12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345 |
- package nats
- import (
- "bytes"
- "context"
- "crypto/sha256"
- "encoding/json"
- "errors"
- "fmt"
- "math/rand"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/nats-io/nuid"
- )
- const (
-
- defaultAPIPrefix = "$JS.API."
-
- apiAccountInfo = "INFO"
-
- apiConsumerCreateT = "CONSUMER.CREATE.%s"
-
- apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"
-
- apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
-
- apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
-
- apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
-
- apiConsumerListT = "CONSUMER.LIST.%s"
-
- apiConsumerNamesT = "CONSUMER.NAMES.%s"
-
- apiStreams = "STREAM.NAMES"
-
- apiStreamCreateT = "STREAM.CREATE.%s"
-
- apiStreamInfoT = "STREAM.INFO.%s"
-
- apiStreamUpdateT = "STREAM.UPDATE.%s"
-
- apiStreamDeleteT = "STREAM.DELETE.%s"
-
- apiStreamPurgeT = "STREAM.PURGE.%s"
-
- apiStreamList = "STREAM.LIST"
-
- apiMsgGetT = "STREAM.MSG.GET.%s"
-
- apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
- )
- type JetStream interface {
-
- Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
-
- PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)
-
-
- PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)
-
-
- PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
-
- PublishAsyncPending() int
-
- PublishAsyncComplete() <-chan struct{}
-
- Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
-
- SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
-
- ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
-
- QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
-
- QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
-
- PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
- }
- type JetStreamContext interface {
- JetStream
- JetStreamManager
- }
- type js struct {
- nc *Conn
- opts *jsOpts
-
- mu sync.RWMutex
- rpre string
- rsub *Subscription
- pafs map[string]*pubAckFuture
- stc chan struct{}
- dch chan struct{}
- rr *rand.Rand
- }
- type jsOpts struct {
- ctx context.Context
-
- pre string
-
- wait time.Duration
-
- aecb MsgErrHandler
-
- maxap int
- }
- const (
- defaultRequestWait = 5 * time.Second
- defaultAccountCheck = 20 * time.Second
- )
- func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
- js := &js{
- nc: nc,
- opts: &jsOpts{
- pre: defaultAPIPrefix,
- wait: defaultRequestWait,
- },
- }
- for _, opt := range opts {
- if err := opt.configureJSContext(js.opts); err != nil {
- return nil, err
- }
- }
-
-
- nc.mu.Lock()
- now := time.Now()
- checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
- if checkAccount {
- nc.jsLastCheck = now
- }
- nc.mu.Unlock()
- if checkAccount {
- if _, err := js.AccountInfo(); err != nil {
- if err == ErrNoResponders {
- err = ErrJetStreamNotEnabled
- }
- return nil, err
- }
- }
- return js, nil
- }
- type JSOpt interface {
- configureJSContext(opts *jsOpts) error
- }
- type jsOptFn func(opts *jsOpts) error
- func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
- return opt(opts)
- }
- func APIPrefix(pre string) JSOpt {
- return jsOptFn(func(js *jsOpts) error {
- js.pre = pre
- if !strings.HasSuffix(js.pre, ".") {
- js.pre = js.pre + "."
- }
- return nil
- })
- }
- func (js *js) apiSubj(subj string) string {
- if js.opts.pre == _EMPTY_ {
- return subj
- }
- var b strings.Builder
- b.WriteString(js.opts.pre)
- b.WriteString(subj)
- return b.String()
- }
- type PubOpt interface {
- configurePublish(opts *pubOpts) error
- }
- type pubOptFn func(opts *pubOpts) error
- func (opt pubOptFn) configurePublish(opts *pubOpts) error {
- return opt(opts)
- }
- type pubOpts struct {
- ctx context.Context
- ttl time.Duration
- id string
- lid string
- str string
- seq uint64
- }
- type pubAckResponse struct {
- apiResponse
- *PubAck
- }
- type PubAck struct {
- Stream string `json:"stream"`
- Sequence uint64 `json:"seq"`
- Duplicate bool `json:"duplicate,omitempty"`
- }
- const (
- MsgIdHdr = "Nats-Msg-Id"
- ExpectedStreamHdr = "Nats-Expected-Stream"
- ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
- ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
- )
- func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
- var o pubOpts
- if len(opts) > 0 {
- if m.Header == nil {
- m.Header = Header{}
- }
- for _, opt := range opts {
- if err := opt.configurePublish(&o); err != nil {
- return nil, err
- }
- }
- }
-
- if o.ctx != nil && o.ttl != 0 {
- return nil, ErrContextAndTimeout
- }
- if o.ttl == 0 && o.ctx == nil {
- o.ttl = js.opts.wait
- }
- if o.id != _EMPTY_ {
- m.Header.Set(MsgIdHdr, o.id)
- }
- if o.lid != _EMPTY_ {
- m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
- }
- if o.str != _EMPTY_ {
- m.Header.Set(ExpectedStreamHdr, o.str)
- }
- if o.seq > 0 {
- m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
- }
- var resp *Msg
- var err error
- if o.ttl > 0 {
- resp, err = js.nc.RequestMsg(m, time.Duration(o.ttl))
- } else {
- resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
- }
- if err != nil {
- if err == ErrNoResponders {
- err = ErrNoStreamResponse
- }
- return nil, err
- }
- var pa pubAckResponse
- if err := json.Unmarshal(resp.Data, &pa); err != nil {
- return nil, ErrInvalidJSAck
- }
- if pa.Error != nil {
- return nil, fmt.Errorf("nats: %s", pa.Error.Description)
- }
- if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
- return nil, ErrInvalidJSAck
- }
- return pa.PubAck, nil
- }
- func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) {
- return js.PublishMsg(&Msg{Subject: subj, Data: data}, opts...)
- }
- type PubAckFuture interface {
-
- Ok() <-chan *PubAck
-
- Err() <-chan error
-
- Msg() *Msg
- }
- type pubAckFuture struct {
- js *js
- msg *Msg
- pa *PubAck
- st time.Time
- err error
- errCh chan error
- doneCh chan *PubAck
- }
- func (paf *pubAckFuture) Ok() <-chan *PubAck {
- paf.js.mu.Lock()
- defer paf.js.mu.Unlock()
- if paf.doneCh == nil {
- paf.doneCh = make(chan *PubAck, 1)
- if paf.pa != nil {
- paf.doneCh <- paf.pa
- }
- }
- return paf.doneCh
- }
- func (paf *pubAckFuture) Err() <-chan error {
- paf.js.mu.Lock()
- defer paf.js.mu.Unlock()
- if paf.errCh == nil {
- paf.errCh = make(chan error, 1)
- if paf.err != nil {
- paf.errCh <- paf.err
- }
- }
- return paf.errCh
- }
- func (paf *pubAckFuture) Msg() *Msg {
- paf.js.mu.RLock()
- defer paf.js.mu.RUnlock()
- return paf.msg
- }
- func (js *js) pullSubscribe(subj string) (*Subscription, error) {
- jsi := &jsSub{js: js, pull: true}
-
-
-
-
-
- jsi.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
- jsi.rpre = fmt.Sprintf("%s.", NewInbox())
- sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", jsi.rpre), jsi.handleFetch)
- if err != nil {
- return nil, err
- }
- jsi.psub = sub
- return &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: jsi}, nil
- }
- const aReplyPreLen = 14
- const aReplyTokensize = 6
- func (js *js) newAsyncReply() string {
- js.mu.Lock()
- if js.rsub == nil {
-
- sha := sha256.New()
- sha.Write([]byte(nuid.Next()))
- b := sha.Sum(nil)
- for i := 0; i < aReplyTokensize; i++ {
- b[i] = rdigits[int(b[i]%base)]
- }
- js.rpre = fmt.Sprintf("%s%s.", InboxPrefix, b[:aReplyTokensize])
- sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
- if err != nil {
- js.mu.Unlock()
- return _EMPTY_
- }
- js.rsub = sub
- js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
- }
- var sb strings.Builder
- sb.WriteString(js.rpre)
- rn := js.rr.Int63()
- var b [aReplyTokensize]byte
- for i, l := 0, rn; i < len(b); i++ {
- b[i] = rdigits[l%base]
- l /= base
- }
- sb.Write(b[:])
- js.mu.Unlock()
- return sb.String()
- }
- func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
- js.mu.Lock()
- if js.pafs == nil {
- js.pafs = make(map[string]*pubAckFuture)
- }
- paf.js = js
- js.pafs[id] = paf
- np := len(js.pafs)
- maxap := js.opts.maxap
- js.mu.Unlock()
- return np, maxap
- }
- func (js *js) getPAF(id string) *pubAckFuture {
- if js.pafs == nil {
- return nil
- }
- return js.pafs[id]
- }
- func (js *js) clearPAF(id string) {
- js.mu.Lock()
- delete(js.pafs, id)
- js.mu.Unlock()
- }
- func (js *js) PublishAsyncPending() int {
- js.mu.RLock()
- defer js.mu.RUnlock()
- return len(js.pafs)
- }
- func (js *js) asyncStall() <-chan struct{} {
- js.mu.Lock()
- if js.stc == nil {
- js.stc = make(chan struct{})
- }
- stc := js.stc
- js.mu.Unlock()
- return stc
- }
- func (js *js) handleAsyncReply(m *Msg) {
- if len(m.Subject) <= aReplyPreLen {
- return
- }
- id := m.Subject[aReplyPreLen:]
- js.mu.Lock()
- paf := js.getPAF(id)
- if paf == nil {
- js.mu.Unlock()
- return
- }
-
- delete(js.pafs, id)
-
- if js.stc != nil && len(js.pafs) < js.opts.maxap {
- close(js.stc)
- js.stc = nil
- }
-
- if js.dch != nil && len(js.pafs) == 0 {
- dch := js.dch
- js.dch = nil
-
- defer close(dch)
- }
- doErr := func(err error) {
- paf.err = err
- if paf.errCh != nil {
- paf.errCh <- paf.err
- }
- cb := js.opts.aecb
- js.mu.Unlock()
- if cb != nil {
- cb(paf.js, paf.msg, err)
- }
- }
-
- if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
- doErr(ErrNoResponders)
- return
- }
- var pa pubAckResponse
- if err := json.Unmarshal(m.Data, &pa); err != nil {
- doErr(ErrInvalidJSAck)
- return
- }
- if pa.Error != nil {
- doErr(fmt.Errorf("nats: %s", pa.Error.Description))
- return
- }
- if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
- doErr(ErrInvalidJSAck)
- return
- }
-
- paf.pa = pa.PubAck
- if paf.doneCh != nil {
- paf.doneCh <- paf.pa
- }
- js.mu.Unlock()
- }
- type MsgErrHandler func(JetStream, *Msg, error)
- func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt {
- return jsOptFn(func(js *jsOpts) error {
- js.aecb = cb
- return nil
- })
- }
- func PublishAsyncMaxPending(max int) JSOpt {
- return jsOptFn(func(js *jsOpts) error {
- if max < 1 {
- return errors.New("nats: max ack pending should be >= 1")
- }
- js.maxap = max
- return nil
- })
- }
- func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
- return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
- }
- func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
- var o pubOpts
- if len(opts) > 0 {
- if m.Header == nil {
- m.Header = Header{}
- }
- for _, opt := range opts {
- if err := opt.configurePublish(&o); err != nil {
- return nil, err
- }
- }
- }
-
- if o.ttl != 0 || o.ctx != nil {
- return nil, ErrContextAndTimeout
- }
-
- if o.id != _EMPTY_ {
- m.Header.Set(MsgIdHdr, o.id)
- }
- if o.lid != _EMPTY_ {
- m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
- }
- if o.str != _EMPTY_ {
- m.Header.Set(ExpectedStreamHdr, o.str)
- }
- if o.seq > 0 {
- m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
- }
-
- if m.Reply != _EMPTY_ {
- return nil, errors.New("nats: reply subject should be empty")
- }
- m.Reply = js.newAsyncReply()
- if m.Reply == _EMPTY_ {
- return nil, errors.New("nats: error creating async reply handler")
- }
- id := m.Reply[aReplyPreLen:]
- paf := &pubAckFuture{msg: m, st: time.Now()}
- numPending, maxPending := js.registerPAF(id, paf)
- if maxPending > 0 && numPending >= maxPending {
- select {
- case <-js.asyncStall():
- case <-time.After(200 * time.Millisecond):
- js.clearPAF(id)
- return nil, errors.New("nats: stalled with too many outstanding async published messages")
- }
- }
- if err := js.nc.PublishMsg(m); err != nil {
- js.clearPAF(id)
- return nil, err
- }
- return paf, nil
- }
- func (js *js) PublishAsyncComplete() <-chan struct{} {
- js.mu.Lock()
- defer js.mu.Unlock()
- if js.dch == nil {
- js.dch = make(chan struct{})
- }
- dch := js.dch
- if len(js.pafs) == 0 {
- close(js.dch)
- js.dch = nil
- }
- return dch
- }
- func MsgId(id string) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.id = id
- return nil
- })
- }
- func ExpectStream(stream string) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.str = stream
- return nil
- })
- }
- func ExpectLastSequence(seq uint64) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.seq = seq
- return nil
- })
- }
- func ExpectLastMsgId(id string) PubOpt {
- return pubOptFn(func(opts *pubOpts) error {
- opts.lid = id
- return nil
- })
- }
- type ackOpts struct {
- ttl time.Duration
- ctx context.Context
- }
- type AckOpt interface {
- configureAck(opts *ackOpts) error
- }
- type MaxWait time.Duration
- func (ttl MaxWait) configureJSContext(js *jsOpts) error {
- js.wait = time.Duration(ttl)
- return nil
- }
- func (ttl MaxWait) configurePull(opts *pullOpts) error {
- opts.ttl = time.Duration(ttl)
- return nil
- }
- type AckWait time.Duration
- func (ttl AckWait) configurePublish(opts *pubOpts) error {
- opts.ttl = time.Duration(ttl)
- return nil
- }
- func (ttl AckWait) configureSubscribe(opts *subOpts) error {
- opts.cfg.AckWait = time.Duration(ttl)
- return nil
- }
- func (ttl AckWait) configureAck(opts *ackOpts) error {
- opts.ttl = time.Duration(ttl)
- return nil
- }
- type ContextOpt struct {
- context.Context
- }
- func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
- opts.ctx = ctx
- return nil
- }
- func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
- opts.ctx = ctx
- return nil
- }
- func (ctx ContextOpt) configurePull(opts *pullOpts) error {
- opts.ctx = ctx
- return nil
- }
- func (ctx ContextOpt) configureAck(opts *ackOpts) error {
- opts.ctx = ctx
- return nil
- }
- func Context(ctx context.Context) ContextOpt {
- return ContextOpt{ctx}
- }
- type ConsumerConfig struct {
- Durable string `json:"durable_name,omitempty"`
- DeliverSubject string `json:"deliver_subject,omitempty"`
- DeliverPolicy DeliverPolicy `json:"deliver_policy"`
- OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
- OptStartTime *time.Time `json:"opt_start_time,omitempty"`
- AckPolicy AckPolicy `json:"ack_policy"`
- AckWait time.Duration `json:"ack_wait,omitempty"`
- MaxDeliver int `json:"max_deliver,omitempty"`
- FilterSubject string `json:"filter_subject,omitempty"`
- ReplayPolicy ReplayPolicy `json:"replay_policy"`
- RateLimit uint64 `json:"rate_limit_bps,omitempty"`
- SampleFrequency string `json:"sample_freq,omitempty"`
- MaxWaiting int `json:"max_waiting,omitempty"`
- MaxAckPending int `json:"max_ack_pending,omitempty"`
- FlowControl bool `json:"flow_control,omitempty"`
- Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
- }
- type ConsumerInfo struct {
- Stream string `json:"stream_name"`
- Name string `json:"name"`
- Created time.Time `json:"created"`
- Config ConsumerConfig `json:"config"`
- Delivered SequencePair `json:"delivered"`
- AckFloor SequencePair `json:"ack_floor"`
- NumAckPending int `json:"num_ack_pending"`
- NumRedelivered int `json:"num_redelivered"`
- NumWaiting int `json:"num_waiting"`
- NumPending uint64 `json:"num_pending"`
- Cluster *ClusterInfo `json:"cluster,omitempty"`
- }
- type SequencePair struct {
- Consumer uint64 `json:"consumer_seq"`
- Stream uint64 `json:"stream_seq"`
- }
- type nextRequest struct {
- Expires time.Duration `json:"expires,omitempty"`
- Batch int `json:"batch,omitempty"`
- NoWait bool `json:"no_wait,omitempty"`
- }
- type jsSub struct {
- js *js
-
- mu sync.RWMutex
- psub *Subscription
- rpre string
- rr *rand.Rand
- freqs []chan *Msg
- consumer string
- stream string
- deliver string
- pull bool
- durable bool
- attached bool
-
- hbs bool
- fc bool
- cmeta string
- fcs map[uint64]string
- }
- func (jsi *jsSub) newFetchReply() string {
- jsi.mu.Lock()
- rpre := jsi.rpre
- rn := jsi.rr.Int63()
- jsi.mu.Unlock()
- var sb strings.Builder
- sb.WriteString(rpre)
- var b [aReplyTokensize]byte
- for i, l := 0, rn; i < len(b); i++ {
- b[i] = rdigits[l%base]
- l /= base
- }
- sb.Write(b[:])
- return sb.String()
- }
- func (jsi *jsSub) handleFetch(m *Msg) {
- jsi.mu.Lock()
- if len(jsi.freqs) == 0 {
- nc := jsi.js.nc
- sub := jsi.psub
- nc.mu.Lock()
- errCB := nc.Opts.AsyncErrorCB
- err := fmt.Errorf("nats: fetch response delivered but requestor has gone away")
- if errCB != nil {
- nc.ach.push(func() { errCB(nc, sub, err) })
- }
- nc.mu.Unlock()
- jsi.mu.Unlock()
- return
- }
- mch := jsi.freqs[0]
- if len(jsi.freqs) > 1 {
- jsi.freqs = append(jsi.freqs[:0], jsi.freqs[1:]...)
- } else {
- jsi.freqs = jsi.freqs[:0]
- }
- jsi.mu.Unlock()
- mch <- m
- }
- func (jsi *jsSub) fetchNoWait(ctx context.Context, subj string, payload []byte) (*Msg, error) {
- nc := jsi.js.nc
- m := NewMsg(subj)
- m.Reply = jsi.newFetchReply()
- m.Data = payload
- mch := make(chan *Msg, 1)
- jsi.mu.Lock()
- jsi.freqs = append(jsi.freqs, mch)
- jsi.mu.Unlock()
- if err := nc.PublishMsg(m); err != nil {
- return nil, err
- }
- var ok bool
- var msg *Msg
- select {
- case msg, ok = <-mch:
- if !ok {
- return nil, ErrConnectionClosed
- }
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- return msg, nil
- }
- func (jsi *jsSub) unsubscribe(drainMode bool) error {
- if drainMode && (jsi.durable || jsi.attached) {
-
-
- return nil
- }
-
- if jsi.psub != nil {
- jsi.psub.Drain()
- }
- js := jsi.js
- return js.DeleteConsumer(jsi.stream, jsi.consumer)
- }
- type SubOpt interface {
- configureSubscribe(opts *subOpts) error
- }
- type subOptFn func(opts *subOpts) error
- func (opt subOptFn) configureSubscribe(opts *subOpts) error {
- return opt(opts)
- }
- func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
- if cb == nil {
- return nil, ErrBadSubscription
- }
- return js.subscribe(subj, _EMPTY_, cb, nil, false, opts)
- }
- func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) {
- mch := make(chan *Msg, js.nc.Opts.SubChanLen)
- return js.subscribe(subj, _EMPTY_, nil, mch, true, opts)
- }
- func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
- if cb == nil {
- return nil, ErrBadSubscription
- }
- return js.subscribe(subj, queue, cb, nil, false, opts)
- }
- func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) {
- mch := make(chan *Msg, js.nc.Opts.SubChanLen)
- return js.subscribe(subj, queue, nil, mch, true, opts)
- }
- func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
- return js.subscribe(subj, _EMPTY_, nil, ch, false, opts)
- }
- func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
- return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable)))
- }
- func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts []SubOpt) (*Subscription, error) {
- cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet}
- o := subOpts{cfg: &cfg}
- if len(opts) > 0 {
- for _, opt := range opts {
- if err := opt.configureSubscribe(&o); err != nil {
- return nil, err
- }
- }
- }
- isPullMode := ch == nil && cb == nil
- badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
- hasHeartbeats := o.cfg.Heartbeat > 0
- hasFC := o.cfg.FlowControl
- if isPullMode && badPullAck {
- return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
- }
- var (
- err error
- shouldCreate bool
- ccfg *ConsumerConfig
- info *ConsumerInfo
- deliver string
- attached bool
- stream = o.stream
- consumer = o.consumer
- isDurable = o.cfg.Durable != _EMPTY_
- )
-
- if o.stream == _EMPTY_ {
- stream, err = js.lookupStreamBySubject(subj)
- if err != nil {
- return nil, err
- }
- } else {
- stream = o.stream
- }
-
-
- consumer = o.cfg.Durable
- if consumer != _EMPTY_ {
-
- info, err = js.ConsumerInfo(stream, consumer)
- if err != nil && err.Error() != "nats: consumer not found" {
- return nil, err
- }
- }
- if info != nil {
-
- ccfg = &info.Config
- attached = true
-
- if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
- return nil, ErrSubjectMismatch
- }
- if ccfg.DeliverSubject != _EMPTY_ {
- deliver = ccfg.DeliverSubject
- } else {
- deliver = NewInbox()
- }
- } else {
- shouldCreate = true
- deliver = NewInbox()
- if !isPullMode {
- cfg.DeliverSubject = deliver
- }
-
- cfg.FilterSubject = subj
- }
- var sub *Subscription
-
- if cb != nil && !o.mack {
- ocb := cb
- cb = func(m *Msg) { ocb(m); m.Ack() }
- }
- if isPullMode {
- sub, err = js.pullSubscribe(subj)
- } else {
- sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
- }
- if err != nil {
- return nil, err
- }
-
-
- if !isPullMode && cb != nil && hasFC {
- sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit)
- }
-
- if shouldCreate {
-
- if cfg.AckPolicy == ackPolicyNotSet {
- cfg.AckPolicy = AckExplicitPolicy
- }
-
-
-
- if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy {
- maxMsgs, _, _ := sub.PendingLimits()
- cfg.MaxAckPending = maxMsgs
- }
- req := &createConsumerRequest{
- Stream: stream,
- Config: &cfg,
- }
- j, err := json.Marshal(req)
- if err != nil {
- return nil, err
- }
- var ccSubj string
- if isDurable {
- ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
- } else {
- ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
- }
- resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
- if err != nil {
- sub.Drain()
- if err == ErrNoResponders {
- err = ErrJetStreamNotEnabled
- }
- return nil, err
- }
- var cinfo consumerResponse
- err = json.Unmarshal(resp.Data, &cinfo)
- if err != nil {
- sub.Drain()
- return nil, err
- }
- info = cinfo.ConsumerInfo
- if cinfo.Error != nil {
-
-
- sub.Drain()
-
-
-
- if consumer != _EMPTY_ && (strings.Contains(cinfo.Error.Description, `consumer already exists`) || strings.Contains(cinfo.Error.Description, `consumer name already in use`)) {
- info, err = js.ConsumerInfo(stream, consumer)
- if err != nil {
- return nil, err
- }
- ccfg = &info.Config
-
- if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
- return nil, ErrSubjectMismatch
- }
-
- if ccfg.DeliverSubject != _EMPTY_ {
-
- if ch != nil {
- ch = make(chan *Msg, cap(ch))
- }
- sub, err = js.nc.subscribe(ccfg.DeliverSubject, queue, cb, ch, isSync,
- &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
- if err != nil {
- return nil, err
- }
- }
- attached = true
- } else {
- return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
- }
- }
- stream = info.Stream
- consumer = info.Name
- deliver = info.Config.DeliverSubject
- }
- sub.mu.Lock()
- sub.jsi.stream = stream
- sub.jsi.consumer = consumer
- sub.jsi.durable = isDurable
- sub.jsi.attached = attached
- sub.jsi.deliver = deliver
- sub.mu.Unlock()
- return sub, nil
- }
- type ErrConsumerSequenceMismatch struct {
-
-
- StreamResumeSequence uint64
-
- ConsumerSequence uint64
-
-
- LastConsumerSequence uint64
- }
- func (ecs *ErrConsumerSequenceMismatch) Error() string {
- return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
- ecs.ConsumerSequence,
- ecs.LastConsumerSequence-ecs.ConsumerSequence,
- ecs.StreamResumeSequence,
- )
- }
- func isControlMessage(msg *Msg) bool {
- return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
- }
- func (jsi *jsSub) trackSequences(reply string) {
- jsi.mu.Lock()
- jsi.cmeta = reply
- jsi.mu.Unlock()
- }
- func (sub *Subscription) checkForFlowControlResponse(delivered uint64) {
- jsi, nc := sub.jsi, sub.conn
- if jsi == nil {
- return
- }
- jsi.mu.Lock()
- defer jsi.mu.Unlock()
- if len(jsi.fcs) == 0 {
- return
- }
- if reply := jsi.fcs[delivered]; reply != _EMPTY_ {
- delete(jsi.fcs, delivered)
- nc.Publish(reply, nil)
- }
- }
- func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) {
- jsi.mu.Lock()
- if jsi.fcs == nil {
- jsi.fcs = make(map[uint64]string)
- }
- jsi.fcs[dfuture] = reply
- jsi.mu.Unlock()
- }
- func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
- nc.mu.Lock()
- errCB := nc.Opts.AsyncErrorCB
- if errCB != nil {
- nc.ach.push(func() { errCB(nc, sub, err) })
- }
- nc.mu.Unlock()
- }
- func (nc *Conn) processSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
-
- jsi.mu.RLock()
- ctrl := jsi.cmeta
- jsi.mu.RUnlock()
- if ctrl == _EMPTY_ {
- return
- }
- tokens, err := getMetadataFields(ctrl)
- if err != nil {
- return
- }
-
- var ldseq string
- dseq := tokens[6]
- hdr := msg.Header[lastConsumerSeqHdr]
- if len(hdr) == 1 {
- ldseq = hdr[0]
- }
-
-
- if ldseq != dseq {
-
-
- sseq := parseNum(tokens[5])
- ecs := &ErrConsumerSequenceMismatch{
- StreamResumeSequence: uint64(sseq),
- ConsumerSequence: uint64(parseNum(dseq)),
- LastConsumerSequence: uint64(parseNum(ldseq)),
- }
- nc.handleConsumerSequenceMismatch(s, ecs)
- }
- }
- type streamRequest struct {
- Subject string `json:"subject,omitempty"`
- }
- type streamNamesResponse struct {
- apiResponse
- apiPaged
- Streams []string `json:"streams"`
- }
- func (js *js) lookupStreamBySubject(subj string) (string, error) {
- var slr streamNamesResponse
- req := &streamRequest{subj}
- j, err := json.Marshal(req)
- if err != nil {
- return _EMPTY_, err
- }
- resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.opts.wait)
- if err != nil {
- if err == ErrNoResponders {
- err = ErrJetStreamNotEnabled
- }
- return _EMPTY_, err
- }
- if err := json.Unmarshal(resp.Data, &slr); err != nil {
- return _EMPTY_, err
- }
- if slr.Error != nil || len(slr.Streams) != 1 {
- return _EMPTY_, ErrNoMatchingStream
- }
- return slr.Streams[0], nil
- }
- type subOpts struct {
-
- stream, consumer string
-
- mack bool
-
- cfg *ConsumerConfig
- }
- func ManualAck() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.mack = true
- return nil
- })
- }
- func Durable(name string) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- if opts.cfg.Durable != "" {
- return fmt.Errorf("nats: option Durable set more than once")
- }
- if strings.Contains(name, ".") {
- return ErrInvalidDurableName
- }
- opts.cfg.Durable = name
- return nil
- })
- }
- func DeliverAll() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverAllPolicy
- return nil
- })
- }
- func DeliverLast() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverLastPolicy
- return nil
- })
- }
- func DeliverNew() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverNewPolicy
- return nil
- })
- }
- func StartSequence(seq uint64) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverByStartSequencePolicy
- opts.cfg.OptStartSeq = seq
- return nil
- })
- }
- func StartTime(startTime time.Time) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.DeliverPolicy = DeliverByStartTimePolicy
- opts.cfg.OptStartTime = &startTime
- return nil
- })
- }
- func AckNone() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.AckPolicy = AckNonePolicy
- return nil
- })
- }
- func AckAll() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.AckPolicy = AckAllPolicy
- return nil
- })
- }
- func AckExplicit() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.AckPolicy = AckExplicitPolicy
- return nil
- })
- }
- func MaxDeliver(n int) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.MaxDeliver = n
- return nil
- })
- }
- func MaxAckPending(n int) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.MaxAckPending = n
- return nil
- })
- }
- func ReplayOriginal() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.ReplayPolicy = ReplayOriginalPolicy
- return nil
- })
- }
- func RateLimit(n uint64) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.RateLimit = n
- return nil
- })
- }
- func BindStream(name string) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.stream = name
- return nil
- })
- }
- func EnableFlowControl() SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.FlowControl = true
- return nil
- })
- }
- func IdleHeartbeat(duration time.Duration) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.Heartbeat = duration
- return nil
- })
- }
- func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
- sub.mu.Lock()
-
- if sub.jsi.consumer == _EMPTY_ {
- sub.mu.Unlock()
- return nil, ErrTypeSubscription
- }
-
- js := sub.jsi.js
- stream, consumer := sub.jsi.stream, sub.jsi.consumer
- sub.mu.Unlock()
- return js.getConsumerInfo(stream, consumer)
- }
- type pullOpts struct {
- ttl time.Duration
- ctx context.Context
- }
- type PullOpt interface {
- configurePull(opts *pullOpts) error
- }
- func PullMaxWaiting(n int) SubOpt {
- return subOptFn(func(opts *subOpts) error {
- opts.cfg.MaxWaiting = n
- return nil
- })
- }
- var errNoMessages = errors.New("nats: no messages")
- func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
- if sub == nil {
- return nil, ErrBadSubscription
- }
- var o pullOpts
- for _, opt := range opts {
- if err := opt.configurePull(&o); err != nil {
- return nil, err
- }
- }
- if o.ctx != nil && o.ttl != 0 {
- return nil, ErrContextAndTimeout
- }
- sub.mu.Lock()
- jsi := sub.jsi
- if jsi == nil || sub.typ != PullSubscription {
- sub.mu.Unlock()
- return nil, ErrTypeSubscription
- }
- nc, _ := sub.conn, sub.Subject
- stream, consumer := sub.jsi.stream, sub.jsi.consumer
- js := sub.jsi.js
- ttl := o.ttl
- if ttl == 0 {
- ttl = js.opts.wait
- }
- sub.mu.Unlock()
-
-
- var (
- ctx = o.ctx
- err error
- cancel context.CancelFunc
- )
- if o.ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), ttl)
- defer cancel()
- }
-
- select {
- case <-ctx.Done():
- if ctx.Err() == context.Canceled {
- err = ctx.Err()
- } else {
- err = ErrTimeout
- }
- default:
- }
- if err != nil {
- return nil, err
- }
-
-
- checkMsg := func(msg *Msg) error {
- if len(msg.Data) == 0 {
- switch msg.Header.Get(statusHdr) {
- case noResponders:
- return ErrNoResponders
- case noMessages:
- return errNoMessages
- case "400", "408", "409":
- return fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
- }
- }
- return nil
- }
- checkCtxErr := func(err error) error {
- if o.ctx == nil && err == context.DeadlineExceeded {
- return ErrTimeout
- }
- return err
- }
- var (
- gotNoMessages bool
- nr = &nextRequest{Batch: batch, NoWait: true}
- req, _ = json.Marshal(nr)
- reqNext = js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer))
- expires = ttl - 10*time.Millisecond
- msgs = make([]*Msg, 0)
- )
- if batch == 1 {
-
-
- resp, err := jsi.fetchNoWait(ctx, reqNext, req)
- if err != nil {
- return nil, checkCtxErr(err)
- }
-
-
- err = checkMsg(resp)
- if err != nil {
- if err == errNoMessages {
-
-
-
- nr.NoWait = false
- nr.Expires = expires
- req, _ = json.Marshal(nr)
- resp, err = nc.oldRequestWithContext(ctx, reqNext, nil, req)
- if err != nil {
- return nil, checkCtxErr(err)
- }
-
-
- err = checkMsg(resp)
- if err != nil {
- return nil, err
- }
- return []*Msg{resp}, nil
- } else {
-
- return nil, checkCtxErr(err)
- }
- }
- return []*Msg{resp}, nil
- }
-
-
-
- inbox := NewInbox()
- mch := make(chan *Msg, batch)
- s, err := nc.subscribe(inbox, _EMPTY_, nil, mch, true, nil)
- if err != nil {
- return nil, err
- }
-
-
-
- defer s.Unsubscribe()
-
- err = nc.publish(reqNext, inbox, nil, req)
- if err != nil {
- s.Unsubscribe()
- return nil, err
- }
-
- var (
- firstMsg *Msg
- ok bool
- )
- select {
- case firstMsg, ok = <-mch:
- if !ok {
- err = s.getNextMsgErr()
- } else {
- err = s.processNextMsgDelivered(firstMsg)
- if err == nil {
- err = checkMsg(firstMsg)
- }
- }
- case <-ctx.Done():
- err = checkCtxErr(ctx.Err())
- }
-
-
- if err == errNoMessages {
- gotNoMessages = true
- } else if err != nil {
-
-
- s.Unsubscribe()
- return nil, err
- }
- if gotNoMessages {
-
-
- nr.NoWait = false
- nr.Expires = expires
- req, _ = json.Marshal(nr)
-
-
- err = s.AutoUnsubscribe(batch + 1)
- if err != nil {
- return nil, err
- }
-
- err = nc.publish(reqNext, inbox, nil, req)
- if err != nil {
- s.Unsubscribe()
- return nil, err
- }
-
- select {
- case firstMsg, ok = <-mch:
- if !ok {
- err = s.getNextMsgErr()
- } else {
- err = s.processNextMsgDelivered(firstMsg)
- if err == nil {
- err = checkMsg(firstMsg)
- }
- }
- case <-ctx.Done():
- err = checkCtxErr(ctx.Err())
- }
- if err != nil {
- s.Unsubscribe()
- return nil, err
- }
-
- err = checkMsg(firstMsg)
- if err != nil {
- s.Unsubscribe()
- return nil, err
- }
- } else {
-
-
- err = s.AutoUnsubscribe(batch)
- if err != nil {
- return nil, err
- }
- }
- msgs = append(msgs, firstMsg)
- for {
- var (
- msg *Msg
- ok bool
- )
- select {
- case msg, ok = <-mch:
- if !ok {
- err = s.getNextMsgErr()
- } else {
- err = s.processNextMsgDelivered(msg)
- if err == nil {
- err = checkMsg(msg)
- }
- }
- case <-ctx.Done():
- return msgs, checkCtxErr(err)
- }
- if err != nil {
-
-
-
- break
- }
- if msg != nil {
- msgs = append(msgs, msg)
- }
- if len(msgs) == batch {
-
- break
- }
- }
- return msgs, nil
- }
- func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
- ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait)
- defer cancel()
- return js.getConsumerInfoContext(ctx, stream, consumer)
- }
- func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) {
- ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
- resp, err := js.nc.RequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
- if err != nil {
- if err == ErrNoResponders {
- err = ErrJetStreamNotEnabled
- }
- return nil, err
- }
- var info consumerResponse
- if err := json.Unmarshal(resp.Data, &info); err != nil {
- return nil, err
- }
- if info.Error != nil {
- return nil, fmt.Errorf("nats: %s", info.Error.Description)
- }
- return info.ConsumerInfo, nil
- }
- func (m *Msg) checkReply() (*js, *jsSub, error) {
- if m == nil || m.Sub == nil {
- return nil, nil, ErrMsgNotBound
- }
- if m.Reply == "" {
- return nil, nil, ErrMsgNoReply
- }
- sub := m.Sub
- sub.mu.Lock()
- if sub.jsi == nil {
- sub.mu.Unlock()
-
- return nil, nil, nil
- }
- js := sub.jsi.js
- jsi := sub.jsi
- sub.mu.Unlock()
- return js, jsi, nil
- }
- func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
- var o ackOpts
- for _, opt := range opts {
- if err := opt.configureAck(&o); err != nil {
- return err
- }
- }
- js, _, err := m.checkReply()
- if err != nil {
- return err
- }
-
- if atomic.LoadUint32(&m.ackd) == 1 {
- return ErrInvalidJSAck
- }
- m.Sub.mu.Lock()
- nc := m.Sub.conn
- m.Sub.mu.Unlock()
- usesCtx := o.ctx != nil
- usesWait := o.ttl > 0
- sync = sync || usesCtx || usesWait
- ctx := o.ctx
- wait := defaultRequestWait
- if usesWait {
- wait = o.ttl
- } else if js != nil {
- wait = js.opts.wait
- }
- if sync {
- if usesCtx {
- _, err = nc.RequestWithContext(ctx, m.Reply, ackType)
- } else {
- _, err = nc.Request(m.Reply, ackType, wait)
- }
- } else {
- err = nc.Publish(m.Reply, ackType)
- }
-
-
- if err == nil && !bytes.Equal(ackType, ackProgress) {
- atomic.StoreUint32(&m.ackd, 1)
- }
- return err
- }
- func (m *Msg) Ack(opts ...AckOpt) error {
- return m.ackReply(ackAck, false, opts...)
- }
- func (m *Msg) AckSync(opts ...AckOpt) error {
- return m.ackReply(ackAck, true, opts...)
- }
- func (m *Msg) Nak(opts ...AckOpt) error {
- return m.ackReply(ackNak, false, opts...)
- }
- func (m *Msg) Term(opts ...AckOpt) error {
- return m.ackReply(ackTerm, false, opts...)
- }
- func (m *Msg) InProgress(opts ...AckOpt) error {
- return m.ackReply(ackProgress, false, opts...)
- }
- type MsgMetadata struct {
- Sequence SequencePair
- NumDelivered uint64
- NumPending uint64
- Timestamp time.Time
- Stream string
- Consumer string
- }
- func getMetadataFields(subject string) ([]string, error) {
- const expectedTokens = 9
- const btsep = '.'
- tsa := [expectedTokens]string{}
- start, tokens := 0, tsa[:0]
- for i := 0; i < len(subject); i++ {
- if subject[i] == btsep {
- tokens = append(tokens, subject[start:i])
- start = i + 1
- }
- }
- tokens = append(tokens, subject[start:])
- if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
- return nil, ErrNotJSMessage
- }
- return tokens, nil
- }
- func (m *Msg) Metadata() (*MsgMetadata, error) {
- if _, _, err := m.checkReply(); err != nil {
- return nil, err
- }
- tokens, err := getMetadataFields(m.Reply)
- if err != nil {
- return nil, err
- }
- meta := &MsgMetadata{
- NumDelivered: uint64(parseNum(tokens[4])),
- NumPending: uint64(parseNum(tokens[8])),
- Timestamp: time.Unix(0, parseNum(tokens[7])),
- Stream: tokens[2],
- Consumer: tokens[3],
- }
- meta.Sequence.Stream = uint64(parseNum(tokens[5]))
- meta.Sequence.Consumer = uint64(parseNum(tokens[6]))
- return meta, nil
- }
- func parseNum(d string) (n int64) {
- if len(d) == 0 {
- return -1
- }
-
- const (
- asciiZero = 48
- asciiNine = 57
- )
- for _, dec := range d {
- if dec < asciiZero || dec > asciiNine {
- return -1
- }
- n = n*10 + (int64(dec) - asciiZero)
- }
- return n
- }
- type AckPolicy int
- const (
-
- AckNonePolicy AckPolicy = iota
-
-
- AckAllPolicy
-
- AckExplicitPolicy
-
- ackPolicyNotSet = 99
- )
- func jsonString(s string) string {
- return "\"" + s + "\""
- }
- func (p *AckPolicy) UnmarshalJSON(data []byte) error {
- switch string(data) {
- case jsonString("none"):
- *p = AckNonePolicy
- case jsonString("all"):
- *p = AckAllPolicy
- case jsonString("explicit"):
- *p = AckExplicitPolicy
- default:
- return fmt.Errorf("nats: can not unmarshal %q", data)
- }
- return nil
- }
- func (p AckPolicy) MarshalJSON() ([]byte, error) {
- switch p {
- case AckNonePolicy:
- return json.Marshal("none")
- case AckAllPolicy:
- return json.Marshal("all")
- case AckExplicitPolicy:
- return json.Marshal("explicit")
- default:
- return nil, fmt.Errorf("nats: unknown acknowlegement policy %v", p)
- }
- }
- func (p AckPolicy) String() string {
- switch p {
- case AckNonePolicy:
- return "AckNone"
- case AckAllPolicy:
- return "AckAll"
- case AckExplicitPolicy:
- return "AckExplicit"
- case ackPolicyNotSet:
- return "Not Initialized"
- default:
- return "Unknown AckPolicy"
- }
- }
- type ReplayPolicy int
- const (
-
- ReplayInstantPolicy ReplayPolicy = iota
-
- ReplayOriginalPolicy
- )
- func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
- switch string(data) {
- case jsonString("instant"):
- *p = ReplayInstantPolicy
- case jsonString("original"):
- *p = ReplayOriginalPolicy
- default:
- return fmt.Errorf("nats: can not unmarshal %q", data)
- }
- return nil
- }
- func (p ReplayPolicy) MarshalJSON() ([]byte, error) {
- switch p {
- case ReplayOriginalPolicy:
- return json.Marshal("original")
- case ReplayInstantPolicy:
- return json.Marshal("instant")
- default:
- return nil, fmt.Errorf("nats: unknown replay policy %v", p)
- }
- }
- var (
- ackAck = []byte("+ACK")
- ackNak = []byte("-NAK")
- ackProgress = []byte("+WPI")
- ackTerm = []byte("+TERM")
- )
- type DeliverPolicy int
- const (
-
-
- DeliverAllPolicy DeliverPolicy = iota
-
-
- DeliverLastPolicy
-
-
- DeliverNewPolicy
-
-
- DeliverByStartSequencePolicy
-
-
- DeliverByStartTimePolicy
- )
- func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
- switch string(data) {
- case jsonString("all"), jsonString("undefined"):
- *p = DeliverAllPolicy
- case jsonString("last"):
- *p = DeliverLastPolicy
- case jsonString("new"):
- *p = DeliverNewPolicy
- case jsonString("by_start_sequence"):
- *p = DeliverByStartSequencePolicy
- case jsonString("by_start_time"):
- *p = DeliverByStartTimePolicy
- }
- return nil
- }
- func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
- switch p {
- case DeliverAllPolicy:
- return json.Marshal("all")
- case DeliverLastPolicy:
- return json.Marshal("last")
- case DeliverNewPolicy:
- return json.Marshal("new")
- case DeliverByStartSequencePolicy:
- return json.Marshal("by_start_sequence")
- case DeliverByStartTimePolicy:
- return json.Marshal("by_start_time")
- default:
- return nil, fmt.Errorf("nats: unknown deliver policy %v", p)
- }
- }
- type RetentionPolicy int
- const (
-
-
- LimitsPolicy RetentionPolicy = iota
-
- InterestPolicy
-
- WorkQueuePolicy
- )
- type DiscardPolicy int
- const (
-
-
- DiscardOld DiscardPolicy = iota
-
- DiscardNew
- )
- const (
- limitsPolicyString = "limits"
- interestPolicyString = "interest"
- workQueuePolicyString = "workqueue"
- )
- func (rp RetentionPolicy) String() string {
- switch rp {
- case LimitsPolicy:
- return "Limits"
- case InterestPolicy:
- return "Interest"
- case WorkQueuePolicy:
- return "WorkQueue"
- default:
- return "Unknown Retention Policy"
- }
- }
- func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
- switch rp {
- case LimitsPolicy:
- return json.Marshal(limitsPolicyString)
- case InterestPolicy:
- return json.Marshal(interestPolicyString)
- case WorkQueuePolicy:
- return json.Marshal(workQueuePolicyString)
- default:
- return nil, fmt.Errorf("nats: can not marshal %v", rp)
- }
- }
- func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
- switch string(data) {
- case jsonString(limitsPolicyString):
- *rp = LimitsPolicy
- case jsonString(interestPolicyString):
- *rp = InterestPolicy
- case jsonString(workQueuePolicyString):
- *rp = WorkQueuePolicy
- default:
- return fmt.Errorf("nats: can not unmarshal %q", data)
- }
- return nil
- }
- func (dp DiscardPolicy) String() string {
- switch dp {
- case DiscardOld:
- return "DiscardOld"
- case DiscardNew:
- return "DiscardNew"
- default:
- return "Unknown Discard Policy"
- }
- }
- func (dp DiscardPolicy) MarshalJSON() ([]byte, error) {
- switch dp {
- case DiscardOld:
- return json.Marshal("old")
- case DiscardNew:
- return json.Marshal("new")
- default:
- return nil, fmt.Errorf("nats: can not marshal %v", dp)
- }
- }
- func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
- switch strings.ToLower(string(data)) {
- case jsonString("old"):
- *dp = DiscardOld
- case jsonString("new"):
- *dp = DiscardNew
- default:
- return fmt.Errorf("nats: can not unmarshal %q", data)
- }
- return nil
- }
- type StorageType int
- const (
-
- FileStorage StorageType = iota
-
- MemoryStorage
- )
- const (
- memoryStorageString = "memory"
- fileStorageString = "file"
- )
- func (st StorageType) String() string {
- switch st {
- case MemoryStorage:
- return strings.Title(memoryStorageString)
- case FileStorage:
- return strings.Title(fileStorageString)
- default:
- return "Unknown Storage Type"
- }
- }
- func (st StorageType) MarshalJSON() ([]byte, error) {
- switch st {
- case MemoryStorage:
- return json.Marshal(memoryStorageString)
- case FileStorage:
- return json.Marshal(fileStorageString)
- default:
- return nil, fmt.Errorf("nats: can not marshal %v", st)
- }
- }
- func (st *StorageType) UnmarshalJSON(data []byte) error {
- switch string(data) {
- case jsonString(memoryStorageString):
- *st = MemoryStorage
- case jsonString(fileStorageString):
- *st = FileStorage
- default:
- return fmt.Errorf("nats: can not unmarshal %q", data)
- }
- return nil
- }
|