context.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. // Copyright 2016-2018 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. // +build go1.7
  14. // A Go client for the NATS messaging system (https://nats.io).
  15. package nats
  16. import (
  17. "context"
  18. "fmt"
  19. "reflect"
  20. )
  21. // RequestWithContext takes a context, a subject and payload
  22. // in bytes and request expecting a single response.
  23. func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
  24. if ctx == nil {
  25. return nil, ErrInvalidContext
  26. }
  27. if nc == nil {
  28. return nil, ErrInvalidConnection
  29. }
  30. nc.mu.Lock()
  31. // If user wants the old style.
  32. if nc.Opts.UseOldRequestStyle {
  33. nc.mu.Unlock()
  34. return nc.oldRequestWithContext(ctx, subj, data)
  35. }
  36. // Do setup for the new style.
  37. if nc.respMap == nil {
  38. // _INBOX wildcard
  39. nc.respSub = fmt.Sprintf("%s.*", NewInbox())
  40. nc.respMap = make(map[string]chan *Msg)
  41. }
  42. // Create literal Inbox and map to a chan msg.
  43. mch := make(chan *Msg, RequestChanLen)
  44. respInbox := nc.newRespInbox()
  45. token := respToken(respInbox)
  46. nc.respMap[token] = mch
  47. createSub := nc.respMux == nil
  48. ginbox := nc.respSub
  49. nc.mu.Unlock()
  50. if createSub {
  51. // Make sure scoped subscription is setup only once.
  52. var err error
  53. nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
  54. if err != nil {
  55. return nil, err
  56. }
  57. }
  58. err := nc.PublishRequest(subj, respInbox, data)
  59. if err != nil {
  60. return nil, err
  61. }
  62. var ok bool
  63. var msg *Msg
  64. select {
  65. case msg, ok = <-mch:
  66. if !ok {
  67. return nil, ErrConnectionClosed
  68. }
  69. case <-ctx.Done():
  70. nc.mu.Lock()
  71. delete(nc.respMap, token)
  72. nc.mu.Unlock()
  73. return nil, ctx.Err()
  74. }
  75. return msg, nil
  76. }
  77. // oldRequestWithContext utilizes inbox and subscription per request.
  78. func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
  79. inbox := NewInbox()
  80. ch := make(chan *Msg, RequestChanLen)
  81. s, err := nc.subscribe(inbox, _EMPTY_, nil, ch)
  82. if err != nil {
  83. return nil, err
  84. }
  85. s.AutoUnsubscribe(1)
  86. defer s.Unsubscribe()
  87. err = nc.PublishRequest(subj, inbox, data)
  88. if err != nil {
  89. return nil, err
  90. }
  91. return s.NextMsgWithContext(ctx)
  92. }
  93. // NextMsgWithContext takes a context and returns the next message
  94. // available to a synchronous subscriber, blocking until it is delivered
  95. // or context gets canceled.
  96. func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
  97. if ctx == nil {
  98. return nil, ErrInvalidContext
  99. }
  100. if s == nil {
  101. return nil, ErrBadSubscription
  102. }
  103. s.mu.Lock()
  104. err := s.validateNextMsgState()
  105. if err != nil {
  106. s.mu.Unlock()
  107. return nil, err
  108. }
  109. // snapshot
  110. mch := s.mch
  111. s.mu.Unlock()
  112. var ok bool
  113. var msg *Msg
  114. select {
  115. case msg, ok = <-mch:
  116. if !ok {
  117. return nil, ErrConnectionClosed
  118. }
  119. err := s.processNextMsgDelivered(msg)
  120. if err != nil {
  121. return nil, err
  122. }
  123. case <-ctx.Done():
  124. return nil, ctx.Err()
  125. }
  126. return msg, nil
  127. }
  128. // RequestWithContext will create an Inbox and perform a Request
  129. // using the provided cancellation context with the Inbox reply
  130. // for the data v. A response will be decoded into the vPtrResponse.
  131. func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v interface{}, vPtr interface{}) error {
  132. if ctx == nil {
  133. return ErrInvalidContext
  134. }
  135. b, err := c.Enc.Encode(subject, v)
  136. if err != nil {
  137. return err
  138. }
  139. m, err := c.Conn.RequestWithContext(ctx, subject, b)
  140. if err != nil {
  141. return err
  142. }
  143. if reflect.TypeOf(vPtr) == emptyMsgType {
  144. mPtr := vPtr.(*Msg)
  145. *mPtr = *m
  146. } else {
  147. err := c.Enc.Decode(m.Subject, m.Data, vPtr)
  148. if err != nil {
  149. return err
  150. }
  151. }
  152. return nil
  153. }