|
- package nats
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "strings"
- "time"
- )
- type JetStreamManager interface {
-
- AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
-
- UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
-
- DeleteStream(name string, opts ...JSOpt) error
-
- StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error)
-
- PurgeStream(name string, opts ...JSOpt) error
-
- StreamsInfo(opts ...JSOpt) <-chan *StreamInfo
-
- StreamNames(opts ...JSOpt) <-chan string
-
- GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error)
-
- DeleteMsg(name string, seq uint64, opts ...JSOpt) error
-
- AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)
-
- DeleteConsumer(stream, consumer string, opts ...JSOpt) error
-
- ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error)
-
- ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo
-
- ConsumerNames(stream string, opts ...JSOpt) <-chan string
-
- AccountInfo(opts ...JSOpt) (*AccountInfo, error)
- }
- type StreamConfig struct {
- Name string `json:"name"`
- Subjects []string `json:"subjects,omitempty"`
- Retention RetentionPolicy `json:"retention"`
- MaxConsumers int `json:"max_consumers"`
- MaxMsgs int64 `json:"max_msgs"`
- MaxBytes int64 `json:"max_bytes"`
- Discard DiscardPolicy `json:"discard"`
- MaxAge time.Duration `json:"max_age"`
- MaxMsgSize int32 `json:"max_msg_size,omitempty"`
- Storage StorageType `json:"storage"`
- Replicas int `json:"num_replicas"`
- NoAck bool `json:"no_ack,omitempty"`
- Template string `json:"template_owner,omitempty"`
- Duplicates time.Duration `json:"duplicate_window,omitempty"`
- Placement *Placement `json:"placement,omitempty"`
- Mirror *StreamSource `json:"mirror,omitempty"`
- Sources []*StreamSource `json:"sources,omitempty"`
- }
- type Placement struct {
- Cluster string `json:"cluster"`
- Tags []string `json:"tags,omitempty"`
- }
- type StreamSource struct {
- Name string `json:"name"`
- OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
- OptStartTime *time.Time `json:"opt_start_time,omitempty"`
- FilterSubject string `json:"filter_subject,omitempty"`
- External *ExternalStream `json:"external,omitempty"`
- }
- type ExternalStream struct {
- APIPrefix string `json:"api"`
- DeliverPrefix string `json:"deliver"`
- }
- type apiError struct {
- Code int `json:"code"`
- Description string `json:"description,omitempty"`
- }
- type apiResponse struct {
- Type string `json:"type"`
- Error *apiError `json:"error,omitempty"`
- }
- type apiPaged struct {
- Total int `json:"total"`
- Offset int `json:"offset"`
- Limit int `json:"limit"`
- }
- type apiPagedRequest struct {
- Offset int `json:"offset"`
- }
- type AccountInfo struct {
- Memory uint64 `json:"memory"`
- Store uint64 `json:"storage"`
- Streams int `json:"streams"`
- Consumers int `json:"consumers"`
- API APIStats `json:"api"`
- Limits AccountLimits `json:"limits"`
- }
- type APIStats struct {
- Total uint64 `json:"total"`
- Errors uint64 `json:"errors"`
- }
- type AccountLimits struct {
- MaxMemory int64 `json:"max_memory"`
- MaxStore int64 `json:"max_storage"`
- MaxStreams int `json:"max_streams"`
- MaxConsumers int `json:"max_consumers"`
- }
- type accountInfoResponse struct {
- apiResponse
- AccountInfo
- }
- func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
- if err != nil {
- return nil, err
- }
- var info accountInfoResponse
- if err := json.Unmarshal(resp.Data, &info); err != nil {
- return nil, err
- }
- if info.Error != nil {
- var err error
- if strings.Contains(info.Error.Description, "not enabled for") {
- err = ErrJetStreamNotEnabled
- } else {
- err = errors.New(info.Error.Description)
- }
- return nil, err
- }
- return &info.AccountInfo, nil
- }
- type createConsumerRequest struct {
- Stream string `json:"stream_name"`
- Config *ConsumerConfig `json:"config"`
- }
- type consumerResponse struct {
- apiResponse
- *ConsumerInfo
- }
- func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- if stream == _EMPTY_ {
- return nil, ErrStreamNameRequired
- }
- req, err := json.Marshal(&createConsumerRequest{Stream: stream, Config: cfg})
- if err != nil {
- return nil, err
- }
- var ccSubj string
- if cfg != nil && cfg.Durable != _EMPTY_ {
- if strings.Contains(cfg.Durable, ".") {
- return nil, ErrInvalidDurableName
- }
- ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
- } else {
- ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
- }
- resp, err := js.nc.RequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
- if err != nil {
- if err == ErrNoResponders {
- err = ErrJetStreamNotEnabled
- }
- return nil, err
- }
- var info consumerResponse
- err = json.Unmarshal(resp.Data, &info)
- if err != nil {
- return nil, err
- }
- if info.Error != nil {
- return nil, errors.New(info.Error.Description)
- }
- return info.ConsumerInfo, nil
- }
- type consumerDeleteResponse struct {
- apiResponse
- Success bool `json:"success,omitempty"`
- }
- func (js *js) DeleteConsumer(stream, consumer string, opts ...JSOpt) error {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return err
- }
- if cancel != nil {
- defer cancel()
- }
- if stream == _EMPTY_ {
- return ErrStreamNameRequired
- }
- dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
- r, err := js.nc.RequestWithContext(o.ctx, dcSubj, nil)
- if err != nil {
- return err
- }
- var resp consumerDeleteResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return err
- }
- if resp.Error != nil {
- return errors.New(resp.Error.Description)
- }
- return nil
- }
- func (js *js) ConsumerInfo(stream, consumer string, opts ...JSOpt) (*ConsumerInfo, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- return js.getConsumerInfoContext(o.ctx, stream, consumer)
- }
- type consumerLister struct {
- stream string
- js *js
- err error
- offset int
- page []*ConsumerInfo
- pageInfo *apiPaged
- }
- type consumersRequest struct {
- apiPagedRequest
- }
- type consumerListResponse struct {
- apiResponse
- apiPaged
- Consumers []*ConsumerInfo `json:"consumers"`
- }
- func (c *consumerLister) Next() bool {
- if c.err != nil {
- return false
- }
- if c.stream == _EMPTY_ {
- c.err = ErrStreamNameRequired
- return false
- }
- if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
- return false
- }
- req, err := json.Marshal(consumersRequest{
- apiPagedRequest: apiPagedRequest{Offset: c.offset},
- })
- if err != nil {
- c.err = err
- return false
- }
- var cancel context.CancelFunc
- ctx := c.js.opts.ctx
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), c.js.opts.wait)
- defer cancel()
- }
- clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerListT, c.stream))
- r, err := c.js.nc.RequestWithContext(ctx, clSubj, req)
- if err != nil {
- c.err = err
- return false
- }
- var resp consumerListResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- c.err = err
- return false
- }
- if resp.Error != nil {
- c.err = errors.New(resp.Error.Description)
- return false
- }
- c.pageInfo = &resp.apiPaged
- c.page = resp.Consumers
- c.offset += len(c.page)
- return true
- }
- func (c *consumerLister) Page() []*ConsumerInfo {
- return c.page
- }
- func (c *consumerLister) Err() error {
- return c.err
- }
- func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
- o, cancel, err := getJSContextOpts(jsc.opts, opts...)
- if err != nil {
- return nil
- }
- ch := make(chan *ConsumerInfo)
- l := &consumerLister{js: &js{nc: jsc.nc, opts: o}, stream: stream}
- go func() {
- if cancel != nil {
- defer cancel()
- }
- defer close(ch)
- for l.Next() {
- for _, info := range l.Page() {
- select {
- case ch <- info:
- case <-o.ctx.Done():
- return
- }
- }
- }
- }()
- return ch
- }
- type consumerNamesLister struct {
- stream string
- js *js
- err error
- offset int
- page []string
- pageInfo *apiPaged
- }
- type consumerNamesListResponse struct {
- apiResponse
- apiPaged
- Consumers []string `json:"consumers"`
- }
- func (c *consumerNamesLister) Next() bool {
- if c.err != nil {
- return false
- }
- if c.stream == _EMPTY_ {
- c.err = ErrStreamNameRequired
- return false
- }
- if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
- return false
- }
- var cancel context.CancelFunc
- ctx := c.js.opts.ctx
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), c.js.opts.wait)
- defer cancel()
- }
- clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream))
- r, err := c.js.nc.RequestWithContext(ctx, clSubj, nil)
- if err != nil {
- c.err = err
- return false
- }
- var resp consumerNamesListResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- c.err = err
- return false
- }
- if resp.Error != nil {
- c.err = errors.New(resp.Error.Description)
- return false
- }
- c.pageInfo = &resp.apiPaged
- c.page = resp.Consumers
- c.offset += len(c.page)
- return true
- }
- func (c *consumerNamesLister) Page() []string {
- return c.page
- }
- func (c *consumerNamesLister) Err() error {
- return c.err
- }
- func (jsc *js) ConsumerNames(stream string, opts ...JSOpt) <-chan string {
- o, cancel, err := getJSContextOpts(jsc.opts, opts...)
- if err != nil {
- return nil
- }
- ch := make(chan string)
- l := &consumerNamesLister{stream: stream, js: &js{nc: jsc.nc, opts: o}}
- go func() {
- if cancel != nil {
- defer cancel()
- }
- defer close(ch)
- for l.Next() {
- for _, info := range l.Page() {
- select {
- case ch <- info:
- case <-o.ctx.Done():
- return
- }
- }
- }
- }()
- return ch
- }
- type streamCreateResponse struct {
- apiResponse
- *StreamInfo
- }
- func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- if cfg == nil || cfg.Name == _EMPTY_ {
- return nil, ErrStreamNameRequired
- }
- if strings.Contains(cfg.Name, ".") {
- return nil, ErrInvalidStreamName
- }
- req, err := json.Marshal(cfg)
- if err != nil {
- return nil, err
- }
- csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name))
- r, err := js.nc.RequestWithContext(o.ctx, csSubj, req)
- if err != nil {
- return nil, err
- }
- var resp streamCreateResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return nil, err
- }
- if resp.Error != nil {
- return nil, errors.New(resp.Error.Description)
- }
- return resp.StreamInfo, nil
- }
- type streamInfoResponse = streamCreateResponse
- func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
- if strings.Contains(stream, ".") {
- return nil, ErrInvalidStreamName
- }
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))
- r, err := js.nc.RequestWithContext(o.ctx, csSubj, nil)
- if err != nil {
- return nil, err
- }
- var resp streamInfoResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return nil, err
- }
- if resp.Error != nil {
- return nil, errors.New(resp.Error.Description)
- }
- return resp.StreamInfo, nil
- }
- type StreamInfo struct {
- Config StreamConfig `json:"config"`
- Created time.Time `json:"created"`
- State StreamState `json:"state"`
- Cluster *ClusterInfo `json:"cluster,omitempty"`
- Mirror *StreamSourceInfo `json:"mirror,omitempty"`
- Sources []*StreamSourceInfo `json:"sources,omitempty"`
- }
- type StreamSourceInfo struct {
- Name string `json:"name"`
- Lag uint64 `json:"lag"`
- Active time.Duration `json:"active"`
- }
- type StreamState struct {
- Msgs uint64 `json:"messages"`
- Bytes uint64 `json:"bytes"`
- FirstSeq uint64 `json:"first_seq"`
- FirstTime time.Time `json:"first_ts"`
- LastSeq uint64 `json:"last_seq"`
- LastTime time.Time `json:"last_ts"`
- Consumers int `json:"consumer_count"`
- }
- type ClusterInfo struct {
- Name string `json:"name,omitempty"`
- Leader string `json:"leader,omitempty"`
- Replicas []*PeerInfo `json:"replicas,omitempty"`
- }
- type PeerInfo struct {
- Name string `json:"name"`
- Current bool `json:"current"`
- Offline bool `json:"offline,omitempty"`
- Active time.Duration `json:"active"`
- Lag uint64 `json:"lag,omitempty"`
- }
- func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- if cfg == nil || cfg.Name == _EMPTY_ {
- return nil, ErrStreamNameRequired
- }
- req, err := json.Marshal(cfg)
- if err != nil {
- return nil, err
- }
- usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name))
- r, err := js.nc.RequestWithContext(o.ctx, usSubj, req)
- if err != nil {
- return nil, err
- }
- var resp streamInfoResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return nil, err
- }
- if resp.Error != nil {
- return nil, errors.New(resp.Error.Description)
- }
- return resp.StreamInfo, nil
- }
- type streamDeleteResponse struct {
- apiResponse
- Success bool `json:"success,omitempty"`
- }
- func (js *js) DeleteStream(name string, opts ...JSOpt) error {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return err
- }
- if cancel != nil {
- defer cancel()
- }
- if name == _EMPTY_ {
- return ErrStreamNameRequired
- }
- dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name))
- r, err := js.nc.RequestWithContext(o.ctx, dsSubj, nil)
- if err != nil {
- return err
- }
- var resp streamDeleteResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return err
- }
- if resp.Error != nil {
- return errors.New(resp.Error.Description)
- }
- return nil
- }
- type apiMsgGetRequest struct {
- Seq uint64 `json:"seq"`
- }
- type RawStreamMsg struct {
- Subject string
- Sequence uint64
- Header Header
- Data []byte
- Time time.Time
- }
- type storedMsg struct {
- Subject string `json:"subject"`
- Sequence uint64 `json:"seq"`
- Header []byte `json:"hdrs,omitempty"`
- Data []byte `json:"data,omitempty"`
- Time time.Time `json:"time"`
- }
- type apiMsgGetResponse struct {
- apiResponse
- Message *storedMsg `json:"message,omitempty"`
- Success bool `json:"success,omitempty"`
- }
- func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return nil, err
- }
- if cancel != nil {
- defer cancel()
- }
- if name == _EMPTY_ {
- return nil, ErrStreamNameRequired
- }
- req, err := json.Marshal(&apiMsgGetRequest{Seq: seq})
- if err != nil {
- return nil, err
- }
- dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name))
- r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req)
- if err != nil {
- return nil, err
- }
- var resp apiMsgGetResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return nil, err
- }
- if resp.Error != nil {
- return nil, errors.New(resp.Error.Description)
- }
- msg := resp.Message
- var hdr Header
- if msg.Header != nil {
- hdr, err = decodeHeadersMsg(msg.Header)
- if err != nil {
- return nil, err
- }
- }
- return &RawStreamMsg{
- Subject: msg.Subject,
- Sequence: msg.Sequence,
- Header: hdr,
- Data: msg.Data,
- Time: msg.Time,
- }, nil
- }
- type msgDeleteRequest struct {
- Seq uint64 `json:"seq"`
- }
- type msgDeleteResponse struct {
- apiResponse
- Success bool `json:"success,omitempty"`
- }
- func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return err
- }
- if cancel != nil {
- defer cancel()
- }
- if name == _EMPTY_ {
- return ErrStreamNameRequired
- }
- req, err := json.Marshal(&msgDeleteRequest{Seq: seq})
- if err != nil {
- return err
- }
- dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name))
- r, err := js.nc.RequestWithContext(o.ctx, dsSubj, req)
- if err != nil {
- return err
- }
- var resp msgDeleteResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return err
- }
- if resp.Error != nil {
- return errors.New(resp.Error.Description)
- }
- return nil
- }
- type streamPurgeResponse struct {
- apiResponse
- Success bool `json:"success,omitempty"`
- Purged uint64 `json:"purged"`
- }
- func (js *js) PurgeStream(name string, opts ...JSOpt) error {
- o, cancel, err := getJSContextOpts(js.opts, opts...)
- if err != nil {
- return err
- }
- if cancel != nil {
- defer cancel()
- }
- psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, name))
- r, err := js.nc.RequestWithContext(o.ctx, psSubj, nil)
- if err != nil {
- return err
- }
- var resp streamPurgeResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- return err
- }
- if resp.Error != nil {
- return errors.New(resp.Error.Description)
- }
- return nil
- }
- type streamLister struct {
- js *js
- page []*StreamInfo
- err error
- offset int
- pageInfo *apiPaged
- }
- type streamListResponse struct {
- apiResponse
- apiPaged
- Streams []*StreamInfo `json:"streams"`
- }
- type streamNamesRequest struct {
- apiPagedRequest
-
- Subject string `json:"subject,omitempty"`
- }
- func (s *streamLister) Next() bool {
- if s.err != nil {
- return false
- }
- if s.pageInfo != nil && s.offset >= s.pageInfo.Total {
- return false
- }
- req, err := json.Marshal(streamNamesRequest{
- apiPagedRequest: apiPagedRequest{Offset: s.offset},
- })
- if err != nil {
- s.err = err
- return false
- }
- var cancel context.CancelFunc
- ctx := s.js.opts.ctx
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), s.js.opts.wait)
- defer cancel()
- }
- slSubj := s.js.apiSubj(apiStreamList)
- r, err := s.js.nc.RequestWithContext(ctx, slSubj, req)
- if err != nil {
- s.err = err
- return false
- }
- var resp streamListResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- s.err = err
- return false
- }
- if resp.Error != nil {
- s.err = errors.New(resp.Error.Description)
- return false
- }
- s.pageInfo = &resp.apiPaged
- s.page = resp.Streams
- s.offset += len(s.page)
- return true
- }
- func (s *streamLister) Page() []*StreamInfo {
- return s.page
- }
- func (s *streamLister) Err() error {
- return s.err
- }
- func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
- o, cancel, err := getJSContextOpts(jsc.opts, opts...)
- if err != nil {
- return nil
- }
- ch := make(chan *StreamInfo)
- l := &streamLister{js: &js{nc: jsc.nc, opts: o}}
- go func() {
- if cancel != nil {
- defer cancel()
- }
- defer close(ch)
- for l.Next() {
- for _, info := range l.Page() {
- select {
- case ch <- info:
- case <-o.ctx.Done():
- return
- }
- }
- }
- }()
- return ch
- }
- type streamNamesLister struct {
- js *js
- err error
- offset int
- page []string
- pageInfo *apiPaged
- }
- func (l *streamNamesLister) Next() bool {
- if l.err != nil {
- return false
- }
- if l.pageInfo != nil && l.offset >= l.pageInfo.Total {
- return false
- }
- var cancel context.CancelFunc
- ctx := l.js.opts.ctx
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), l.js.opts.wait)
- defer cancel()
- }
- r, err := l.js.nc.RequestWithContext(ctx, l.js.apiSubj(apiStreams), nil)
- if err != nil {
- l.err = err
- return false
- }
- var resp streamNamesResponse
- if err := json.Unmarshal(r.Data, &resp); err != nil {
- l.err = err
- return false
- }
- if resp.Error != nil {
- l.err = errors.New(resp.Error.Description)
- return false
- }
- l.pageInfo = &resp.apiPaged
- l.page = resp.Streams
- l.offset += len(l.page)
- return true
- }
- func (l *streamNamesLister) Page() []string {
- return l.page
- }
- func (l *streamNamesLister) Err() error {
- return l.err
- }
- func (jsc *js) StreamNames(opts ...JSOpt) <-chan string {
- o, cancel, err := getJSContextOpts(jsc.opts, opts...)
- if err != nil {
- return nil
- }
- ch := make(chan string)
- l := &streamNamesLister{js: &js{nc: jsc.nc, opts: o}}
- go func() {
- if cancel != nil {
- defer cancel()
- }
- defer close(ch)
- for l.Next() {
- for _, info := range l.Page() {
- select {
- case ch <- info:
- case <-o.ctx.Done():
- return
- }
- }
- }
- }()
- return ch
- }
- func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, error) {
- var o jsOpts
- for _, opt := range opts {
- if err := opt.configureJSContext(&o); err != nil {
- return nil, nil, err
- }
- }
-
- if o.ctx != nil && o.wait != 0 {
- return nil, nil, ErrContextAndTimeout
- }
- if o.wait == 0 && o.ctx == nil {
- o.wait = defs.wait
- }
- var cancel context.CancelFunc
- if o.ctx == nil && o.wait > 0 {
- o.ctx, cancel = context.WithTimeout(context.Background(), o.wait)
- }
- if o.pre == "" {
- o.pre = defs.pre
- }
- return &o, cancel, nil
- }
|