123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- package nats
- import (
- "errors"
- "reflect"
- )
- func (c *EncodedConn) BindSendChan(subject string, channel interface{}) error {
- chVal := reflect.ValueOf(channel)
- if chVal.Kind() != reflect.Chan {
- return ErrChanArg
- }
- go chPublish(c, chVal, subject)
- return nil
- }
- func chPublish(c *EncodedConn, chVal reflect.Value, subject string) {
- for {
- val, ok := chVal.Recv()
- if !ok {
-
- return
- }
- if e := c.Publish(subject, val.Interface()); e != nil {
-
- c.Conn.mu.Lock()
- defer c.Conn.mu.Unlock()
- if c.Conn.Opts.AsyncErrorCB != nil {
-
-
- if c.Conn.isClosed() {
- go c.Conn.Opts.AsyncErrorCB(c.Conn, nil, e)
- } else {
- c.Conn.ach <- func() { c.Conn.Opts.AsyncErrorCB(c.Conn, nil, e) }
- }
- }
- return
- }
- }
- }
- func (c *EncodedConn) BindRecvChan(subject string, channel interface{}) (*Subscription, error) {
- return c.bindRecvChan(subject, _EMPTY_, channel)
- }
- func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel interface{}) (*Subscription, error) {
- return c.bindRecvChan(subject, queue, channel)
- }
- func (c *EncodedConn) bindRecvChan(subject, queue string, channel interface{}) (*Subscription, error) {
- chVal := reflect.ValueOf(channel)
- if chVal.Kind() != reflect.Chan {
- return nil, ErrChanArg
- }
- argType := chVal.Type().Elem()
- cb := func(m *Msg) {
- var oPtr reflect.Value
- if argType.Kind() != reflect.Ptr {
- oPtr = reflect.New(argType)
- } else {
- oPtr = reflect.New(argType.Elem())
- }
- if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
- c.Conn.err = errors.New("nats: Got an error trying to unmarshal: " + err.Error())
- if c.Conn.Opts.AsyncErrorCB != nil {
- c.Conn.ach <- func() { c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, c.Conn.err) }
- }
- return
- }
- if argType.Kind() != reflect.Ptr {
- oPtr = reflect.Indirect(oPtr)
- }
-
-
- defer func() {
-
- if r := recover(); r != nil {
- m.Sub.Unsubscribe()
- }
- }()
-
- chVal.Send(oPtr)
- }
- return c.Conn.subscribe(subject, queue, cb, nil)
- }
|