jsm.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077
  1. // Copyright 2021 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "context"
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "strings"
  20. "time"
  21. )
  22. // JetStreamManager manages JetStream Streams and Consumers.
  23. type JetStreamManager interface {
  24. // AddStream creates a stream.
  25. AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
  26. // UpdateStream updates a stream.
  27. UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
  28. // DeleteStream deletes a stream.
  29. DeleteStream(name string, opts ...JSOpt) error
  30. // StreamInfo retrieves information from a stream.
  31. StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error)
  32. // PurgeStream purges a stream messages.
  33. PurgeStream(name string, opts ...JSOpt) error
  34. // StreamsInfo can be used to retrieve a list of StreamInfo objects.
  35. StreamsInfo(opts ...JSOpt) <-chan *StreamInfo
  36. // StreamNames is used to retrieve a list of Stream names.
  37. StreamNames(opts ...JSOpt) <-chan string
  38. // GetMsg retrieves a raw stream message stored in JetStream by sequence number.
  39. GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error)
  40. // DeleteMsg erases a message from a stream.
  41. DeleteMsg(name string, seq uint64, opts ...JSOpt) error
  42. // AddConsumer adds a consumer to a stream.
  43. AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)
  44. // DeleteConsumer deletes a consumer.
  45. DeleteConsumer(stream, consumer string, opts ...JSOpt) error
  46. // ConsumerInfo retrieves information of a consumer from a stream.
  47. ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error)
  48. // ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
  49. ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo
  50. // ConsumerNames is used to retrieve a list of Consumer names.
  51. ConsumerNames(stream string, opts ...JSOpt) <-chan string
  52. // AccountInfo retrieves info about the JetStream usage from an account.
  53. AccountInfo(opts ...JSOpt) (*AccountInfo, error)
  54. }
  55. // StreamConfig will determine the properties for a stream.
  56. // There are sensible defaults for most. If no subjects are
  57. // given the name will be used as the only subject.
  58. type StreamConfig struct {
  59. Name string `json:"name"`
  60. Subjects []string `json:"subjects,omitempty"`
  61. Retention RetentionPolicy `json:"retention"`
  62. MaxConsumers int `json:"max_consumers"`
  63. MaxMsgs int64 `json:"max_msgs"`
  64. MaxBytes int64 `json:"max_bytes"`
  65. Discard DiscardPolicy `json:"discard"`
  66. MaxAge time.Duration `json:"max_age"`
  67. MaxMsgSize int32 `json:"max_msg_size,omitempty"`
  68. Storage StorageType `json:"storage"`
  69. Replicas int `json:"num_replicas"`
  70. NoAck bool `json:"no_ack,omitempty"`
  71. Template string `json:"template_owner,omitempty"`
  72. Duplicates time.Duration `json:"duplicate_window,omitempty"`
  73. Placement *Placement `json:"placement,omitempty"`
  74. Mirror *StreamSource `json:"mirror,omitempty"`
  75. Sources []*StreamSource `json:"sources,omitempty"`
  76. }
  77. // Placement is used to guide placement of streams in clustered JetStream.
  78. type Placement struct {
  79. Cluster string `json:"cluster"`
  80. Tags []string `json:"tags,omitempty"`
  81. }
  82. // StreamSource dictates how streams can source from other streams.
  83. type StreamSource struct {
  84. Name string `json:"name"`
  85. OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
  86. OptStartTime *time.Time `json:"opt_start_time,omitempty"`
  87. FilterSubject string `json:"filter_subject,omitempty"`
  88. External *ExternalStream `json:"external,omitempty"`
  89. }
  90. // ExternalStream allows you to qualify access to a stream source in another
  91. // account.
  92. type ExternalStream struct {
  93. APIPrefix string `json:"api"`
  94. DeliverPrefix string `json:"deliver"`
  95. }
  96. // apiError is included in all API responses if there was an error.
  97. type apiError struct {
  98. Code int `json:"code"`
  99. Description string `json:"description,omitempty"`
  100. }
  101. // apiResponse is a standard response from the JetStream JSON API
  102. type apiResponse struct {
  103. Type string `json:"type"`
  104. Error *apiError `json:"error,omitempty"`
  105. }
  106. // apiPaged includes variables used to create paged responses from the JSON API
  107. type apiPaged struct {
  108. Total int `json:"total"`
  109. Offset int `json:"offset"`
  110. Limit int `json:"limit"`
  111. }
  112. // apiPagedRequest includes parameters allowing specific pages to be requested
  113. // from APIs responding with apiPaged.
  114. type apiPagedRequest struct {
  115. Offset int `json:"offset"`
  116. }
  117. // AccountInfo contains info about the JetStream usage from the current account.
  118. type AccountInfo struct {
  119. Memory uint64 `json:"memory"`
  120. Store uint64 `json:"storage"`
  121. Streams int `json:"streams"`
  122. Consumers int `json:"consumers"`
  123. API APIStats `json:"api"`
  124. Limits AccountLimits `json:"limits"`
  125. }
  126. // APIStats reports on API calls to JetStream for this account.
  127. type APIStats struct {
  128. Total uint64 `json:"total"`
  129. Errors uint64 `json:"errors"`
  130. }
  131. // AccountLimits includes the JetStream limits of the current account.
  132. type AccountLimits struct {
  133. MaxMemory int64 `json:"max_memory"`
  134. MaxStore int64 `json:"max_storage"`
  135. MaxStreams int `json:"max_streams"`
  136. MaxConsumers int `json:"max_consumers"`
  137. }
  138. type accountInfoResponse struct {
  139. apiResponse
  140. AccountInfo
  141. }
  142. // AccountInfo retrieves info about the JetStream usage from the current account.
  143. func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
  144. o, cancel, err := getJSContextOpts(js.opts, opts...)
  145. if err != nil {
  146. return nil, err
  147. }
  148. if cancel != nil {
  149. defer cancel()
  150. }
  151. resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
  152. if err != nil {
  153. return nil, err
  154. }
  155. var info accountInfoResponse
  156. if err := json.Unmarshal(resp.Data, &info); err != nil {
  157. return nil, err
  158. }
  159. if info.Error != nil {
  160. var err error
  161. if strings.Contains(info.Error.Description, "not enabled for") {
  162. err = ErrJetStreamNotEnabled
  163. } else {
  164. err = errors.New(info.Error.Description)
  165. }
  166. return nil, err
  167. }
  168. return &info.AccountInfo, nil
  169. }
  170. type createConsumerRequest struct {
  171. Stream string `json:"stream_name"`
  172. Config *ConsumerConfig `json:"config"`
  173. }
  174. type consumerResponse struct {
  175. apiResponse
  176. *ConsumerInfo
  177. }
  178. // AddConsumer will add a JetStream consumer.
  179. func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
  180. o, cancel, err := getJSContextOpts(js.opts, opts...)
  181. if err != nil {
  182. return nil, err
  183. }
  184. if cancel != nil {
  185. defer cancel()
  186. }
  187. if stream == _EMPTY_ {
  188. return nil, ErrStreamNameRequired
  189. }
  190. req, err := json.Marshal(&createConsumerRequest{Stream: stream, Config: cfg})
  191. if err != nil {
  192. return nil, err
  193. }
  194. var ccSubj string
  195. if cfg != nil && cfg.Durable != _EMPTY_ {
  196. if strings.Contains(cfg.Durable, ".") {
  197. return nil, ErrInvalidDurableName
  198. }
  199. ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
  200. } else {
  201. ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
  202. }
  203. resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
  204. if err != nil {
  205. if err == ErrNoResponders {
  206. err = ErrJetStreamNotEnabled
  207. }
  208. return nil, err
  209. }
  210. var info consumerResponse
  211. err = json.Unmarshal(resp.Data, &info)
  212. if err != nil {
  213. return nil, err
  214. }
  215. if info.Error != nil {
  216. return nil, errors.New(info.Error.Description)
  217. }
  218. return info.ConsumerInfo, nil
  219. }
  220. // consumerDeleteResponse is the response for a Consumer delete request.
  221. type consumerDeleteResponse struct {
  222. apiResponse
  223. Success bool `json:"success,omitempty"`
  224. }
  225. // DeleteConsumer deletes a Consumer.
  226. func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
  227. o, cancel, err := getJSContextOpts(js.opts, opts...)
  228. if err != nil {
  229. return err
  230. }
  231. if cancel != nil {
  232. defer cancel()
  233. }
  234. if stream == _EMPTY_ {
  235. return ErrStreamNameRequired
  236. }
  237. dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
  238. r, err := js.nc.RequestWithContext(o.ctx, dcSubj, nil)
  239. if err != nil {
  240. return err
  241. }
  242. var resp consumerDeleteResponse
  243. if err := json.Unmarshal(r.Data, &resp); err != nil {
  244. return err
  245. }
  246. if resp.Error != nil {
  247. return errors.New(resp.Error.Description)
  248. }
  249. return nil
  250. }
  251. // ConsumerInfo returns information about a Consumer.
  252. func (js *js) ConsumerInfo(stream, consumer string, opts ...JSOpt) (*ConsumerInfo, error) {
  253. o, cancel, err := getJSContextOpts(js.opts, opts...)
  254. if err != nil {
  255. return nil, err
  256. }
  257. if cancel != nil {
  258. defer cancel()
  259. }
  260. return js.getConsumerInfoContext(o.ctx, stream, consumer)
  261. }
  262. // consumerLister fetches pages of ConsumerInfo objects. This object is not
  263. // safe to use for multiple threads.
  264. type consumerLister struct {
  265. stream string
  266. js *js
  267. err error
  268. offset int
  269. page []*ConsumerInfo
  270. pageInfo *apiPaged
  271. }
  272. // consumersRequest is the type used for Consumers requests.
  273. type consumersRequest struct {
  274. apiPagedRequest
  275. }
  276. // consumerListResponse is the response for a Consumers List request.
  277. type consumerListResponse struct {
  278. apiResponse
  279. apiPaged
  280. Consumers []*ConsumerInfo `json:"consumers"`
  281. }
  282. // Next fetches the next ConsumerInfo page.
  283. func (c *consumerLister) Next() bool {
  284. if c.err != nil {
  285. return false
  286. }
  287. if c.stream == _EMPTY_ {
  288. c.err = ErrStreamNameRequired
  289. return false
  290. }
  291. if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
  292. return false
  293. }
  294. req, err := json.Marshal(consumersRequest{
  295. apiPagedRequest: apiPagedRequest{Offset: c.offset},
  296. })
  297. if err != nil {
  298. c.err = err
  299. return false
  300. }
  301. var cancel context.CancelFunc
  302. ctx := c.js.opts.ctx
  303. if ctx == nil {
  304. ctx, cancel = context.WithTimeout(context.Background(), c.js.opts.wait)
  305. defer cancel()
  306. }
  307. clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerListT, c.stream))
  308. r, err := c.js.nc.RequestWithContext(ctx, clSubj, req)
  309. if err != nil {
  310. c.err = err
  311. return false
  312. }
  313. var resp consumerListResponse
  314. if err := json.Unmarshal(r.Data, &resp); err != nil {
  315. c.err = err
  316. return false
  317. }
  318. if resp.Error != nil {
  319. c.err = errors.New(resp.Error.Description)
  320. return false
  321. }
  322. c.pageInfo = &resp.apiPaged
  323. c.page = resp.Consumers
  324. c.offset += len(c.page)
  325. return true
  326. }
  327. // Page returns the current ConsumerInfo page.
  328. func (c *consumerLister) Page() []*ConsumerInfo {
  329. return c.page
  330. }
  331. // Err returns any errors found while fetching pages.
  332. func (c *consumerLister) Err() error {
  333. return c.err
  334. }
  335. // ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
  336. func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
  337. o, cancel, err := getJSContextOpts(jsc.opts, opts...)
  338. if err != nil {
  339. return nil
  340. }
  341. ch := make(chan *ConsumerInfo)
  342. l := &consumerLister{js: &js{nc: jsc.nc, opts: o}, stream: stream}
  343. go func() {
  344. if cancel != nil {
  345. defer cancel()
  346. }
  347. defer close(ch)
  348. for l.Next() {
  349. for _, info := range l.Page() {
  350. select {
  351. case ch <- info:
  352. case <-o.ctx.Done():
  353. return
  354. }
  355. }
  356. }
  357. }()
  358. return ch
  359. }
  360. type consumerNamesLister struct {
  361. stream string
  362. js *js
  363. err error
  364. offset int
  365. page []string
  366. pageInfo *apiPaged
  367. }
  368. // consumerNamesListResponse is the response for a Consumers Names List request.
  369. type consumerNamesListResponse struct {
  370. apiResponse
  371. apiPaged
  372. Consumers []string `json:"consumers"`
  373. }
  374. // Next fetches the next ConsumerInfo page.
  375. func (c *consumerNamesLister) Next() bool {
  376. if c.err != nil {
  377. return false
  378. }
  379. if c.stream == _EMPTY_ {
  380. c.err = ErrStreamNameRequired
  381. return false
  382. }
  383. if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
  384. return false
  385. }
  386. var cancel context.CancelFunc
  387. ctx := c.js.opts.ctx
  388. if ctx == nil {
  389. ctx, cancel = context.WithTimeout(context.Background(), c.js.opts.wait)
  390. defer cancel()
  391. }
  392. clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream))
  393. r, err := c.js.nc.RequestWithContext(ctx, clSubj, nil)
  394. if err != nil {
  395. c.err = err
  396. return false
  397. }
  398. var resp consumerNamesListResponse
  399. if err := json.Unmarshal(r.Data, &resp); err != nil {
  400. c.err = err
  401. return false
  402. }
  403. if resp.Error != nil {
  404. c.err = errors.New(resp.Error.Description)
  405. return false
  406. }
  407. c.pageInfo = &resp.apiPaged
  408. c.page = resp.Consumers
  409. c.offset += len(c.page)
  410. return true
  411. }
  412. // Page returns the current ConsumerInfo page.
  413. func (c *consumerNamesLister) Page() []string {
  414. return c.page
  415. }
  416. // Err returns any errors found while fetching pages.
  417. func (c *consumerNamesLister) Err() error {
  418. return c.err
  419. }
  420. // ConsumerNames is used to retrieve a list of Consumer names.
  421. func (jsc *js) ConsumerNames(stream string, opts ...JSOpt) <-chan string {
  422. o, cancel, err := getJSContextOpts(jsc.opts, opts...)
  423. if err != nil {
  424. return nil
  425. }
  426. ch := make(chan string)
  427. l := &consumerNamesLister{stream: stream, js: &js{nc: jsc.nc, opts: o}}
  428. go func() {
  429. if cancel != nil {
  430. defer cancel()
  431. }
  432. defer close(ch)
  433. for l.Next() {
  434. for _, info := range l.Page() {
  435. select {
  436. case ch <- info:
  437. case <-o.ctx.Done():
  438. return
  439. }
  440. }
  441. }
  442. }()
  443. return ch
  444. }
  445. // streamCreateResponse stream creation.
  446. type streamCreateResponse struct {
  447. apiResponse
  448. *StreamInfo
  449. }
  450. func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
  451. o, cancel, err := getJSContextOpts(js.opts, opts...)
  452. if err != nil {
  453. return nil, err
  454. }
  455. if cancel != nil {
  456. defer cancel()
  457. }
  458. if cfg == nil || cfg.Name == _EMPTY_ {
  459. return nil, ErrStreamNameRequired
  460. }
  461. if strings.Contains(cfg.Name, ".") {
  462. return nil, ErrInvalidStreamName
  463. }
  464. req, err := json.Marshal(cfg)
  465. if err != nil {
  466. return nil, err
  467. }
  468. csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name))
  469. r, err := js.nc.RequestWithContext(o.ctx, csSubj, req)
  470. if err != nil {
  471. return nil, err
  472. }
  473. var resp streamCreateResponse
  474. if err := json.Unmarshal(r.Data, &resp); err != nil {
  475. return nil, err
  476. }
  477. if resp.Error != nil {
  478. return nil, errors.New(resp.Error.Description)
  479. }
  480. return resp.StreamInfo, nil
  481. }
  482. type streamInfoResponse = streamCreateResponse
  483. func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
  484. if strings.Contains(stream, ".") {
  485. return nil, ErrInvalidStreamName
  486. }
  487. o, cancel, err := getJSContextOpts(js.opts, opts...)
  488. if err != nil {
  489. return nil, err
  490. }
  491. if cancel != nil {
  492. defer cancel()
  493. }
  494. csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))
  495. r, err := js.nc.RequestWithContext(o.ctx, csSubj, nil)
  496. if err != nil {
  497. return nil, err
  498. }
  499. var resp streamInfoResponse
  500. if err := json.Unmarshal(r.Data, &resp); err != nil {
  501. return nil, err
  502. }
  503. if resp.Error != nil {
  504. return nil, errors.New(resp.Error.Description)
  505. }
  506. return resp.StreamInfo, nil
  507. }
  508. // StreamInfo shows config and current state for this stream.
  509. type StreamInfo struct {
  510. Config StreamConfig `json:"config"`
  511. Created time.Time `json:"created"`
  512. State StreamState `json:"state"`
  513. Cluster *ClusterInfo `json:"cluster,omitempty"`
  514. Mirror *StreamSourceInfo `json:"mirror,omitempty"`
  515. Sources []*StreamSourceInfo `json:"sources,omitempty"`
  516. }
  517. // StreamSourceInfo shows information about an upstream stream source.
  518. type StreamSourceInfo struct {
  519. Name string `json:"name"`
  520. Lag uint64 `json:"lag"`
  521. Active time.Duration `json:"active"`
  522. }
  523. // StreamState is information about the given stream.
  524. type StreamState struct {
  525. Msgs uint64 `json:"messages"`
  526. Bytes uint64 `json:"bytes"`
  527. FirstSeq uint64 `json:"first_seq"`
  528. FirstTime time.Time `json:"first_ts"`
  529. LastSeq uint64 `json:"last_seq"`
  530. LastTime time.Time `json:"last_ts"`
  531. Consumers int `json:"consumer_count"`
  532. }
  533. // ClusterInfo shows information about the underlying set of servers
  534. // that make up the stream or consumer.
  535. type ClusterInfo struct {
  536. Name string `json:"name,omitempty"`
  537. Leader string `json:"leader,omitempty"`
  538. Replicas []*PeerInfo `json:"replicas,omitempty"`
  539. }
  540. // PeerInfo shows information about all the peers in the cluster that
  541. // are supporting the stream or consumer.
  542. type PeerInfo struct {
  543. Name string `json:"name"`
  544. Current bool `json:"current"`
  545. Offline bool `json:"offline,omitempty"`
  546. Active time.Duration `json:"active"`
  547. Lag uint64 `json:"lag,omitempty"`
  548. }
  549. // UpdateStream updates a Stream.
  550. func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
  551. o, cancel, err := getJSContextOpts(js.opts, opts...)
  552. if err != nil {
  553. return nil, err
  554. }
  555. if cancel != nil {
  556. defer cancel()
  557. }
  558. if cfg == nil || cfg.Name == _EMPTY_ {
  559. return nil, ErrStreamNameRequired
  560. }
  561. req, err := json.Marshal(cfg)
  562. if err != nil {
  563. return nil, err
  564. }
  565. usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name))
  566. r, err := js.nc.RequestWithContext(o.ctx, usSubj, req)
  567. if err != nil {
  568. return nil, err
  569. }
  570. var resp streamInfoResponse
  571. if err := json.Unmarshal(r.Data, &resp); err != nil {
  572. return nil, err
  573. }
  574. if resp.Error != nil {
  575. return nil, errors.New(resp.Error.Description)
  576. }
  577. return resp.StreamInfo, nil
  578. }
  579. // streamDeleteResponse is the response for a Stream delete request.
  580. type streamDeleteResponse struct {
  581. apiResponse
  582. Success bool `json:"success,omitempty"`
  583. }
  584. // DeleteStream deletes a Stream.
  585. func (js *js) DeleteStream(name string, opts ...JSOpt) error {
  586. o, cancel, err := getJSContextOpts(js.opts, opts...)
  587. if err != nil {
  588. return err
  589. }
  590. if cancel != nil {
  591. defer cancel()
  592. }
  593. if name == _EMPTY_ {
  594. return ErrStreamNameRequired
  595. }
  596. dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name))
  597. r, err := js.nc.RequestWithContext(o.ctx, dsSubj, nil)
  598. if err != nil {
  599. return err
  600. }
  601. var resp streamDeleteResponse
  602. if err := json.Unmarshal(r.Data, &resp); err != nil {
  603. return err
  604. }
  605. if resp.Error != nil {
  606. return errors.New(resp.Error.Description)
  607. }
  608. return nil
  609. }
  610. type apiMsgGetRequest struct {
  611. Seq uint64 `json:"seq"`
  612. }
  613. // RawStreamMsg is a raw message stored in JetStream.
  614. type RawStreamMsg struct {
  615. Subject string
  616. Sequence uint64
  617. Header Header
  618. Data []byte
  619. Time time.Time
  620. }
  621. // storedMsg is a raw message stored in JetStream.
  622. type storedMsg struct {
  623. Subject string `json:"subject"`
  624. Sequence uint64 `json:"seq"`
  625. Header []byte `json:"hdrs,omitempty"`
  626. Data []byte `json:"data,omitempty"`
  627. Time time.Time `json:"time"`
  628. }
  629. // apiMsgGetResponse is the response for a Stream get request.
  630. type apiMsgGetResponse struct {
  631. apiResponse
  632. Message *storedMsg `json:"message,omitempty"`
  633. Success bool `json:"success,omitempty"`
  634. }
  635. // GetMsg retrieves a raw stream message stored in JetStream by sequence number.
  636. func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) {
  637. o, cancel, err := getJSContextOpts(js.opts, opts...)
  638. if err != nil {
  639. return nil, err
  640. }
  641. if cancel != nil {
  642. defer cancel()
  643. }
  644. if name == _EMPTY_ {
  645. return nil, ErrStreamNameRequired
  646. }
  647. req, err := json.Marshal(&apiMsgGetRequest{Seq: seq})
  648. if err != nil {
  649. return nil, err
  650. }
  651. dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name))
  652. r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req)
  653. if err != nil {
  654. return nil, err
  655. }
  656. var resp apiMsgGetResponse
  657. if err := json.Unmarshal(r.Data, &resp); err != nil {
  658. return nil, err
  659. }
  660. if resp.Error != nil {
  661. return nil, errors.New(resp.Error.Description)
  662. }
  663. msg := resp.Message
  664. var hdr Header
  665. if msg.Header != nil {
  666. hdr, err = decodeHeadersMsg(msg.Header)
  667. if err != nil {
  668. return nil, err
  669. }
  670. }
  671. return &RawStreamMsg{
  672. Subject: msg.Subject,
  673. Sequence: msg.Sequence,
  674. Header: hdr,
  675. Data: msg.Data,
  676. Time: msg.Time,
  677. }, nil
  678. }
  679. type msgDeleteRequest struct {
  680. Seq uint64 `json:"seq"`
  681. }
  682. // msgDeleteResponse is the response for a Stream delete request.
  683. type msgDeleteResponse struct {
  684. apiResponse
  685. Success bool `json:"success,omitempty"`
  686. }
  687. // DeleteMsg deletes a message from a stream.
  688. func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
  689. o, cancel, err := getJSContextOpts(js.opts, opts...)
  690. if err != nil {
  691. return err
  692. }
  693. if cancel != nil {
  694. defer cancel()
  695. }
  696. if name == _EMPTY_ {
  697. return ErrStreamNameRequired
  698. }
  699. req, err := json.Marshal(&msgDeleteRequest{Seq: seq})
  700. if err != nil {
  701. return err
  702. }
  703. dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name))
  704. r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req)
  705. if err != nil {
  706. return err
  707. }
  708. var resp msgDeleteResponse
  709. if err := json.Unmarshal(r.Data, &resp); err != nil {
  710. return err
  711. }
  712. if resp.Error != nil {
  713. return errors.New(resp.Error.Description)
  714. }
  715. return nil
  716. }
  717. type streamPurgeResponse struct {
  718. apiResponse
  719. Success bool `json:"success,omitempty"`
  720. Purged uint64 `json:"purged"`
  721. }
  722. // PurgeStream purges messages on a Stream.
  723. func (js *js) PurgeStream(name string, opts ...JSOpt) error {
  724. o, cancel, err := getJSContextOpts(js.opts, opts...)
  725. if err != nil {
  726. return err
  727. }
  728. if cancel != nil {
  729. defer cancel()
  730. }
  731. psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, name))
  732. r, err := js.nc.RequestWithContext(o.ctx, psSubj, nil)
  733. if err != nil {
  734. return err
  735. }
  736. var resp streamPurgeResponse
  737. if err := json.Unmarshal(r.Data, &resp); err != nil {
  738. return err
  739. }
  740. if resp.Error != nil {
  741. return errors.New(resp.Error.Description)
  742. }
  743. return nil
  744. }
  745. // streamLister fetches pages of StreamInfo objects. This object is not safe
  746. // to use for multiple threads.
  747. type streamLister struct {
  748. js *js
  749. page []*StreamInfo
  750. err error
  751. offset int
  752. pageInfo *apiPaged
  753. }
  754. // streamListResponse list of detailed stream information.
  755. // A nil request is valid and means all streams.
  756. type streamListResponse struct {
  757. apiResponse
  758. apiPaged
  759. Streams []*StreamInfo `json:"streams"`
  760. }
  761. // streamNamesRequest is used for Stream Name requests.
  762. type streamNamesRequest struct {
  763. apiPagedRequest
  764. // These are filters that can be applied to the list.
  765. Subject string `json:"subject,omitempty"`
  766. }
  767. // Next fetches the next StreamInfo page.
  768. func (s *streamLister) Next() bool {
  769. if s.err != nil {
  770. return false
  771. }
  772. if s.pageInfo != nil && s.offset >= s.pageInfo.Total {
  773. return false
  774. }
  775. req, err := json.Marshal(streamNamesRequest{
  776. apiPagedRequest: apiPagedRequest{Offset: s.offset},
  777. })
  778. if err != nil {
  779. s.err = err
  780. return false
  781. }
  782. var cancel context.CancelFunc
  783. ctx := s.js.opts.ctx
  784. if ctx == nil {
  785. ctx, cancel = context.WithTimeout(context.Background(), s.js.opts.wait)
  786. defer cancel()
  787. }
  788. slSubj := s.js.apiSubj(apiStreamList)
  789. r, err := s.js.nc.RequestWithContext(ctx, slSubj, req)
  790. if err != nil {
  791. s.err = err
  792. return false
  793. }
  794. var resp streamListResponse
  795. if err := json.Unmarshal(r.Data, &resp); err != nil {
  796. s.err = err
  797. return false
  798. }
  799. if resp.Error != nil {
  800. s.err = errors.New(resp.Error.Description)
  801. return false
  802. }
  803. s.pageInfo = &resp.apiPaged
  804. s.page = resp.Streams
  805. s.offset += len(s.page)
  806. return true
  807. }
  808. // Page returns the current StreamInfo page.
  809. func (s *streamLister) Page() []*StreamInfo {
  810. return s.page
  811. }
  812. // Err returns any errors found while fetching pages.
  813. func (s *streamLister) Err() error {
  814. return s.err
  815. }
  816. // StreamsInfo can be used to retrieve a list of StreamInfo objects.
  817. func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
  818. o, cancel, err := getJSContextOpts(jsc.opts, opts...)
  819. if err != nil {
  820. return nil
  821. }
  822. ch := make(chan *StreamInfo)
  823. l := &streamLister{js: &js{nc: jsc.nc, opts: o}}
  824. go func() {
  825. if cancel != nil {
  826. defer cancel()
  827. }
  828. defer close(ch)
  829. for l.Next() {
  830. for _, info := range l.Page() {
  831. select {
  832. case ch <- info:
  833. case <-o.ctx.Done():
  834. return
  835. }
  836. }
  837. }
  838. }()
  839. return ch
  840. }
  841. type streamNamesLister struct {
  842. js *js
  843. err error
  844. offset int
  845. page []string
  846. pageInfo *apiPaged
  847. }
  848. // Next fetches the next ConsumerInfo page.
  849. func (l *streamNamesLister) Next() bool {
  850. if l.err != nil {
  851. return false
  852. }
  853. if l.pageInfo != nil && l.offset >= l.pageInfo.Total {
  854. return false
  855. }
  856. var cancel context.CancelFunc
  857. ctx := l.js.opts.ctx
  858. if ctx == nil {
  859. ctx, cancel = context.WithTimeout(context.Background(), l.js.opts.wait)
  860. defer cancel()
  861. }
  862. r, err := l.js.nc.RequestWithContext(ctx, l.js.apiSubj(apiStreams), nil)
  863. if err != nil {
  864. l.err = err
  865. return false
  866. }
  867. var resp streamNamesResponse
  868. if err := json.Unmarshal(r.Data, &resp); err != nil {
  869. l.err = err
  870. return false
  871. }
  872. if resp.Error != nil {
  873. l.err = errors.New(resp.Error.Description)
  874. return false
  875. }
  876. l.pageInfo = &resp.apiPaged
  877. l.page = resp.Streams
  878. l.offset += len(l.page)
  879. return true
  880. }
  881. // Page returns the current ConsumerInfo page.
  882. func (l *streamNamesLister) Page() []string {
  883. return l.page
  884. }
  885. // Err returns any errors found while fetching pages.
  886. func (l *streamNamesLister) Err() error {
  887. return l.err
  888. }
  889. // StreamNames is used to retrieve a list of Stream names.
  890. func (jsc *js) StreamNames(opts ...JSOpt) <-chan string {
  891. o, cancel, err := getJSContextOpts(jsc.opts, opts...)
  892. if err != nil {
  893. return nil
  894. }
  895. ch := make(chan string)
  896. l := &streamNamesLister{js: &js{nc: jsc.nc, opts: o}}
  897. go func() {
  898. if cancel != nil {
  899. defer cancel()
  900. }
  901. defer close(ch)
  902. for l.Next() {
  903. for _, info := range l.Page() {
  904. select {
  905. case ch <- info:
  906. case <-o.ctx.Done():
  907. return
  908. }
  909. }
  910. }
  911. }()
  912. return ch
  913. }
  914. func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, error) {
  915. var o jsOpts
  916. for _, opt := range opts {
  917. if err := opt.configureJSContext(&o); err != nil {
  918. return nil, nil, err
  919. }
  920. }
  921. // Check for option collisions. Right now just timeout and context.
  922. if o.ctx != nil && o.wait != 0 {
  923. return nil, nil, ErrContextAndTimeout
  924. }
  925. if o.wait == 0 && o.ctx == nil {
  926. o.wait = defs.wait
  927. }
  928. var cancel context.CancelFunc
  929. if o.ctx == nil && o.wait > 0 {
  930. o.ctx, cancel = context.WithTimeout(context.Background(), o.wait)
  931. }
  932. if o.pre == "" {
  933. o.pre = defs.pre
  934. }
  935. return &o, cancel, nil
  936. }