|
@@ -0,0 +1,2345 @@
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+package nats
|
|
|
+
|
|
|
+import (
|
|
|
+ "bytes"
|
|
|
+ "context"
|
|
|
+ "crypto/sha256"
|
|
|
+ "encoding/json"
|
|
|
+ "errors"
|
|
|
+ "fmt"
|
|
|
+ "math/rand"
|
|
|
+ "strconv"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+ "sync/atomic"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/nats-io/nuid"
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+const (
|
|
|
+
|
|
|
+ defaultAPIPrefix = "$JS.API."
|
|
|
+
|
|
|
+
|
|
|
+ apiAccountInfo = "INFO"
|
|
|
+
|
|
|
+
|
|
|
+ apiConsumerCreateT = "CONSUMER.CREATE.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiConsumerListT = "CONSUMER.LIST.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiConsumerNamesT = "CONSUMER.NAMES.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiStreams = "STREAM.NAMES"
|
|
|
+
|
|
|
+
|
|
|
+ apiStreamCreateT = "STREAM.CREATE.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiStreamInfoT = "STREAM.INFO.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiStreamUpdateT = "STREAM.UPDATE.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiStreamDeleteT = "STREAM.DELETE.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiStreamPurgeT = "STREAM.PURGE.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiStreamList = "STREAM.LIST"
|
|
|
+
|
|
|
+
|
|
|
+ apiMsgGetT = "STREAM.MSG.GET.%s"
|
|
|
+
|
|
|
+
|
|
|
+ apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+type JetStream interface {
|
|
|
+
|
|
|
+ Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
|
|
|
+
|
|
|
+
|
|
|
+ PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
|
|
|
+
|
|
|
+
|
|
|
+ PublishAsyncPending() int
|
|
|
+
|
|
|
+
|
|
|
+ PublishAsyncComplete() <-chan struct{}
|
|
|
+
|
|
|
+
|
|
|
+ Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
|
|
|
+
|
|
|
+
|
|
|
+ SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
|
|
|
+
|
|
|
+
|
|
|
+ ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
|
|
|
+
|
|
|
+
|
|
|
+ QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
|
|
|
+
|
|
|
+
|
|
|
+ QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
|
|
|
+
|
|
|
+
|
|
|
+ PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type JetStreamContext interface {
|
|
|
+ JetStream
|
|
|
+ JetStreamManager
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type js struct {
|
|
|
+ nc *Conn
|
|
|
+ opts *jsOpts
|
|
|
+
|
|
|
+
|
|
|
+ mu sync.RWMutex
|
|
|
+ rpre string
|
|
|
+ rsub *Subscription
|
|
|
+ pafs map[string]*pubAckFuture
|
|
|
+ stc chan struct{}
|
|
|
+ dch chan struct{}
|
|
|
+ rr *rand.Rand
|
|
|
+}
|
|
|
+
|
|
|
+type jsOpts struct {
|
|
|
+ ctx context.Context
|
|
|
+
|
|
|
+ pre string
|
|
|
+
|
|
|
+ wait time.Duration
|
|
|
+
|
|
|
+ aecb MsgErrHandler
|
|
|
+
|
|
|
+ maxap int
|
|
|
+}
|
|
|
+
|
|
|
+const (
|
|
|
+ defaultRequestWait = 5 * time.Second
|
|
|
+ defaultAccountCheck = 20 * time.Second
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
|
|
|
+ js := &js{
|
|
|
+ nc: nc,
|
|
|
+ opts: &jsOpts{
|
|
|
+ pre: defaultAPIPrefix,
|
|
|
+ wait: defaultRequestWait,
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, opt := range opts {
|
|
|
+ if err := opt.configureJSContext(js.opts); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ nc.mu.Lock()
|
|
|
+ now := time.Now()
|
|
|
+ checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
|
|
|
+ if checkAccount {
|
|
|
+ nc.jsLastCheck = now
|
|
|
+ }
|
|
|
+ nc.mu.Unlock()
|
|
|
+
|
|
|
+ if checkAccount {
|
|
|
+ if _, err := js.AccountInfo(); err != nil {
|
|
|
+ if err == ErrNoResponders {
|
|
|
+ err = ErrJetStreamNotEnabled
|
|
|
+ }
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return js, nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type JSOpt interface {
|
|
|
+ configureJSContext(opts *jsOpts) error
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type jsOptFn func(opts *jsOpts) error
|
|
|
+
|
|
|
+func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
|
|
|
+ return opt(opts)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func APIPrefix(pre string) JSOpt {
|
|
|
+ return jsOptFn(func(js *jsOpts) error {
|
|
|
+ js.pre = pre
|
|
|
+ if !strings.HasSuffix(js.pre, ".") {
|
|
|
+ js.pre = js.pre + "."
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+func (js *js) apiSubj(subj string) string {
|
|
|
+ if js.opts.pre == _EMPTY_ {
|
|
|
+ return subj
|
|
|
+ }
|
|
|
+ var b strings.Builder
|
|
|
+ b.WriteString(js.opts.pre)
|
|
|
+ b.WriteString(subj)
|
|
|
+ return b.String()
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type PubOpt interface {
|
|
|
+ configurePublish(opts *pubOpts) error
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type pubOptFn func(opts *pubOpts) error
|
|
|
+
|
|
|
+func (opt pubOptFn) configurePublish(opts *pubOpts) error {
|
|
|
+ return opt(opts)
|
|
|
+}
|
|
|
+
|
|
|
+type pubOpts struct {
|
|
|
+ ctx context.Context
|
|
|
+ ttl time.Duration
|
|
|
+ id string
|
|
|
+ lid string
|
|
|
+ str string
|
|
|
+ seq uint64
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type pubAckResponse struct {
|
|
|
+ apiResponse
|
|
|
+ *PubAck
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type PubAck struct {
|
|
|
+ Stream string `json:"stream"`
|
|
|
+ Sequence uint64 `json:"seq"`
|
|
|
+ Duplicate bool `json:"duplicate,omitempty"`
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+const (
|
|
|
+ MsgIdHdr = "Nats-Msg-Id"
|
|
|
+ ExpectedStreamHdr = "Nats-Expected-Stream"
|
|
|
+ ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
|
|
|
+ ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
|
|
|
+ var o pubOpts
|
|
|
+ if len(opts) > 0 {
|
|
|
+ if m.Header == nil {
|
|
|
+ m.Header = Header{}
|
|
|
+ }
|
|
|
+ for _, opt := range opts {
|
|
|
+ if err := opt.configurePublish(&o); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if o.ctx != nil && o.ttl != 0 {
|
|
|
+ return nil, ErrContextAndTimeout
|
|
|
+ }
|
|
|
+ if o.ttl == 0 && o.ctx == nil {
|
|
|
+ o.ttl = js.opts.wait
|
|
|
+ }
|
|
|
+
|
|
|
+ if o.id != _EMPTY_ {
|
|
|
+ m.Header.Set(MsgIdHdr, o.id)
|
|
|
+ }
|
|
|
+ if o.lid != _EMPTY_ {
|
|
|
+ m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
|
|
|
+ }
|
|
|
+ if o.str != _EMPTY_ {
|
|
|
+ m.Header.Set(ExpectedStreamHdr, o.str)
|
|
|
+ }
|
|
|
+ if o.seq > 0 {
|
|
|
+ m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
|
|
|
+ }
|
|
|
+
|
|
|
+ var resp *Msg
|
|
|
+ var err error
|
|
|
+
|
|
|
+ if o.ttl > 0 {
|
|
|
+ resp, err = js.nc.RequestMsg(m, time.Duration(o.ttl))
|
|
|
+ } else {
|
|
|
+ resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ if err == ErrNoResponders {
|
|
|
+ err = ErrNoStreamResponse
|
|
|
+ }
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ var pa pubAckResponse
|
|
|
+ if err := json.Unmarshal(resp.Data, &pa); err != nil {
|
|
|
+ return nil, ErrInvalidJSAck
|
|
|
+ }
|
|
|
+ if pa.Error != nil {
|
|
|
+ return nil, fmt.Errorf("nats: %s", pa.Error.Description)
|
|
|
+ }
|
|
|
+ if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
|
|
|
+ return nil, ErrInvalidJSAck
|
|
|
+ }
|
|
|
+ return pa.PubAck, nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) {
|
|
|
+ return js.PublishMsg(&Msg{Subject: subj, Data: data}, opts...)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type PubAckFuture interface {
|
|
|
+
|
|
|
+ Ok() <-chan *PubAck
|
|
|
+
|
|
|
+
|
|
|
+ Err() <-chan error
|
|
|
+
|
|
|
+
|
|
|
+ Msg() *Msg
|
|
|
+}
|
|
|
+
|
|
|
+type pubAckFuture struct {
|
|
|
+ js *js
|
|
|
+ msg *Msg
|
|
|
+ pa *PubAck
|
|
|
+ st time.Time
|
|
|
+ err error
|
|
|
+ errCh chan error
|
|
|
+ doneCh chan *PubAck
|
|
|
+}
|
|
|
+
|
|
|
+func (paf *pubAckFuture) Ok() <-chan *PubAck {
|
|
|
+ paf.js.mu.Lock()
|
|
|
+ defer paf.js.mu.Unlock()
|
|
|
+
|
|
|
+ if paf.doneCh == nil {
|
|
|
+ paf.doneCh = make(chan *PubAck, 1)
|
|
|
+ if paf.pa != nil {
|
|
|
+ paf.doneCh <- paf.pa
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return paf.doneCh
|
|
|
+}
|
|
|
+
|
|
|
+func (paf *pubAckFuture) Err() <-chan error {
|
|
|
+ paf.js.mu.Lock()
|
|
|
+ defer paf.js.mu.Unlock()
|
|
|
+
|
|
|
+ if paf.errCh == nil {
|
|
|
+ paf.errCh = make(chan error, 1)
|
|
|
+ if paf.err != nil {
|
|
|
+ paf.errCh <- paf.err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return paf.errCh
|
|
|
+}
|
|
|
+
|
|
|
+func (paf *pubAckFuture) Msg() *Msg {
|
|
|
+ paf.js.mu.RLock()
|
|
|
+ defer paf.js.mu.RUnlock()
|
|
|
+ return paf.msg
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) pullSubscribe(subj string) (*Subscription, error) {
|
|
|
+ jsi := &jsSub{js: js, pull: true}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ jsi.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
|
+ jsi.rpre = fmt.Sprintf("%s.", NewInbox())
|
|
|
+ sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", jsi.rpre), jsi.handleFetch)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ jsi.psub = sub
|
|
|
+
|
|
|
+ return &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: jsi}, nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+const aReplyPreLen = 14
|
|
|
+const aReplyTokensize = 6
|
|
|
+
|
|
|
+func (js *js) newAsyncReply() string {
|
|
|
+ js.mu.Lock()
|
|
|
+ if js.rsub == nil {
|
|
|
+
|
|
|
+ sha := sha256.New()
|
|
|
+ sha.Write([]byte(nuid.Next()))
|
|
|
+ b := sha.Sum(nil)
|
|
|
+ for i := 0; i < aReplyTokensize; i++ {
|
|
|
+ b[i] = rdigits[int(b[i]%base)]
|
|
|
+ }
|
|
|
+ js.rpre = fmt.Sprintf("%s%s.", InboxPrefix, b[:aReplyTokensize])
|
|
|
+ sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
|
|
|
+ if err != nil {
|
|
|
+ js.mu.Unlock()
|
|
|
+ return _EMPTY_
|
|
|
+ }
|
|
|
+ js.rsub = sub
|
|
|
+ js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
|
+ }
|
|
|
+ var sb strings.Builder
|
|
|
+ sb.WriteString(js.rpre)
|
|
|
+ rn := js.rr.Int63()
|
|
|
+ var b [aReplyTokensize]byte
|
|
|
+ for i, l := 0, rn; i < len(b); i++ {
|
|
|
+ b[i] = rdigits[l%base]
|
|
|
+ l /= base
|
|
|
+ }
|
|
|
+ sb.Write(b[:])
|
|
|
+ js.mu.Unlock()
|
|
|
+ return sb.String()
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
|
|
|
+ js.mu.Lock()
|
|
|
+ if js.pafs == nil {
|
|
|
+ js.pafs = make(map[string]*pubAckFuture)
|
|
|
+ }
|
|
|
+ paf.js = js
|
|
|
+ js.pafs[id] = paf
|
|
|
+ np := len(js.pafs)
|
|
|
+ maxap := js.opts.maxap
|
|
|
+ js.mu.Unlock()
|
|
|
+ return np, maxap
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) getPAF(id string) *pubAckFuture {
|
|
|
+ if js.pafs == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ return js.pafs[id]
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) clearPAF(id string) {
|
|
|
+ js.mu.Lock()
|
|
|
+ delete(js.pafs, id)
|
|
|
+ js.mu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) PublishAsyncPending() int {
|
|
|
+ js.mu.RLock()
|
|
|
+ defer js.mu.RUnlock()
|
|
|
+ return len(js.pafs)
|
|
|
+}
|
|
|
+
|
|
|
+func (js *js) asyncStall() <-chan struct{} {
|
|
|
+ js.mu.Lock()
|
|
|
+ if js.stc == nil {
|
|
|
+ js.stc = make(chan struct{})
|
|
|
+ }
|
|
|
+ stc := js.stc
|
|
|
+ js.mu.Unlock()
|
|
|
+ return stc
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) handleAsyncReply(m *Msg) {
|
|
|
+ if len(m.Subject) <= aReplyPreLen {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ id := m.Subject[aReplyPreLen:]
|
|
|
+
|
|
|
+ js.mu.Lock()
|
|
|
+ paf := js.getPAF(id)
|
|
|
+ if paf == nil {
|
|
|
+ js.mu.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ delete(js.pafs, id)
|
|
|
+
|
|
|
+
|
|
|
+ if js.stc != nil && len(js.pafs) < js.opts.maxap {
|
|
|
+ close(js.stc)
|
|
|
+ js.stc = nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if js.dch != nil && len(js.pafs) == 0 {
|
|
|
+ dch := js.dch
|
|
|
+ js.dch = nil
|
|
|
+
|
|
|
+ defer close(dch)
|
|
|
+ }
|
|
|
+
|
|
|
+ doErr := func(err error) {
|
|
|
+ paf.err = err
|
|
|
+ if paf.errCh != nil {
|
|
|
+ paf.errCh <- paf.err
|
|
|
+ }
|
|
|
+ cb := js.opts.aecb
|
|
|
+ js.mu.Unlock()
|
|
|
+ if cb != nil {
|
|
|
+ cb(paf.js, paf.msg, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
|
|
|
+ doErr(ErrNoResponders)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ var pa pubAckResponse
|
|
|
+ if err := json.Unmarshal(m.Data, &pa); err != nil {
|
|
|
+ doErr(ErrInvalidJSAck)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if pa.Error != nil {
|
|
|
+ doErr(fmt.Errorf("nats: %s", pa.Error.Description))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
|
|
|
+ doErr(ErrInvalidJSAck)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ paf.pa = pa.PubAck
|
|
|
+ if paf.doneCh != nil {
|
|
|
+ paf.doneCh <- paf.pa
|
|
|
+ }
|
|
|
+ js.mu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+type MsgErrHandler func(JetStream, *Msg, error)
|
|
|
+
|
|
|
+
|
|
|
+func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt {
|
|
|
+ return jsOptFn(func(js *jsOpts) error {
|
|
|
+ js.aecb = cb
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func PublishAsyncMaxPending(max int) JSOpt {
|
|
|
+ return jsOptFn(func(js *jsOpts) error {
|
|
|
+ if max < 1 {
|
|
|
+ return errors.New("nats: max ack pending should be >= 1")
|
|
|
+ }
|
|
|
+ js.maxap = max
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
|
|
|
+ return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
|
|
|
+}
|
|
|
+
|
|
|
+func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
|
|
|
+ var o pubOpts
|
|
|
+ if len(opts) > 0 {
|
|
|
+ if m.Header == nil {
|
|
|
+ m.Header = Header{}
|
|
|
+ }
|
|
|
+ for _, opt := range opts {
|
|
|
+ if err := opt.configurePublish(&o); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if o.ttl != 0 || o.ctx != nil {
|
|
|
+ return nil, ErrContextAndTimeout
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if o.id != _EMPTY_ {
|
|
|
+ m.Header.Set(MsgIdHdr, o.id)
|
|
|
+ }
|
|
|
+ if o.lid != _EMPTY_ {
|
|
|
+ m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
|
|
|
+ }
|
|
|
+ if o.str != _EMPTY_ {
|
|
|
+ m.Header.Set(ExpectedStreamHdr, o.str)
|
|
|
+ }
|
|
|
+ if o.seq > 0 {
|
|
|
+ m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if m.Reply != _EMPTY_ {
|
|
|
+ return nil, errors.New("nats: reply subject should be empty")
|
|
|
+ }
|
|
|
+ m.Reply = js.newAsyncReply()
|
|
|
+ if m.Reply == _EMPTY_ {
|
|
|
+ return nil, errors.New("nats: error creating async reply handler")
|
|
|
+ }
|
|
|
+ id := m.Reply[aReplyPreLen:]
|
|
|
+ paf := &pubAckFuture{msg: m, st: time.Now()}
|
|
|
+ numPending, maxPending := js.registerPAF(id, paf)
|
|
|
+
|
|
|
+ if maxPending > 0 && numPending >= maxPending {
|
|
|
+ select {
|
|
|
+ case <-js.asyncStall():
|
|
|
+ case <-time.After(200 * time.Millisecond):
|
|
|
+ js.clearPAF(id)
|
|
|
+ return nil, errors.New("nats: stalled with too many outstanding async published messages")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := js.nc.PublishMsg(m); err != nil {
|
|
|
+ js.clearPAF(id)
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return paf, nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) PublishAsyncComplete() <-chan struct{} {
|
|
|
+ js.mu.Lock()
|
|
|
+ defer js.mu.Unlock()
|
|
|
+ if js.dch == nil {
|
|
|
+ js.dch = make(chan struct{})
|
|
|
+ }
|
|
|
+ dch := js.dch
|
|
|
+ if len(js.pafs) == 0 {
|
|
|
+ close(js.dch)
|
|
|
+ js.dch = nil
|
|
|
+ }
|
|
|
+ return dch
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func MsgId(id string) PubOpt {
|
|
|
+ return pubOptFn(func(opts *pubOpts) error {
|
|
|
+ opts.id = id
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func ExpectStream(stream string) PubOpt {
|
|
|
+ return pubOptFn(func(opts *pubOpts) error {
|
|
|
+ opts.str = stream
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func ExpectLastSequence(seq uint64) PubOpt {
|
|
|
+ return pubOptFn(func(opts *pubOpts) error {
|
|
|
+ opts.seq = seq
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func ExpectLastMsgId(id string) PubOpt {
|
|
|
+ return pubOptFn(func(opts *pubOpts) error {
|
|
|
+ opts.lid = id
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+type ackOpts struct {
|
|
|
+ ttl time.Duration
|
|
|
+ ctx context.Context
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type AckOpt interface {
|
|
|
+ configureAck(opts *ackOpts) error
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type MaxWait time.Duration
|
|
|
+
|
|
|
+func (ttl MaxWait) configureJSContext(js *jsOpts) error {
|
|
|
+ js.wait = time.Duration(ttl)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ttl MaxWait) configurePull(opts *pullOpts) error {
|
|
|
+ opts.ttl = time.Duration(ttl)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type AckWait time.Duration
|
|
|
+
|
|
|
+func (ttl AckWait) configurePublish(opts *pubOpts) error {
|
|
|
+ opts.ttl = time.Duration(ttl)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ttl AckWait) configureSubscribe(opts *subOpts) error {
|
|
|
+ opts.cfg.AckWait = time.Duration(ttl)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ttl AckWait) configureAck(opts *ackOpts) error {
|
|
|
+ opts.ttl = time.Duration(ttl)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type ContextOpt struct {
|
|
|
+ context.Context
|
|
|
+}
|
|
|
+
|
|
|
+func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
|
|
|
+ opts.ctx = ctx
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
|
|
|
+ opts.ctx = ctx
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ctx ContextOpt) configurePull(opts *pullOpts) error {
|
|
|
+ opts.ctx = ctx
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (ctx ContextOpt) configureAck(opts *ackOpts) error {
|
|
|
+ opts.ctx = ctx
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func Context(ctx context.Context) ContextOpt {
|
|
|
+ return ContextOpt{ctx}
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+type ConsumerConfig struct {
|
|
|
+ Durable string `json:"durable_name,omitempty"`
|
|
|
+ DeliverSubject string `json:"deliver_subject,omitempty"`
|
|
|
+ DeliverPolicy DeliverPolicy `json:"deliver_policy"`
|
|
|
+ OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
|
|
|
+ OptStartTime *time.Time `json:"opt_start_time,omitempty"`
|
|
|
+ AckPolicy AckPolicy `json:"ack_policy"`
|
|
|
+ AckWait time.Duration `json:"ack_wait,omitempty"`
|
|
|
+ MaxDeliver int `json:"max_deliver,omitempty"`
|
|
|
+ FilterSubject string `json:"filter_subject,omitempty"`
|
|
|
+ ReplayPolicy ReplayPolicy `json:"replay_policy"`
|
|
|
+ RateLimit uint64 `json:"rate_limit_bps,omitempty"`
|
|
|
+ SampleFrequency string `json:"sample_freq,omitempty"`
|
|
|
+ MaxWaiting int `json:"max_waiting,omitempty"`
|
|
|
+ MaxAckPending int `json:"max_ack_pending,omitempty"`
|
|
|
+ FlowControl bool `json:"flow_control,omitempty"`
|
|
|
+ Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type ConsumerInfo struct {
|
|
|
+ Stream string `json:"stream_name"`
|
|
|
+ Name string `json:"name"`
|
|
|
+ Created time.Time `json:"created"`
|
|
|
+ Config ConsumerConfig `json:"config"`
|
|
|
+ Delivered SequencePair `json:"delivered"`
|
|
|
+ AckFloor SequencePair `json:"ack_floor"`
|
|
|
+ NumAckPending int `json:"num_ack_pending"`
|
|
|
+ NumRedelivered int `json:"num_redelivered"`
|
|
|
+ NumWaiting int `json:"num_waiting"`
|
|
|
+ NumPending uint64 `json:"num_pending"`
|
|
|
+ Cluster *ClusterInfo `json:"cluster,omitempty"`
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type SequencePair struct {
|
|
|
+ Consumer uint64 `json:"consumer_seq"`
|
|
|
+ Stream uint64 `json:"stream_seq"`
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type nextRequest struct {
|
|
|
+ Expires time.Duration `json:"expires,omitempty"`
|
|
|
+ Batch int `json:"batch,omitempty"`
|
|
|
+ NoWait bool `json:"no_wait,omitempty"`
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type jsSub struct {
|
|
|
+ js *js
|
|
|
+
|
|
|
+
|
|
|
+ mu sync.RWMutex
|
|
|
+ psub *Subscription
|
|
|
+ rpre string
|
|
|
+ rr *rand.Rand
|
|
|
+ freqs []chan *Msg
|
|
|
+
|
|
|
+ consumer string
|
|
|
+ stream string
|
|
|
+ deliver string
|
|
|
+ pull bool
|
|
|
+ durable bool
|
|
|
+ attached bool
|
|
|
+
|
|
|
+
|
|
|
+ hbs bool
|
|
|
+ fc bool
|
|
|
+ cmeta string
|
|
|
+ fcs map[uint64]string
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (jsi *jsSub) newFetchReply() string {
|
|
|
+ jsi.mu.Lock()
|
|
|
+ rpre := jsi.rpre
|
|
|
+ rn := jsi.rr.Int63()
|
|
|
+ jsi.mu.Unlock()
|
|
|
+ var sb strings.Builder
|
|
|
+ sb.WriteString(rpre)
|
|
|
+ var b [aReplyTokensize]byte
|
|
|
+ for i, l := 0, rn; i < len(b); i++ {
|
|
|
+ b[i] = rdigits[l%base]
|
|
|
+ l /= base
|
|
|
+ }
|
|
|
+ sb.Write(b[:])
|
|
|
+ return sb.String()
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (jsi *jsSub) handleFetch(m *Msg) {
|
|
|
+ jsi.mu.Lock()
|
|
|
+ if len(jsi.freqs) == 0 {
|
|
|
+ nc := jsi.js.nc
|
|
|
+ sub := jsi.psub
|
|
|
+ nc.mu.Lock()
|
|
|
+ errCB := nc.Opts.AsyncErrorCB
|
|
|
+ err := fmt.Errorf("nats: fetch response delivered but requestor has gone away")
|
|
|
+ if errCB != nil {
|
|
|
+ nc.ach.push(func() { errCB(nc, sub, err) })
|
|
|
+ }
|
|
|
+ nc.mu.Unlock()
|
|
|
+ jsi.mu.Unlock()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ mch := jsi.freqs[0]
|
|
|
+ if len(jsi.freqs) > 1 {
|
|
|
+ jsi.freqs = append(jsi.freqs[:0], jsi.freqs[1:]...)
|
|
|
+ } else {
|
|
|
+ jsi.freqs = jsi.freqs[:0]
|
|
|
+ }
|
|
|
+ jsi.mu.Unlock()
|
|
|
+ mch <- m
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (jsi *jsSub) fetchNoWait(ctx context.Context, subj string, payload []byte) (*Msg, error) {
|
|
|
+ nc := jsi.js.nc
|
|
|
+ m := NewMsg(subj)
|
|
|
+ m.Reply = jsi.newFetchReply()
|
|
|
+ m.Data = payload
|
|
|
+
|
|
|
+ mch := make(chan *Msg, 1)
|
|
|
+ jsi.mu.Lock()
|
|
|
+ jsi.freqs = append(jsi.freqs, mch)
|
|
|
+ jsi.mu.Unlock()
|
|
|
+ if err := nc.PublishMsg(m); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ var ok bool
|
|
|
+ var msg *Msg
|
|
|
+
|
|
|
+ select {
|
|
|
+ case msg, ok = <-mch:
|
|
|
+ if !ok {
|
|
|
+ return nil, ErrConnectionClosed
|
|
|
+ }
|
|
|
+ case <-ctx.Done():
|
|
|
+ return nil, ctx.Err()
|
|
|
+ }
|
|
|
+
|
|
|
+ return msg, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (jsi *jsSub) unsubscribe(drainMode bool) error {
|
|
|
+ if drainMode && (jsi.durable || jsi.attached) {
|
|
|
+
|
|
|
+
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ if jsi.psub != nil {
|
|
|
+ jsi.psub.Drain()
|
|
|
+ }
|
|
|
+
|
|
|
+ js := jsi.js
|
|
|
+ return js.DeleteConsumer(jsi.stream, jsi.consumer)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type SubOpt interface {
|
|
|
+ configureSubscribe(opts *subOpts) error
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type subOptFn func(opts *subOpts) error
|
|
|
+
|
|
|
+func (opt subOptFn) configureSubscribe(opts *subOpts) error {
|
|
|
+ return opt(opts)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
|
|
|
+ if cb == nil {
|
|
|
+ return nil, ErrBadSubscription
|
|
|
+ }
|
|
|
+ return js.subscribe(subj, _EMPTY_, cb, nil, false, opts)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) {
|
|
|
+ mch := make(chan *Msg, js.nc.Opts.SubChanLen)
|
|
|
+ return js.subscribe(subj, _EMPTY_, nil, mch, true, opts)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
|
|
|
+ if cb == nil {
|
|
|
+ return nil, ErrBadSubscription
|
|
|
+ }
|
|
|
+ return js.subscribe(subj, queue, cb, nil, false, opts)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) {
|
|
|
+ mch := make(chan *Msg, js.nc.Opts.SubChanLen)
|
|
|
+ return js.subscribe(subj, queue, nil, mch, true, opts)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
|
|
|
+ return js.subscribe(subj, _EMPTY_, nil, ch, false, opts)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
|
|
|
+ return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable)))
|
|
|
+}
|
|
|
+
|
|
|
+func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts []SubOpt) (*Subscription, error) {
|
|
|
+ cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet}
|
|
|
+ o := subOpts{cfg: &cfg}
|
|
|
+ if len(opts) > 0 {
|
|
|
+ for _, opt := range opts {
|
|
|
+ if err := opt.configureSubscribe(&o); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ isPullMode := ch == nil && cb == nil
|
|
|
+ badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
|
|
|
+ hasHeartbeats := o.cfg.Heartbeat > 0
|
|
|
+ hasFC := o.cfg.FlowControl
|
|
|
+ if isPullMode && badPullAck {
|
|
|
+ return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
|
|
|
+ }
|
|
|
+
|
|
|
+ var (
|
|
|
+ err error
|
|
|
+ shouldCreate bool
|
|
|
+ ccfg *ConsumerConfig
|
|
|
+ info *ConsumerInfo
|
|
|
+ deliver string
|
|
|
+ attached bool
|
|
|
+ stream = o.stream
|
|
|
+ consumer = o.consumer
|
|
|
+ isDurable = o.cfg.Durable != _EMPTY_
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+ if o.stream == _EMPTY_ {
|
|
|
+ stream, err = js.lookupStreamBySubject(subj)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ stream = o.stream
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ consumer = o.cfg.Durable
|
|
|
+ if consumer != _EMPTY_ {
|
|
|
+
|
|
|
+ info, err = js.ConsumerInfo(stream, consumer)
|
|
|
+ if err != nil && err.Error() != "nats: consumer not found" {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if info != nil {
|
|
|
+
|
|
|
+ ccfg = &info.Config
|
|
|
+ attached = true
|
|
|
+
|
|
|
+
|
|
|
+ if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
|
|
|
+ return nil, ErrSubjectMismatch
|
|
|
+ }
|
|
|
+
|
|
|
+ if ccfg.DeliverSubject != _EMPTY_ {
|
|
|
+ deliver = ccfg.DeliverSubject
|
|
|
+ } else {
|
|
|
+ deliver = NewInbox()
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ shouldCreate = true
|
|
|
+ deliver = NewInbox()
|
|
|
+ if !isPullMode {
|
|
|
+ cfg.DeliverSubject = deliver
|
|
|
+ }
|
|
|
+
|
|
|
+ cfg.FilterSubject = subj
|
|
|
+ }
|
|
|
+
|
|
|
+ var sub *Subscription
|
|
|
+
|
|
|
+
|
|
|
+ if cb != nil && !o.mack {
|
|
|
+ ocb := cb
|
|
|
+ cb = func(m *Msg) { ocb(m); m.Ack() }
|
|
|
+ }
|
|
|
+
|
|
|
+ if isPullMode {
|
|
|
+ sub, err = js.pullSubscribe(subj)
|
|
|
+ } else {
|
|
|
+ sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ if !isPullMode && cb != nil && hasFC {
|
|
|
+ sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if shouldCreate {
|
|
|
+
|
|
|
+ if cfg.AckPolicy == ackPolicyNotSet {
|
|
|
+ cfg.AckPolicy = AckExplicitPolicy
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy {
|
|
|
+ maxMsgs, _, _ := sub.PendingLimits()
|
|
|
+ cfg.MaxAckPending = maxMsgs
|
|
|
+ }
|
|
|
+
|
|
|
+ req := &createConsumerRequest{
|
|
|
+ Stream: stream,
|
|
|
+ Config: &cfg,
|
|
|
+ }
|
|
|
+
|
|
|
+ j, err := json.Marshal(req)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ var ccSubj string
|
|
|
+ if isDurable {
|
|
|
+ ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
|
|
|
+ } else {
|
|
|
+ ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
|
|
|
+ }
|
|
|
+
|
|
|
+ resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
|
|
|
+ if err != nil {
|
|
|
+ sub.Drain()
|
|
|
+ if err == ErrNoResponders {
|
|
|
+ err = ErrJetStreamNotEnabled
|
|
|
+ }
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ var cinfo consumerResponse
|
|
|
+ err = json.Unmarshal(resp.Data, &cinfo)
|
|
|
+ if err != nil {
|
|
|
+ sub.Drain()
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ info = cinfo.ConsumerInfo
|
|
|
+ if cinfo.Error != nil {
|
|
|
+
|
|
|
+
|
|
|
+ sub.Drain()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ if consumer != _EMPTY_ && (strings.Contains(cinfo.Error.Description, `consumer already exists`) || strings.Contains(cinfo.Error.Description, `consumer name already in use`)) {
|
|
|
+ info, err = js.ConsumerInfo(stream, consumer)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ ccfg = &info.Config
|
|
|
+
|
|
|
+
|
|
|
+ if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
|
|
|
+ return nil, ErrSubjectMismatch
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if ccfg.DeliverSubject != _EMPTY_ {
|
|
|
+
|
|
|
+ if ch != nil {
|
|
|
+ ch = make(chan *Msg, cap(ch))
|
|
|
+ }
|
|
|
+ sub, err = js.nc.subscribe(ccfg.DeliverSubject, queue, cb, ch, isSync,
|
|
|
+ &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ attached = true
|
|
|
+ } else {
|
|
|
+ return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ stream = info.Stream
|
|
|
+ consumer = info.Name
|
|
|
+ deliver = info.Config.DeliverSubject
|
|
|
+ }
|
|
|
+ sub.mu.Lock()
|
|
|
+ sub.jsi.stream = stream
|
|
|
+ sub.jsi.consumer = consumer
|
|
|
+ sub.jsi.durable = isDurable
|
|
|
+ sub.jsi.attached = attached
|
|
|
+ sub.jsi.deliver = deliver
|
|
|
+ sub.mu.Unlock()
|
|
|
+
|
|
|
+ return sub, nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+type ErrConsumerSequenceMismatch struct {
|
|
|
+
|
|
|
+
|
|
|
+ StreamResumeSequence uint64
|
|
|
+
|
|
|
+
|
|
|
+ ConsumerSequence uint64
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ LastConsumerSequence uint64
|
|
|
+}
|
|
|
+
|
|
|
+func (ecs *ErrConsumerSequenceMismatch) Error() string {
|
|
|
+ return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
|
|
|
+ ecs.ConsumerSequence,
|
|
|
+ ecs.LastConsumerSequence-ecs.ConsumerSequence,
|
|
|
+ ecs.StreamResumeSequence,
|
|
|
+ )
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func isControlMessage(msg *Msg) bool {
|
|
|
+ return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
|
|
|
+}
|
|
|
+
|
|
|
+func (jsi *jsSub) trackSequences(reply string) {
|
|
|
+ jsi.mu.Lock()
|
|
|
+ jsi.cmeta = reply
|
|
|
+ jsi.mu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (sub *Subscription) checkForFlowControlResponse(delivered uint64) {
|
|
|
+ jsi, nc := sub.jsi, sub.conn
|
|
|
+ if jsi == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ jsi.mu.Lock()
|
|
|
+ defer jsi.mu.Unlock()
|
|
|
+
|
|
|
+ if len(jsi.fcs) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ if reply := jsi.fcs[delivered]; reply != _EMPTY_ {
|
|
|
+ delete(jsi.fcs, delivered)
|
|
|
+ nc.Publish(reply, nil)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) {
|
|
|
+ jsi.mu.Lock()
|
|
|
+ if jsi.fcs == nil {
|
|
|
+ jsi.fcs = make(map[uint64]string)
|
|
|
+ }
|
|
|
+ jsi.fcs[dfuture] = reply
|
|
|
+ jsi.mu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
|
|
|
+ nc.mu.Lock()
|
|
|
+ errCB := nc.Opts.AsyncErrorCB
|
|
|
+ if errCB != nil {
|
|
|
+ nc.ach.push(func() { errCB(nc, sub, err) })
|
|
|
+ }
|
|
|
+ nc.mu.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func (nc *Conn) processSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
|
|
|
+
|
|
|
+ jsi.mu.RLock()
|
|
|
+ ctrl := jsi.cmeta
|
|
|
+ jsi.mu.RUnlock()
|
|
|
+
|
|
|
+ if ctrl == _EMPTY_ {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ tokens, err := getMetadataFields(ctrl)
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ var ldseq string
|
|
|
+ dseq := tokens[6]
|
|
|
+ hdr := msg.Header[lastConsumerSeqHdr]
|
|
|
+ if len(hdr) == 1 {
|
|
|
+ ldseq = hdr[0]
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ if ldseq != dseq {
|
|
|
+
|
|
|
+
|
|
|
+ sseq := parseNum(tokens[5])
|
|
|
+ ecs := &ErrConsumerSequenceMismatch{
|
|
|
+ StreamResumeSequence: uint64(sseq),
|
|
|
+ ConsumerSequence: uint64(parseNum(dseq)),
|
|
|
+ LastConsumerSequence: uint64(parseNum(ldseq)),
|
|
|
+ }
|
|
|
+ nc.handleConsumerSequenceMismatch(s, ecs)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+type streamRequest struct {
|
|
|
+ Subject string `json:"subject,omitempty"`
|
|
|
+}
|
|
|
+
|
|
|
+type streamNamesResponse struct {
|
|
|
+ apiResponse
|
|
|
+ apiPaged
|
|
|
+ Streams []string `json:"streams"`
|
|
|
+}
|
|
|
+
|
|
|
+func (js *js) lookupStreamBySubject(subj string) (string, error) {
|
|
|
+ var slr streamNamesResponse
|
|
|
+ req := &streamRequest{subj}
|
|
|
+ j, err := json.Marshal(req)
|
|
|
+ if err != nil {
|
|
|
+ return _EMPTY_, err
|
|
|
+ }
|
|
|
+ resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.opts.wait)
|
|
|
+ if err != nil {
|
|
|
+ if err == ErrNoResponders {
|
|
|
+ err = ErrJetStreamNotEnabled
|
|
|
+ }
|
|
|
+ return _EMPTY_, err
|
|
|
+ }
|
|
|
+ if err := json.Unmarshal(resp.Data, &slr); err != nil {
|
|
|
+ return _EMPTY_, err
|
|
|
+ }
|
|
|
+ if slr.Error != nil || len(slr.Streams) != 1 {
|
|
|
+ return _EMPTY_, ErrNoMatchingStream
|
|
|
+ }
|
|
|
+ return slr.Streams[0], nil
|
|
|
+}
|
|
|
+
|
|
|
+type subOpts struct {
|
|
|
+
|
|
|
+ stream, consumer string
|
|
|
+
|
|
|
+ mack bool
|
|
|
+
|
|
|
+ cfg *ConsumerConfig
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func ManualAck() SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.mack = true
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func Durable(name string) SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ if opts.cfg.Durable != "" {
|
|
|
+ return fmt.Errorf("nats: option Durable set more than once")
|
|
|
+ }
|
|
|
+ if strings.Contains(name, ".") {
|
|
|
+ return ErrInvalidDurableName
|
|
|
+ }
|
|
|
+
|
|
|
+ opts.cfg.Durable = name
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func DeliverAll() SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.DeliverPolicy = DeliverAllPolicy
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func DeliverLast() SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.DeliverPolicy = DeliverLastPolicy
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func DeliverNew() SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.DeliverPolicy = DeliverNewPolicy
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func StartSequence(seq uint64) SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.DeliverPolicy = DeliverByStartSequencePolicy
|
|
|
+ opts.cfg.OptStartSeq = seq
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func StartTime(startTime time.Time) SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.DeliverPolicy = DeliverByStartTimePolicy
|
|
|
+ opts.cfg.OptStartTime = &startTime
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func AckNone() SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.AckPolicy = AckNonePolicy
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func AckAll() SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.AckPolicy = AckAllPolicy
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func AckExplicit() SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.AckPolicy = AckExplicitPolicy
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func MaxDeliver(n int) SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.MaxDeliver = n
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func MaxAckPending(n int) SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.MaxAckPending = n
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func ReplayOriginal() SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.ReplayPolicy = ReplayOriginalPolicy
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func RateLimit(n uint64) SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.RateLimit = n
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func BindStream(name string) SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.stream = name
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func EnableFlowControl() SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.FlowControl = true
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func IdleHeartbeat(duration time.Duration) SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.Heartbeat = duration
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
|
|
|
+ sub.mu.Lock()
|
|
|
+
|
|
|
+ if sub.jsi.consumer == _EMPTY_ {
|
|
|
+ sub.mu.Unlock()
|
|
|
+ return nil, ErrTypeSubscription
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ js := sub.jsi.js
|
|
|
+ stream, consumer := sub.jsi.stream, sub.jsi.consumer
|
|
|
+ sub.mu.Unlock()
|
|
|
+
|
|
|
+ return js.getConsumerInfo(stream, consumer)
|
|
|
+}
|
|
|
+
|
|
|
+type pullOpts struct {
|
|
|
+ ttl time.Duration
|
|
|
+ ctx context.Context
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type PullOpt interface {
|
|
|
+ configurePull(opts *pullOpts) error
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func PullMaxWaiting(n int) SubOpt {
|
|
|
+ return subOptFn(func(opts *subOpts) error {
|
|
|
+ opts.cfg.MaxWaiting = n
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+var errNoMessages = errors.New("nats: no messages")
|
|
|
+
|
|
|
+
|
|
|
+func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
|
|
|
+ if sub == nil {
|
|
|
+ return nil, ErrBadSubscription
|
|
|
+ }
|
|
|
+
|
|
|
+ var o pullOpts
|
|
|
+ for _, opt := range opts {
|
|
|
+ if err := opt.configurePull(&o); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if o.ctx != nil && o.ttl != 0 {
|
|
|
+ return nil, ErrContextAndTimeout
|
|
|
+ }
|
|
|
+
|
|
|
+ sub.mu.Lock()
|
|
|
+ jsi := sub.jsi
|
|
|
+ if jsi == nil || sub.typ != PullSubscription {
|
|
|
+ sub.mu.Unlock()
|
|
|
+ return nil, ErrTypeSubscription
|
|
|
+ }
|
|
|
+
|
|
|
+ nc, _ := sub.conn, sub.Subject
|
|
|
+ stream, consumer := sub.jsi.stream, sub.jsi.consumer
|
|
|
+ js := sub.jsi.js
|
|
|
+
|
|
|
+ ttl := o.ttl
|
|
|
+ if ttl == 0 {
|
|
|
+ ttl = js.opts.wait
|
|
|
+ }
|
|
|
+ sub.mu.Unlock()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ var (
|
|
|
+ ctx = o.ctx
|
|
|
+ err error
|
|
|
+ cancel context.CancelFunc
|
|
|
+ )
|
|
|
+ if o.ctx == nil {
|
|
|
+ ctx, cancel = context.WithTimeout(context.Background(), ttl)
|
|
|
+ defer cancel()
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ if ctx.Err() == context.Canceled {
|
|
|
+ err = ctx.Err()
|
|
|
+ } else {
|
|
|
+ err = ErrTimeout
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ checkMsg := func(msg *Msg) error {
|
|
|
+ if len(msg.Data) == 0 {
|
|
|
+ switch msg.Header.Get(statusHdr) {
|
|
|
+ case noResponders:
|
|
|
+ return ErrNoResponders
|
|
|
+ case noMessages:
|
|
|
+ return errNoMessages
|
|
|
+ case "400", "408", "409":
|
|
|
+ return fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ checkCtxErr := func(err error) error {
|
|
|
+ if o.ctx == nil && err == context.DeadlineExceeded {
|
|
|
+ return ErrTimeout
|
|
|
+ }
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ var (
|
|
|
+ gotNoMessages bool
|
|
|
+ nr = &nextRequest{Batch: batch, NoWait: true}
|
|
|
+ req, _ = json.Marshal(nr)
|
|
|
+ reqNext = js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer))
|
|
|
+ expires = ttl - 10*time.Millisecond
|
|
|
+ msgs = make([]*Msg, 0)
|
|
|
+ )
|
|
|
+
|
|
|
+ if batch == 1 {
|
|
|
+
|
|
|
+
|
|
|
+ resp, err := jsi.fetchNoWait(ctx, reqNext, req)
|
|
|
+ if err != nil {
|
|
|
+ return nil, checkCtxErr(err)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ err = checkMsg(resp)
|
|
|
+ if err != nil {
|
|
|
+ if err == errNoMessages {
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ nr.NoWait = false
|
|
|
+ nr.Expires = expires
|
|
|
+ req, _ = json.Marshal(nr)
|
|
|
+ resp, err = nc.oldRequestWithContext(ctx, reqNext, nil, req)
|
|
|
+ if err != nil {
|
|
|
+ return nil, checkCtxErr(err)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ err = checkMsg(resp)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return []*Msg{resp}, nil
|
|
|
+ } else {
|
|
|
+
|
|
|
+ return nil, checkCtxErr(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return []*Msg{resp}, nil
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ inbox := NewInbox()
|
|
|
+
|
|
|
+ mch := make(chan *Msg, batch)
|
|
|
+ s, err := nc.subscribe(inbox, _EMPTY_, nil, mch, true, nil)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ defer s.Unsubscribe()
|
|
|
+
|
|
|
+
|
|
|
+ err = nc.publish(reqNext, inbox, nil, req)
|
|
|
+ if err != nil {
|
|
|
+ s.Unsubscribe()
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ var (
|
|
|
+ firstMsg *Msg
|
|
|
+ ok bool
|
|
|
+ )
|
|
|
+ select {
|
|
|
+ case firstMsg, ok = <-mch:
|
|
|
+ if !ok {
|
|
|
+ err = s.getNextMsgErr()
|
|
|
+ } else {
|
|
|
+ err = s.processNextMsgDelivered(firstMsg)
|
|
|
+ if err == nil {
|
|
|
+ err = checkMsg(firstMsg)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ case <-ctx.Done():
|
|
|
+ err = checkCtxErr(ctx.Err())
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ if err == errNoMessages {
|
|
|
+ gotNoMessages = true
|
|
|
+ } else if err != nil {
|
|
|
+
|
|
|
+
|
|
|
+ s.Unsubscribe()
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ if gotNoMessages {
|
|
|
+
|
|
|
+
|
|
|
+ nr.NoWait = false
|
|
|
+ nr.Expires = expires
|
|
|
+ req, _ = json.Marshal(nr)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ err = s.AutoUnsubscribe(batch + 1)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ err = nc.publish(reqNext, inbox, nil, req)
|
|
|
+ if err != nil {
|
|
|
+ s.Unsubscribe()
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ select {
|
|
|
+ case firstMsg, ok = <-mch:
|
|
|
+ if !ok {
|
|
|
+ err = s.getNextMsgErr()
|
|
|
+ } else {
|
|
|
+ err = s.processNextMsgDelivered(firstMsg)
|
|
|
+ if err == nil {
|
|
|
+ err = checkMsg(firstMsg)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ case <-ctx.Done():
|
|
|
+ err = checkCtxErr(ctx.Err())
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+ s.Unsubscribe()
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ err = checkMsg(firstMsg)
|
|
|
+ if err != nil {
|
|
|
+ s.Unsubscribe()
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+
|
|
|
+
|
|
|
+ err = s.AutoUnsubscribe(batch)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ msgs = append(msgs, firstMsg)
|
|
|
+ for {
|
|
|
+ var (
|
|
|
+ msg *Msg
|
|
|
+ ok bool
|
|
|
+ )
|
|
|
+ select {
|
|
|
+ case msg, ok = <-mch:
|
|
|
+ if !ok {
|
|
|
+ err = s.getNextMsgErr()
|
|
|
+ } else {
|
|
|
+ err = s.processNextMsgDelivered(msg)
|
|
|
+ if err == nil {
|
|
|
+ err = checkMsg(msg)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ case <-ctx.Done():
|
|
|
+ return msgs, checkCtxErr(err)
|
|
|
+ }
|
|
|
+ if err != nil {
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if msg != nil {
|
|
|
+ msgs = append(msgs, msg)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(msgs) == batch {
|
|
|
+
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return msgs, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
|
|
|
+ ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait)
|
|
|
+ defer cancel()
|
|
|
+ return js.getConsumerInfoContext(ctx, stream, consumer)
|
|
|
+}
|
|
|
+
|
|
|
+func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) {
|
|
|
+ ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
|
|
|
+ resp, err := js.nc.RequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
|
|
|
+ if err != nil {
|
|
|
+ if err == ErrNoResponders {
|
|
|
+ err = ErrJetStreamNotEnabled
|
|
|
+ }
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ var info consumerResponse
|
|
|
+ if err := json.Unmarshal(resp.Data, &info); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if info.Error != nil {
|
|
|
+ return nil, fmt.Errorf("nats: %s", info.Error.Description)
|
|
|
+ }
|
|
|
+ return info.ConsumerInfo, nil
|
|
|
+}
|
|
|
+
|
|
|
+func (m *Msg) checkReply() (*js, *jsSub, error) {
|
|
|
+ if m == nil || m.Sub == nil {
|
|
|
+ return nil, nil, ErrMsgNotBound
|
|
|
+ }
|
|
|
+ if m.Reply == "" {
|
|
|
+ return nil, nil, ErrMsgNoReply
|
|
|
+ }
|
|
|
+ sub := m.Sub
|
|
|
+ sub.mu.Lock()
|
|
|
+ if sub.jsi == nil {
|
|
|
+ sub.mu.Unlock()
|
|
|
+
|
|
|
+
|
|
|
+ return nil, nil, nil
|
|
|
+ }
|
|
|
+ js := sub.jsi.js
|
|
|
+ jsi := sub.jsi
|
|
|
+ sub.mu.Unlock()
|
|
|
+
|
|
|
+ return js, jsi, nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
|
|
|
+ var o ackOpts
|
|
|
+ for _, opt := range opts {
|
|
|
+ if err := opt.configureAck(&o); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ js, _, err := m.checkReply()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if atomic.LoadUint32(&m.ackd) == 1 {
|
|
|
+ return ErrInvalidJSAck
|
|
|
+ }
|
|
|
+
|
|
|
+ m.Sub.mu.Lock()
|
|
|
+ nc := m.Sub.conn
|
|
|
+ m.Sub.mu.Unlock()
|
|
|
+
|
|
|
+ usesCtx := o.ctx != nil
|
|
|
+ usesWait := o.ttl > 0
|
|
|
+ sync = sync || usesCtx || usesWait
|
|
|
+ ctx := o.ctx
|
|
|
+ wait := defaultRequestWait
|
|
|
+ if usesWait {
|
|
|
+ wait = o.ttl
|
|
|
+ } else if js != nil {
|
|
|
+ wait = js.opts.wait
|
|
|
+ }
|
|
|
+
|
|
|
+ if sync {
|
|
|
+ if usesCtx {
|
|
|
+ _, err = nc.RequestWithContext(ctx, m.Reply, ackType)
|
|
|
+ } else {
|
|
|
+ _, err = nc.Request(m.Reply, ackType, wait)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ err = nc.Publish(m.Reply, ackType)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ if err == nil && !bytes.Equal(ackType, ackProgress) {
|
|
|
+ atomic.StoreUint32(&m.ackd, 1)
|
|
|
+ }
|
|
|
+
|
|
|
+ return err
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (m *Msg) Ack(opts ...AckOpt) error {
|
|
|
+ return m.ackReply(ackAck, false, opts...)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (m *Msg) AckSync(opts ...AckOpt) error {
|
|
|
+ return m.ackReply(ackAck, true, opts...)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (m *Msg) Nak(opts ...AckOpt) error {
|
|
|
+ return m.ackReply(ackNak, false, opts...)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (m *Msg) Term(opts ...AckOpt) error {
|
|
|
+ return m.ackReply(ackTerm, false, opts...)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (m *Msg) InProgress(opts ...AckOpt) error {
|
|
|
+ return m.ackReply(ackProgress, false, opts...)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type MsgMetadata struct {
|
|
|
+ Sequence SequencePair
|
|
|
+ NumDelivered uint64
|
|
|
+ NumPending uint64
|
|
|
+ Timestamp time.Time
|
|
|
+ Stream string
|
|
|
+ Consumer string
|
|
|
+}
|
|
|
+
|
|
|
+func getMetadataFields(subject string) ([]string, error) {
|
|
|
+ const expectedTokens = 9
|
|
|
+ const btsep = '.'
|
|
|
+
|
|
|
+ tsa := [expectedTokens]string{}
|
|
|
+ start, tokens := 0, tsa[:0]
|
|
|
+ for i := 0; i < len(subject); i++ {
|
|
|
+ if subject[i] == btsep {
|
|
|
+ tokens = append(tokens, subject[start:i])
|
|
|
+ start = i + 1
|
|
|
+ }
|
|
|
+ }
|
|
|
+ tokens = append(tokens, subject[start:])
|
|
|
+ if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
|
|
|
+ return nil, ErrNotJSMessage
|
|
|
+ }
|
|
|
+ return tokens, nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (m *Msg) Metadata() (*MsgMetadata, error) {
|
|
|
+ if _, _, err := m.checkReply(); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ tokens, err := getMetadataFields(m.Reply)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ meta := &MsgMetadata{
|
|
|
+ NumDelivered: uint64(parseNum(tokens[4])),
|
|
|
+ NumPending: uint64(parseNum(tokens[8])),
|
|
|
+ Timestamp: time.Unix(0, parseNum(tokens[7])),
|
|
|
+ Stream: tokens[2],
|
|
|
+ Consumer: tokens[3],
|
|
|
+ }
|
|
|
+ meta.Sequence.Stream = uint64(parseNum(tokens[5]))
|
|
|
+ meta.Sequence.Consumer = uint64(parseNum(tokens[6]))
|
|
|
+ return meta, nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+func parseNum(d string) (n int64) {
|
|
|
+ if len(d) == 0 {
|
|
|
+ return -1
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ const (
|
|
|
+ asciiZero = 48
|
|
|
+ asciiNine = 57
|
|
|
+ )
|
|
|
+
|
|
|
+ for _, dec := range d {
|
|
|
+ if dec < asciiZero || dec > asciiNine {
|
|
|
+ return -1
|
|
|
+ }
|
|
|
+ n = n*10 + (int64(dec) - asciiZero)
|
|
|
+ }
|
|
|
+ return n
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type AckPolicy int
|
|
|
+
|
|
|
+const (
|
|
|
+
|
|
|
+ AckNonePolicy AckPolicy = iota
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ AckAllPolicy
|
|
|
+
|
|
|
+
|
|
|
+ AckExplicitPolicy
|
|
|
+
|
|
|
+
|
|
|
+ ackPolicyNotSet = 99
|
|
|
+)
|
|
|
+
|
|
|
+func jsonString(s string) string {
|
|
|
+ return "\"" + s + "\""
|
|
|
+}
|
|
|
+
|
|
|
+func (p *AckPolicy) UnmarshalJSON(data []byte) error {
|
|
|
+ switch string(data) {
|
|
|
+ case jsonString("none"):
|
|
|
+ *p = AckNonePolicy
|
|
|
+ case jsonString("all"):
|
|
|
+ *p = AckAllPolicy
|
|
|
+ case jsonString("explicit"):
|
|
|
+ *p = AckExplicitPolicy
|
|
|
+ default:
|
|
|
+ return fmt.Errorf("nats: can not unmarshal %q", data)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (p AckPolicy) MarshalJSON() ([]byte, error) {
|
|
|
+ switch p {
|
|
|
+ case AckNonePolicy:
|
|
|
+ return json.Marshal("none")
|
|
|
+ case AckAllPolicy:
|
|
|
+ return json.Marshal("all")
|
|
|
+ case AckExplicitPolicy:
|
|
|
+ return json.Marshal("explicit")
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("nats: unknown acknowlegement policy %v", p)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (p AckPolicy) String() string {
|
|
|
+ switch p {
|
|
|
+ case AckNonePolicy:
|
|
|
+ return "AckNone"
|
|
|
+ case AckAllPolicy:
|
|
|
+ return "AckAll"
|
|
|
+ case AckExplicitPolicy:
|
|
|
+ return "AckExplicit"
|
|
|
+ case ackPolicyNotSet:
|
|
|
+ return "Not Initialized"
|
|
|
+ default:
|
|
|
+ return "Unknown AckPolicy"
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type ReplayPolicy int
|
|
|
+
|
|
|
+const (
|
|
|
+
|
|
|
+ ReplayInstantPolicy ReplayPolicy = iota
|
|
|
+
|
|
|
+
|
|
|
+ ReplayOriginalPolicy
|
|
|
+)
|
|
|
+
|
|
|
+func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
|
|
|
+ switch string(data) {
|
|
|
+ case jsonString("instant"):
|
|
|
+ *p = ReplayInstantPolicy
|
|
|
+ case jsonString("original"):
|
|
|
+ *p = ReplayOriginalPolicy
|
|
|
+ default:
|
|
|
+ return fmt.Errorf("nats: can not unmarshal %q", data)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (p ReplayPolicy) MarshalJSON() ([]byte, error) {
|
|
|
+ switch p {
|
|
|
+ case ReplayOriginalPolicy:
|
|
|
+ return json.Marshal("original")
|
|
|
+ case ReplayInstantPolicy:
|
|
|
+ return json.Marshal("instant")
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("nats: unknown replay policy %v", p)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+var (
|
|
|
+ ackAck = []byte("+ACK")
|
|
|
+ ackNak = []byte("-NAK")
|
|
|
+ ackProgress = []byte("+WPI")
|
|
|
+ ackTerm = []byte("+TERM")
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+type DeliverPolicy int
|
|
|
+
|
|
|
+const (
|
|
|
+
|
|
|
+
|
|
|
+ DeliverAllPolicy DeliverPolicy = iota
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ DeliverLastPolicy
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ DeliverNewPolicy
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ DeliverByStartSequencePolicy
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ DeliverByStartTimePolicy
|
|
|
+)
|
|
|
+
|
|
|
+func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
|
|
|
+ switch string(data) {
|
|
|
+ case jsonString("all"), jsonString("undefined"):
|
|
|
+ *p = DeliverAllPolicy
|
|
|
+ case jsonString("last"):
|
|
|
+ *p = DeliverLastPolicy
|
|
|
+ case jsonString("new"):
|
|
|
+ *p = DeliverNewPolicy
|
|
|
+ case jsonString("by_start_sequence"):
|
|
|
+ *p = DeliverByStartSequencePolicy
|
|
|
+ case jsonString("by_start_time"):
|
|
|
+ *p = DeliverByStartTimePolicy
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
|
|
|
+ switch p {
|
|
|
+ case DeliverAllPolicy:
|
|
|
+ return json.Marshal("all")
|
|
|
+ case DeliverLastPolicy:
|
|
|
+ return json.Marshal("last")
|
|
|
+ case DeliverNewPolicy:
|
|
|
+ return json.Marshal("new")
|
|
|
+ case DeliverByStartSequencePolicy:
|
|
|
+ return json.Marshal("by_start_sequence")
|
|
|
+ case DeliverByStartTimePolicy:
|
|
|
+ return json.Marshal("by_start_time")
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("nats: unknown deliver policy %v", p)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type RetentionPolicy int
|
|
|
+
|
|
|
+const (
|
|
|
+
|
|
|
+
|
|
|
+ LimitsPolicy RetentionPolicy = iota
|
|
|
+
|
|
|
+ InterestPolicy
|
|
|
+
|
|
|
+ WorkQueuePolicy
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+type DiscardPolicy int
|
|
|
+
|
|
|
+const (
|
|
|
+
|
|
|
+
|
|
|
+ DiscardOld DiscardPolicy = iota
|
|
|
+
|
|
|
+ DiscardNew
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ limitsPolicyString = "limits"
|
|
|
+ interestPolicyString = "interest"
|
|
|
+ workQueuePolicyString = "workqueue"
|
|
|
+)
|
|
|
+
|
|
|
+func (rp RetentionPolicy) String() string {
|
|
|
+ switch rp {
|
|
|
+ case LimitsPolicy:
|
|
|
+ return "Limits"
|
|
|
+ case InterestPolicy:
|
|
|
+ return "Interest"
|
|
|
+ case WorkQueuePolicy:
|
|
|
+ return "WorkQueue"
|
|
|
+ default:
|
|
|
+ return "Unknown Retention Policy"
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
|
|
|
+ switch rp {
|
|
|
+ case LimitsPolicy:
|
|
|
+ return json.Marshal(limitsPolicyString)
|
|
|
+ case InterestPolicy:
|
|
|
+ return json.Marshal(interestPolicyString)
|
|
|
+ case WorkQueuePolicy:
|
|
|
+ return json.Marshal(workQueuePolicyString)
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("nats: can not marshal %v", rp)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
|
|
|
+ switch string(data) {
|
|
|
+ case jsonString(limitsPolicyString):
|
|
|
+ *rp = LimitsPolicy
|
|
|
+ case jsonString(interestPolicyString):
|
|
|
+ *rp = InterestPolicy
|
|
|
+ case jsonString(workQueuePolicyString):
|
|
|
+ *rp = WorkQueuePolicy
|
|
|
+ default:
|
|
|
+ return fmt.Errorf("nats: can not unmarshal %q", data)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (dp DiscardPolicy) String() string {
|
|
|
+ switch dp {
|
|
|
+ case DiscardOld:
|
|
|
+ return "DiscardOld"
|
|
|
+ case DiscardNew:
|
|
|
+ return "DiscardNew"
|
|
|
+ default:
|
|
|
+ return "Unknown Discard Policy"
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (dp DiscardPolicy) MarshalJSON() ([]byte, error) {
|
|
|
+ switch dp {
|
|
|
+ case DiscardOld:
|
|
|
+ return json.Marshal("old")
|
|
|
+ case DiscardNew:
|
|
|
+ return json.Marshal("new")
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("nats: can not marshal %v", dp)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
|
|
|
+ switch strings.ToLower(string(data)) {
|
|
|
+ case jsonString("old"):
|
|
|
+ *dp = DiscardOld
|
|
|
+ case jsonString("new"):
|
|
|
+ *dp = DiscardNew
|
|
|
+ default:
|
|
|
+ return fmt.Errorf("nats: can not unmarshal %q", data)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+type StorageType int
|
|
|
+
|
|
|
+const (
|
|
|
+
|
|
|
+ FileStorage StorageType = iota
|
|
|
+
|
|
|
+ MemoryStorage
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ memoryStorageString = "memory"
|
|
|
+ fileStorageString = "file"
|
|
|
+)
|
|
|
+
|
|
|
+func (st StorageType) String() string {
|
|
|
+ switch st {
|
|
|
+ case MemoryStorage:
|
|
|
+ return strings.Title(memoryStorageString)
|
|
|
+ case FileStorage:
|
|
|
+ return strings.Title(fileStorageString)
|
|
|
+ default:
|
|
|
+ return "Unknown Storage Type"
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (st StorageType) MarshalJSON() ([]byte, error) {
|
|
|
+ switch st {
|
|
|
+ case MemoryStorage:
|
|
|
+ return json.Marshal(memoryStorageString)
|
|
|
+ case FileStorage:
|
|
|
+ return json.Marshal(fileStorageString)
|
|
|
+ default:
|
|
|
+ return nil, fmt.Errorf("nats: can not marshal %v", st)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (st *StorageType) UnmarshalJSON(data []byte) error {
|
|
|
+ switch string(data) {
|
|
|
+ case jsonString(memoryStorageString):
|
|
|
+ *st = MemoryStorage
|
|
|
+ case jsonString(fileStorageString):
|
|
|
+ *st = FileStorage
|
|
|
+ default:
|
|
|
+ return fmt.Errorf("nats: can not unmarshal %q", data)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|