123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- // Copyright 2012-2018 The NATS Authors
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package nats
- import (
- "errors"
- "fmt"
- "reflect"
- "sync"
- "time"
- // Default Encoders
- "github.com/nats-io/go-nats/encoders/builtin"
- )
- // Encoder interface is for all register encoders
- type Encoder interface {
- Encode(subject string, v interface{}) ([]byte, error)
- Decode(subject string, data []byte, vPtr interface{}) error
- }
- var encMap map[string]Encoder
- var encLock sync.Mutex
- // Indexe names into the Registered Encoders.
- const (
- JSON_ENCODER = "json"
- GOB_ENCODER = "gob"
- DEFAULT_ENCODER = "default"
- )
- func init() {
- encMap = make(map[string]Encoder)
- // Register json, gob and default encoder
- RegisterEncoder(JSON_ENCODER, &builtin.JsonEncoder{})
- RegisterEncoder(GOB_ENCODER, &builtin.GobEncoder{})
- RegisterEncoder(DEFAULT_ENCODER, &builtin.DefaultEncoder{})
- }
- // EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
- // a nats server and have an extendable encoder system that will encode and decode messages
- // from raw Go types.
- type EncodedConn struct {
- Conn *Conn
- Enc Encoder
- }
- // NewEncodedConn will wrap an existing Connection and utilize the appropriate registered
- // encoder.
- func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) {
- if c == nil {
- return nil, errors.New("nats: Nil Connection")
- }
- if c.IsClosed() {
- return nil, ErrConnectionClosed
- }
- ec := &EncodedConn{Conn: c, Enc: EncoderForType(encType)}
- if ec.Enc == nil {
- return nil, fmt.Errorf("no encoder registered for '%s'", encType)
- }
- return ec, nil
- }
- // RegisterEncoder will register the encType with the given Encoder. Useful for customization.
- func RegisterEncoder(encType string, enc Encoder) {
- encLock.Lock()
- defer encLock.Unlock()
- encMap[encType] = enc
- }
- // EncoderForType will return the registered Encoder for the encType.
- func EncoderForType(encType string) Encoder {
- encLock.Lock()
- defer encLock.Unlock()
- return encMap[encType]
- }
- // Publish publishes the data argument to the given subject. The data argument
- // will be encoded using the associated encoder.
- func (c *EncodedConn) Publish(subject string, v interface{}) error {
- b, err := c.Enc.Encode(subject, v)
- if err != nil {
- return err
- }
- return c.Conn.publish(subject, _EMPTY_, b)
- }
- // PublishRequest will perform a Publish() expecting a response on the
- // reply subject. Use Request() for automatically waiting for a response
- // inline.
- func (c *EncodedConn) PublishRequest(subject, reply string, v interface{}) error {
- b, err := c.Enc.Encode(subject, v)
- if err != nil {
- return err
- }
- return c.Conn.publish(subject, reply, b)
- }
- // Request will create an Inbox and perform a Request() call
- // with the Inbox reply for the data v. A response will be
- // decoded into the vPtrResponse.
- func (c *EncodedConn) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
- b, err := c.Enc.Encode(subject, v)
- if err != nil {
- return err
- }
- m, err := c.Conn.Request(subject, b, timeout)
- if err != nil {
- return err
- }
- if reflect.TypeOf(vPtr) == emptyMsgType {
- mPtr := vPtr.(*Msg)
- *mPtr = *m
- } else {
- err = c.Enc.Decode(m.Subject, m.Data, vPtr)
- }
- return err
- }
- // Handler is a specific callback used for Subscribe. It is generalized to
- // an interface{}, but we will discover its format and arguments at runtime
- // and perform the correct callback, including de-marshaling JSON strings
- // back into the appropriate struct based on the signature of the Handler.
- //
- // Handlers are expected to have one of four signatures.
- //
- // type person struct {
- // Name string `json:"name,omitempty"`
- // Age uint `json:"age,omitempty"`
- // }
- //
- // handler := func(m *Msg)
- // handler := func(p *person)
- // handler := func(subject string, o *obj)
- // handler := func(subject, reply string, o *obj)
- //
- // These forms allow a callback to request a raw Msg ptr, where the processing
- // of the message from the wire is untouched. Process a JSON representation
- // and demarshal it into the given struct, e.g. person.
- // There are also variants where the callback wants either the subject, or the
- // subject and the reply subject.
- type Handler interface{}
- // Dissect the cb Handler's signature
- func argInfo(cb Handler) (reflect.Type, int) {
- cbType := reflect.TypeOf(cb)
- if cbType.Kind() != reflect.Func {
- panic("nats: Handler needs to be a func")
- }
- numArgs := cbType.NumIn()
- if numArgs == 0 {
- return nil, numArgs
- }
- return cbType.In(numArgs - 1), numArgs
- }
- var emptyMsgType = reflect.TypeOf(&Msg{})
- // Subscribe will create a subscription on the given subject and process incoming
- // messages using the specified Handler. The Handler should be a func that matches
- // a signature from the description of Handler from above.
- func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error) {
- return c.subscribe(subject, _EMPTY_, cb)
- }
- // QueueSubscribe will create a queue subscription on the given subject and process
- // incoming messages using the specified Handler. The Handler should be a func that
- // matches a signature from the description of Handler from above.
- func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) {
- return c.subscribe(subject, queue, cb)
- }
- // Internal implementation that all public functions will use.
- func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscription, error) {
- if cb == nil {
- return nil, errors.New("nats: Handler required for EncodedConn Subscription")
- }
- argType, numArgs := argInfo(cb)
- if argType == nil {
- return nil, errors.New("nats: Handler requires at least one argument")
- }
- cbValue := reflect.ValueOf(cb)
- wantsRaw := (argType == emptyMsgType)
- natsCB := func(m *Msg) {
- var oV []reflect.Value
- if wantsRaw {
- oV = []reflect.Value{reflect.ValueOf(m)}
- } else {
- var oPtr reflect.Value
- if argType.Kind() != reflect.Ptr {
- oPtr = reflect.New(argType)
- } else {
- oPtr = reflect.New(argType.Elem())
- }
- if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
- if c.Conn.Opts.AsyncErrorCB != nil {
- c.Conn.ach.push(func() {
- c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, errors.New("nats: Got an error trying to unmarshal: "+err.Error()))
- })
- }
- return
- }
- if argType.Kind() != reflect.Ptr {
- oPtr = reflect.Indirect(oPtr)
- }
- // Callback Arity
- switch numArgs {
- case 1:
- oV = []reflect.Value{oPtr}
- case 2:
- subV := reflect.ValueOf(m.Subject)
- oV = []reflect.Value{subV, oPtr}
- case 3:
- subV := reflect.ValueOf(m.Subject)
- replyV := reflect.ValueOf(m.Reply)
- oV = []reflect.Value{subV, replyV, oPtr}
- }
- }
- cbValue.Call(oV)
- }
- return c.Conn.subscribe(subject, queue, natsCB, nil)
- }
- // FlushTimeout allows a Flush operation to have an associated timeout.
- func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error) {
- return c.Conn.FlushTimeout(timeout)
- }
- // Flush will perform a round trip to the server and return when it
- // receives the internal reply.
- func (c *EncodedConn) Flush() error {
- return c.Conn.Flush()
- }
- // Close will close the connection to the server. This call will release
- // all blocking calls, such as Flush(), etc.
- func (c *EncodedConn) Close() {
- c.Conn.Close()
- }
- // Drain will put a connection into a drain state. All subscriptions will
- // immediately be put into a drain state. Upon completion, the publishers
- // will be drained and can not publish any additional messages. Upon draining
- // of the publishers, the connection will be closed. Use the ClosedCB()
- // option to know when the connection has moved from draining to closed.
- func (c *EncodedConn) Drain() error {
- return c.Conn.Drain()
- }
- // LastError reports the last error encountered via the Connection.
- func (c *EncodedConn) LastError() error {
- return c.Conn.err
- }
|