enc.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. // Copyright 2012-2018 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "errors"
  16. "fmt"
  17. "reflect"
  18. "sync"
  19. "time"
  20. // Default Encoders
  21. . "github.com/nats-io/go-nats/encoders/builtin"
  22. )
  23. // Encoder interface is for all register encoders
  24. type Encoder interface {
  25. Encode(subject string, v interface{}) ([]byte, error)
  26. Decode(subject string, data []byte, vPtr interface{}) error
  27. }
  28. var encMap map[string]Encoder
  29. var encLock sync.Mutex
  30. // Indexe names into the Registered Encoders.
  31. const (
  32. JSON_ENCODER = "json"
  33. GOB_ENCODER = "gob"
  34. DEFAULT_ENCODER = "default"
  35. )
  36. func init() {
  37. encMap = make(map[string]Encoder)
  38. // Register json, gob and default encoder
  39. RegisterEncoder(JSON_ENCODER, &JsonEncoder{})
  40. RegisterEncoder(GOB_ENCODER, &GobEncoder{})
  41. RegisterEncoder(DEFAULT_ENCODER, &DefaultEncoder{})
  42. }
  43. // EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
  44. // a nats server and have an extendable encoder system that will encode and decode messages
  45. // from raw Go types.
  46. type EncodedConn struct {
  47. Conn *Conn
  48. Enc Encoder
  49. }
  50. // NewEncodedConn will wrap an existing Connection and utilize the appropriate registered
  51. // encoder.
  52. func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
  53. if c == nil {
  54. return nil, errors.New("nats: Nil Connection")
  55. }
  56. if c.IsClosed() {
  57. return nil, ErrConnectionClosed
  58. }
  59. ec := &EncodedConn{Conn: c, Enc: EncoderForType(encType)}
  60. if ec.Enc == nil {
  61. return nil, fmt.Errorf("No encoder registered for '%s'", encType)
  62. }
  63. return ec, nil
  64. }
  65. // RegisterEncoder will register the encType with the given Encoder. Useful for customization.
  66. func RegisterEncoder(encType string, enc Encoder) {
  67. encLock.Lock()
  68. defer encLock.Unlock()
  69. encMap[encType] = enc
  70. }
  71. // EncoderForType will return the registered Encoder for the encType.
  72. func EncoderForType(encType string) Encoder {
  73. encLock.Lock()
  74. defer encLock.Unlock()
  75. return encMap[encType]
  76. }
  77. // Publish publishes the data argument to the given subject. The data argument
  78. // will be encoded using the associated encoder.
  79. func (c *EncodedConn) Publish(subject string, v interface{}) error {
  80. b, err := c.Enc.Encode(subject, v)
  81. if err != nil {
  82. return err
  83. }
  84. return c.Conn.publish(subject, _EMPTY_, b)
  85. }
  86. // PublishRequest will perform a Publish() expecting a response on the
  87. // reply subject. Use Request() for automatically waiting for a response
  88. // inline.
  89. func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error {
  90. b, err := c.Enc.Encode(subject, v)
  91. if err != nil {
  92. return err
  93. }
  94. return c.Conn.publish(subject, reply, b)
  95. }
  96. // Request will create an Inbox and perform a Request() call
  97. // with the Inbox reply for the data v. A response will be
  98. // decoded into the vPtrResponse.
  99. func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
  100. b, err := c.Enc.Encode(subject, v)
  101. if err != nil {
  102. return err
  103. }
  104. m, err := c.Conn.Request(subject, b, timeout)
  105. if err != nil {
  106. return err
  107. }
  108. if reflect.TypeOf(vPtr) == emptyMsgType {
  109. mPtr := vPtr.(*Msg)
  110. *mPtr = *m
  111. } else {
  112. err = c.Enc.Decode(m.Subject, m.Data, vPtr)
  113. }
  114. return err
  115. }
  116. // Handler is a specific callback used for Subscribe. It is generalized to
  117. // an interface{}, but we will discover its format and arguments at runtime
  118. // and perform the correct callback, including de-marshaling JSON strings
  119. // back into the appropriate struct based on the signature of the Handler.
  120. //
  121. // Handlers are expected to have one of four signatures.
  122. //
  123. // type person struct {
  124. // Name string `json:"name,omitempty"`
  125. // Age uint `json:"age,omitempty"`
  126. // }
  127. //
  128. // handler := func(m *Msg)
  129. // handler := func(p *person)
  130. // handler := func(subject string, o *obj)
  131. // handler := func(subject, reply string, o *obj)
  132. //
  133. // These forms allow a callback to request a raw Msg ptr, where the processing
  134. // of the message from the wire is untouched. Process a JSON representation
  135. // and demarshal it into the given struct, e.g. person.
  136. // There are also variants where the callback wants either the subject, or the
  137. // subject and the reply subject.
  138. type Handler interface{}
  139. // Dissect the cb Handler's signature
  140. func argInfo(cb Handler) (reflect.Type, int) {
  141. cbType := reflect.TypeOf(cb)
  142. if cbType.Kind() != reflect.Func {
  143. panic("nats: Handler needs to be a func")
  144. }
  145. numArgs := cbType.NumIn()
  146. if numArgs == 0 {
  147. return nil, numArgs
  148. }
  149. return cbType.In(numArgs - 1), numArgs
  150. }
  151. var emptyMsgType = reflect.TypeOf(&Msg{})
  152. // Subscribe will create a subscription on the given subject and process incoming
  153. // messages using the specified Handler. The Handler should be a func that matches
  154. // a signature from the description of Handler from above.
  155. func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error) {
  156. return c.subscribe(subject, _EMPTY_, cb)
  157. }
  158. // QueueSubscribe will create a queue subscription on the given subject and process
  159. // incoming messages using the specified Handler. The Handler should be a func that
  160. // matches a signature from the description of Handler from above.
  161. func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) {
  162. return c.subscribe(subject, queue, cb)
  163. }
  164. // Internal implementation that all public functions will use.
  165. func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscription, error) {
  166. if cb == nil {
  167. return nil, errors.New("nats: Handler required for EncodedConn Subscription")
  168. }
  169. argType, numArgs := argInfo(cb)
  170. if argType == nil {
  171. return nil, errors.New("nats: Handler requires at least one argument")
  172. }
  173. cbValue := reflect.ValueOf(cb)
  174. wantsRaw := (argType == emptyMsgType)
  175. natsCB := func(m *Msg) {
  176. var oV []reflect.Value
  177. if wantsRaw {
  178. oV = []reflect.Value{reflect.ValueOf(m)}
  179. } else {
  180. var oPtr reflect.Value
  181. if argType.Kind() != reflect.Ptr {
  182. oPtr = reflect.New(argType)
  183. } else {
  184. oPtr = reflect.New(argType.Elem())
  185. }
  186. if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
  187. if c.Conn.Opts.AsyncErrorCB != nil {
  188. c.Conn.ach <- func() {
  189. c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, errors.New("nats: Got an error trying to unmarshal: "+err.Error()))
  190. }
  191. }
  192. return
  193. }
  194. if argType.Kind() != reflect.Ptr {
  195. oPtr = reflect.Indirect(oPtr)
  196. }
  197. // Callback Arity
  198. switch numArgs {
  199. case 1:
  200. oV = []reflect.Value{oPtr}
  201. case 2:
  202. subV := reflect.ValueOf(m.Subject)
  203. oV = []reflect.Value{subV, oPtr}
  204. case 3:
  205. subV := reflect.ValueOf(m.Subject)
  206. replyV := reflect.ValueOf(m.Reply)
  207. oV = []reflect.Value{subV, replyV, oPtr}
  208. }
  209. }
  210. cbValue.Call(oV)
  211. }
  212. return c.Conn.subscribe(subject, queue, natsCB, nil)
  213. }
  214. // FlushTimeout allows a Flush operation to have an associated timeout.
  215. func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error) {
  216. return c.Conn.FlushTimeout(timeout)
  217. }
  218. // Flush will perform a round trip to the server and return when it
  219. // receives the internal reply.
  220. func (c *EncodedConn) Flush() error {
  221. return c.Conn.Flush()
  222. }
  223. // Close will close the connection to the server. This call will release
  224. // all blocking calls, such as Flush(), etc.
  225. func (c *EncodedConn) Close() {
  226. c.Conn.Close()
  227. }
  228. // LastError reports the last error encountered via the Connection.
  229. func (c *EncodedConn) LastError() error {
  230. return c.Conn.err
  231. }