123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077 |
- package nats
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "strings"
- "time"
- )
- type JetStreamManager interface {
-
- AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
-
- UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
-
- DeleteStream(name string, opts ...JSOpt) error
-
- StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error)
-
- PurgeStream(name string, opts ...JSOpt) error
-
- StreamsInfo(opts ...JSOpt) <-chan *StreamInfo
-
- StreamNames(opts ...JSOpt) <-chan string
-
- GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error)
-
- DeleteMsg(name string, seq uint64, opts ...JSOpt) error
-
- AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)
-
- DeleteConsumer(stream, consumer string, opts ...JSOpt) error
-
- ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error)
-
- ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo
-
- ConsumerNames(stream string, opts ...JSOpt) <-chan string
-
- AccountInfo(opts ...JSOpt) (*AccountInfo, error)
- }
- type StreamConfig struct {
- Name string `json:"name"`
- Subjects []string `json:"subjects,omitempty"`
- Retention RetentionPolicy `json:"retention"`
- MaxConsumers int `json:"max_consumers"`
- MaxMsgs int64 `json:"max_msgs"`
- MaxBytes int64 `json:"max_bytes"`
- Discard DiscardPolicy `json:"discard"`
- MaxAge time.Duration `json:"max_age"`
- MaxMsgSize int32 `json:"max_msg_size,omitempty"`
- Storage StorageType `json:"storage"`
- Replicas int `json:"num_replicas"`
- NoAck bool `json:"no_ack,omitempty"`
- Template string `json:"template_owner,omitempty"`
- Duplicates time.Duration `json:"duplicate_window,omitempty"`
- Placement *Placement `json:"placement,omitempty"`
- Mirror *StreamSource `json:"mirror,omitempty"`
- Sources []*StreamSource `json:"sources,omitempty"`
- }
- type Placement struct {
- Cluster string `json:"cluster"`
- Tags []string `json:"tags,omitempty"`
- }
- type StreamSource struct {
- Name string `json:"name"`
- OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
- OptStartTime *time.Time `json:"opt_start_time,omitempty"`
- FilterSubject string `json:"filter_subject,omitempty"`
- External *ExternalStream `json:"external,omitempty"`
- }
- type ExternalStream struct {
- APIPrefix string `json:"api"`
- DeliverPrefix string `json:"deliver"`
- }
- type apiError struct {
- Code int `json:"code"`
- Description string `json:"description,omitempty"`
- }
- type apiResponse struct {
- Type string `json:"type"`
- Error *apiError `json:"error,omitempty"`
- }
- type apiPaged struct {
- Total int `json:"total"`
- Offset int `json:"offset"`
- Limit int `json:"limit"`
- }
- type apiPagedRequest struct {
- Offset int `json:"offset"`
- }
- type AccountInfo struct {
- Memory uint64 `json:"memory"`
- Store uint64 `json:"storage"`
- Streams int `json:"streams"`
- Consumers int `json:"consumers"`
- API APIStats `json:"api"`
- Limits AccountLimits `json:"limits"`
- }
- type APIStats struct {
- Total uint64 `json:"total"`
- Errors uint64 `json:"errors"`
- }
- type AccountLimits struct {
- MaxMemory int64 `json:"max_memory"`
- MaxStore int64 `json:"max_storage"`
- MaxStreams int `json:"max_streams"`
- MaxConsumers int `json:"max_consumers"`
- }
- type accountInfoResponse struct {
- apiResponse
- AccountInfo
- }
- func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
- if err != nil {
- return nil, err
- }
- var info accountInfoResponse
- if err := json.Unmarshal(resp.Data, &info); err != nil {
- return nil, err
- }
- if info.Error != nil {
- var err error
- if strings.Contains(info.Error.Description, "not enabled for") {
- err = ErrJetStreamNotEnabled
- } else {
- err = errors.New(info.Error.Description)
- }
- return nil, err
- }
- return &info.AccountInfo, nil
- }
- type createConsumerRequest struct {
- Stream string `json:"stream_name"`
- Config *ConsumerConfig `json:"config"`
- }
- type consumerResponse struct {
- apiResponse
- *ConsumerInfo
- }
- func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- if stream == _EMPTY_ {
- return nil, ErrStreamNameRequired
- }
- req, err := json.Marshal(&createConsumerRequest{Stream: stream, Config: cfg})
- if err != nil {
- return nil, err
- }
- var ccSubj string
- if cfg != nil && cfg.Durable != _EMPTY_ {
- if strings.Contains(cfg.Durable, ".") {
- return nil, ErrInvalidDurableName
- }
- ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
- } else {
- ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
- }
- resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
- if err != nil {
- if err == ErrNoResponders {
- err = ErrJetStreamNotEnabled
- }
- return nil, err
- }
- var info consumerResponse
- err = json.Unmarshal(resp.Data, &info)
- if err != nil {
- return nil, err
- }
- if info.Error != nil {
- return nil, errors.New(info.Error.Description)
- }
- return info.ConsumerInfo, nil
- }
- type consumerDeleteResponse struct {
- apiResponse
- Success bool `json:"success,omitempty"`
- }
- func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return err
- }
- if cancel != nil {
- defer cancel()
- }
- if stream == _EMPTY_ {
- return ErrStreamNameRequired
- }
- dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
- r, err := js.nc.RequestWithContext(o.ctx, dcSubj, nil)
- if err != nil {
- return err
- }
- var resp consumerDeleteResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return err
- }
- if resp.Error != nil {
- return errors.New(resp.Error.Description)
- }
- return nil
- }
- func (js *js) ConsumerInfo(stream, consumer string, opts ...JSOpt) (*ConsumerInfo, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- return js.getConsumerInfoContext(o.ctx, stream, consumer)
- }
- type consumerLister struct {
- stream string
- js *js
- err error
- offset int
- page []*ConsumerInfo
- pageInfo *apiPaged
- }
- type consumersRequest struct {
- apiPagedRequest
- }
- type consumerListResponse struct {
- apiResponse
- apiPaged
- Consumers []*ConsumerInfo `json:"consumers"`
- }
- func (c *consumerLister) Next() bool {
- if c.err != nil {
- return false
- }
- if c.stream == _EMPTY_ {
- c.err = ErrStreamNameRequired
- return false
- }
- if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
- return false
- }
- req, err := json.Marshal(consumersRequest{
- apiPagedRequest: apiPagedRequest{Offset: c.offset},
- })
- if err != nil {
- c.err = err
- return false
- }
- var cancel context.CancelFunc
- ctx := c.js.opts.ctx
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), c.js.opts.wait)
- defer cancel()
- }
- clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerListT, c.stream))
- r, err := c.js.nc.RequestWithContext(ctx, clSubj, req)
- if err != nil {
- c.err = err
- return false
- }
- var resp consumerListResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- c.err = err
- return false
- }
- if resp.Error != nil {
- c.err = errors.New(resp.Error.Description)
- return false
- }
- c.pageInfo = &resp.apiPaged
- c.page = resp.Consumers
- c.offset += len(c.page)
- return true
- }
- func (c *consumerLister) Page() []*ConsumerInfo {
- return c.page
- }
- func (c *consumerLister) Err() error {
- return c.err
- }
- func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
- o, cancel, err := getJSContextOpts(jsc.opts, opts...)
- if err != nil {
- return nil
- }
- ch := make(chan *ConsumerInfo)
- l := &consumerLister{js: &js{nc: jsc.nc, opts: o}, stream: stream}
- go func() {
- if cancel != nil {
- defer cancel()
- }
- defer close(ch)
- for l.Next() {
- for _, info := range l.Page() {
- select {
- case ch <- info:
- case <-o.ctx.Done():
- return
- }
- }
- }
- }()
- return ch
- }
- type consumerNamesLister struct {
- stream string
- js *js
- err error
- offset int
- page []string
- pageInfo *apiPaged
- }
- type consumerNamesListResponse struct {
- apiResponse
- apiPaged
- Consumers []string `json:"consumers"`
- }
- func (c *consumerNamesLister) Next() bool {
- if c.err != nil {
- return false
- }
- if c.stream == _EMPTY_ {
- c.err = ErrStreamNameRequired
- return false
- }
- if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
- return false
- }
- var cancel context.CancelFunc
- ctx := c.js.opts.ctx
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), c.js.opts.wait)
- defer cancel()
- }
- clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream))
- r, err := c.js.nc.RequestWithContext(ctx, clSubj, nil)
- if err != nil {
- c.err = err
- return false
- }
- var resp consumerNamesListResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- c.err = err
- return false
- }
- if resp.Error != nil {
- c.err = errors.New(resp.Error.Description)
- return false
- }
- c.pageInfo = &resp.apiPaged
- c.page = resp.Consumers
- c.offset += len(c.page)
- return true
- }
- func (c *consumerNamesLister) Page() []string {
- return c.page
- }
- func (c *consumerNamesLister) Err() error {
- return c.err
- }
- func (jsc *js) ConsumerNames(stream string, opts ...JSOpt) <-chan string {
- o, cancel, err := getJSContextOpts(jsc.opts, opts...)
- if err != nil {
- return nil
- }
- ch := make(chan string)
- l := &consumerNamesLister{stream: stream, js: &js{nc: jsc.nc, opts: o}}
- go func() {
- if cancel != nil {
- defer cancel()
- }
- defer close(ch)
- for l.Next() {
- for _, info := range l.Page() {
- select {
- case ch <- info:
- case <-o.ctx.Done():
- return
- }
- }
- }
- }()
- return ch
- }
- type streamCreateResponse struct {
- apiResponse
- *StreamInfo
- }
- func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- if cfg == nil || cfg.Name == _EMPTY_ {
- return nil, ErrStreamNameRequired
- }
- if strings.Contains(cfg.Name, ".") {
- return nil, ErrInvalidStreamName
- }
- req, err := json.Marshal(cfg)
- if err != nil {
- return nil, err
- }
- csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name))
- r, err := js.nc.RequestWithContext(o.ctx, csSubj, req)
- if err != nil {
- return nil, err
- }
- var resp streamCreateResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return nil, err
- }
- if resp.Error != nil {
- return nil, errors.New(resp.Error.Description)
- }
- return resp.StreamInfo, nil
- }
- type streamInfoResponse = streamCreateResponse
- func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
- if strings.Contains(stream, ".") {
- return nil, ErrInvalidStreamName
- }
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))
- r, err := js.nc.RequestWithContext(o.ctx, csSubj, nil)
- if err != nil {
- return nil, err
- }
- var resp streamInfoResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return nil, err
- }
- if resp.Error != nil {
- return nil, errors.New(resp.Error.Description)
- }
- return resp.StreamInfo, nil
- }
- type StreamInfo struct {
- Config StreamConfig `json:"config"`
- Created time.Time `json:"created"`
- State StreamState `json:"state"`
- Cluster *ClusterInfo `json:"cluster,omitempty"`
- Mirror *StreamSourceInfo `json:"mirror,omitempty"`
- Sources []*StreamSourceInfo `json:"sources,omitempty"`
- }
- type StreamSourceInfo struct {
- Name string `json:"name"`
- Lag uint64 `json:"lag"`
- Active time.Duration `json:"active"`
- }
- type StreamState struct {
- Msgs uint64 `json:"messages"`
- Bytes uint64 `json:"bytes"`
- FirstSeq uint64 `json:"first_seq"`
- FirstTime time.Time `json:"first_ts"`
- LastSeq uint64 `json:"last_seq"`
- LastTime time.Time `json:"last_ts"`
- Consumers int `json:"consumer_count"`
- }
- type ClusterInfo struct {
- Name string `json:"name,omitempty"`
- Leader string `json:"leader,omitempty"`
- Replicas []*PeerInfo `json:"replicas,omitempty"`
- }
- type PeerInfo struct {
- Name string `json:"name"`
- Current bool `json:"current"`
- Offline bool `json:"offline,omitempty"`
- Active time.Duration `json:"active"`
- Lag uint64 `json:"lag,omitempty"`
- }
- func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- if cfg == nil || cfg.Name == _EMPTY_ {
- return nil, ErrStreamNameRequired
- }
- req, err := json.Marshal(cfg)
- if err != nil {
- return nil, err
- }
- usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name))
- r, err := js.nc.RequestWithContext(o.ctx, usSubj, req)
- if err != nil {
- return nil, err
- }
- var resp streamInfoResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return nil, err
- }
- if resp.Error != nil {
- return nil, errors.New(resp.Error.Description)
- }
- return resp.StreamInfo, nil
- }
- type streamDeleteResponse struct {
- apiResponse
- Success bool `json:"success,omitempty"`
- }
- func (js *js) DeleteStream(name string, opts ...JSOpt) error {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return err
- }
- if cancel != nil {
- defer cancel()
- }
- if name == _EMPTY_ {
- return ErrStreamNameRequired
- }
- dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name))
- r, err := js.nc.RequestWithContext(o.ctx, dsSubj, nil)
- if err != nil {
- return err
- }
- var resp streamDeleteResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return err
- }
- if resp.Error != nil {
- return errors.New(resp.Error.Description)
- }
- return nil
- }
- type apiMsgGetRequest struct {
- Seq uint64 `json:"seq"`
- }
- type RawStreamMsg struct {
- Subject string
- Sequence uint64
- Header Header
- Data []byte
- Time time.Time
- }
- type storedMsg struct {
- Subject string `json:"subject"`
- Sequence uint64 `json:"seq"`
- Header []byte `json:"hdrs,omitempty"`
- Data []byte `json:"data,omitempty"`
- Time time.Time `json:"time"`
- }
- type apiMsgGetResponse struct {
- apiResponse
- Message *storedMsg `json:"message,omitempty"`
- Success bool `json:"success,omitempty"`
- }
- func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- if name == _EMPTY_ {
- return nil, ErrStreamNameRequired
- }
- req, err := json.Marshal(&apiMsgGetRequest{Seq: seq})
- if err != nil {
- return nil, err
- }
- dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name))
- r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req)
- if err != nil {
- return nil, err
- }
- var resp apiMsgGetResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return nil, err
- }
- if resp.Error != nil {
- return nil, errors.New(resp.Error.Description)
- }
- msg := resp.Message
- var hdr Header
- if msg.Header != nil {
- hdr, err = decodeHeadersMsg(msg.Header)
- if err != nil {
- return nil, err
- }
- }
- return &RawStreamMsg{
- Subject: msg.Subject,
- Sequence: msg.Sequence,
- Header: hdr,
- Data: msg.Data,
- Time: msg.Time,
- }, nil
- }
- type msgDeleteRequest struct {
- Seq uint64 `json:"seq"`
- }
- type msgDeleteResponse struct {
- apiResponse
- Success bool `json:"success,omitempty"`
- }
- func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return err
- }
- if cancel != nil {
- defer cancel()
- }
- if name == _EMPTY_ {
- return ErrStreamNameRequired
- }
- req, err := json.Marshal(&msgDeleteRequest{Seq: seq})
- if err != nil {
- return err
- }
- dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name))
- r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req)
- if err != nil {
- return err
- }
- var resp msgDeleteResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return err
- }
- if resp.Error != nil {
- return errors.New(resp.Error.Description)
- }
- return nil
- }
- type streamPurgeResponse struct {
- apiResponse
- Success bool `json:"success,omitempty"`
- Purged uint64 `json:"purged"`
- }
- func (js *js) PurgeStream(name string, opts ...JSOpt) error {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return err
- }
- if cancel != nil {
- defer cancel()
- }
- psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, name))
- r, err := js.nc.RequestWithContext(o.ctx, psSubj, nil)
- if err != nil {
- return err
- }
- var resp streamPurgeResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return err
- }
- if resp.Error != nil {
- return errors.New(resp.Error.Description)
- }
- return nil
- }
- type streamLister struct {
- js *js
- page []*StreamInfo
- err error
- offset int
- pageInfo *apiPaged
- }
- type streamListResponse struct {
- apiResponse
- apiPaged
- Streams []*StreamInfo `json:"streams"`
- }
- type streamNamesRequest struct {
- apiPagedRequest
-
- Subject string `json:"subject,omitempty"`
- }
- func (s *streamLister) Next() bool {
- if s.err != nil {
- return false
- }
- if s.pageInfo != nil && s.offset >= s.pageInfo.Total {
- return false
- }
- req, err := json.Marshal(streamNamesRequest{
- apiPagedRequest: apiPagedRequest{Offset: s.offset},
- })
- if err != nil {
- s.err = err
- return false
- }
- var cancel context.CancelFunc
- ctx := s.js.opts.ctx
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), s.js.opts.wait)
- defer cancel()
- }
- slSubj := s.js.apiSubj(apiStreamList)
- r, err := s.js.nc.RequestWithContext(ctx, slSubj, req)
- if err != nil {
- s.err = err
- return false
- }
- var resp streamListResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- s.err = err
- return false
- }
- if resp.Error != nil {
- s.err = errors.New(resp.Error.Description)
- return false
- }
- s.pageInfo = &resp.apiPaged
- s.page = resp.Streams
- s.offset += len(s.page)
- return true
- }
- func (s *streamLister) Page() []*StreamInfo {
- return s.page
- }
- func (s *streamLister) Err() error {
- return s.err
- }
- func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
- o, cancel, err := getJSContextOpts(jsc.opts, opts...)
- if err != nil {
- return nil
- }
- ch := make(chan *StreamInfo)
- l := &streamLister{js: &js{nc: jsc.nc, opts: o}}
- go func() {
- if cancel != nil {
- defer cancel()
- }
- defer close(ch)
- for l.Next() {
- for _, info := range l.Page() {
- select {
- case ch <- info:
- case <-o.ctx.Done():
- return
- }
- }
- }
- }()
- return ch
- }
- type streamNamesLister struct {
- js *js
- err error
- offset int
- page []string
- pageInfo *apiPaged
- }
- func (l *streamNamesLister) Next() bool {
- if l.err != nil {
- return false
- }
- if l.pageInfo != nil && l.offset >= l.pageInfo.Total {
- return false
- }
- var cancel context.CancelFunc
- ctx := l.js.opts.ctx
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), l.js.opts.wait)
- defer cancel()
- }
- r, err := l.js.nc.RequestWithContext(ctx, l.js.apiSubj(apiStreams), nil)
- if err != nil {
- l.err = err
- return false
- }
- var resp streamNamesResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- l.err = err
- return false
- }
- if resp.Error != nil {
- l.err = errors.New(resp.Error.Description)
- return false
- }
- l.pageInfo = &resp.apiPaged
- l.page = resp.Streams
- l.offset += len(l.page)
- return true
- }
- func (l *streamNamesLister) Page() []string {
- return l.page
- }
- func (l *streamNamesLister) Err() error {
- return l.err
- }
- func (jsc *js) StreamNames(opts ...JSOpt) <-chan string {
- o, cancel, err := getJSContextOpts(jsc.opts, opts...)
- if err != nil {
- return nil
- }
- ch := make(chan string)
- l := &streamNamesLister{js: &js{nc: jsc.nc, opts: o}}
- go func() {
- if cancel != nil {
- defer cancel()
- }
- defer close(ch)
- for l.Next() {
- for _, info := range l.Page() {
- select {
- case ch <- info:
- case <-o.ctx.Done():
- return
- }
- }
- }
- }()
- return ch
- }
- func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, error) {
- var o jsOpts
- for _, opt := range opts {
- if err := opt.configureJSContext(&o); err != nil {
- return nil, nil, err
- }
- }
-
- if o.ctx != nil && o.wait != 0 {
- return nil, nil, ErrContextAndTimeout
- }
- if o.wait == 0 && o.ctx == nil {
- o.wait = defs.wait
- }
- var cancel context.CancelFunc
- if o.ctx == nil && o.wait > 0 {
- o.ctx, cancel = context.WithTimeout(context.Background(), o.wait)
- }
- if o.pre == "" {
- o.pre = defs.pre
- }
- return &o, cancel, nil
- }
|