context.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. // Copyright 2016-2018 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "context"
  16. "reflect"
  17. )
  18. // RequestMsgWithContext takes a context, a subject and payload
  19. // in bytes and request expecting a single response.
  20. func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error) {
  21. var hdr []byte
  22. var err error
  23. if len(msg.Header) > 0 {
  24. if !nc.info.Headers {
  25. return nil, ErrHeadersNotSupported
  26. }
  27. hdr, err = msg.headerBytes()
  28. if err != nil {
  29. return nil, err
  30. }
  31. }
  32. return nc.requestWithContext(ctx, msg.Subject, hdr, msg.Data)
  33. }
  34. // RequestWithContext takes a context, a subject and payload
  35. // in bytes and request expecting a single response.
  36. func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
  37. return nc.requestWithContext(ctx, subj, nil, data)
  38. }
  39. func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
  40. if ctx == nil {
  41. return nil, ErrInvalidContext
  42. }
  43. if nc == nil {
  44. return nil, ErrInvalidConnection
  45. }
  46. // Check whether the context is done already before making
  47. // the request.
  48. if ctx.Err() != nil {
  49. return nil, ctx.Err()
  50. }
  51. var m *Msg
  52. var err error
  53. // If user wants the old style.
  54. if nc.useOldRequestStyle() {
  55. m, err = nc.oldRequestWithContext(ctx, subj, hdr, data)
  56. } else {
  57. mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
  58. if err != nil {
  59. return nil, err
  60. }
  61. var ok bool
  62. select {
  63. case m, ok = <-mch:
  64. if !ok {
  65. return nil, ErrConnectionClosed
  66. }
  67. case <-ctx.Done():
  68. nc.mu.Lock()
  69. delete(nc.respMap, token)
  70. nc.mu.Unlock()
  71. return nil, ctx.Err()
  72. }
  73. }
  74. // Check for no responder status.
  75. if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
  76. m, err = nil, ErrNoResponders
  77. }
  78. return m, err
  79. }
  80. // oldRequestWithContext utilizes inbox and subscription per request.
  81. func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
  82. inbox := NewInbox()
  83. ch := make(chan *Msg, RequestChanLen)
  84. s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
  85. if err != nil {
  86. return nil, err
  87. }
  88. s.AutoUnsubscribe(1)
  89. defer s.Unsubscribe()
  90. err = nc.publish(subj, inbox, hdr, data)
  91. if err != nil {
  92. return nil, err
  93. }
  94. return s.NextMsgWithContext(ctx)
  95. }
  96. // NextMsgWithContext takes a context and returns the next message
  97. // available to a synchronous subscriber, blocking until it is delivered
  98. // or context gets canceled.
  99. func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
  100. if ctx == nil {
  101. return nil, ErrInvalidContext
  102. }
  103. if s == nil {
  104. return nil, ErrBadSubscription
  105. }
  106. if ctx.Err() != nil {
  107. return nil, ctx.Err()
  108. }
  109. s.mu.Lock()
  110. err := s.validateNextMsgState()
  111. if err != nil {
  112. s.mu.Unlock()
  113. return nil, err
  114. }
  115. // snapshot
  116. mch := s.mch
  117. s.mu.Unlock()
  118. var ok bool
  119. var msg *Msg
  120. // If something is available right away, let's optimize that case.
  121. select {
  122. case msg, ok = <-mch:
  123. if !ok {
  124. return nil, s.getNextMsgErr()
  125. }
  126. if err := s.processNextMsgDelivered(msg); err != nil {
  127. return nil, err
  128. } else {
  129. return msg, nil
  130. }
  131. default:
  132. }
  133. select {
  134. case msg, ok = <-mch:
  135. if !ok {
  136. return nil, s.getNextMsgErr()
  137. }
  138. if err := s.processNextMsgDelivered(msg); err != nil {
  139. return nil, err
  140. }
  141. case <-ctx.Done():
  142. return nil, ctx.Err()
  143. }
  144. return msg, nil
  145. }
  146. // FlushWithContext will allow a context to control the duration
  147. // of a Flush() call. This context should be non-nil and should
  148. // have a deadline set. We will return an error if none is present.
  149. func (nc *Conn) FlushWithContext(ctx context.Context) error {
  150. if nc == nil {
  151. return ErrInvalidConnection
  152. }
  153. if ctx == nil {
  154. return ErrInvalidContext
  155. }
  156. _, ok := ctx.Deadline()
  157. if !ok {
  158. return ErrNoDeadlineContext
  159. }
  160. nc.mu.Lock()
  161. if nc.isClosed() {
  162. nc.mu.Unlock()
  163. return ErrConnectionClosed
  164. }
  165. // Create a buffered channel to prevent chan send to block
  166. // in processPong()
  167. ch := make(chan struct{}, 1)
  168. nc.sendPing(ch)
  169. nc.mu.Unlock()
  170. var err error
  171. select {
  172. case _, ok := <-ch:
  173. if !ok {
  174. err = ErrConnectionClosed
  175. } else {
  176. close(ch)
  177. }
  178. case <-ctx.Done():
  179. err = ctx.Err()
  180. }
  181. if err != nil {
  182. nc.removeFlushEntry(ch)
  183. }
  184. return err
  185. }
  186. // RequestWithContext will create an Inbox and perform a Request
  187. // using the provided cancellation context with the Inbox reply
  188. // for the data v. A response will be decoded into the vPtrResponse.
  189. func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v interface{}, vPtr interface{}) error {
  190. if ctx == nil {
  191. return ErrInvalidContext
  192. }
  193. b, err := c.Enc.Encode(subject, v)
  194. if err != nil {
  195. return err
  196. }
  197. m, err := c.Conn.RequestWithContext(ctx, subject, b)
  198. if err != nil {
  199. return err
  200. }
  201. if reflect.TypeOf(vPtr) == emptyMsgType {
  202. mPtr := vPtr.(*Msg)
  203. *mPtr = *m
  204. } else {
  205. err := c.Enc.Decode(m.Subject, m.Data, vPtr)
  206. if err != nil {
  207. return err
  208. }
  209. }
  210. return nil
  211. }