js.go 59 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345
  1. // Copyright 2020-2021 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "bytes"
  16. "context"
  17. "crypto/sha256"
  18. "encoding/json"
  19. "errors"
  20. "fmt"
  21. "math/rand"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. "github.com/nats-io/nuid"
  28. )
  29. // Request API subjects for JetStream.
  30. const (
  31. // defaultAPIPrefix is the default prefix for the JetStream API.
  32. defaultAPIPrefix = "$JS.API."
  33. // apiAccountInfo is for obtaining general information about JetStream.
  34. apiAccountInfo = "INFO"
  35. // apiConsumerCreateT is used to create consumers.
  36. apiConsumerCreateT = "CONSUMER.CREATE.%s"
  37. // apiDurableCreateT is used to create durable consumers.
  38. apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"
  39. // apiConsumerInfoT is used to create consumers.
  40. apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
  41. // apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
  42. apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
  43. // apiDeleteConsumerT is used to delete consumers.
  44. apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
  45. // apiConsumerListT is used to return all detailed consumer information
  46. apiConsumerListT = "CONSUMER.LIST.%s"
  47. // apiConsumerNamesT is used to return a list with all consumer names for the stream.
  48. apiConsumerNamesT = "CONSUMER.NAMES.%s"
  49. // apiStreams can lookup a stream by subject.
  50. apiStreams = "STREAM.NAMES"
  51. // apiStreamCreateT is the endpoint to create new streams.
  52. apiStreamCreateT = "STREAM.CREATE.%s"
  53. // apiStreamInfoT is the endpoint to get information on a stream.
  54. apiStreamInfoT = "STREAM.INFO.%s"
  55. // apiStreamUpdate is the endpoint to update existing streams.
  56. apiStreamUpdateT = "STREAM.UPDATE.%s"
  57. // apiStreamDeleteT is the endpoint to delete streams.
  58. apiStreamDeleteT = "STREAM.DELETE.%s"
  59. // apiPurgeStreamT is the endpoint to purge streams.
  60. apiStreamPurgeT = "STREAM.PURGE.%s"
  61. // apiStreamListT is the endpoint that will return all detailed stream information
  62. apiStreamList = "STREAM.LIST"
  63. // apiMsgGetT is the endpoint to get a message.
  64. apiMsgGetT = "STREAM.MSG.GET.%s"
  65. // apiMsgDeleteT is the endpoint to remove a message.
  66. apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
  67. )
  68. // JetStream allows persistent messaging through JetStream.
  69. type JetStream interface {
  70. // Publish publishes a message to JetStream.
  71. Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
  72. // PublishMsg publishes a Msg to JetStream.
  73. PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)
  74. // PublishAsync publishes a message to JetStream and returns a PubAckFuture.
  75. // The data should not be changed until the PubAckFuture has been processed.
  76. PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)
  77. // PublishMsgAsync publishes a Msg to JetStream and returms a PubAckFuture.
  78. // The message should not be changed until the PubAckFuture has been processed.
  79. PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
  80. // PublishAsyncPending returns the number of async publishes outstanding for this context.
  81. PublishAsyncPending() int
  82. // PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
  83. PublishAsyncComplete() <-chan struct{}
  84. // Subscribe creates an async Subscription for JetStream.
  85. Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
  86. // SubscribeSync creates a Subscription that can be used to process messages synchronously.
  87. SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
  88. // ChanSubscribe creates channel based Subscription.
  89. ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
  90. // QueueSubscribe creates a Subscription with a queue group.
  91. QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
  92. // QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
  93. QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
  94. // PullSubscribe creates a Subscription that can fetch messages.
  95. PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
  96. }
  97. // JetStreamContext allows JetStream messaging and stream management.
  98. type JetStreamContext interface {
  99. JetStream
  100. JetStreamManager
  101. }
  102. // js is an internal struct from a JetStreamContext.
  103. type js struct {
  104. nc *Conn
  105. opts *jsOpts
  106. // For async publish context.
  107. mu sync.RWMutex
  108. rpre string
  109. rsub *Subscription
  110. pafs map[string]*pubAckFuture
  111. stc chan struct{}
  112. dch chan struct{}
  113. rr *rand.Rand
  114. }
  115. type jsOpts struct {
  116. ctx context.Context
  117. // For importing JetStream from other accounts.
  118. pre string
  119. // Amount of time to wait for API requests.
  120. wait time.Duration
  121. // For async publish error handling.
  122. aecb MsgErrHandler
  123. // Maximum in flight.
  124. maxap int
  125. }
  126. const (
  127. defaultRequestWait = 5 * time.Second
  128. defaultAccountCheck = 20 * time.Second
  129. )
  130. // JetStream returns a JetStreamContext for messaging and stream management.
  131. func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
  132. js := &js{
  133. nc: nc,
  134. opts: &jsOpts{
  135. pre: defaultAPIPrefix,
  136. wait: defaultRequestWait,
  137. },
  138. }
  139. for _, opt := range opts {
  140. if err := opt.configureJSContext(js.opts); err != nil {
  141. return nil, err
  142. }
  143. }
  144. // If we have check recently we can avoid another account lookup here.
  145. // We want these to be lighweight and created at will.
  146. nc.mu.Lock()
  147. now := time.Now()
  148. checkAccount := now.Sub(nc.jsLastCheck) > defaultAccountCheck
  149. if checkAccount {
  150. nc.jsLastCheck = now
  151. }
  152. nc.mu.Unlock()
  153. if checkAccount {
  154. if _, err := js.AccountInfo(); err != nil {
  155. if err == ErrNoResponders {
  156. err = ErrJetStreamNotEnabled
  157. }
  158. return nil, err
  159. }
  160. }
  161. return js, nil
  162. }
  163. // JSOpt configures a JetStreamContext.
  164. type JSOpt interface {
  165. configureJSContext(opts *jsOpts) error
  166. }
  167. // jsOptFn configures an option for the JetStreamContext.
  168. type jsOptFn func(opts *jsOpts) error
  169. func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
  170. return opt(opts)
  171. }
  172. // APIPrefix changes the default prefix used for the JetStream API.
  173. func APIPrefix(pre string) JSOpt {
  174. return jsOptFn(func(js *jsOpts) error {
  175. js.pre = pre
  176. if !strings.HasSuffix(js.pre, ".") {
  177. js.pre = js.pre + "."
  178. }
  179. return nil
  180. })
  181. }
  182. func (js *js) apiSubj(subj string) string {
  183. if js.opts.pre == _EMPTY_ {
  184. return subj
  185. }
  186. var b strings.Builder
  187. b.WriteString(js.opts.pre)
  188. b.WriteString(subj)
  189. return b.String()
  190. }
  191. // PubOpt configures options for publishing JetStream messages.
  192. type PubOpt interface {
  193. configurePublish(opts *pubOpts) error
  194. }
  195. // pubOptFn is a function option used to configure JetStream Publish.
  196. type pubOptFn func(opts *pubOpts) error
  197. func (opt pubOptFn) configurePublish(opts *pubOpts) error {
  198. return opt(opts)
  199. }
  200. type pubOpts struct {
  201. ctx context.Context
  202. ttl time.Duration
  203. id string
  204. lid string // Expected last msgId
  205. str string // Expected stream name
  206. seq uint64 // Expected last sequence
  207. }
  208. // pubAckResponse is the ack response from the JetStream API when publishing a message.
  209. type pubAckResponse struct {
  210. apiResponse
  211. *PubAck
  212. }
  213. // PubAck is an ack received after successfully publishing a message.
  214. type PubAck struct {
  215. Stream string `json:"stream"`
  216. Sequence uint64 `json:"seq"`
  217. Duplicate bool `json:"duplicate,omitempty"`
  218. }
  219. // Headers for published messages.
  220. const (
  221. MsgIdHdr = "Nats-Msg-Id"
  222. ExpectedStreamHdr = "Nats-Expected-Stream"
  223. ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
  224. ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
  225. )
  226. // PublishMsg publishes a Msg to a stream from JetStream.
  227. func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
  228. var o pubOpts
  229. if len(opts) > 0 {
  230. if m.Header == nil {
  231. m.Header = Header{}
  232. }
  233. for _, opt := range opts {
  234. if err := opt.configurePublish(&o); err != nil {
  235. return nil, err
  236. }
  237. }
  238. }
  239. // Check for option collisions. Right now just timeout and context.
  240. if o.ctx != nil && o.ttl != 0 {
  241. return nil, ErrContextAndTimeout
  242. }
  243. if o.ttl == 0 && o.ctx == nil {
  244. o.ttl = js.opts.wait
  245. }
  246. if o.id != _EMPTY_ {
  247. m.Header.Set(MsgIdHdr, o.id)
  248. }
  249. if o.lid != _EMPTY_ {
  250. m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
  251. }
  252. if o.str != _EMPTY_ {
  253. m.Header.Set(ExpectedStreamHdr, o.str)
  254. }
  255. if o.seq > 0 {
  256. m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
  257. }
  258. var resp *Msg
  259. var err error
  260. if o.ttl > 0 {
  261. resp, err = js.nc.RequestMsg(m, time.Duration(o.ttl))
  262. } else {
  263. resp, err = js.nc.RequestMsgWithContext(o.ctx, m)
  264. }
  265. if err != nil {
  266. if err == ErrNoResponders {
  267. err = ErrNoStreamResponse
  268. }
  269. return nil, err
  270. }
  271. var pa pubAckResponse
  272. if err := json.Unmarshal(resp.Data, &pa); err != nil {
  273. return nil, ErrInvalidJSAck
  274. }
  275. if pa.Error != nil {
  276. return nil, fmt.Errorf("nats: %s", pa.Error.Description)
  277. }
  278. if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
  279. return nil, ErrInvalidJSAck
  280. }
  281. return pa.PubAck, nil
  282. }
  283. // Publish publishes a message to a stream from JetStream.
  284. func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) {
  285. return js.PublishMsg(&Msg{Subject: subj, Data: data}, opts...)
  286. }
  287. // PubAckFuture is a future for a PubAck.
  288. type PubAckFuture interface {
  289. // Ok returns a receive only channel that can be used to get a PubAck.
  290. Ok() <-chan *PubAck
  291. // Err returns a receive only channel that can be used to get the error from an async publish.
  292. Err() <-chan error
  293. // Msg returns the message that was sent to the server.
  294. Msg() *Msg
  295. }
  296. type pubAckFuture struct {
  297. js *js
  298. msg *Msg
  299. pa *PubAck
  300. st time.Time
  301. err error
  302. errCh chan error
  303. doneCh chan *PubAck
  304. }
  305. func (paf *pubAckFuture) Ok() <-chan *PubAck {
  306. paf.js.mu.Lock()
  307. defer paf.js.mu.Unlock()
  308. if paf.doneCh == nil {
  309. paf.doneCh = make(chan *PubAck, 1)
  310. if paf.pa != nil {
  311. paf.doneCh <- paf.pa
  312. }
  313. }
  314. return paf.doneCh
  315. }
  316. func (paf *pubAckFuture) Err() <-chan error {
  317. paf.js.mu.Lock()
  318. defer paf.js.mu.Unlock()
  319. if paf.errCh == nil {
  320. paf.errCh = make(chan error, 1)
  321. if paf.err != nil {
  322. paf.errCh <- paf.err
  323. }
  324. }
  325. return paf.errCh
  326. }
  327. func (paf *pubAckFuture) Msg() *Msg {
  328. paf.js.mu.RLock()
  329. defer paf.js.mu.RUnlock()
  330. return paf.msg
  331. }
  332. // pullSubscribe creates the wildcard subscription used per pull subscriber
  333. // to make fetch requests.
  334. func (js *js) pullSubscribe(subj string) (*Subscription, error) {
  335. jsi := &jsSub{js: js, pull: true}
  336. // Similar to async request handler we create a wildcard subscription for making requests,
  337. // though we do not use the token based approach since we cannot match the response to
  338. // the requestor due to JS subject being remapped on delivery. Instead, we just use an array
  339. // of channels similar to how ping/pong interval is handled and send the message to the first
  340. // available requestor via a channel.
  341. jsi.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
  342. jsi.rpre = fmt.Sprintf("%s.", NewInbox())
  343. sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", jsi.rpre), jsi.handleFetch)
  344. if err != nil {
  345. return nil, err
  346. }
  347. jsi.psub = sub
  348. return &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: jsi}, nil
  349. }
  350. // For quick token lookup etc.
  351. const aReplyPreLen = 14
  352. const aReplyTokensize = 6
  353. func (js *js) newAsyncReply() string {
  354. js.mu.Lock()
  355. if js.rsub == nil {
  356. // Create our wildcard reply subject.
  357. sha := sha256.New()
  358. sha.Write([]byte(nuid.Next()))
  359. b := sha.Sum(nil)
  360. for i := 0; i < aReplyTokensize; i++ {
  361. b[i] = rdigits[int(b[i]%base)]
  362. }
  363. js.rpre = fmt.Sprintf("%s%s.", InboxPrefix, b[:aReplyTokensize])
  364. sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
  365. if err != nil {
  366. js.mu.Unlock()
  367. return _EMPTY_
  368. }
  369. js.rsub = sub
  370. js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
  371. }
  372. var sb strings.Builder
  373. sb.WriteString(js.rpre)
  374. rn := js.rr.Int63()
  375. var b [aReplyTokensize]byte
  376. for i, l := 0, rn; i < len(b); i++ {
  377. b[i] = rdigits[l%base]
  378. l /= base
  379. }
  380. sb.Write(b[:])
  381. js.mu.Unlock()
  382. return sb.String()
  383. }
  384. // registerPAF will register for a PubAckFuture.
  385. func (js *js) registerPAF(id string, paf *pubAckFuture) (int, int) {
  386. js.mu.Lock()
  387. if js.pafs == nil {
  388. js.pafs = make(map[string]*pubAckFuture)
  389. }
  390. paf.js = js
  391. js.pafs[id] = paf
  392. np := len(js.pafs)
  393. maxap := js.opts.maxap
  394. js.mu.Unlock()
  395. return np, maxap
  396. }
  397. // Lock should be held.
  398. func (js *js) getPAF(id string) *pubAckFuture {
  399. if js.pafs == nil {
  400. return nil
  401. }
  402. return js.pafs[id]
  403. }
  404. // clearPAF will remove a PubAckFuture that was registered.
  405. func (js *js) clearPAF(id string) {
  406. js.mu.Lock()
  407. delete(js.pafs, id)
  408. js.mu.Unlock()
  409. }
  410. // PublishAsyncPending returns how many PubAckFutures are pending.
  411. func (js *js) PublishAsyncPending() int {
  412. js.mu.RLock()
  413. defer js.mu.RUnlock()
  414. return len(js.pafs)
  415. }
  416. func (js *js) asyncStall() <-chan struct{} {
  417. js.mu.Lock()
  418. if js.stc == nil {
  419. js.stc = make(chan struct{})
  420. }
  421. stc := js.stc
  422. js.mu.Unlock()
  423. return stc
  424. }
  425. // Handle an async reply from PublishAsync.
  426. func (js *js) handleAsyncReply(m *Msg) {
  427. if len(m.Subject) <= aReplyPreLen {
  428. return
  429. }
  430. id := m.Subject[aReplyPreLen:]
  431. js.mu.Lock()
  432. paf := js.getPAF(id)
  433. if paf == nil {
  434. js.mu.Unlock()
  435. return
  436. }
  437. // Remove
  438. delete(js.pafs, id)
  439. // Check on anyone stalled and waiting.
  440. if js.stc != nil && len(js.pafs) < js.opts.maxap {
  441. close(js.stc)
  442. js.stc = nil
  443. }
  444. // Check on anyone one waiting on done status.
  445. if js.dch != nil && len(js.pafs) == 0 {
  446. dch := js.dch
  447. js.dch = nil
  448. // Defer here so error is processed and can be checked.
  449. defer close(dch)
  450. }
  451. doErr := func(err error) {
  452. paf.err = err
  453. if paf.errCh != nil {
  454. paf.errCh <- paf.err
  455. }
  456. cb := js.opts.aecb
  457. js.mu.Unlock()
  458. if cb != nil {
  459. cb(paf.js, paf.msg, err)
  460. }
  461. }
  462. // Process no responders etc.
  463. if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
  464. doErr(ErrNoResponders)
  465. return
  466. }
  467. var pa pubAckResponse
  468. if err := json.Unmarshal(m.Data, &pa); err != nil {
  469. doErr(ErrInvalidJSAck)
  470. return
  471. }
  472. if pa.Error != nil {
  473. doErr(fmt.Errorf("nats: %s", pa.Error.Description))
  474. return
  475. }
  476. if pa.PubAck == nil || pa.PubAck.Stream == _EMPTY_ {
  477. doErr(ErrInvalidJSAck)
  478. return
  479. }
  480. // So here we have received a proper puback.
  481. paf.pa = pa.PubAck
  482. if paf.doneCh != nil {
  483. paf.doneCh <- paf.pa
  484. }
  485. js.mu.Unlock()
  486. }
  487. // MsgErrHandler is used to process asynchronous errors from
  488. // JetStream PublishAsync and PublishAsynMsg. It will return the original
  489. // message sent to the server for possible retransmitting and the error encountered.
  490. type MsgErrHandler func(JetStream, *Msg, error)
  491. // PublishAsyncErrHandler sets the error handler for async publishes in JetStream.
  492. func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt {
  493. return jsOptFn(func(js *jsOpts) error {
  494. js.aecb = cb
  495. return nil
  496. })
  497. }
  498. // PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
  499. func PublishAsyncMaxPending(max int) JSOpt {
  500. return jsOptFn(func(js *jsOpts) error {
  501. if max < 1 {
  502. return errors.New("nats: max ack pending should be >= 1")
  503. }
  504. js.maxap = max
  505. return nil
  506. })
  507. }
  508. // PublishAsync publishes a message to JetStream and returns a PubAckFuture
  509. func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
  510. return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
  511. }
  512. func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
  513. var o pubOpts
  514. if len(opts) > 0 {
  515. if m.Header == nil {
  516. m.Header = Header{}
  517. }
  518. for _, opt := range opts {
  519. if err := opt.configurePublish(&o); err != nil {
  520. return nil, err
  521. }
  522. }
  523. }
  524. // Timeouts and contexts do not make sense for these.
  525. if o.ttl != 0 || o.ctx != nil {
  526. return nil, ErrContextAndTimeout
  527. }
  528. // FIXME(dlc) - Make common.
  529. if o.id != _EMPTY_ {
  530. m.Header.Set(MsgIdHdr, o.id)
  531. }
  532. if o.lid != _EMPTY_ {
  533. m.Header.Set(ExpectedLastMsgIdHdr, o.lid)
  534. }
  535. if o.str != _EMPTY_ {
  536. m.Header.Set(ExpectedStreamHdr, o.str)
  537. }
  538. if o.seq > 0 {
  539. m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10))
  540. }
  541. // Reply
  542. if m.Reply != _EMPTY_ {
  543. return nil, errors.New("nats: reply subject should be empty")
  544. }
  545. m.Reply = js.newAsyncReply()
  546. if m.Reply == _EMPTY_ {
  547. return nil, errors.New("nats: error creating async reply handler")
  548. }
  549. id := m.Reply[aReplyPreLen:]
  550. paf := &pubAckFuture{msg: m, st: time.Now()}
  551. numPending, maxPending := js.registerPAF(id, paf)
  552. if maxPending > 0 && numPending >= maxPending {
  553. select {
  554. case <-js.asyncStall():
  555. case <-time.After(200 * time.Millisecond):
  556. js.clearPAF(id)
  557. return nil, errors.New("nats: stalled with too many outstanding async published messages")
  558. }
  559. }
  560. if err := js.nc.PublishMsg(m); err != nil {
  561. js.clearPAF(id)
  562. return nil, err
  563. }
  564. return paf, nil
  565. }
  566. // PublishAsyncComplete returns a channel that will be closed when all outstanding messages have been ack'd.
  567. func (js *js) PublishAsyncComplete() <-chan struct{} {
  568. js.mu.Lock()
  569. defer js.mu.Unlock()
  570. if js.dch == nil {
  571. js.dch = make(chan struct{})
  572. }
  573. dch := js.dch
  574. if len(js.pafs) == 0 {
  575. close(js.dch)
  576. js.dch = nil
  577. }
  578. return dch
  579. }
  580. // MsgId sets the message ID used for de-duplication.
  581. func MsgId(id string) PubOpt {
  582. return pubOptFn(func(opts *pubOpts) error {
  583. opts.id = id
  584. return nil
  585. })
  586. }
  587. // ExpectStream sets the expected stream to respond from the publish.
  588. func ExpectStream(stream string) PubOpt {
  589. return pubOptFn(func(opts *pubOpts) error {
  590. opts.str = stream
  591. return nil
  592. })
  593. }
  594. // ExpectLastSequence sets the expected sequence in the response from the publish.
  595. func ExpectLastSequence(seq uint64) PubOpt {
  596. return pubOptFn(func(opts *pubOpts) error {
  597. opts.seq = seq
  598. return nil
  599. })
  600. }
  601. // ExpectLastSequence sets the expected sequence in the response from the publish.
  602. func ExpectLastMsgId(id string) PubOpt {
  603. return pubOptFn(func(opts *pubOpts) error {
  604. opts.lid = id
  605. return nil
  606. })
  607. }
  608. type ackOpts struct {
  609. ttl time.Duration
  610. ctx context.Context
  611. }
  612. // AckOpt are the options that can be passed when acknowledge a message.
  613. type AckOpt interface {
  614. configureAck(opts *ackOpts) error
  615. }
  616. // MaxWait sets the maximum amount of time we will wait for a response.
  617. type MaxWait time.Duration
  618. func (ttl MaxWait) configureJSContext(js *jsOpts) error {
  619. js.wait = time.Duration(ttl)
  620. return nil
  621. }
  622. func (ttl MaxWait) configurePull(opts *pullOpts) error {
  623. opts.ttl = time.Duration(ttl)
  624. return nil
  625. }
  626. // AckWait sets the maximum amount of time we will wait for an ack.
  627. type AckWait time.Duration
  628. func (ttl AckWait) configurePublish(opts *pubOpts) error {
  629. opts.ttl = time.Duration(ttl)
  630. return nil
  631. }
  632. func (ttl AckWait) configureSubscribe(opts *subOpts) error {
  633. opts.cfg.AckWait = time.Duration(ttl)
  634. return nil
  635. }
  636. func (ttl AckWait) configureAck(opts *ackOpts) error {
  637. opts.ttl = time.Duration(ttl)
  638. return nil
  639. }
  640. // ContextOpt is an option used to set a context.Context.
  641. type ContextOpt struct {
  642. context.Context
  643. }
  644. func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
  645. opts.ctx = ctx
  646. return nil
  647. }
  648. func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
  649. opts.ctx = ctx
  650. return nil
  651. }
  652. func (ctx ContextOpt) configurePull(opts *pullOpts) error {
  653. opts.ctx = ctx
  654. return nil
  655. }
  656. func (ctx ContextOpt) configureAck(opts *ackOpts) error {
  657. opts.ctx = ctx
  658. return nil
  659. }
  660. // Context returns an option that can be used to configure a context for APIs
  661. // that are context aware such as those part of the JetStream interface.
  662. func Context(ctx context.Context) ContextOpt {
  663. return ContextOpt{ctx}
  664. }
  665. // Subscribe
  666. // ConsumerConfig is the configuration of a JetStream consumer.
  667. type ConsumerConfig struct {
  668. Durable string `json:"durable_name,omitempty"`
  669. DeliverSubject string `json:"deliver_subject,omitempty"`
  670. DeliverPolicy DeliverPolicy `json:"deliver_policy"`
  671. OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
  672. OptStartTime *time.Time `json:"opt_start_time,omitempty"`
  673. AckPolicy AckPolicy `json:"ack_policy"`
  674. AckWait time.Duration `json:"ack_wait,omitempty"`
  675. MaxDeliver int `json:"max_deliver,omitempty"`
  676. FilterSubject string `json:"filter_subject,omitempty"`
  677. ReplayPolicy ReplayPolicy `json:"replay_policy"`
  678. RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
  679. SampleFrequency string `json:"sample_freq,omitempty"`
  680. MaxWaiting int `json:"max_waiting,omitempty"`
  681. MaxAckPending int `json:"max_ack_pending,omitempty"`
  682. FlowControl bool `json:"flow_control,omitempty"`
  683. Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
  684. }
  685. // ConsumerInfo is the info from a JetStream consumer.
  686. type ConsumerInfo struct {
  687. Stream string `json:"stream_name"`
  688. Name string `json:"name"`
  689. Created time.Time `json:"created"`
  690. Config ConsumerConfig `json:"config"`
  691. Delivered SequencePair `json:"delivered"`
  692. AckFloor SequencePair `json:"ack_floor"`
  693. NumAckPending int `json:"num_ack_pending"`
  694. NumRedelivered int `json:"num_redelivered"`
  695. NumWaiting int `json:"num_waiting"`
  696. NumPending uint64 `json:"num_pending"`
  697. Cluster *ClusterInfo `json:"cluster,omitempty"`
  698. }
  699. // SequencePair includes the consumer and stream sequence info from a JetStream consumer.
  700. type SequencePair struct {
  701. Consumer uint64 `json:"consumer_seq"`
  702. Stream uint64 `json:"stream_seq"`
  703. }
  704. // nextRequest is for getting next messages for pull based consumers from JetStream.
  705. type nextRequest struct {
  706. Expires time.Duration `json:"expires,omitempty"`
  707. Batch int `json:"batch,omitempty"`
  708. NoWait bool `json:"no_wait,omitempty"`
  709. }
  710. // jsSub includes JetStream subscription info.
  711. type jsSub struct {
  712. js *js
  713. // To setup request mux handler for pull subscribers.
  714. mu sync.RWMutex
  715. psub *Subscription
  716. rpre string
  717. rr *rand.Rand
  718. freqs []chan *Msg
  719. consumer string
  720. stream string
  721. deliver string
  722. pull bool
  723. durable bool
  724. attached bool
  725. // Heartbeats and Flow Control handling from push consumers.
  726. hbs bool
  727. fc bool
  728. cmeta string
  729. fcs map[uint64]string
  730. }
  731. // newFetchReply generates a unique inbox used for a fetch request.
  732. func (jsi *jsSub) newFetchReply() string {
  733. jsi.mu.Lock()
  734. rpre := jsi.rpre
  735. rn := jsi.rr.Int63()
  736. jsi.mu.Unlock()
  737. var sb strings.Builder
  738. sb.WriteString(rpre)
  739. var b [aReplyTokensize]byte
  740. for i, l := 0, rn; i < len(b); i++ {
  741. b[i] = rdigits[l%base]
  742. l /= base
  743. }
  744. sb.Write(b[:])
  745. return sb.String()
  746. }
  747. // handleFetch is delivered a message requested by pull subscribers
  748. // when calling Fetch.
  749. func (jsi *jsSub) handleFetch(m *Msg) {
  750. jsi.mu.Lock()
  751. if len(jsi.freqs) == 0 {
  752. nc := jsi.js.nc
  753. sub := jsi.psub
  754. nc.mu.Lock()
  755. errCB := nc.Opts.AsyncErrorCB
  756. err := fmt.Errorf("nats: fetch response delivered but requestor has gone away")
  757. if errCB != nil {
  758. nc.ach.push(func() { errCB(nc, sub, err) })
  759. }
  760. nc.mu.Unlock()
  761. jsi.mu.Unlock()
  762. return
  763. }
  764. mch := jsi.freqs[0]
  765. if len(jsi.freqs) > 1 {
  766. jsi.freqs = append(jsi.freqs[:0], jsi.freqs[1:]...)
  767. } else {
  768. jsi.freqs = jsi.freqs[:0]
  769. }
  770. jsi.mu.Unlock()
  771. mch <- m
  772. }
  773. // fetchNoWait makes a request to get a single message using no wait.
  774. func (jsi *jsSub) fetchNoWait(ctx context.Context, subj string, payload []byte) (*Msg, error) {
  775. nc := jsi.js.nc
  776. m := NewMsg(subj)
  777. m.Reply = jsi.newFetchReply()
  778. m.Data = payload
  779. mch := make(chan *Msg, 1)
  780. jsi.mu.Lock()
  781. jsi.freqs = append(jsi.freqs, mch)
  782. jsi.mu.Unlock()
  783. if err := nc.PublishMsg(m); err != nil {
  784. return nil, err
  785. }
  786. var ok bool
  787. var msg *Msg
  788. select {
  789. case msg, ok = <-mch:
  790. if !ok {
  791. return nil, ErrConnectionClosed
  792. }
  793. case <-ctx.Done():
  794. return nil, ctx.Err()
  795. }
  796. return msg, nil
  797. }
  798. func (jsi *jsSub) unsubscribe(drainMode bool) error {
  799. if drainMode && (jsi.durable || jsi.attached) {
  800. // Skip deleting consumer for durables/attached
  801. // consumers when using drain mode.
  802. return nil
  803. }
  804. // Clear the extra async pull subscription used for fetch requests.
  805. if jsi.psub != nil {
  806. jsi.psub.Drain()
  807. }
  808. js := jsi.js
  809. return js.DeleteConsumer(jsi.stream, jsi.consumer)
  810. }
  811. // SubOpt configures options for subscribing to JetStream consumers.
  812. type SubOpt interface {
  813. configureSubscribe(opts *subOpts) error
  814. }
  815. // subOptFn is a function option used to configure a JetStream Subscribe.
  816. type subOptFn func(opts *subOpts) error
  817. func (opt subOptFn) configureSubscribe(opts *subOpts) error {
  818. return opt(opts)
  819. }
  820. // Subscribe will create a subscription to the appropriate stream and consumer.
  821. func (js *js) Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
  822. if cb == nil {
  823. return nil, ErrBadSubscription
  824. }
  825. return js.subscribe(subj, _EMPTY_, cb, nil, false, opts)
  826. }
  827. // SubscribeSync will create a sync subscription to the appropriate stream and consumer.
  828. func (js *js) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) {
  829. mch := make(chan *Msg, js.nc.Opts.SubChanLen)
  830. return js.subscribe(subj, _EMPTY_, nil, mch, true, opts)
  831. }
  832. // QueueSubscribe will create a subscription to the appropriate stream and consumer with queue semantics.
  833. func (js *js) QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) {
  834. if cb == nil {
  835. return nil, ErrBadSubscription
  836. }
  837. return js.subscribe(subj, queue, cb, nil, false, opts)
  838. }
  839. // QueueSubscribeSync will create a sync subscription to the appropriate stream and consumer with queue semantics.
  840. func (js *js) QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) {
  841. mch := make(chan *Msg, js.nc.Opts.SubChanLen)
  842. return js.subscribe(subj, queue, nil, mch, true, opts)
  843. }
  844. // Subscribe will create a subscription to the appropriate stream and consumer.
  845. func (js *js) ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) {
  846. return js.subscribe(subj, _EMPTY_, nil, ch, false, opts)
  847. }
  848. // PullSubscribe creates a pull subscriber.
  849. func (js *js) PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) {
  850. return js.subscribe(subj, _EMPTY_, nil, nil, false, append(opts, Durable(durable)))
  851. }
  852. func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, opts []SubOpt) (*Subscription, error) {
  853. cfg := ConsumerConfig{AckPolicy: ackPolicyNotSet}
  854. o := subOpts{cfg: &cfg}
  855. if len(opts) > 0 {
  856. for _, opt := range opts {
  857. if err := opt.configureSubscribe(&o); err != nil {
  858. return nil, err
  859. }
  860. }
  861. }
  862. isPullMode := ch == nil && cb == nil
  863. badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
  864. hasHeartbeats := o.cfg.Heartbeat > 0
  865. hasFC := o.cfg.FlowControl
  866. if isPullMode && badPullAck {
  867. return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
  868. }
  869. var (
  870. err error
  871. shouldCreate bool
  872. ccfg *ConsumerConfig
  873. info *ConsumerInfo
  874. deliver string
  875. attached bool
  876. stream = o.stream
  877. consumer = o.consumer
  878. isDurable = o.cfg.Durable != _EMPTY_
  879. )
  880. // Find the stream mapped to the subject if not bound to a stream already.
  881. if o.stream == _EMPTY_ {
  882. stream, err = js.lookupStreamBySubject(subj)
  883. if err != nil {
  884. return nil, err
  885. }
  886. } else {
  887. stream = o.stream
  888. }
  889. // With an explicit durable name, then can lookup
  890. // the consumer to which it should be attaching to.
  891. consumer = o.cfg.Durable
  892. if consumer != _EMPTY_ {
  893. // Only create in case there is no consumer already.
  894. info, err = js.ConsumerInfo(stream, consumer)
  895. if err != nil && err.Error() != "nats: consumer not found" {
  896. return nil, err
  897. }
  898. }
  899. if info != nil {
  900. // Attach using the found consumer config.
  901. ccfg = &info.Config
  902. attached = true
  903. // Make sure this new subject matches or is a subset.
  904. if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
  905. return nil, ErrSubjectMismatch
  906. }
  907. if ccfg.DeliverSubject != _EMPTY_ {
  908. deliver = ccfg.DeliverSubject
  909. } else {
  910. deliver = NewInbox()
  911. }
  912. } else {
  913. shouldCreate = true
  914. deliver = NewInbox()
  915. if !isPullMode {
  916. cfg.DeliverSubject = deliver
  917. }
  918. // Do filtering always, server will clear as needed.
  919. cfg.FilterSubject = subj
  920. }
  921. var sub *Subscription
  922. // Check if we are manual ack.
  923. if cb != nil && !o.mack {
  924. ocb := cb
  925. cb = func(m *Msg) { ocb(m); m.Ack() }
  926. }
  927. if isPullMode {
  928. sub, err = js.pullSubscribe(subj)
  929. } else {
  930. sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
  931. }
  932. if err != nil {
  933. return nil, err
  934. }
  935. // With flow control enabled async subscriptions we will disable msgs
  936. // limits, and set a larger pending bytes limit by default.
  937. if !isPullMode && cb != nil && hasFC {
  938. sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit)
  939. }
  940. // If we are creating or updating let's process that request.
  941. if shouldCreate {
  942. // If not set default to ack explicit.
  943. if cfg.AckPolicy == ackPolicyNotSet {
  944. cfg.AckPolicy = AckExplicitPolicy
  945. }
  946. // If we have acks at all and the MaxAckPending is not set go ahead
  947. // and set to the internal max.
  948. // TODO(dlc) - We should be able to update this if client updates PendingLimits.
  949. if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy {
  950. maxMsgs, _, _ := sub.PendingLimits()
  951. cfg.MaxAckPending = maxMsgs
  952. }
  953. req := &createConsumerRequest{
  954. Stream: stream,
  955. Config: &cfg,
  956. }
  957. j, err := json.Marshal(req)
  958. if err != nil {
  959. return nil, err
  960. }
  961. var ccSubj string
  962. if isDurable {
  963. ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
  964. } else {
  965. ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
  966. }
  967. resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
  968. if err != nil {
  969. sub.Drain()
  970. if err == ErrNoResponders {
  971. err = ErrJetStreamNotEnabled
  972. }
  973. return nil, err
  974. }
  975. var cinfo consumerResponse
  976. err = json.Unmarshal(resp.Data, &cinfo)
  977. if err != nil {
  978. sub.Drain()
  979. return nil, err
  980. }
  981. info = cinfo.ConsumerInfo
  982. if cinfo.Error != nil {
  983. // Remove interest from previous subscribe since it
  984. // may have an incorrect delivery subject.
  985. sub.Drain()
  986. // Multiple subscribers could compete in creating the first consumer
  987. // that will be shared using the same durable name. If this happens, then
  988. // do a lookup of the consumer info and resubscribe using the latest info.
  989. if consumer != _EMPTY_ && (strings.Contains(cinfo.Error.Description, `consumer already exists`) || strings.Contains(cinfo.Error.Description, `consumer name already in use`)) {
  990. info, err = js.ConsumerInfo(stream, consumer)
  991. if err != nil {
  992. return nil, err
  993. }
  994. ccfg = &info.Config
  995. // Validate that the original subject does still match.
  996. if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
  997. return nil, ErrSubjectMismatch
  998. }
  999. // Use the deliver subject from latest consumer config to attach.
  1000. if ccfg.DeliverSubject != _EMPTY_ {
  1001. // We can't reuse the channel, so if one was passed, we need to create a new one.
  1002. if ch != nil {
  1003. ch = make(chan *Msg, cap(ch))
  1004. }
  1005. sub, err = js.nc.subscribe(ccfg.DeliverSubject, queue, cb, ch, isSync,
  1006. &jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
  1007. if err != nil {
  1008. return nil, err
  1009. }
  1010. }
  1011. attached = true
  1012. } else {
  1013. return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
  1014. }
  1015. }
  1016. stream = info.Stream
  1017. consumer = info.Name
  1018. deliver = info.Config.DeliverSubject
  1019. }
  1020. sub.mu.Lock()
  1021. sub.jsi.stream = stream
  1022. sub.jsi.consumer = consumer
  1023. sub.jsi.durable = isDurable
  1024. sub.jsi.attached = attached
  1025. sub.jsi.deliver = deliver
  1026. sub.mu.Unlock()
  1027. return sub, nil
  1028. }
  1029. // ErrConsumerSequenceMismatch represents an error from a consumer
  1030. // that received a Heartbeat including sequence different to the
  1031. // one expected from the view of the client.
  1032. type ErrConsumerSequenceMismatch struct {
  1033. // StreamResumeSequence is the stream sequence from where the consumer
  1034. // should resume consuming from the stream.
  1035. StreamResumeSequence uint64
  1036. // ConsumerSequence is the sequence of the consumer that is behind.
  1037. ConsumerSequence uint64
  1038. // LastConsumerSequence is the sequence of the consumer when the heartbeat
  1039. // was received.
  1040. LastConsumerSequence uint64
  1041. }
  1042. func (ecs *ErrConsumerSequenceMismatch) Error() string {
  1043. return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
  1044. ecs.ConsumerSequence,
  1045. ecs.LastConsumerSequence-ecs.ConsumerSequence,
  1046. ecs.StreamResumeSequence,
  1047. )
  1048. }
  1049. // isControlMessage will return true if this is an empty control status message.
  1050. func isControlMessage(msg *Msg) bool {
  1051. return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
  1052. }
  1053. func (jsi *jsSub) trackSequences(reply string) {
  1054. jsi.mu.Lock()
  1055. jsi.cmeta = reply
  1056. jsi.mu.Unlock()
  1057. }
  1058. // checkForFlowControlResponse will check to see if we should send a flow control response
  1059. // based on the delivered index.
  1060. // Lock should be held.
  1061. func (sub *Subscription) checkForFlowControlResponse(delivered uint64) {
  1062. jsi, nc := sub.jsi, sub.conn
  1063. if jsi == nil {
  1064. return
  1065. }
  1066. jsi.mu.Lock()
  1067. defer jsi.mu.Unlock()
  1068. if len(jsi.fcs) == 0 {
  1069. return
  1070. }
  1071. if reply := jsi.fcs[delivered]; reply != _EMPTY_ {
  1072. delete(jsi.fcs, delivered)
  1073. nc.Publish(reply, nil)
  1074. }
  1075. }
  1076. // Record an inbound flow control message.
  1077. func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) {
  1078. jsi.mu.Lock()
  1079. if jsi.fcs == nil {
  1080. jsi.fcs = make(map[uint64]string)
  1081. }
  1082. jsi.fcs[dfuture] = reply
  1083. jsi.mu.Unlock()
  1084. }
  1085. // handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
  1086. func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
  1087. nc.mu.Lock()
  1088. errCB := nc.Opts.AsyncErrorCB
  1089. if errCB != nil {
  1090. nc.ach.push(func() { errCB(nc, sub, err) })
  1091. }
  1092. nc.mu.Unlock()
  1093. }
  1094. // processControlFlow will automatically respond to control messages sent by the server.
  1095. func (nc *Conn) processSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
  1096. // Process heartbeat received, get latest control metadata if present.
  1097. jsi.mu.RLock()
  1098. ctrl := jsi.cmeta
  1099. jsi.mu.RUnlock()
  1100. if ctrl == _EMPTY_ {
  1101. return
  1102. }
  1103. tokens, err := getMetadataFields(ctrl)
  1104. if err != nil {
  1105. return
  1106. }
  1107. // Consumer sequence.
  1108. var ldseq string
  1109. dseq := tokens[6]
  1110. hdr := msg.Header[lastConsumerSeqHdr]
  1111. if len(hdr) == 1 {
  1112. ldseq = hdr[0]
  1113. }
  1114. // Detect consumer sequence mismatch and whether
  1115. // should restart the consumer.
  1116. if ldseq != dseq {
  1117. // Dispatch async error including details such as
  1118. // from where the consumer could be restarted.
  1119. sseq := parseNum(tokens[5])
  1120. ecs := &ErrConsumerSequenceMismatch{
  1121. StreamResumeSequence: uint64(sseq),
  1122. ConsumerSequence: uint64(parseNum(dseq)),
  1123. LastConsumerSequence: uint64(parseNum(ldseq)),
  1124. }
  1125. nc.handleConsumerSequenceMismatch(s, ecs)
  1126. }
  1127. }
  1128. type streamRequest struct {
  1129. Subject string `json:"subject,omitempty"`
  1130. }
  1131. type streamNamesResponse struct {
  1132. apiResponse
  1133. apiPaged
  1134. Streams []string `json:"streams"`
  1135. }
  1136. func (js *js) lookupStreamBySubject(subj string) (string, error) {
  1137. var slr streamNamesResponse
  1138. req := &streamRequest{subj}
  1139. j, err := json.Marshal(req)
  1140. if err != nil {
  1141. return _EMPTY_, err
  1142. }
  1143. resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.opts.wait)
  1144. if err != nil {
  1145. if err == ErrNoResponders {
  1146. err = ErrJetStreamNotEnabled
  1147. }
  1148. return _EMPTY_, err
  1149. }
  1150. if err := json.Unmarshal(resp.Data, &slr); err != nil {
  1151. return _EMPTY_, err
  1152. }
  1153. if slr.Error != nil || len(slr.Streams) != 1 {
  1154. return _EMPTY_, ErrNoMatchingStream
  1155. }
  1156. return slr.Streams[0], nil
  1157. }
  1158. type subOpts struct {
  1159. // For attaching.
  1160. stream, consumer string
  1161. // For manual ack
  1162. mack bool
  1163. // For creating or updating.
  1164. cfg *ConsumerConfig
  1165. }
  1166. // ManualAck disables auto ack functionality for async subscriptions.
  1167. func ManualAck() SubOpt {
  1168. return subOptFn(func(opts *subOpts) error {
  1169. opts.mack = true
  1170. return nil
  1171. })
  1172. }
  1173. // Durable defines the consumer name for JetStream durable subscribers.
  1174. func Durable(name string) SubOpt {
  1175. return subOptFn(func(opts *subOpts) error {
  1176. if opts.cfg.Durable != "" {
  1177. return fmt.Errorf("nats: option Durable set more than once")
  1178. }
  1179. if strings.Contains(name, ".") {
  1180. return ErrInvalidDurableName
  1181. }
  1182. opts.cfg.Durable = name
  1183. return nil
  1184. })
  1185. }
  1186. // DeliverAll will configure a Consumer to receive all the
  1187. // messages from a Stream.
  1188. func DeliverAll() SubOpt {
  1189. return subOptFn(func(opts *subOpts) error {
  1190. opts.cfg.DeliverPolicy = DeliverAllPolicy
  1191. return nil
  1192. })
  1193. }
  1194. // DeliverLast configures a Consumer to receive messages
  1195. // starting with the latest one.
  1196. func DeliverLast() SubOpt {
  1197. return subOptFn(func(opts *subOpts) error {
  1198. opts.cfg.DeliverPolicy = DeliverLastPolicy
  1199. return nil
  1200. })
  1201. }
  1202. // DeliverNew configures a Consumer to receive messages
  1203. // published after the subscription.
  1204. func DeliverNew() SubOpt {
  1205. return subOptFn(func(opts *subOpts) error {
  1206. opts.cfg.DeliverPolicy = DeliverNewPolicy
  1207. return nil
  1208. })
  1209. }
  1210. // StartSequence configures a Consumer to receive
  1211. // messages from a start sequence.
  1212. func StartSequence(seq uint64) SubOpt {
  1213. return subOptFn(func(opts *subOpts) error {
  1214. opts.cfg.DeliverPolicy = DeliverByStartSequencePolicy
  1215. opts.cfg.OptStartSeq = seq
  1216. return nil
  1217. })
  1218. }
  1219. // StartTime configures a Consumer to receive
  1220. // messages from a start time.
  1221. func StartTime(startTime time.Time) SubOpt {
  1222. return subOptFn(func(opts *subOpts) error {
  1223. opts.cfg.DeliverPolicy = DeliverByStartTimePolicy
  1224. opts.cfg.OptStartTime = &startTime
  1225. return nil
  1226. })
  1227. }
  1228. // AckNone requires no acks for delivered messages.
  1229. func AckNone() SubOpt {
  1230. return subOptFn(func(opts *subOpts) error {
  1231. opts.cfg.AckPolicy = AckNonePolicy
  1232. return nil
  1233. })
  1234. }
  1235. // AckAll when acking a sequence number, this implicitly acks all sequences
  1236. // below this one as well.
  1237. func AckAll() SubOpt {
  1238. return subOptFn(func(opts *subOpts) error {
  1239. opts.cfg.AckPolicy = AckAllPolicy
  1240. return nil
  1241. })
  1242. }
  1243. // AckExplicit requires ack or nack for all messages.
  1244. func AckExplicit() SubOpt {
  1245. return subOptFn(func(opts *subOpts) error {
  1246. opts.cfg.AckPolicy = AckExplicitPolicy
  1247. return nil
  1248. })
  1249. }
  1250. // MaxDeliver sets the number of redeliveries for a message.
  1251. func MaxDeliver(n int) SubOpt {
  1252. return subOptFn(func(opts *subOpts) error {
  1253. opts.cfg.MaxDeliver = n
  1254. return nil
  1255. })
  1256. }
  1257. // MaxAckPending sets the number of outstanding acks that are allowed before
  1258. // message delivery is halted.
  1259. func MaxAckPending(n int) SubOpt {
  1260. return subOptFn(func(opts *subOpts) error {
  1261. opts.cfg.MaxAckPending = n
  1262. return nil
  1263. })
  1264. }
  1265. // ReplayOriginal replays the messages at the original speed.
  1266. func ReplayOriginal() SubOpt {
  1267. return subOptFn(func(opts *subOpts) error {
  1268. opts.cfg.ReplayPolicy = ReplayOriginalPolicy
  1269. return nil
  1270. })
  1271. }
  1272. // RateLimit is the Bits per sec rate limit applied to a push consumer.
  1273. func RateLimit(n uint64) SubOpt {
  1274. return subOptFn(func(opts *subOpts) error {
  1275. opts.cfg.RateLimit = n
  1276. return nil
  1277. })
  1278. }
  1279. // BindStream binds a consumer to a stream explicitly based on a name.
  1280. func BindStream(name string) SubOpt {
  1281. return subOptFn(func(opts *subOpts) error {
  1282. opts.stream = name
  1283. return nil
  1284. })
  1285. }
  1286. // EnableFlowControl enables flow control for a push based consumer.
  1287. func EnableFlowControl() SubOpt {
  1288. return subOptFn(func(opts *subOpts) error {
  1289. opts.cfg.FlowControl = true
  1290. return nil
  1291. })
  1292. }
  1293. // IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
  1294. func IdleHeartbeat(duration time.Duration) SubOpt {
  1295. return subOptFn(func(opts *subOpts) error {
  1296. opts.cfg.Heartbeat = duration
  1297. return nil
  1298. })
  1299. }
  1300. func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
  1301. sub.mu.Lock()
  1302. // TODO(dlc) - Better way to mark especially if we attach.
  1303. if sub.jsi.consumer == _EMPTY_ {
  1304. sub.mu.Unlock()
  1305. return nil, ErrTypeSubscription
  1306. }
  1307. // Consumer info lookup should fail if in direct mode.
  1308. js := sub.jsi.js
  1309. stream, consumer := sub.jsi.stream, sub.jsi.consumer
  1310. sub.mu.Unlock()
  1311. return js.getConsumerInfo(stream, consumer)
  1312. }
  1313. type pullOpts struct {
  1314. ttl time.Duration
  1315. ctx context.Context
  1316. }
  1317. // PullOpt are the options that can be passed when pulling a batch of messages.
  1318. type PullOpt interface {
  1319. configurePull(opts *pullOpts) error
  1320. }
  1321. // PullMaxWaiting defines the max inflight pull requests.
  1322. func PullMaxWaiting(n int) SubOpt {
  1323. return subOptFn(func(opts *subOpts) error {
  1324. opts.cfg.MaxWaiting = n
  1325. return nil
  1326. })
  1327. }
  1328. var errNoMessages = errors.New("nats: no messages")
  1329. // Fetch pulls a batch of messages from a stream for a pull consumer.
  1330. func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
  1331. if sub == nil {
  1332. return nil, ErrBadSubscription
  1333. }
  1334. var o pullOpts
  1335. for _, opt := range opts {
  1336. if err := opt.configurePull(&o); err != nil {
  1337. return nil, err
  1338. }
  1339. }
  1340. if o.ctx != nil && o.ttl != 0 {
  1341. return nil, ErrContextAndTimeout
  1342. }
  1343. sub.mu.Lock()
  1344. jsi := sub.jsi
  1345. if jsi == nil || sub.typ != PullSubscription {
  1346. sub.mu.Unlock()
  1347. return nil, ErrTypeSubscription
  1348. }
  1349. nc, _ := sub.conn, sub.Subject
  1350. stream, consumer := sub.jsi.stream, sub.jsi.consumer
  1351. js := sub.jsi.js
  1352. ttl := o.ttl
  1353. if ttl == 0 {
  1354. ttl = js.opts.wait
  1355. }
  1356. sub.mu.Unlock()
  1357. // Use the given context or setup a default one for the span
  1358. // of the pull batch request.
  1359. var (
  1360. ctx = o.ctx
  1361. err error
  1362. cancel context.CancelFunc
  1363. )
  1364. if o.ctx == nil {
  1365. ctx, cancel = context.WithTimeout(context.Background(), ttl)
  1366. defer cancel()
  1367. }
  1368. // Check if context not done already before making the request.
  1369. select {
  1370. case <-ctx.Done():
  1371. if ctx.Err() == context.Canceled {
  1372. err = ctx.Err()
  1373. } else {
  1374. err = ErrTimeout
  1375. }
  1376. default:
  1377. }
  1378. if err != nil {
  1379. return nil, err
  1380. }
  1381. // Check for empty payload message and process synchronously
  1382. // any status messages.
  1383. checkMsg := func(msg *Msg) error {
  1384. if len(msg.Data) == 0 {
  1385. switch msg.Header.Get(statusHdr) {
  1386. case noResponders:
  1387. return ErrNoResponders
  1388. case noMessages:
  1389. return errNoMessages
  1390. case "400", "408", "409":
  1391. return fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
  1392. }
  1393. }
  1394. return nil
  1395. }
  1396. checkCtxErr := func(err error) error {
  1397. if o.ctx == nil && err == context.DeadlineExceeded {
  1398. return ErrTimeout
  1399. }
  1400. return err
  1401. }
  1402. var (
  1403. gotNoMessages bool
  1404. nr = &nextRequest{Batch: batch, NoWait: true}
  1405. req, _ = json.Marshal(nr)
  1406. reqNext = js.apiSubj(fmt.Sprintf(apiRequestNextT, stream, consumer))
  1407. expires = ttl - 10*time.Millisecond
  1408. msgs = make([]*Msg, 0)
  1409. )
  1410. if batch == 1 {
  1411. // To optimize single message no wait fetch, we use a shared wildcard
  1412. // subscription per pull subscriber to wait for the response.
  1413. resp, err := jsi.fetchNoWait(ctx, reqNext, req)
  1414. if err != nil {
  1415. return nil, checkCtxErr(err)
  1416. }
  1417. // In case of a no messages instant error, then fallback
  1418. // into longer version of pull batch request.
  1419. err = checkMsg(resp)
  1420. if err != nil {
  1421. if err == errNoMessages {
  1422. // Use old request style for the retry of the pull request
  1423. // in order to use auto UNSUB 1 to prevent the server
  1424. // from delivering a message when there is no more interest.
  1425. nr.NoWait = false
  1426. nr.Expires = expires
  1427. req, _ = json.Marshal(nr)
  1428. resp, err = nc.oldRequestWithContext(ctx, reqNext, nil, req)
  1429. if err != nil {
  1430. return nil, checkCtxErr(err)
  1431. }
  1432. // This next message, could also be an error
  1433. // (e.g. 408 due to request timeout).
  1434. err = checkMsg(resp)
  1435. if err != nil {
  1436. return nil, err
  1437. }
  1438. return []*Msg{resp}, nil
  1439. } else {
  1440. // Hard error
  1441. return nil, checkCtxErr(err)
  1442. }
  1443. }
  1444. return []*Msg{resp}, nil
  1445. }
  1446. // Setup a request where we will wait for the first response
  1447. // in case of errors, then dispatch the rest of the replies
  1448. // to the channel.
  1449. inbox := NewInbox()
  1450. mch := make(chan *Msg, batch)
  1451. s, err := nc.subscribe(inbox, _EMPTY_, nil, mch, true, nil)
  1452. if err != nil {
  1453. return nil, err
  1454. }
  1455. // Remove interest in the subscription at the end so that the
  1456. // this inbox does not get delivered the results intended
  1457. // for another request.
  1458. defer s.Unsubscribe()
  1459. // Make a publish request to get results of the pull.
  1460. err = nc.publish(reqNext, inbox, nil, req)
  1461. if err != nil {
  1462. s.Unsubscribe()
  1463. return nil, err
  1464. }
  1465. // Try to get the first message or error with NoWait.
  1466. var (
  1467. firstMsg *Msg
  1468. ok bool
  1469. )
  1470. select {
  1471. case firstMsg, ok = <-mch:
  1472. if !ok {
  1473. err = s.getNextMsgErr()
  1474. } else {
  1475. err = s.processNextMsgDelivered(firstMsg)
  1476. if err == nil {
  1477. err = checkMsg(firstMsg)
  1478. }
  1479. }
  1480. case <-ctx.Done():
  1481. err = checkCtxErr(ctx.Err())
  1482. }
  1483. // If the first error is 'no more messages', then switch into
  1484. // longer form version of the request that waits for messages.
  1485. if err == errNoMessages {
  1486. gotNoMessages = true
  1487. } else if err != nil {
  1488. // We should be getting the response from the server
  1489. // in case we got a poll error, so stop and cleanup.
  1490. s.Unsubscribe()
  1491. return nil, err
  1492. }
  1493. if gotNoMessages {
  1494. // We started with a 404 response right away, so fallback into
  1495. // second request that waits longer for messages to delivered.
  1496. nr.NoWait = false
  1497. nr.Expires = expires
  1498. req, _ = json.Marshal(nr)
  1499. // Since first message was an error we UNSUB (batch+1)
  1500. // since we are counting it as the first message.
  1501. err = s.AutoUnsubscribe(batch + 1)
  1502. if err != nil {
  1503. return nil, err
  1504. }
  1505. // Make another request and wait for the messages...
  1506. err = nc.publish(reqNext, inbox, nil, req)
  1507. if err != nil {
  1508. s.Unsubscribe()
  1509. return nil, err
  1510. }
  1511. // Try to get the first result again or return the error.
  1512. select {
  1513. case firstMsg, ok = <-mch:
  1514. if !ok {
  1515. err = s.getNextMsgErr()
  1516. } else {
  1517. err = s.processNextMsgDelivered(firstMsg)
  1518. if err == nil {
  1519. err = checkMsg(firstMsg)
  1520. }
  1521. }
  1522. case <-ctx.Done():
  1523. err = checkCtxErr(ctx.Err())
  1524. }
  1525. if err != nil {
  1526. s.Unsubscribe()
  1527. return nil, err
  1528. }
  1529. // Check again if the delivered next message is a status error.
  1530. err = checkMsg(firstMsg)
  1531. if err != nil {
  1532. s.Unsubscribe()
  1533. return nil, err
  1534. }
  1535. } else {
  1536. // We are receiving messages at this point. Send UNSUB to let
  1537. // the server clear interest once enough replies are delivered.
  1538. err = s.AutoUnsubscribe(batch)
  1539. if err != nil {
  1540. return nil, err
  1541. }
  1542. }
  1543. msgs = append(msgs, firstMsg)
  1544. for {
  1545. var (
  1546. msg *Msg
  1547. ok bool
  1548. )
  1549. select {
  1550. case msg, ok = <-mch:
  1551. if !ok {
  1552. err = s.getNextMsgErr()
  1553. } else {
  1554. err = s.processNextMsgDelivered(msg)
  1555. if err == nil {
  1556. err = checkMsg(msg)
  1557. }
  1558. }
  1559. case <-ctx.Done():
  1560. return msgs, checkCtxErr(err)
  1561. }
  1562. if err != nil {
  1563. // Discard the error which may have been a timeout
  1564. // or 408 request timeout status from the server,
  1565. // and just the return delivered messages.
  1566. break
  1567. }
  1568. if msg != nil {
  1569. msgs = append(msgs, msg)
  1570. }
  1571. if len(msgs) == batch {
  1572. // Done!
  1573. break
  1574. }
  1575. }
  1576. return msgs, nil
  1577. }
  1578. func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
  1579. ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait)
  1580. defer cancel()
  1581. return js.getConsumerInfoContext(ctx, stream, consumer)
  1582. }
  1583. func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) {
  1584. ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
  1585. resp, err := js.nc.RequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
  1586. if err != nil {
  1587. if err == ErrNoResponders {
  1588. err = ErrJetStreamNotEnabled
  1589. }
  1590. return nil, err
  1591. }
  1592. var info consumerResponse
  1593. if err := json.Unmarshal(resp.Data, &info); err != nil {
  1594. return nil, err
  1595. }
  1596. if info.Error != nil {
  1597. return nil, fmt.Errorf("nats: %s", info.Error.Description)
  1598. }
  1599. return info.ConsumerInfo, nil
  1600. }
  1601. func (m *Msg) checkReply() (*js, *jsSub, error) {
  1602. if m == nil || m.Sub == nil {
  1603. return nil, nil, ErrMsgNotBound
  1604. }
  1605. if m.Reply == "" {
  1606. return nil, nil, ErrMsgNoReply
  1607. }
  1608. sub := m.Sub
  1609. sub.mu.Lock()
  1610. if sub.jsi == nil {
  1611. sub.mu.Unlock()
  1612. // Not using a JS context.
  1613. return nil, nil, nil
  1614. }
  1615. js := sub.jsi.js
  1616. jsi := sub.jsi
  1617. sub.mu.Unlock()
  1618. return js, jsi, nil
  1619. }
  1620. // ackReply handles all acks. Will do the right thing for pull and sync mode.
  1621. // It ensures that an ack is only sent a single time, regardless of
  1622. // how many times it is being called to avoid duplicated acks.
  1623. func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
  1624. var o ackOpts
  1625. for _, opt := range opts {
  1626. if err := opt.configureAck(&o); err != nil {
  1627. return err
  1628. }
  1629. }
  1630. js, _, err := m.checkReply()
  1631. if err != nil {
  1632. return err
  1633. }
  1634. // Skip if already acked.
  1635. if atomic.LoadUint32(&m.ackd) == 1 {
  1636. return ErrInvalidJSAck
  1637. }
  1638. m.Sub.mu.Lock()
  1639. nc := m.Sub.conn
  1640. m.Sub.mu.Unlock()
  1641. usesCtx := o.ctx != nil
  1642. usesWait := o.ttl > 0
  1643. sync = sync || usesCtx || usesWait
  1644. ctx := o.ctx
  1645. wait := defaultRequestWait
  1646. if usesWait {
  1647. wait = o.ttl
  1648. } else if js != nil {
  1649. wait = js.opts.wait
  1650. }
  1651. if sync {
  1652. if usesCtx {
  1653. _, err = nc.RequestWithContext(ctx, m.Reply, ackType)
  1654. } else {
  1655. _, err = nc.Request(m.Reply, ackType, wait)
  1656. }
  1657. } else {
  1658. err = nc.Publish(m.Reply, ackType)
  1659. }
  1660. // Mark that the message has been acked unless it is AckProgress
  1661. // which can be sent many times.
  1662. if err == nil && !bytes.Equal(ackType, ackProgress) {
  1663. atomic.StoreUint32(&m.ackd, 1)
  1664. }
  1665. return err
  1666. }
  1667. // Ack acknowledges a message. This tells the server that the message was
  1668. // successfully processed and it can move on to the next message.
  1669. func (m *Msg) Ack(opts ...AckOpt) error {
  1670. return m.ackReply(ackAck, false, opts...)
  1671. }
  1672. // Ack is the synchronous version of Ack. This indicates successful message
  1673. // processing.
  1674. func (m *Msg) AckSync(opts ...AckOpt) error {
  1675. return m.ackReply(ackAck, true, opts...)
  1676. }
  1677. // Nak negatively acknowledges a message. This tells the server to redeliver
  1678. // the message. You can configure the number of redeliveries by passing
  1679. // nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
  1680. func (m *Msg) Nak(opts ...AckOpt) error {
  1681. return m.ackReply(ackNak, false, opts...)
  1682. }
  1683. // Term tells the server to not redeliver this message, regardless of the value
  1684. // of nats.MaxDeliver.
  1685. func (m *Msg) Term(opts ...AckOpt) error {
  1686. return m.ackReply(ackTerm, false, opts...)
  1687. }
  1688. // InProgress tells the server that this message is being worked on. It resets
  1689. // the redelivery timer on the server.
  1690. func (m *Msg) InProgress(opts ...AckOpt) error {
  1691. return m.ackReply(ackProgress, false, opts...)
  1692. }
  1693. // MsgMetadata is the JetStream metadata associated with received messages.
  1694. type MsgMetadata struct {
  1695. Sequence SequencePair
  1696. NumDelivered uint64
  1697. NumPending uint64
  1698. Timestamp time.Time
  1699. Stream string
  1700. Consumer string
  1701. }
  1702. func getMetadataFields(subject string) ([]string, error) {
  1703. const expectedTokens = 9
  1704. const btsep = '.'
  1705. tsa := [expectedTokens]string{}
  1706. start, tokens := 0, tsa[:0]
  1707. for i := 0; i < len(subject); i++ {
  1708. if subject[i] == btsep {
  1709. tokens = append(tokens, subject[start:i])
  1710. start = i + 1
  1711. }
  1712. }
  1713. tokens = append(tokens, subject[start:])
  1714. if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
  1715. return nil, ErrNotJSMessage
  1716. }
  1717. return tokens, nil
  1718. }
  1719. // Metadata retrieves the metadata from a JetStream message. This method will
  1720. // return an error for non-JetStream Msgs.
  1721. func (m *Msg) Metadata() (*MsgMetadata, error) {
  1722. if _, _, err := m.checkReply(); err != nil {
  1723. return nil, err
  1724. }
  1725. tokens, err := getMetadataFields(m.Reply)
  1726. if err != nil {
  1727. return nil, err
  1728. }
  1729. meta := &MsgMetadata{
  1730. NumDelivered: uint64(parseNum(tokens[4])),
  1731. NumPending: uint64(parseNum(tokens[8])),
  1732. Timestamp: time.Unix(0, parseNum(tokens[7])),
  1733. Stream: tokens[2],
  1734. Consumer: tokens[3],
  1735. }
  1736. meta.Sequence.Stream = uint64(parseNum(tokens[5]))
  1737. meta.Sequence.Consumer = uint64(parseNum(tokens[6]))
  1738. return meta, nil
  1739. }
  1740. // Quick parser for positive numbers in ack reply encoding.
  1741. func parseNum(d string) (n int64) {
  1742. if len(d) == 0 {
  1743. return -1
  1744. }
  1745. // Ascii numbers 0-9
  1746. const (
  1747. asciiZero = 48
  1748. asciiNine = 57
  1749. )
  1750. for _, dec := range d {
  1751. if dec < asciiZero || dec > asciiNine {
  1752. return -1
  1753. }
  1754. n = n*10 + (int64(dec) - asciiZero)
  1755. }
  1756. return n
  1757. }
  1758. // AckPolicy determines how the consumer should acknowledge delivered messages.
  1759. type AckPolicy int
  1760. const (
  1761. // AckNonePolicy requires no acks for delivered messages.
  1762. AckNonePolicy AckPolicy = iota
  1763. // AckAllPolicy when acking a sequence number, this implicitly acks all
  1764. // sequences below this one as well.
  1765. AckAllPolicy
  1766. // AckExplicitPolicy requires ack or nack for all messages.
  1767. AckExplicitPolicy
  1768. // For setting
  1769. ackPolicyNotSet = 99
  1770. )
  1771. func jsonString(s string) string {
  1772. return "\"" + s + "\""
  1773. }
  1774. func (p *AckPolicy) UnmarshalJSON(data []byte) error {
  1775. switch string(data) {
  1776. case jsonString("none"):
  1777. *p = AckNonePolicy
  1778. case jsonString("all"):
  1779. *p = AckAllPolicy
  1780. case jsonString("explicit"):
  1781. *p = AckExplicitPolicy
  1782. default:
  1783. return fmt.Errorf("nats: can not unmarshal %q", data)
  1784. }
  1785. return nil
  1786. }
  1787. func (p AckPolicy) MarshalJSON() ([]byte, error) {
  1788. switch p {
  1789. case AckNonePolicy:
  1790. return json.Marshal("none")
  1791. case AckAllPolicy:
  1792. return json.Marshal("all")
  1793. case AckExplicitPolicy:
  1794. return json.Marshal("explicit")
  1795. default:
  1796. return nil, fmt.Errorf("nats: unknown acknowlegement policy %v", p)
  1797. }
  1798. }
  1799. func (p AckPolicy) String() string {
  1800. switch p {
  1801. case AckNonePolicy:
  1802. return "AckNone"
  1803. case AckAllPolicy:
  1804. return "AckAll"
  1805. case AckExplicitPolicy:
  1806. return "AckExplicit"
  1807. case ackPolicyNotSet:
  1808. return "Not Initialized"
  1809. default:
  1810. return "Unknown AckPolicy"
  1811. }
  1812. }
  1813. // ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
  1814. type ReplayPolicy int
  1815. const (
  1816. // ReplayInstantPolicy will replay messages as fast as possible.
  1817. ReplayInstantPolicy ReplayPolicy = iota
  1818. // ReplayOriginalPolicy will maintain the same timing as the messages were received.
  1819. ReplayOriginalPolicy
  1820. )
  1821. func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
  1822. switch string(data) {
  1823. case jsonString("instant"):
  1824. *p = ReplayInstantPolicy
  1825. case jsonString("original"):
  1826. *p = ReplayOriginalPolicy
  1827. default:
  1828. return fmt.Errorf("nats: can not unmarshal %q", data)
  1829. }
  1830. return nil
  1831. }
  1832. func (p ReplayPolicy) MarshalJSON() ([]byte, error) {
  1833. switch p {
  1834. case ReplayOriginalPolicy:
  1835. return json.Marshal("original")
  1836. case ReplayInstantPolicy:
  1837. return json.Marshal("instant")
  1838. default:
  1839. return nil, fmt.Errorf("nats: unknown replay policy %v", p)
  1840. }
  1841. }
  1842. var (
  1843. ackAck = []byte("+ACK")
  1844. ackNak = []byte("-NAK")
  1845. ackProgress = []byte("+WPI")
  1846. ackTerm = []byte("+TERM")
  1847. )
  1848. // DeliverPolicy determines how the consumer should select the first message to deliver.
  1849. type DeliverPolicy int
  1850. const (
  1851. // DeliverAllPolicy starts delivering messages from the very beginning of a
  1852. // stream. This is the default.
  1853. DeliverAllPolicy DeliverPolicy = iota
  1854. // DeliverLastPolicy will start the consumer with the last sequence
  1855. // received.
  1856. DeliverLastPolicy
  1857. // DeliverNewPolicy will only deliver new messages that are sent after the
  1858. // consumer is created.
  1859. DeliverNewPolicy
  1860. // DeliverByStartTimePolicy will deliver messages starting from a given
  1861. // sequence.
  1862. DeliverByStartSequencePolicy
  1863. // DeliverByStartTimePolicy will deliver messages starting from a given
  1864. // time.
  1865. DeliverByStartTimePolicy
  1866. )
  1867. func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
  1868. switch string(data) {
  1869. case jsonString("all"), jsonString("undefined"):
  1870. *p = DeliverAllPolicy
  1871. case jsonString("last"):
  1872. *p = DeliverLastPolicy
  1873. case jsonString("new"):
  1874. *p = DeliverNewPolicy
  1875. case jsonString("by_start_sequence"):
  1876. *p = DeliverByStartSequencePolicy
  1877. case jsonString("by_start_time"):
  1878. *p = DeliverByStartTimePolicy
  1879. }
  1880. return nil
  1881. }
  1882. func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
  1883. switch p {
  1884. case DeliverAllPolicy:
  1885. return json.Marshal("all")
  1886. case DeliverLastPolicy:
  1887. return json.Marshal("last")
  1888. case DeliverNewPolicy:
  1889. return json.Marshal("new")
  1890. case DeliverByStartSequencePolicy:
  1891. return json.Marshal("by_start_sequence")
  1892. case DeliverByStartTimePolicy:
  1893. return json.Marshal("by_start_time")
  1894. default:
  1895. return nil, fmt.Errorf("nats: unknown deliver policy %v", p)
  1896. }
  1897. }
  1898. // RetentionPolicy determines how messages in a set are retained.
  1899. type RetentionPolicy int
  1900. const (
  1901. // LimitsPolicy (default) means that messages are retained until any given limit is reached.
  1902. // This could be one of MaxMsgs, MaxBytes, or MaxAge.
  1903. LimitsPolicy RetentionPolicy = iota
  1904. // InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
  1905. InterestPolicy
  1906. // WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
  1907. WorkQueuePolicy
  1908. )
  1909. // DiscardPolicy determines how to proceed when limits of messages or bytes are
  1910. // reached.
  1911. type DiscardPolicy int
  1912. const (
  1913. // DiscardOld will remove older messages to return to the limits. This is
  1914. // the default.
  1915. DiscardOld DiscardPolicy = iota
  1916. //DiscardNew will fail to store new messages.
  1917. DiscardNew
  1918. )
  1919. const (
  1920. limitsPolicyString = "limits"
  1921. interestPolicyString = "interest"
  1922. workQueuePolicyString = "workqueue"
  1923. )
  1924. func (rp RetentionPolicy) String() string {
  1925. switch rp {
  1926. case LimitsPolicy:
  1927. return "Limits"
  1928. case InterestPolicy:
  1929. return "Interest"
  1930. case WorkQueuePolicy:
  1931. return "WorkQueue"
  1932. default:
  1933. return "Unknown Retention Policy"
  1934. }
  1935. }
  1936. func (rp RetentionPolicy) MarshalJSON() ([]byte, error) {
  1937. switch rp {
  1938. case LimitsPolicy:
  1939. return json.Marshal(limitsPolicyString)
  1940. case InterestPolicy:
  1941. return json.Marshal(interestPolicyString)
  1942. case WorkQueuePolicy:
  1943. return json.Marshal(workQueuePolicyString)
  1944. default:
  1945. return nil, fmt.Errorf("nats: can not marshal %v", rp)
  1946. }
  1947. }
  1948. func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
  1949. switch string(data) {
  1950. case jsonString(limitsPolicyString):
  1951. *rp = LimitsPolicy
  1952. case jsonString(interestPolicyString):
  1953. *rp = InterestPolicy
  1954. case jsonString(workQueuePolicyString):
  1955. *rp = WorkQueuePolicy
  1956. default:
  1957. return fmt.Errorf("nats: can not unmarshal %q", data)
  1958. }
  1959. return nil
  1960. }
  1961. func (dp DiscardPolicy) String() string {
  1962. switch dp {
  1963. case DiscardOld:
  1964. return "DiscardOld"
  1965. case DiscardNew:
  1966. return "DiscardNew"
  1967. default:
  1968. return "Unknown Discard Policy"
  1969. }
  1970. }
  1971. func (dp DiscardPolicy) MarshalJSON() ([]byte, error) {
  1972. switch dp {
  1973. case DiscardOld:
  1974. return json.Marshal("old")
  1975. case DiscardNew:
  1976. return json.Marshal("new")
  1977. default:
  1978. return nil, fmt.Errorf("nats: can not marshal %v", dp)
  1979. }
  1980. }
  1981. func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
  1982. switch strings.ToLower(string(data)) {
  1983. case jsonString("old"):
  1984. *dp = DiscardOld
  1985. case jsonString("new"):
  1986. *dp = DiscardNew
  1987. default:
  1988. return fmt.Errorf("nats: can not unmarshal %q", data)
  1989. }
  1990. return nil
  1991. }
  1992. // StorageType determines how messages are stored for retention.
  1993. type StorageType int
  1994. const (
  1995. // FileStorage specifies on disk storage. It's the default.
  1996. FileStorage StorageType = iota
  1997. // MemoryStorage specifies in memory only.
  1998. MemoryStorage
  1999. )
  2000. const (
  2001. memoryStorageString = "memory"
  2002. fileStorageString = "file"
  2003. )
  2004. func (st StorageType) String() string {
  2005. switch st {
  2006. case MemoryStorage:
  2007. return strings.Title(memoryStorageString)
  2008. case FileStorage:
  2009. return strings.Title(fileStorageString)
  2010. default:
  2011. return "Unknown Storage Type"
  2012. }
  2013. }
  2014. func (st StorageType) MarshalJSON() ([]byte, error) {
  2015. switch st {
  2016. case MemoryStorage:
  2017. return json.Marshal(memoryStorageString)
  2018. case FileStorage:
  2019. return json.Marshal(fileStorageString)
  2020. default:
  2021. return nil, fmt.Errorf("nats: can not marshal %v", st)
  2022. }
  2023. }
  2024. func (st *StorageType) UnmarshalJSON(data []byte) error {
  2025. switch string(data) {
  2026. case jsonString(memoryStorageString):
  2027. *st = MemoryStorage
  2028. case jsonString(fileStorageString):
  2029. *st = FileStorage
  2030. default:
  2031. return fmt.Errorf("nats: can not unmarshal %q", data)
  2032. }
  2033. return nil
  2034. }