|
- package nats
- import (
- "bufio"
- "bytes"
- "crypto/tls"
- "crypto/x509"
- "encoding/base64"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "math/rand"
- "net"
- "net/http"
- "net/textproto"
- "net/url"
- "os"
- "path/filepath"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/nats-io/nats.go/util"
- "github.com/nats-io/nkeys"
- "github.com/nats-io/nuid"
- )
- const (
- Version = "1.11.0"
- DefaultURL = "nats://127.0.0.1:4222"
- DefaultPort = 4222
- DefaultMaxReconnect = 60
- DefaultReconnectWait = 2 * time.Second
- DefaultReconnectJitter = 100 * time.Millisecond
- DefaultReconnectJitterTLS = time.Second
- DefaultTimeout = 2 * time.Second
- DefaultPingInterval = 2 * time.Minute
- DefaultMaxPingOut = 2
- DefaultMaxChanLen = 64 * 1024
- DefaultReconnectBufSize = 8 * 1024 * 1024
- RequestChanLen = 8
- DefaultDrainTimeout = 30 * time.Second
- LangString = "go"
- )
- const (
-
- STALE_CONNECTION = "stale connection"
-
- PERMISSIONS_ERR = "permissions violation"
-
- AUTHORIZATION_ERR = "authorization violation"
-
- AUTHENTICATION_EXPIRED_ERR = "user authentication expired"
-
- AUTHENTICATION_REVOKED_ERR = "user authentication revoked"
-
- ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired"
- )
- var (
- ErrConnectionClosed = errors.New("nats: connection closed")
- ErrConnectionDraining = errors.New("nats: connection draining")
- ErrDrainTimeout = errors.New("nats: draining connection timed out")
- ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
- ErrSecureConnRequired = errors.New("nats: secure connection required")
- ErrSecureConnWanted = errors.New("nats: secure connection not available")
- ErrBadSubscription = errors.New("nats: invalid subscription")
- ErrTypeSubscription = errors.New("nats: invalid subscription type")
- ErrBadSubject = errors.New("nats: invalid subject")
- ErrBadQueueName = errors.New("nats: invalid queue name")
- ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
- ErrTimeout = errors.New("nats: timeout")
- ErrBadTimeout = errors.New("nats: timeout invalid")
- ErrAuthorization = errors.New("nats: authorization violation")
- ErrAuthExpired = errors.New("nats: authentication expired")
- ErrAuthRevoked = errors.New("nats: authentication revoked")
- ErrAccountAuthExpired = errors.New("nats: account authentication expired")
- ErrNoServers = errors.New("nats: no servers available for connection")
- ErrJsonParse = errors.New("nats: connect message, json parse error")
- ErrChanArg = errors.New("nats: argument needs to be a channel type")
- ErrMaxPayload = errors.New("nats: maximum payload exceeded")
- ErrMaxMessages = errors.New("nats: maximum messages delivered")
- ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
- ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
- ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
- ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
- ErrInvalidConnection = errors.New("nats: invalid connection")
- ErrInvalidMsg = errors.New("nats: invalid message or message nil")
- ErrInvalidArg = errors.New("nats: invalid argument")
- ErrInvalidContext = errors.New("nats: invalid context")
- ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
- ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
- ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
- ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
- ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
- ErrNoUserCB = errors.New("nats: user callback not defined")
- ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
- ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
- ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
- ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
- ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
- ErrMsgNoReply = errors.New("nats: message does not have a reply")
- ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
- ErrDisconnected = errors.New("nats: server is disconnected")
- ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
- ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
- ErrNoResponders = errors.New("nats: no responders available for request")
- ErrNoContextOrTimeout = errors.New("nats: no context or timeout given")
- ErrPullModeNotAllowed = errors.New("nats: pull based not supported")
- ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled")
- ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid")
- ErrNoStreamResponse = errors.New("nats: no response from stream")
- ErrNotJSMessage = errors.New("nats: not a jetstream message")
- ErrInvalidStreamName = errors.New("nats: invalid stream name")
- ErrInvalidDurableName = errors.New("nats: invalid durable name")
- ErrNoMatchingStream = errors.New("nats: no stream matches subject")
- ErrSubjectMismatch = errors.New("nats: subject does not match consumer")
- ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set")
- ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response")
- ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported")
- ErrStreamNameRequired = errors.New("nats: stream name is required")
- ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required")
- ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required")
- ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required")
- )
- func init() {
- rand.Seed(time.Now().UnixNano())
- }
- func GetDefaultOptions() Options {
- return Options{
- AllowReconnect: true,
- MaxReconnect: DefaultMaxReconnect,
- ReconnectWait: DefaultReconnectWait,
- ReconnectJitter: DefaultReconnectJitter,
- ReconnectJitterTLS: DefaultReconnectJitterTLS,
- Timeout: DefaultTimeout,
- PingInterval: DefaultPingInterval,
- MaxPingsOut: DefaultMaxPingOut,
- SubChanLen: DefaultMaxChanLen,
- ReconnectBufSize: DefaultReconnectBufSize,
- DrainTimeout: DefaultDrainTimeout,
- }
- }
- var DefaultOptions = GetDefaultOptions()
- type Status int
- const (
- DISCONNECTED = Status(iota)
- CONNECTED
- CLOSED
- RECONNECTING
- CONNECTING
- DRAINING_SUBS
- DRAINING_PUBS
- )
- type ConnHandler func(*Conn)
- type ConnErrHandler func(*Conn, error)
- type ErrHandler func(*Conn, *Subscription, error)
- type UserJWTHandler func() (string, error)
- type SignatureHandler func([]byte) ([]byte, error)
- type AuthTokenHandler func() string
- type ReconnectDelayHandler func(attempts int) time.Duration
- type asyncCB struct {
- f func()
- next *asyncCB
- }
- type asyncCallbacksHandler struct {
- mu sync.Mutex
- cond *sync.Cond
- head *asyncCB
- tail *asyncCB
- }
- type Option func(*Options) error
- type CustomDialer interface {
- Dial(network, address string) (net.Conn, error)
- }
- type Options struct {
-
-
-
- Url string
-
-
- Servers []string
-
-
- NoRandomize bool
-
-
-
- NoEcho bool
-
-
- Name string
-
-
- Verbose bool
-
-
- Pedantic bool
-
-
- Secure bool
-
-
- TLSConfig *tls.Config
-
-
- AllowReconnect bool
-
-
-
- MaxReconnect int
-
-
- ReconnectWait time.Duration
-
-
-
-
-
-
- CustomReconnectDelayCB ReconnectDelayHandler
-
-
-
- ReconnectJitter time.Duration
-
-
-
- ReconnectJitterTLS time.Duration
-
- Timeout time.Duration
-
- DrainTimeout time.Duration
-
-
- FlusherTimeout time.Duration
-
-
- PingInterval time.Duration
-
-
- MaxPingsOut int
-
-
- ClosedCB ConnHandler
-
-
-
-
-
- DisconnectedCB ConnHandler
-
-
-
-
- DisconnectedErrCB ConnErrHandler
-
-
- ReconnectedCB ConnHandler
-
-
- DiscoveredServersCB ConnHandler
-
- AsyncErrorCB ErrHandler
-
-
- ReconnectBufSize int
-
-
-
-
- SubChanLen int
-
- UserJWT UserJWTHandler
-
-
-
- Nkey string
-
-
- SignatureCB SignatureHandler
-
- User string
-
- Password string
-
- Token string
-
- TokenHandler AuthTokenHandler
-
-
- Dialer *net.Dialer
-
-
- CustomDialer CustomDialer
-
-
- UseOldRequestStyle bool
-
-
-
- NoCallbacksAfterClientClose bool
-
-
-
-
- LameDuckModeHandler ConnHandler
-
-
-
-
-
-
-
- RetryOnFailedConnect bool
-
-
- Compression bool
- }
- const (
-
- scratchSize = 512
-
- defaultBufSize = 32768
-
- flushChanSize = 1
-
- srvPoolSize = 4
-
- nuidSize = 22
-
- defaultPortString = "4222"
- )
- type Conn struct {
-
-
-
-
- Statistics
- mu sync.RWMutex
-
-
- Opts Options
- wg sync.WaitGroup
- srvPool []*srv
- current *srv
- urls map[string]struct{}
- conn net.Conn
- bw *natsWriter
- br *natsReader
- fch chan struct{}
- info serverInfo
- ssid int64
- subsMu sync.RWMutex
- subs map[int64]*Subscription
- ach *asyncCallbacksHandler
- pongs []chan struct{}
- scratch [scratchSize]byte
- status Status
- initc bool
- err error
- ps *parseState
- ptmr *time.Timer
- pout int
- ar bool
- rqch chan struct{}
- ws bool
-
- respSub string
- respScanf string
- respMux *Subscription
- respMap map[string]chan *Msg
- respRand *rand.Rand
-
- jsLastCheck time.Time
- }
- type natsReader struct {
- r io.Reader
- buf []byte
- off int
- n int
- }
- type natsWriter struct {
- w io.Writer
- bufs []byte
- limit int
- pending *bytes.Buffer
- plimit int
- }
- type Subscription struct {
- mu sync.Mutex
- sid int64
-
-
- Subject string
-
-
-
- Queue string
-
- jsi *jsSub
- delivered uint64
- max uint64
- conn *Conn
- mcb MsgHandler
- mch chan *Msg
- closed bool
- sc bool
- connClosed bool
-
- typ SubscriptionType
-
- pHead *Msg
- pTail *Msg
- pCond *sync.Cond
-
- pMsgs int
- pBytes int
- pMsgsMax int
- pBytesMax int
- pMsgsLimit int
- pBytesLimit int
- dropped int
- }
- type Msg struct {
- Subject string
- Reply string
- Header Header
- Data []byte
- Sub *Subscription
- next *Msg
- barrier *barrierInfo
- ackd uint32
- }
- func (m *Msg) headerBytes() ([]byte, error) {
- var hdr []byte
- if len(m.Header) == 0 {
- return hdr, nil
- }
- var b bytes.Buffer
- _, err := b.WriteString(hdrLine)
- if err != nil {
- return nil, ErrBadHeaderMsg
- }
- err = http.Header(m.Header).Write(&b)
- if err != nil {
- return nil, ErrBadHeaderMsg
- }
- _, err = b.WriteString(crlf)
- if err != nil {
- return nil, ErrBadHeaderMsg
- }
- return b.Bytes(), nil
- }
- type barrierInfo struct {
- refs int64
- f func()
- }
- type Statistics struct {
- InMsgs uint64
- OutMsgs uint64
- InBytes uint64
- OutBytes uint64
- Reconnects uint64
- }
- type srv struct {
- url *url.URL
- didConnect bool
- reconnects int
- lastErr error
- isImplicit bool
- tlsName string
- }
- type serverInfo struct {
- ID string `json:"server_id"`
- Name string `json:"server_name"`
- Proto int `json:"proto"`
- Host string `json:"host"`
- Port int `json:"port"`
- Headers bool `json:"headers"`
- AuthRequired bool `json:"auth_required,omitempty"`
- TLSRequired bool `json:"tls_required,omitempty"`
- TLSAvailable bool `json:"tls_available,omitempty"`
- MaxPayload int64 `json:"max_payload"`
- CID uint64 `json:"client_id,omitempty"`
- ClientIP string `json:"client_ip,omitempty"`
- Nonce string `json:"nonce,omitempty"`
- Cluster string `json:"cluster,omitempty"`
- ConnectURLs []string `json:"connect_urls,omitempty"`
- LameDuckMode bool `json:"ldm,omitempty"`
- }
- const (
-
-
- _ = iota
-
-
- clientProtoInfo
- )
- type connectInfo struct {
- Verbose bool `json:"verbose"`
- Pedantic bool `json:"pedantic"`
- UserJWT string `json:"jwt,omitempty"`
- Nkey string `json:"nkey,omitempty"`
- Signature string `json:"sig,omitempty"`
- User string `json:"user,omitempty"`
- Pass string `json:"pass,omitempty"`
- Token string `json:"auth_token,omitempty"`
- TLS bool `json:"tls_required"`
- Name string `json:"name"`
- Lang string `json:"lang"`
- Version string `json:"version"`
- Protocol int `json:"protocol"`
- Echo bool `json:"echo"`
- Headers bool `json:"headers"`
- NoResponders bool `json:"no_responders"`
- }
- type MsgHandler func(msg *Msg)
- func Connect(url string, options ...Option) (*Conn, error) {
- opts := GetDefaultOptions()
- opts.Servers = processUrlString(url)
- for _, opt := range options {
- if opt != nil {
- if err := opt(&opts); err != nil {
- return nil, err
- }
- }
- }
- return opts.Connect()
- }
- func Name(name string) Option {
- return func(o *Options) error {
- o.Name = name
- return nil
- }
- }
- func Secure(tls ...*tls.Config) Option {
- return func(o *Options) error {
- o.Secure = true
-
- if len(tls) > 1 {
- return ErrMultipleTLSConfigs
- }
- if len(tls) == 1 {
- o.TLSConfig = tls[0]
- }
- return nil
- }
- }
- func RootCAs(file ...string) Option {
- return func(o *Options) error {
- pool := x509.NewCertPool()
- for _, f := range file {
- rootPEM, err := ioutil.ReadFile(f)
- if err != nil || rootPEM == nil {
- return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
- }
- ok := pool.AppendCertsFromPEM(rootPEM)
- if !ok {
- return fmt.Errorf("nats: failed to parse root certificate from %q", f)
- }
- }
- if o.TLSConfig == nil {
- o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
- }
- o.TLSConfig.RootCAs = pool
- o.Secure = true
- return nil
- }
- }
- func ClientCert(certFile, keyFile string) Option {
- return func(o *Options) error {
- cert, err := tls.LoadX509KeyPair(certFile, keyFile)
- if err != nil {
- return fmt.Errorf("nats: error loading client certificate: %v", err)
- }
- cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
- if err != nil {
- return fmt.Errorf("nats: error parsing client certificate: %v", err)
- }
- if o.TLSConfig == nil {
- o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
- }
- o.TLSConfig.Certificates = []tls.Certificate{cert}
- o.Secure = true
- return nil
- }
- }
- func NoReconnect() Option {
- return func(o *Options) error {
- o.AllowReconnect = false
- return nil
- }
- }
- func DontRandomize() Option {
- return func(o *Options) error {
- o.NoRandomize = true
- return nil
- }
- }
- func NoEcho() Option {
- return func(o *Options) error {
- o.NoEcho = true
- return nil
- }
- }
- func ReconnectWait(t time.Duration) Option {
- return func(o *Options) error {
- o.ReconnectWait = t
- return nil
- }
- }
- func MaxReconnects(max int) Option {
- return func(o *Options) error {
- o.MaxReconnect = max
- return nil
- }
- }
- func ReconnectJitter(jitter, jitterForTLS time.Duration) Option {
- return func(o *Options) error {
- o.ReconnectJitter = jitter
- o.ReconnectJitterTLS = jitterForTLS
- return nil
- }
- }
- func CustomReconnectDelay(cb ReconnectDelayHandler) Option {
- return func(o *Options) error {
- o.CustomReconnectDelayCB = cb
- return nil
- }
- }
- func PingInterval(t time.Duration) Option {
- return func(o *Options) error {
- o.PingInterval = t
- return nil
- }
- }
- func MaxPingsOutstanding(max int) Option {
- return func(o *Options) error {
- o.MaxPingsOut = max
- return nil
- }
- }
- func ReconnectBufSize(size int) Option {
- return func(o *Options) error {
- o.ReconnectBufSize = size
- return nil
- }
- }
- func Timeout(t time.Duration) Option {
- return func(o *Options) error {
- o.Timeout = t
- return nil
- }
- }
- func FlusherTimeout(t time.Duration) Option {
- return func(o *Options) error {
- o.FlusherTimeout = t
- return nil
- }
- }
- func DrainTimeout(t time.Duration) Option {
- return func(o *Options) error {
- o.DrainTimeout = t
- return nil
- }
- }
- func DisconnectErrHandler(cb ConnErrHandler) Option {
- return func(o *Options) error {
- o.DisconnectedErrCB = cb
- return nil
- }
- }
- func DisconnectHandler(cb ConnHandler) Option {
- return func(o *Options) error {
- o.DisconnectedCB = cb
- return nil
- }
- }
- func ReconnectHandler(cb ConnHandler) Option {
- return func(o *Options) error {
- o.ReconnectedCB = cb
- return nil
- }
- }
- func ClosedHandler(cb ConnHandler) Option {
- return func(o *Options) error {
- o.ClosedCB = cb
- return nil
- }
- }
- func DiscoveredServersHandler(cb ConnHandler) Option {
- return func(o *Options) error {
- o.DiscoveredServersCB = cb
- return nil
- }
- }
- func ErrorHandler(cb ErrHandler) Option {
- return func(o *Options) error {
- o.AsyncErrorCB = cb
- return nil
- }
- }
- func UserInfo(user, password string) Option {
- return func(o *Options) error {
- o.User = user
- o.Password = password
- return nil
- }
- }
- func Token(token string) Option {
- return func(o *Options) error {
- if o.TokenHandler != nil {
- return ErrTokenAlreadySet
- }
- o.Token = token
- return nil
- }
- }
- func TokenHandler(cb AuthTokenHandler) Option {
- return func(o *Options) error {
- if o.Token != "" {
- return ErrTokenAlreadySet
- }
- o.TokenHandler = cb
- return nil
- }
- }
- func UserCredentials(userOrChainedFile string, seedFiles ...string) Option {
- userCB := func() (string, error) {
- return userFromFile(userOrChainedFile)
- }
- var keyFile string
- if len(seedFiles) > 0 {
- keyFile = seedFiles[0]
- } else {
- keyFile = userOrChainedFile
- }
- sigCB := func(nonce []byte) ([]byte, error) {
- return sigHandler(nonce, keyFile)
- }
- return UserJWT(userCB, sigCB)
- }
- func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option {
- return func(o *Options) error {
- if userCB == nil {
- return ErrNoUserCB
- }
- if sigCB == nil {
- return ErrUserButNoSigCB
- }
- o.UserJWT = userCB
- o.SignatureCB = sigCB
- return nil
- }
- }
- func Nkey(pubKey string, sigCB SignatureHandler) Option {
- return func(o *Options) error {
- o.Nkey = pubKey
- o.SignatureCB = sigCB
- if pubKey != "" && sigCB == nil {
- return ErrNkeyButNoSigCB
- }
- return nil
- }
- }
- func SyncQueueLen(max int) Option {
- return func(o *Options) error {
- o.SubChanLen = max
- return nil
- }
- }
- func Dialer(dialer *net.Dialer) Option {
- return func(o *Options) error {
- o.Dialer = dialer
- return nil
- }
- }
- func SetCustomDialer(dialer CustomDialer) Option {
- return func(o *Options) error {
- o.CustomDialer = dialer
- return nil
- }
- }
- func UseOldRequestStyle() Option {
- return func(o *Options) error {
- o.UseOldRequestStyle = true
- return nil
- }
- }
- func NoCallbacksAfterClientClose() Option {
- return func(o *Options) error {
- o.NoCallbacksAfterClientClose = true
- return nil
- }
- }
- func LameDuckModeHandler(cb ConnHandler) Option {
- return func(o *Options) error {
- o.LameDuckModeHandler = cb
- return nil
- }
- }
- func RetryOnFailedConnect(retry bool) Option {
- return func(o *Options) error {
- o.RetryOnFailedConnect = retry
- return nil
- }
- }
- func Compression(enabled bool) Option {
- return func(o *Options) error {
- o.Compression = enabled
- return nil
- }
- }
- func (nc *Conn) SetDisconnectHandler(dcb ConnHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.DisconnectedCB = dcb
- }
- func (nc *Conn) SetDisconnectErrHandler(dcb ConnErrHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.DisconnectedErrCB = dcb
- }
- func (nc *Conn) SetReconnectHandler(rcb ConnHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.ReconnectedCB = rcb
- }
- func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.DiscoveredServersCB = dscb
- }
- func (nc *Conn) SetClosedHandler(cb ConnHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.ClosedCB = cb
- }
- func (nc *Conn) SetErrorHandler(cb ErrHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.AsyncErrorCB = cb
- }
- func processUrlString(url string) []string {
- urls := strings.Split(url, ",")
- for i, s := range urls {
- urls[i] = strings.TrimSpace(s)
- }
- return urls
- }
- func (o Options) Connect() (*Conn, error) {
- nc := &Conn{Opts: o}
-
- if nc.Opts.MaxPingsOut == 0 {
- nc.Opts.MaxPingsOut = DefaultMaxPingOut
- }
-
- if nc.Opts.SubChanLen == 0 {
- nc.Opts.SubChanLen = DefaultMaxChanLen
- }
-
- if nc.Opts.ReconnectBufSize == 0 {
- nc.Opts.ReconnectBufSize = DefaultReconnectBufSize
- }
-
- if nc.Opts.Timeout == 0 {
- nc.Opts.Timeout = DefaultTimeout
- }
-
- if nc.Opts.UserJWT != nil && nc.Opts.Nkey != "" {
- return nil, ErrNkeyAndUser
- }
-
- if nc.Opts.Nkey != "" && nc.Opts.SignatureCB == nil {
- return nil, ErrNkeyButNoSigCB
- }
-
- if nc.Opts.Dialer == nil {
- nc.Opts.Dialer = &net.Dialer{
- Timeout: nc.Opts.Timeout,
- }
- }
- if err := nc.setupServerPool(); err != nil {
- return nil, err
- }
-
- nc.ach = &asyncCallbacksHandler{}
- nc.ach.cond = sync.NewCond(&nc.ach.mu)
-
- if nc.Opts.AsyncErrorCB == nil {
- nc.Opts.AsyncErrorCB = defaultErrHandler
- }
-
- nc.newReaderWriter()
- if err := nc.connect(); err != nil {
- return nil, err
- }
-
- go nc.ach.asyncCBDispatcher()
- return nc, nil
- }
- func defaultErrHandler(nc *Conn, sub *Subscription, err error) {
- var cid uint64
- if nc != nil {
- nc.mu.RLock()
- cid = nc.info.CID
- nc.mu.RUnlock()
- }
- var errStr string
- if sub != nil {
- errStr = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", err.Error(), cid, sub.Subject)
- } else {
- errStr = fmt.Sprintf("%s on connection [%d]\n", err.Error(), cid)
- }
- os.Stderr.WriteString(errStr)
- }
- const (
- _CRLF_ = "\r\n"
- _EMPTY_ = ""
- _SPC_ = " "
- _PUB_P_ = "PUB "
- _HPUB_P_ = "HPUB "
- )
- var _CRLF_BYTES_ = []byte(_CRLF_)
- const (
- _OK_OP_ = "+OK"
- _ERR_OP_ = "-ERR"
- _PONG_OP_ = "PONG"
- _INFO_OP_ = "INFO"
- )
- const (
- connectProto = "CONNECT %s" + _CRLF_
- pingProto = "PING" + _CRLF_
- pongProto = "PONG" + _CRLF_
- subProto = "SUB %s %s %d" + _CRLF_
- unsubProto = "UNSUB %d %s" + _CRLF_
- okProto = _OK_OP_ + _CRLF_
- )
- func (nc *Conn) currentServer() (int, *srv) {
- for i, s := range nc.srvPool {
- if s == nil {
- continue
- }
- if s == nc.current {
- return i, s
- }
- }
- return -1, nil
- }
- func (nc *Conn) selectNextServer() (*srv, error) {
- i, s := nc.currentServer()
- if i < 0 {
- return nil, ErrNoServers
- }
- sp := nc.srvPool
- num := len(sp)
- copy(sp[i:num-1], sp[i+1:num])
- maxReconnect := nc.Opts.MaxReconnect
- if maxReconnect < 0 || s.reconnects < maxReconnect {
- nc.srvPool[num-1] = s
- } else {
- nc.srvPool = sp[0 : num-1]
- }
- if len(nc.srvPool) <= 0 {
- nc.current = nil
- return nil, ErrNoServers
- }
- nc.current = nc.srvPool[0]
- return nc.srvPool[0], nil
- }
- func (nc *Conn) pickServer() error {
- nc.current = nil
- if len(nc.srvPool) <= 0 {
- return ErrNoServers
- }
- for _, s := range nc.srvPool {
- if s != nil {
- nc.current = s
- return nil
- }
- }
- return ErrNoServers
- }
- const tlsScheme = "tls"
- func (nc *Conn) setupServerPool() error {
- nc.srvPool = make([]*srv, 0, srvPoolSize)
- nc.urls = make(map[string]struct{}, srvPoolSize)
-
-
- for _, urlString := range nc.Opts.Servers {
- if err := nc.addURLToPool(urlString, false, false); err != nil {
- return err
- }
- }
-
- if !nc.Opts.NoRandomize {
- nc.shufflePool(0)
- }
-
-
- if nc.Opts.Url != _EMPTY_ {
-
- if err := nc.addURLToPool(nc.Opts.Url, false, false); err != nil {
- return err
- }
-
- last := len(nc.srvPool) - 1
- if last > 0 {
- nc.srvPool[0], nc.srvPool[last] = nc.srvPool[last], nc.srvPool[0]
- }
- } else if len(nc.srvPool) <= 0 {
-
- if err := nc.addURLToPool(DefaultURL, false, false); err != nil {
- return err
- }
- }
-
- for _, srv := range nc.srvPool {
- if srv.url.Scheme == tlsScheme {
-
- nc.Opts.Secure = true
- if nc.Opts.TLSConfig == nil {
- nc.Opts.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
- }
- }
- }
- return nc.pickServer()
- }
- func (nc *Conn) connScheme() string {
- if nc.ws {
- if nc.Opts.Secure {
- return wsSchemeTLS
- }
- return wsScheme
- }
- if nc.Opts.Secure {
- return tlsScheme
- }
- return "nats"
- }
- func hostIsIP(u *url.URL) bool {
- return net.ParseIP(u.Hostname()) != nil
- }
- func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
- if !strings.Contains(sURL, "://") {
- sURL = fmt.Sprintf("%s://%s", nc.connScheme(), sURL)
- }
- var (
- u *url.URL
- err error
- )
- for i := 0; i < 2; i++ {
- u, err = url.Parse(sURL)
- if err != nil {
- return err
- }
- if u.Port() != "" {
- break
- }
-
-
- if sURL[len(sURL)-1] != ':' {
- sURL += ":"
- }
- sURL += defaultPortString
- }
- isWS := isWebsocketScheme(u)
-
-
-
- if len(nc.srvPool) == 0 {
- nc.ws = isWS
- } else if isWS && !nc.ws || !isWS && nc.ws {
- return fmt.Errorf("mixing of websocket and non websocket URLs is not allowed")
- }
- var tlsName string
- if implicit {
- curl := nc.current.url
-
-
- if u.User == nil && curl.User != nil {
- u.User = curl.User
- }
-
-
-
- if saveTLSName && hostIsIP(u) {
- tlsName = curl.Hostname()
- }
- }
- s := &srv{url: u, isImplicit: implicit, tlsName: tlsName}
- nc.srvPool = append(nc.srvPool, s)
- nc.urls[u.Host] = struct{}{}
- return nil
- }
- func (nc *Conn) shufflePool(offset int) {
- if len(nc.srvPool) <= offset+1 {
- return
- }
- source := rand.NewSource(time.Now().UnixNano())
- r := rand.New(source)
- for i := offset; i < len(nc.srvPool); i++ {
- j := offset + r.Intn(i+1-offset)
- nc.srvPool[i], nc.srvPool[j] = nc.srvPool[j], nc.srvPool[i]
- }
- }
- func (nc *Conn) newReaderWriter() {
- nc.br = &natsReader{
- buf: make([]byte, defaultBufSize),
- off: -1,
- }
- nc.bw = &natsWriter{
- limit: defaultBufSize,
- plimit: nc.Opts.ReconnectBufSize,
- }
- }
- func (nc *Conn) bindToNewConn() {
- bw := nc.bw
- bw.w, bw.bufs = nc.newWriter(), nil
- br := nc.br
- br.r, br.n, br.off = nc.conn, 0, -1
- }
- func (nc *Conn) newWriter() io.Writer {
- var w io.Writer = nc.conn
- if nc.Opts.FlusherTimeout > 0 {
- w = &timeoutWriter{conn: nc.conn, timeout: nc.Opts.FlusherTimeout}
- }
- return w
- }
- func (w *natsWriter) appendString(str string) error {
- return w.appendBufs([]byte(str))
- }
- func (w *natsWriter) appendBufs(bufs ...[]byte) error {
- for _, buf := range bufs {
- if len(buf) == 0 {
- continue
- }
- if w.pending != nil {
- w.pending.Write(buf)
- } else {
- w.bufs = append(w.bufs, buf...)
- }
- }
- if w.pending == nil && len(w.bufs) >= w.limit {
- return w.flush()
- }
- return nil
- }
- func (w *natsWriter) writeDirect(strs ...string) error {
- for _, str := range strs {
- if _, err := w.w.Write([]byte(str)); err != nil {
- return err
- }
- }
- return nil
- }
- func (w *natsWriter) flush() error {
-
-
-
- if w.pending != nil {
- return nil
- }
-
-
-
- _, err := w.w.Write(w.bufs)
- w.bufs = w.bufs[:0]
- return err
- }
- func (w *natsWriter) buffered() int {
- if w.pending != nil {
- return w.pending.Len()
- }
- return len(w.bufs)
- }
- func (w *natsWriter) switchToPending() {
- w.pending = new(bytes.Buffer)
- }
- func (w *natsWriter) flushPendingBuffer() error {
- if w.pending == nil || w.pending.Len() == 0 {
- return nil
- }
- _, err := w.w.Write(w.pending.Bytes())
-
-
- w.pending.Reset()
- return err
- }
- func (w *natsWriter) atLimitIfUsingPending() bool {
- if w.pending == nil {
- return false
- }
- return w.pending.Len() >= w.plimit
- }
- func (w *natsWriter) doneWithPending() {
- w.pending = nil
- }
- func (r *natsReader) Read() ([]byte, error) {
- if r.off >= 0 {
- off := r.off
- r.off = -1
- return r.buf[off:r.n], nil
- }
- var err error
- r.n, err = r.r.Read(r.buf)
- return r.buf[:r.n], err
- }
- func (r *natsReader) ReadString(delim byte) (string, error) {
- var s string
- build_string:
-
- if r.off >= 0 {
- i := bytes.IndexByte(r.buf[r.off:r.n], delim)
- if i >= 0 {
- end := r.off + i + 1
- s += string(r.buf[r.off:end])
- r.off = end
- if r.off >= r.n {
- r.off = -1
- }
- return s, nil
- }
-
- s += string(r.buf[r.off:r.n])
- r.off = -1
- }
- if _, err := r.Read(); err != nil {
- return s, err
- }
- r.off = 0
- goto build_string
- }
- func (nc *Conn) createConn() (err error) {
- if nc.Opts.Timeout < 0 {
- return ErrBadTimeout
- }
- if _, cur := nc.currentServer(); cur == nil {
- return ErrNoServers
- }
-
- hosts := []string{}
- u := nc.current.url
- if net.ParseIP(u.Hostname()) == nil {
- addrs, _ := net.LookupHost(u.Hostname())
- for _, addr := range addrs {
- hosts = append(hosts, net.JoinHostPort(addr, u.Port()))
- }
- }
-
- if len(hosts) == 0 {
- hosts = append(hosts, u.Host)
- }
-
-
-
- dialer := nc.Opts.CustomDialer
- if dialer == nil {
-
- copyDialer := *nc.Opts.Dialer
- copyDialer.Timeout = copyDialer.Timeout / time.Duration(len(hosts))
- dialer = ©Dialer
- }
- if len(hosts) > 1 && !nc.Opts.NoRandomize {
- rand.Shuffle(len(hosts), func(i, j int) {
- hosts[i], hosts[j] = hosts[j], hosts[i]
- })
- }
- for _, host := range hosts {
- nc.conn, err = dialer.Dial("tcp", host)
- if err == nil {
- break
- }
- }
- if err != nil {
- return err
- }
-
- if isWebsocketScheme(u) {
- return nc.wsInitHandshake(u)
- }
-
- nc.bindToNewConn()
- return nil
- }
- func (nc *Conn) makeTLSConn() error {
-
- var tlsCopy *tls.Config
- if nc.Opts.TLSConfig != nil {
- tlsCopy = util.CloneTLSConfig(nc.Opts.TLSConfig)
- } else {
- tlsCopy = &tls.Config{}
- }
-
- if tlsCopy.ServerName == _EMPTY_ {
- if nc.current.tlsName != _EMPTY_ {
- tlsCopy.ServerName = nc.current.tlsName
- } else {
- h, _, _ := net.SplitHostPort(nc.current.url.Host)
- tlsCopy.ServerName = h
- }
- }
- nc.conn = tls.Client(nc.conn, tlsCopy)
- conn := nc.conn.(*tls.Conn)
- if err := conn.Handshake(); err != nil {
- return err
- }
- nc.bindToNewConn()
- return nil
- }
- func (nc *Conn) waitForExits() {
-
- select {
- case nc.fch <- struct{}{}:
- default:
- }
-
- nc.wg.Wait()
- }
- func (nc *Conn) ConnectedUrl() string {
- if nc == nil {
- return _EMPTY_
- }
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- if nc.status != CONNECTED {
- return _EMPTY_
- }
- return nc.current.url.String()
- }
- func (nc *Conn) ConnectedAddr() string {
- if nc == nil {
- return _EMPTY_
- }
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- if nc.status != CONNECTED {
- return _EMPTY_
- }
- return nc.conn.RemoteAddr().String()
- }
- func (nc *Conn) ConnectedServerId() string {
- if nc == nil {
- return _EMPTY_
- }
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- if nc.status != CONNECTED {
- return _EMPTY_
- }
- return nc.info.ID
- }
- func (nc *Conn) ConnectedServerName() string {
- if nc == nil {
- return _EMPTY_
- }
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- if nc.status != CONNECTED {
- return _EMPTY_
- }
- return nc.info.Name
- }
- func (nc *Conn) ConnectedClusterName() string {
- if nc == nil {
- return _EMPTY_
- }
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- if nc.status != CONNECTED {
- return _EMPTY_
- }
- return nc.info.Cluster
- }
- func (nc *Conn) setup() {
- nc.subs = make(map[int64]*Subscription)
- nc.pongs = make([]chan struct{}, 0, 8)
- nc.fch = make(chan struct{}, flushChanSize)
- nc.rqch = make(chan struct{})
-
- pub := nc.scratch[:len(_HPUB_P_)]
- copy(pub, _HPUB_P_)
- }
- func (nc *Conn) processConnectInit() error {
-
- nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout))
- defer nc.conn.SetDeadline(time.Time{})
-
- nc.status = CONNECTING
-
- err := nc.processExpectedInfo()
- if err != nil {
- return err
- }
-
-
- err = nc.sendConnect()
- if err != nil {
- return err
- }
-
- nc.pout = 0
-
- if nc.Opts.PingInterval > 0 {
- if nc.ptmr == nil {
- nc.ptmr = time.AfterFunc(nc.Opts.PingInterval, nc.processPingTimer)
- } else {
- nc.ptmr.Reset(nc.Opts.PingInterval)
- }
- }
-
- nc.wg.Add(2)
- go nc.readLoop()
- go nc.flusher()
- return nil
- }
- func (nc *Conn) connect() error {
- var err error
-
-
-
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.initc = true
-
- for i := 0; i < len(nc.srvPool); i++ {
- nc.current = nc.srvPool[i]
- if err = nc.createConn(); err == nil {
-
-
- nc.setup()
- err = nc.processConnectInit()
- if err == nil {
- nc.current.didConnect = true
- nc.current.reconnects = 0
- nc.current.lastErr = nil
- break
- } else {
- nc.mu.Unlock()
- nc.close(DISCONNECTED, false, err)
- nc.mu.Lock()
-
-
-
- }
- } else {
-
-
- if strings.Contains(err.Error(), "connection refused") {
- err = nil
- }
- }
- }
- if err == nil && nc.status != CONNECTED {
- err = ErrNoServers
- }
- if err == nil {
- nc.initc = false
- } else if nc.Opts.RetryOnFailedConnect {
- nc.setup()
- nc.status = RECONNECTING
- nc.bw.switchToPending()
- go nc.doReconnect(ErrNoServers)
- err = nil
- } else {
- nc.current = nil
- }
- return err
- }
- func (nc *Conn) checkForSecure() error {
-
- o := nc.Opts
-
- if o.Secure && !nc.info.TLSRequired && !nc.info.TLSAvailable {
- return ErrSecureConnWanted
- } else if nc.info.TLSRequired && !o.Secure {
-
- o.Secure = true
- }
-
- if o.Secure {
- if err := nc.makeTLSConn(); err != nil {
- return err
- }
- }
- return nil
- }
- func (nc *Conn) processExpectedInfo() error {
- c := &control{}
-
- err := nc.readOp(c)
- if err != nil {
- return err
- }
-
- if c.op != _INFO_OP_ {
- return ErrNoInfoReceived
- }
-
- if err := nc.processInfo(c.args); err != nil {
- return err
- }
- if nc.Opts.Nkey != "" && nc.info.Nonce == "" {
- return ErrNkeysNotSupported
- }
-
-
- if nc.ws {
- return nil
- }
- return nc.checkForSecure()
- }
- func (nc *Conn) sendProto(proto string) {
- nc.mu.Lock()
- nc.bw.appendString(proto)
- nc.kickFlusher()
- nc.mu.Unlock()
- }
- func (nc *Conn) connectProto() (string, error) {
- o := nc.Opts
- var nkey, sig, user, pass, token, ujwt string
- u := nc.current.url.User
- if u != nil {
-
- if _, ok := u.Password(); !ok {
- token = u.Username()
- } else {
- user = u.Username()
- pass, _ = u.Password()
- }
- } else {
-
- user = o.User
- pass = o.Password
- token = o.Token
- nkey = o.Nkey
- }
-
- if o.UserJWT != nil {
- if jwt, err := o.UserJWT(); err != nil {
- return _EMPTY_, err
- } else {
- ujwt = jwt
- }
- if nkey != _EMPTY_ {
- return _EMPTY_, ErrNkeyAndUser
- }
- }
- if ujwt != _EMPTY_ || nkey != _EMPTY_ {
- if o.SignatureCB == nil {
- if ujwt == _EMPTY_ {
- return _EMPTY_, ErrNkeyButNoSigCB
- }
- return _EMPTY_, ErrUserButNoSigCB
- }
- sigraw, err := o.SignatureCB([]byte(nc.info.Nonce))
- if err != nil {
- return _EMPTY_, err
- }
- sig = base64.RawURLEncoding.EncodeToString(sigraw)
- }
- if nc.Opts.TokenHandler != nil {
- if token != _EMPTY_ {
- return _EMPTY_, ErrTokenAlreadySet
- }
- token = nc.Opts.TokenHandler()
- }
-
- hdrs := nc.info.Headers
- cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
- o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs}
- b, err := json.Marshal(cinfo)
- if err != nil {
- return _EMPTY_, ErrJsonParse
- }
-
- if o.NoEcho && nc.info.Proto < 1 {
- return _EMPTY_, ErrNoEchoNotSupported
- }
- return fmt.Sprintf(connectProto, b), nil
- }
- func normalizeErr(line string) string {
- s := strings.TrimSpace(strings.TrimPrefix(line, _ERR_OP_))
- s = strings.TrimLeft(strings.TrimRight(s, "'"), "'")
- return s
- }
- func (nc *Conn) sendConnect() error {
-
- cProto, err := nc.connectProto()
- if err != nil {
- return err
- }
-
- if err := nc.bw.writeDirect(cProto, pingProto); err != nil {
- return err
- }
-
-
-
-
- proto, err := nc.readProto()
- if err != nil {
- return err
- }
-
- if nc.Opts.Verbose && proto == okProto {
-
- proto, err = nc.readProto()
- if err != nil {
- return err
- }
- }
-
- if proto != pongProto {
-
-
- proto = strings.TrimRight(proto, "\r\n")
-
- if strings.HasPrefix(proto, _ERR_OP_) {
-
- proto = normalizeErr(proto)
-
- if authErr := checkAuthError(strings.ToLower(proto)); authErr != nil {
-
-
-
-
-
- nc.processAuthError(authErr)
- }
- return errors.New("nats: " + proto)
- }
-
- return fmt.Errorf("nats: expected '%s', got '%s'", _PONG_OP_, proto)
- }
-
- nc.status = CONNECTED
- return nil
- }
- func (nc *Conn) readProto() (string, error) {
- return nc.br.ReadString('\n')
- }
- type control struct {
- op, args string
- }
- func (nc *Conn) readOp(c *control) error {
- line, err := nc.readProto()
- if err != nil {
- return err
- }
- parseControl(line, c)
- return nil
- }
- func parseControl(line string, c *control) {
- toks := strings.SplitN(line, _SPC_, 2)
- if len(toks) == 1 {
- c.op = strings.TrimSpace(toks[0])
- c.args = _EMPTY_
- } else if len(toks) == 2 {
- c.op, c.args = strings.TrimSpace(toks[0]), strings.TrimSpace(toks[1])
- } else {
- c.op = _EMPTY_
- }
- }
- func (nc *Conn) flushReconnectPendingItems() error {
- return nc.bw.flushPendingBuffer()
- }
- func (nc *Conn) stopPingTimer() {
- if nc.ptmr != nil {
- nc.ptmr.Stop()
- }
- }
- func (nc *Conn) doReconnect(err error) {
-
-
- nc.waitForExits()
-
-
-
-
-
- nc.mu.Lock()
-
- nc.err = nil
-
-
- if !nc.initc {
- if nc.Opts.DisconnectedErrCB != nil {
- nc.ach.push(func() { nc.Opts.DisconnectedErrCB(nc, err) })
- } else if nc.Opts.DisconnectedCB != nil {
- nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
- }
- }
-
-
- waitForGoRoutines := false
- var rt *time.Timer
-
- rqch := nc.rqch
-
- var wlf int
- var jitter time.Duration
- var rw time.Duration
-
- crd := nc.Opts.CustomReconnectDelayCB
- if crd == nil {
- rw = nc.Opts.ReconnectWait
-
-
-
- jitter = nc.Opts.ReconnectJitter
- if nc.Opts.Secure || nc.Opts.TLSConfig != nil {
- jitter = nc.Opts.ReconnectJitterTLS
- }
- }
- for i := 0; len(nc.srvPool) > 0; {
- cur, err := nc.selectNextServer()
- if err != nil {
- nc.err = err
- break
- }
- doSleep := i+1 >= len(nc.srvPool)
- nc.mu.Unlock()
- if !doSleep {
- i++
-
- runtime.Gosched()
- } else {
- i = 0
- var st time.Duration
- if crd != nil {
- wlf++
- st = crd(wlf)
- } else {
- st = rw
- if jitter > 0 {
- st += time.Duration(rand.Int63n(int64(jitter)))
- }
- }
- if rt == nil {
- rt = time.NewTimer(st)
- } else {
- rt.Reset(st)
- }
- select {
- case <-rqch:
- rt.Stop()
- case <-rt.C:
- }
- }
-
- if waitForGoRoutines {
- nc.waitForExits()
- waitForGoRoutines = false
- }
- nc.mu.Lock()
-
- if nc.isClosed() {
- break
- }
-
- cur.reconnects++
-
- err = nc.createConn()
-
-
- if err != nil {
- nc.err = nil
- continue
- }
-
- nc.Reconnects++
-
- if nc.err = nc.processConnectInit(); nc.err != nil {
-
-
- if nc.ar {
- break
- }
- nc.status = RECONNECTING
- continue
- }
-
-
- nc.current.lastErr = nil
-
- cur.didConnect = true
- cur.reconnects = 0
-
- nc.resendSubscriptions()
-
- nc.err = nc.flushReconnectPendingItems()
- if nc.err != nil {
- nc.status = RECONNECTING
-
- nc.stopPingTimer()
-
-
-
- waitForGoRoutines = true
- continue
- }
-
- nc.bw.doneWithPending()
-
- nc.status = CONNECTED
-
-
- nc.initc = false
-
- if nc.Opts.ReconnectedCB != nil {
- nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
- }
-
- nc.mu.Unlock()
-
- nc.Flush()
- return
- }
-
- if nc.err == nil {
- nc.err = ErrNoServers
- }
- nc.mu.Unlock()
- nc.close(CLOSED, true, nil)
- }
- func (nc *Conn) processOpErr(err error) {
- nc.mu.Lock()
- if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() {
- nc.mu.Unlock()
- return
- }
- if nc.Opts.AllowReconnect && nc.status == CONNECTED {
-
- nc.status = RECONNECTING
-
- nc.stopPingTimer()
- if nc.conn != nil {
- nc.conn.Close()
- nc.conn = nil
- }
-
- nc.bw.switchToPending()
-
- nc.clearPendingFlushCalls()
- go nc.doReconnect(err)
- nc.mu.Unlock()
- return
- }
- nc.status = DISCONNECTED
- nc.err = err
- nc.mu.Unlock()
- nc.close(CLOSED, true, nil)
- }
- func (ac *asyncCallbacksHandler) asyncCBDispatcher() {
- for {
- ac.mu.Lock()
-
-
- for ac.head == nil {
- ac.cond.Wait()
- }
- cur := ac.head
- ac.head = cur.next
- if cur == ac.tail {
- ac.tail = nil
- }
- ac.mu.Unlock()
-
-
- if cur.f == nil {
- return
- }
-
- cur.f()
- }
- }
- func (ac *asyncCallbacksHandler) push(f func()) {
- ac.pushOrClose(f, false)
- }
- func (ac *asyncCallbacksHandler) close() {
- ac.pushOrClose(nil, true)
- }
- func (ac *asyncCallbacksHandler) pushOrClose(f func(), close bool) {
- ac.mu.Lock()
- defer ac.mu.Unlock()
-
-
- if !close && f == nil {
- panic("pushing a nil callback")
- }
- cb := &asyncCB{f: f}
- if ac.tail != nil {
- ac.tail.next = cb
- } else {
- ac.head = cb
- }
- ac.tail = cb
- if close {
- ac.cond.Broadcast()
- } else {
- ac.cond.Signal()
- }
- }
- func (nc *Conn) readLoop() {
-
- defer nc.wg.Done()
-
- nc.mu.Lock()
- if nc.ps == nil {
- nc.ps = &parseState{}
- }
- conn := nc.conn
- br := nc.br
- nc.mu.Unlock()
- if conn == nil {
- return
- }
- for {
- buf, err := br.Read()
- if err == nil {
- err = nc.parse(buf)
- }
- if err != nil {
- nc.processOpErr(err)
- break
- }
- }
-
- nc.mu.Lock()
- nc.ps = nil
- nc.mu.Unlock()
- }
- func (nc *Conn) waitForMsgs(s *Subscription) {
- var closed bool
- var delivered, max uint64
-
- msgLen := -1
- for {
- s.mu.Lock()
-
-
- if msgLen >= 0 {
- s.pMsgs--
- s.pBytes -= msgLen
- msgLen = -1
- }
- if s.pHead == nil && !s.closed {
- s.pCond.Wait()
- }
-
- m := s.pHead
- if m != nil {
- s.pHead = m.next
- if s.pHead == nil {
- s.pTail = nil
- }
- if m.barrier != nil {
- s.mu.Unlock()
- if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
- m.barrier.f()
- }
- continue
- }
- msgLen = len(m.Data)
- }
- mcb := s.mcb
- max = s.max
- closed = s.closed
- if !s.closed {
- s.delivered++
- delivered = s.delivered
- if s.jsi != nil && s.jsi.fc && len(s.jsi.fcs) > 0 {
- s.checkForFlowControlResponse(delivered)
- }
- }
- s.mu.Unlock()
- if closed {
- break
- }
-
- if m != nil && (max == 0 || delivered <= max) {
- mcb(m)
- }
-
- if max > 0 && delivered >= max {
- nc.mu.Lock()
- nc.removeSub(s)
- nc.mu.Unlock()
- break
- }
- }
-
- s.mu.Lock()
- for m := s.pHead; m != nil; m = s.pHead {
- if m.barrier != nil {
- s.mu.Unlock()
- if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
- m.barrier.f()
- }
- s.mu.Lock()
- }
- s.pHead = m.next
- }
- s.mu.Unlock()
- }
- func (nc *Conn) processMsg(data []byte) {
-
- atomic.AddUint64(&nc.InMsgs, 1)
- atomic.AddUint64(&nc.InBytes, uint64(len(data)))
-
-
-
- nc.subsMu.RLock()
- sub := nc.subs[nc.ps.ma.sid]
- nc.subsMu.RUnlock()
- if sub == nil {
- return
- }
-
- subj := string(nc.ps.ma.subject)
- reply := string(nc.ps.ma.reply)
-
-
-
- msgPayload := make([]byte, len(data))
- copy(msgPayload, data)
-
- var h Header
- var err error
- var ctrlMsg bool
- var hasFC bool
- var hasHBs bool
- if nc.ps.ma.hdr > 0 {
- hbuf := msgPayload[:nc.ps.ma.hdr]
- msgPayload = msgPayload[nc.ps.ma.hdr:]
- h, err = decodeHeadersMsg(hbuf)
- if err != nil {
-
- nc.mu.Lock()
- nc.err = ErrBadHeaderMsg
- if nc.Opts.AsyncErrorCB != nil {
- nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrBadHeaderMsg) })
- }
- nc.mu.Unlock()
- }
- }
-
- m := &Msg{Header: h, Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
- sub.mu.Lock()
-
- if sub.closed {
- sub.mu.Unlock()
- return
- }
-
- jsi := sub.jsi
- if jsi != nil {
- ctrlMsg, hasHBs, hasFC = isControlMessage(m), jsi.hbs, jsi.fc
- }
-
- if !ctrlMsg {
-
- if sub.typ != ChanSubscription {
- sub.pMsgs++
- if sub.pMsgs > sub.pMsgsMax {
- sub.pMsgsMax = sub.pMsgs
- }
- sub.pBytes += len(m.Data)
- if sub.pBytes > sub.pBytesMax {
- sub.pBytesMax = sub.pBytes
- }
-
- if (sub.pMsgsLimit > 0 && sub.pMsgs > sub.pMsgsLimit) ||
- (sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
- goto slowConsumer
- }
- }
-
-
- if sub.mch != nil {
- select {
- case sub.mch <- m:
- default:
- goto slowConsumer
- }
- } else {
-
- if sub.pHead == nil {
- sub.pHead = m
- sub.pTail = m
- if sub.pCond != nil {
- sub.pCond.Signal()
- }
- } else {
- sub.pTail.next = m
- sub.pTail = m
- }
- }
- if jsi != nil && hasHBs {
-
-
- jsi.trackSequences(m.Reply)
- }
- } else if hasFC && m.Reply != _EMPTY_ {
-
-
- if sub.pMsgs == 0 {
- nc.Publish(m.Reply, nil)
- } else {
-
- jsi.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply)
- }
- }
-
- sub.sc = false
- sub.mu.Unlock()
-
- if ctrlMsg && hasHBs && m.Reply == _EMPTY_ {
- nc.processSequenceMismatch(m, sub, jsi)
- }
- return
- slowConsumer:
- sub.dropped++
- sc := !sub.sc
- sub.sc = true
-
- if sub.typ != ChanSubscription {
- sub.pMsgs--
- sub.pBytes -= len(m.Data)
- }
- sub.mu.Unlock()
- if sc {
-
-
-
- nc.mu.Lock()
- nc.err = ErrSlowConsumer
- if nc.Opts.AsyncErrorCB != nil {
- nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrSlowConsumer) })
- }
- nc.mu.Unlock()
- }
- }
- func (nc *Conn) processPermissionsViolation(err string) {
- nc.mu.Lock()
-
- e := errors.New("nats: " + err)
- nc.err = e
- if nc.Opts.AsyncErrorCB != nil {
- nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, e) })
- }
- nc.mu.Unlock()
- }
- func (nc *Conn) processAuthError(err error) bool {
- nc.err = err
- if !nc.initc && nc.Opts.AsyncErrorCB != nil {
- nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
- }
-
-
- if nc.current.lastErr == err {
- nc.ar = true
- } else {
- nc.current.lastErr = err
- }
- return nc.ar
- }
- func (nc *Conn) flusher() {
-
- defer nc.wg.Done()
-
- nc.mu.Lock()
- bw := nc.bw
- conn := nc.conn
- fch := nc.fch
- nc.mu.Unlock()
- if conn == nil || bw == nil {
- return
- }
- for {
- if _, ok := <-fch; !ok {
- return
- }
- nc.mu.Lock()
-
- if !nc.isConnected() || nc.isConnecting() || conn != nc.conn {
- nc.mu.Unlock()
- return
- }
- if bw.buffered() > 0 {
- if err := bw.flush(); err != nil {
- if nc.err == nil {
- nc.err = err
- }
- }
- }
- nc.mu.Unlock()
- }
- }
- func (nc *Conn) processPing() {
- nc.sendProto(pongProto)
- }
- func (nc *Conn) processPong() {
- var ch chan struct{}
- nc.mu.Lock()
- if len(nc.pongs) > 0 {
- ch = nc.pongs[0]
- nc.pongs = append(nc.pongs[:0], nc.pongs[1:]...)
- }
- nc.pout = 0
- nc.mu.Unlock()
- if ch != nil {
- ch <- struct{}{}
- }
- }
- func (nc *Conn) processOK() {
-
- }
- func (nc *Conn) processInfo(info string) error {
- if info == _EMPTY_ {
- return nil
- }
- ncInfo := serverInfo{}
- if err := json.Unmarshal([]byte(info), &ncInfo); err != nil {
- return err
- }
-
- nc.info = ncInfo
-
-
-
-
- if len(nc.info.ConnectURLs) == 0 {
- if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
- nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
- }
- return nil
- }
-
-
-
-
- hasNew := false
-
- urls := nc.info.ConnectURLs
-
- tmp := make(map[string]struct{}, len(urls))
- for _, curl := range urls {
- tmp[curl] = struct{}{}
- }
-
-
- sp := nc.srvPool
- for i := 0; i < len(sp); i++ {
- srv := sp[i]
- curl := srv.url.Host
-
- _, inInfo := tmp[curl]
-
-
- delete(tmp, curl)
-
-
- if !srv.isImplicit || srv.url == nc.current.url {
- continue
- }
- if !inInfo {
-
- copy(sp[i:], sp[i+1:])
- nc.srvPool = sp[:len(sp)-1]
- sp = nc.srvPool
- i--
- }
- }
-
- saveTLS := nc.current != nil && !hostIsIP(nc.current.url)
-
-
- for curl := range tmp {
-
-
- if _, present := nc.urls[curl]; !present {
- hasNew = true
- }
- nc.addURLToPool(fmt.Sprintf("%s://%s", nc.connScheme(), curl), true, saveTLS)
- }
- if hasNew {
-
- if !nc.Opts.NoRandomize {
- nc.shufflePool(1)
- }
- if !nc.initc && nc.Opts.DiscoveredServersCB != nil {
- nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
- }
- }
- if !nc.initc && ncInfo.LameDuckMode && nc.Opts.LameDuckModeHandler != nil {
- nc.ach.push(func() { nc.Opts.LameDuckModeHandler(nc) })
- }
- return nil
- }
- func (nc *Conn) processAsyncInfo(info []byte) {
- nc.mu.Lock()
-
- nc.processInfo(string(info))
- nc.mu.Unlock()
- }
- func (nc *Conn) LastError() error {
- if nc == nil {
- return ErrInvalidConnection
- }
- nc.mu.RLock()
- err := nc.err
- nc.mu.RUnlock()
- return err
- }
- func checkAuthError(e string) error {
- if strings.HasPrefix(e, AUTHORIZATION_ERR) {
- return ErrAuthorization
- }
- if strings.HasPrefix(e, AUTHENTICATION_EXPIRED_ERR) {
- return ErrAuthExpired
- }
- if strings.HasPrefix(e, AUTHENTICATION_REVOKED_ERR) {
- return ErrAuthRevoked
- }
- if strings.HasPrefix(e, ACCOUNT_AUTHENTICATION_EXPIRED_ERR) {
- return ErrAccountAuthExpired
- }
- return nil
- }
- func (nc *Conn) processErr(ie string) {
-
- ne := normalizeErr(ie)
-
- e := strings.ToLower(ne)
- close := false
-
- if e == STALE_CONNECTION {
- nc.processOpErr(ErrStaleConnection)
- } else if strings.HasPrefix(e, PERMISSIONS_ERR) {
- nc.processPermissionsViolation(ne)
- } else if authErr := checkAuthError(e); authErr != nil {
- nc.mu.Lock()
- close = nc.processAuthError(authErr)
- nc.mu.Unlock()
- } else {
- close = true
- nc.mu.Lock()
- nc.err = errors.New("nats: " + ne)
- nc.mu.Unlock()
- }
- if close {
- nc.close(CLOSED, true, nil)
- }
- }
- func (nc *Conn) kickFlusher() {
- if nc.bw != nil {
- select {
- case nc.fch <- struct{}{}:
- default:
- }
- }
- }
- func (nc *Conn) Publish(subj string, data []byte) error {
- return nc.publish(subj, _EMPTY_, nil, data)
- }
- type Header map[string][]string
- func (h Header) Add(key, value string) {
- h[key] = append(h[key], value)
- }
- func (h Header) Set(key, value string) {
- h[key] = []string{value}
- }
- func (h Header) Get(key string) string {
- if h == nil {
- return _EMPTY_
- }
- if v := h[key]; v != nil {
- return v[0]
- }
- return _EMPTY_
- }
- func (h Header) Values(key string) []string {
- return h[key]
- }
- func (h Header) Del(key string) {
- delete(h, key)
- }
- func NewMsg(subject string) *Msg {
- return &Msg{
- Subject: subject,
- Header: make(Header),
- }
- }
- const (
- hdrLine = "NATS/1.0\r\n"
- crlf = "\r\n"
- hdrPreEnd = len(hdrLine) - len(crlf)
- statusHdr = "Status"
- descrHdr = "Description"
- lastConsumerSeqHdr = "Nats-Last-Consumer"
- lastStreamSeqHdr = "Nats-Last-Stream"
- noResponders = "503"
- noMessages = "404"
- controlMsg = "100"
- statusLen = 3
- )
- func decodeHeadersMsg(data []byte) (Header, error) {
- tp := textproto.NewReader(bufio.NewReader(bytes.NewReader(data)))
- l, err := tp.ReadLine()
- if err != nil || len(l) < hdrPreEnd || l[:hdrPreEnd] != hdrLine[:hdrPreEnd] {
- return nil, ErrBadHeaderMsg
- }
- mh, err := readMIMEHeader(tp)
- if err != nil {
- return nil, err
- }
-
- if len(l) > hdrPreEnd {
- var description string
- status := strings.TrimSpace(l[hdrPreEnd:])
- if len(status) != statusLen {
- description = strings.TrimSpace(status[statusLen:])
- status = status[:statusLen]
- }
- mh.Add(statusHdr, status)
- if len(description) > 0 {
- mh.Add(descrHdr, description)
- }
- }
- return Header(mh), nil
- }
- func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) {
- var (
- m = make(textproto.MIMEHeader)
- strs []string
- )
- for {
- kv, err := tp.ReadLine()
- if len(kv) == 0 {
- return m, err
- }
-
- i := bytes.IndexByte([]byte(kv), ':')
- if i < 0 {
- return nil, ErrBadHeaderMsg
- }
- key := kv[:i]
- if key == "" {
-
- continue
- }
- i++
- for i < len(kv) && (kv[i] == ' ' || kv[i] == '\t') {
- i++
- }
- value := string(kv[i:])
- vv := m[key]
- if vv == nil && len(strs) > 0 {
-
- vv, strs = strs[:1:1], strs[1:]
- vv[0] = value
- m[key] = vv
- } else {
-
- m[key] = append(vv, value)
- }
- if err != nil {
- return m, err
- }
- }
- }
- func (nc *Conn) PublishMsg(m *Msg) error {
- if m == nil {
- return ErrInvalidMsg
- }
- var hdr []byte
- var err error
- if len(m.Header) > 0 {
- if !nc.info.Headers {
- return ErrHeadersNotSupported
- }
- hdr, err = m.headerBytes()
- if err != nil {
- return err
- }
- }
- return nc.publish(m.Subject, m.Reply, hdr, m.Data)
- }
- func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
- return nc.publish(subj, reply, nil, data)
- }
- const digits = "0123456789"
- func (nc *Conn) publish(subj, reply string, hdr, data []byte) error {
- if nc == nil {
- return ErrInvalidConnection
- }
- if subj == "" {
- return ErrBadSubject
- }
- nc.mu.Lock()
- if nc.isClosed() {
- nc.mu.Unlock()
- return ErrConnectionClosed
- }
- if nc.isDrainingPubs() {
- nc.mu.Unlock()
- return ErrConnectionDraining
- }
-
- msgSize := int64(len(data) + len(hdr))
-
- if !nc.initc && msgSize > nc.info.MaxPayload {
- nc.mu.Unlock()
- return ErrMaxPayload
- }
-
-
- if nc.bw.atLimitIfUsingPending() {
- nc.mu.Unlock()
- return ErrReconnectBufExceeded
- }
- var mh []byte
- if hdr != nil {
- mh = nc.scratch[:len(_HPUB_P_)]
- } else {
- mh = nc.scratch[1:len(_HPUB_P_)]
- }
- mh = append(mh, subj...)
- mh = append(mh, ' ')
- if reply != "" {
- mh = append(mh, reply...)
- mh = append(mh, ' ')
- }
-
-
-
-
-
- var b [12]byte
- var i = len(b)
- if hdr != nil {
- if len(hdr) > 0 {
- for l := len(hdr); l > 0; l /= 10 {
- i--
- b[i] = digits[l%10]
- }
- } else {
- i--
- b[i] = digits[0]
- }
- mh = append(mh, b[i:]...)
- mh = append(mh, ' ')
-
- i = len(b)
- }
- if msgSize > 0 {
- for l := msgSize; l > 0; l /= 10 {
- i--
- b[i] = digits[l%10]
- }
- } else {
- i--
- b[i] = digits[0]
- }
- mh = append(mh, b[i:]...)
- mh = append(mh, _CRLF_...)
- if err := nc.bw.appendBufs(mh, hdr, data, _CRLF_BYTES_); err != nil {
- nc.mu.Unlock()
- return err
- }
- nc.OutMsgs++
- nc.OutBytes += uint64(len(data) + len(hdr))
- if len(nc.fch) == 0 {
- nc.kickFlusher()
- }
- nc.mu.Unlock()
- return nil
- }
- func (nc *Conn) respHandler(m *Msg) {
- nc.mu.Lock()
-
- if nc.isClosed() {
- nc.mu.Unlock()
- return
- }
- var mch chan *Msg
-
- rt := nc.respToken(m.Subject)
- if rt != _EMPTY_ {
- mch = nc.respMap[rt]
-
- delete(nc.respMap, rt)
- } else if len(nc.respMap) == 1 {
-
-
-
- for k, v := range nc.respMap {
- mch = v
- delete(nc.respMap, k)
- break
- }
- }
- nc.mu.Unlock()
-
-
-
- select {
- case mch <- m:
- default:
- return
- }
- }
- func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) {
- nc.mu.Lock()
-
- if nc.respMap == nil {
- nc.initNewResp()
- }
-
- mch := make(chan *Msg, RequestChanLen)
- respInbox := nc.newRespInbox()
- token := respInbox[respInboxPrefixLen:]
- nc.respMap[token] = mch
- if nc.respMux == nil {
-
-
-
- s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, false, nil)
- if err != nil {
- nc.mu.Unlock()
- return nil, token, err
- }
- nc.respScanf = strings.Replace(nc.respSub, "*", "%s", -1)
- nc.respMux = s
- }
- nc.mu.Unlock()
- if err := nc.publish(subj, respInbox, hdr, data); err != nil {
- return nil, token, err
- }
- return mch, token, nil
- }
- func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) {
- var hdr []byte
- var err error
- if len(msg.Header) > 0 {
- if !nc.info.Headers {
- return nil, ErrHeadersNotSupported
- }
- hdr, err = msg.headerBytes()
- if err != nil {
- return nil, err
- }
- }
- return nc.request(msg.Subject, hdr, msg.Data, timeout)
- }
- func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
- return nc.request(subj, nil, data, timeout)
- }
- func (nc *Conn) useOldRequestStyle() bool {
- nc.mu.RLock()
- r := nc.Opts.UseOldRequestStyle
- nc.mu.RUnlock()
- return r
- }
- func (nc *Conn) request(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
- if nc == nil {
- return nil, ErrInvalidConnection
- }
- var m *Msg
- var err error
- if nc.useOldRequestStyle() {
- m, err = nc.oldRequest(subj, hdr, data, timeout)
- } else {
- m, err = nc.newRequest(subj, hdr, data, timeout)
- }
-
- if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
- m, err = nil, ErrNoResponders
- }
- return m, err
- }
- func (nc *Conn) newRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
- mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
- if err != nil {
- return nil, err
- }
- t := globalTimerPool.Get(timeout)
- defer globalTimerPool.Put(t)
- var ok bool
- var msg *Msg
- select {
- case msg, ok = <-mch:
- if !ok {
- return nil, ErrConnectionClosed
- }
- case <-t.C:
- nc.mu.Lock()
- delete(nc.respMap, token)
- nc.mu.Unlock()
- return nil, ErrTimeout
- }
- return msg, nil
- }
- func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration) (*Msg, error) {
- inbox := NewInbox()
- ch := make(chan *Msg, RequestChanLen)
- s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
- if err != nil {
- return nil, err
- }
- s.AutoUnsubscribe(1)
- defer s.Unsubscribe()
- err = nc.publish(subj, inbox, hdr, data)
- if err != nil {
- return nil, err
- }
- return s.NextMsg(timeout)
- }
- const (
- InboxPrefix = "_INBOX."
- inboxPrefixLen = len(InboxPrefix)
- respInboxPrefixLen = inboxPrefixLen + nuidSize + 1
- replySuffixLen = 8
- rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
- base = 62
- )
- func NewInbox() string {
- var b [inboxPrefixLen + nuidSize]byte
- pres := b[:inboxPrefixLen]
- copy(pres, InboxPrefix)
- ns := b[inboxPrefixLen:]
- copy(ns, nuid.Next())
- return string(b[:])
- }
- func (nc *Conn) initNewResp() {
-
- nc.respSub = fmt.Sprintf("%s.*", NewInbox())
- nc.respMap = make(map[string]chan *Msg)
- nc.respRand = rand.New(rand.NewSource(time.Now().UnixNano()))
- }
- func (nc *Conn) newRespInbox() string {
- if nc.respMap == nil {
- nc.initNewResp()
- }
- var b [respInboxPrefixLen + replySuffixLen]byte
- pres := b[:respInboxPrefixLen]
- copy(pres, nc.respSub)
- rn := nc.respRand.Int63()
- for i, l := respInboxPrefixLen, rn; i < len(b); i++ {
- b[i] = rdigits[l%base]
- l /= base
- }
- return string(b[:])
- }
- func (nc *Conn) NewRespInbox() string {
- nc.mu.Lock()
- s := nc.newRespInbox()
- nc.mu.Unlock()
- return s
- }
- func (nc *Conn) respToken(respInbox string) string {
- var token string
- n, err := fmt.Sscanf(respInbox, nc.respScanf, &token)
- if err != nil || n != 1 {
- return ""
- }
- return token
- }
- func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
- return nc.subscribe(subj, _EMPTY_, cb, nil, false, nil)
- }
- func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
- return nc.subscribe(subj, _EMPTY_, nil, ch, false, nil)
- }
- func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
- return nc.subscribe(subj, group, nil, ch, false, nil)
- }
- func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
- if nc == nil {
- return nil, ErrInvalidConnection
- }
- mch := make(chan *Msg, nc.Opts.SubChanLen)
- return nc.subscribe(subj, _EMPTY_, nil, mch, true, nil)
- }
- func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
- return nc.subscribe(subj, queue, cb, nil, false, nil)
- }
- func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
- mch := make(chan *Msg, nc.Opts.SubChanLen)
- return nc.subscribe(subj, queue, nil, mch, true, nil)
- }
- func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
- return nc.subscribe(subj, queue, nil, ch, false, nil)
- }
- func badSubject(subj string) bool {
- if strings.ContainsAny(subj, " \t\r\n") {
- return true
- }
- tokens := strings.Split(subj, ".")
- for _, t := range tokens {
- if len(t) == 0 {
- return true
- }
- }
- return false
- }
- func badQueue(qname string) bool {
- return strings.ContainsAny(qname, " \t\r\n")
- }
- func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
- if nc == nil {
- return nil, ErrInvalidConnection
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- return nc.subscribeLocked(subj, queue, cb, ch, isSync, js)
- }
- func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
- if nc == nil {
- return nil, ErrInvalidConnection
- }
- if badSubject(subj) {
- return nil, ErrBadSubject
- }
- if queue != "" && badQueue(queue) {
- return nil, ErrBadQueueName
- }
-
- if nc.isClosed() {
- return nil, ErrConnectionClosed
- }
- if nc.isDraining() {
- return nil, ErrConnectionDraining
- }
- if cb == nil && ch == nil {
- return nil, ErrBadSubscription
- }
- sub := &Subscription{Subject: subj, Queue: queue, mcb: cb, conn: nc, jsi: js}
-
- if ch != nil {
- sub.pMsgsLimit = cap(ch)
- } else {
- sub.pMsgsLimit = DefaultSubPendingMsgsLimit
- }
- sub.pBytesLimit = DefaultSubPendingBytesLimit
-
-
- var sr bool
- if cb != nil {
- sub.typ = AsyncSubscription
- sub.pCond = sync.NewCond(&sub.mu)
- sr = true
- } else if !isSync {
- sub.typ = ChanSubscription
- sub.mch = ch
- } else {
- sub.typ = SyncSubscription
- sub.mch = ch
- }
- nc.subsMu.Lock()
- nc.ssid++
- sub.sid = nc.ssid
- nc.subs[sub.sid] = sub
- nc.subsMu.Unlock()
-
- if sr {
- go nc.waitForMsgs(sub)
- }
-
-
- if !nc.isReconnecting() {
- nc.bw.appendString(fmt.Sprintf(subProto, subj, queue, sub.sid))
- nc.kickFlusher()
- }
- return sub, nil
- }
- func (nc *Conn) NumSubscriptions() int {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return len(nc.subs)
- }
- func (nc *Conn) removeSub(s *Subscription) {
- nc.subsMu.Lock()
- delete(nc.subs, s.sid)
- nc.subsMu.Unlock()
- s.mu.Lock()
- defer s.mu.Unlock()
-
- if s.mch != nil && s.typ == SyncSubscription {
- close(s.mch)
- }
- s.mch = nil
-
- s.closed = true
- if s.pCond != nil {
- s.pCond.Broadcast()
- }
- }
- type SubscriptionType int
- const (
- AsyncSubscription = SubscriptionType(iota)
- SyncSubscription
- ChanSubscription
- NilSubscription
- PullSubscription
- )
- func (s *Subscription) Type() SubscriptionType {
- if s == nil {
- return NilSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- return s.typ
- }
- func (s *Subscription) IsValid() bool {
- if s == nil {
- return false
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- return s.conn != nil && !s.closed
- }
- func (s *Subscription) Drain() error {
- if s == nil {
- return ErrBadSubscription
- }
- s.mu.Lock()
- conn := s.conn
- s.mu.Unlock()
- if conn == nil {
- return ErrBadSubscription
- }
- return conn.unsubscribe(s, 0, true)
- }
- func (s *Subscription) Unsubscribe() error {
- if s == nil {
- return ErrBadSubscription
- }
- s.mu.Lock()
- conn := s.conn
- closed := s.closed
- s.mu.Unlock()
- if conn == nil || conn.IsClosed() {
- return ErrConnectionClosed
- }
- if closed {
- return ErrBadSubscription
- }
- if conn.IsDraining() {
- return ErrConnectionDraining
- }
- return conn.unsubscribe(s, 0, false)
- }
- func (nc *Conn) checkDrained(sub *Subscription) {
- if nc == nil || sub == nil {
- return
- }
-
-
- nc.Flush()
-
-
- for {
-
- if nc.IsClosed() {
- return
- }
-
- sub.mu.Lock()
- conn := sub.conn
- closed := sub.closed
- pMsgs := sub.pMsgs
- sub.mu.Unlock()
- if conn == nil || closed || pMsgs == 0 {
- nc.mu.Lock()
- nc.removeSub(sub)
- nc.mu.Unlock()
- return
- }
- time.Sleep(100 * time.Millisecond)
- }
- }
- func (s *Subscription) AutoUnsubscribe(max int) error {
- if s == nil {
- return ErrBadSubscription
- }
- s.mu.Lock()
- conn := s.conn
- closed := s.closed
- s.mu.Unlock()
- if conn == nil || closed {
- return ErrBadSubscription
- }
- return conn.unsubscribe(s, max, false)
- }
- func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
-
-
- sub.mu.Lock()
- jsi := sub.jsi
- sub.mu.Unlock()
- if jsi != nil {
- err := jsi.unsubscribe(drainMode)
- if err != nil {
- return err
- }
- }
- nc.mu.Lock()
-
- defer nc.mu.Unlock()
- defer nc.kickFlusher()
- if nc.isClosed() {
- return ErrConnectionClosed
- }
- nc.subsMu.RLock()
- s := nc.subs[sub.sid]
- nc.subsMu.RUnlock()
-
- if s == nil {
- return nil
- }
- maxStr := _EMPTY_
- if max > 0 {
- s.mu.Lock()
- s.max = uint64(max)
- s.mu.Unlock()
- maxStr = strconv.Itoa(max)
- } else if !drainMode {
- nc.removeSub(s)
- }
- if drainMode {
- go nc.checkDrained(sub)
- }
-
-
- if !nc.isReconnecting() {
- nc.bw.appendString(fmt.Sprintf(unsubProto, s.sid, maxStr))
- nc.kickFlusher()
- }
- return nil
- }
- func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
- if s == nil {
- return nil, ErrBadSubscription
- }
- s.mu.Lock()
- err := s.validateNextMsgState()
- if err != nil {
- s.mu.Unlock()
- return nil, err
- }
-
- mch := s.mch
- s.mu.Unlock()
- var ok bool
- var msg *Msg
-
- select {
- case msg, ok = <-mch:
- if !ok {
- return nil, s.getNextMsgErr()
- }
- if err := s.processNextMsgDelivered(msg); err != nil {
- return nil, err
- } else {
- return msg, nil
- }
- default:
- }
-
-
- t := globalTimerPool.Get(timeout)
- defer globalTimerPool.Put(t)
- select {
- case msg, ok = <-mch:
- if !ok {
- return nil, s.getNextMsgErr()
- }
- if err := s.processNextMsgDelivered(msg); err != nil {
- return nil, err
- }
- case <-t.C:
- return nil, ErrTimeout
- }
- return msg, nil
- }
- func (s *Subscription) validateNextMsgState() error {
- if s.connClosed {
- return ErrConnectionClosed
- }
- if s.mch == nil {
- if s.max > 0 && s.delivered >= s.max {
- return ErrMaxMessages
- } else if s.closed {
- return ErrBadSubscription
- }
- }
- if s.mcb != nil {
- return ErrSyncSubRequired
- }
- if s.sc {
- s.sc = false
- return ErrSlowConsumer
- }
- return nil
- }
- func (s *Subscription) getNextMsgErr() error {
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.connClosed {
- return ErrConnectionClosed
- }
- return ErrBadSubscription
- }
- func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
- s.mu.Lock()
- nc := s.conn
- max := s.max
-
- s.delivered++
- delivered := s.delivered
- if s.jsi != nil && s.jsi.fc && len(s.jsi.fcs) > 0 {
- s.checkForFlowControlResponse(delivered)
- }
- if s.typ == SyncSubscription {
- s.pMsgs--
- s.pBytes -= len(msg.Data)
- }
- s.mu.Unlock()
- if max > 0 {
- if delivered > max {
- return ErrMaxMessages
- }
-
- if delivered == max {
- nc.mu.Lock()
- nc.removeSub(s)
- nc.mu.Unlock()
- }
- }
- if len(msg.Data) == 0 && msg.Header.Get(statusHdr) == noResponders {
- return ErrNoResponders
- }
- return nil
- }
- func (s *Subscription) QueuedMsgs() (int, error) {
- m, _, err := s.Pending()
- return int(m), err
- }
- func (s *Subscription) Pending() (int, int, error) {
- if s == nil {
- return -1, -1, ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil || s.closed {
- return -1, -1, ErrBadSubscription
- }
- if s.typ == ChanSubscription {
- return -1, -1, ErrTypeSubscription
- }
- return s.pMsgs, s.pBytes, nil
- }
- func (s *Subscription) MaxPending() (int, int, error) {
- if s == nil {
- return -1, -1, ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil || s.closed {
- return -1, -1, ErrBadSubscription
- }
- if s.typ == ChanSubscription {
- return -1, -1, ErrTypeSubscription
- }
- return s.pMsgsMax, s.pBytesMax, nil
- }
- func (s *Subscription) ClearMaxPending() error {
- if s == nil {
- return ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil || s.closed {
- return ErrBadSubscription
- }
- if s.typ == ChanSubscription {
- return ErrTypeSubscription
- }
- s.pMsgsMax, s.pBytesMax = 0, 0
- return nil
- }
- const (
-
- DefaultSubPendingMsgsLimit = 512 * 1024
-
- DefaultSubPendingBytesLimit = 64 * 1024 * 1024
- )
- func (s *Subscription) PendingLimits() (int, int, error) {
- if s == nil {
- return -1, -1, ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil || s.closed {
- return -1, -1, ErrBadSubscription
- }
- if s.typ == ChanSubscription {
- return -1, -1, ErrTypeSubscription
- }
- return s.pMsgsLimit, s.pBytesLimit, nil
- }
- func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error {
- if s == nil {
- return ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil || s.closed {
- return ErrBadSubscription
- }
- if s.typ == ChanSubscription {
- return ErrTypeSubscription
- }
- if msgLimit == 0 || bytesLimit == 0 {
- return ErrInvalidArg
- }
- s.pMsgsLimit, s.pBytesLimit = msgLimit, bytesLimit
- return nil
- }
- func (s *Subscription) Delivered() (int64, error) {
- if s == nil {
- return -1, ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil || s.closed {
- return -1, ErrBadSubscription
- }
- return int64(s.delivered), nil
- }
- func (s *Subscription) Dropped() (int, error) {
- if s == nil {
- return -1, ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil || s.closed {
- return -1, ErrBadSubscription
- }
- return s.dropped, nil
- }
- func (m *Msg) Respond(data []byte) error {
- if m == nil || m.Sub == nil {
- return ErrMsgNotBound
- }
- if m.Reply == "" {
- return ErrMsgNoReply
- }
- m.Sub.mu.Lock()
- nc := m.Sub.conn
- m.Sub.mu.Unlock()
-
- return nc.Publish(m.Reply, data)
- }
- func (m *Msg) RespondMsg(msg *Msg) error {
- if m == nil || m.Sub == nil {
- return ErrMsgNotBound
- }
- if m.Reply == "" {
- return ErrMsgNoReply
- }
- msg.Subject = m.Reply
- m.Sub.mu.Lock()
- nc := m.Sub.conn
- m.Sub.mu.Unlock()
-
- return nc.PublishMsg(msg)
- }
- func (nc *Conn) removeFlushEntry(ch chan struct{}) bool {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- if nc.pongs == nil {
- return false
- }
- for i, c := range nc.pongs {
- if c == ch {
- nc.pongs[i] = nil
- return true
- }
- }
- return false
- }
- func (nc *Conn) sendPing(ch chan struct{}) {
- nc.pongs = append(nc.pongs, ch)
- nc.bw.appendString(pingProto)
-
- nc.bw.flush()
- }
- func (nc *Conn) processPingTimer() {
- nc.mu.Lock()
- if nc.status != CONNECTED {
- nc.mu.Unlock()
- return
- }
-
- nc.pout++
- if nc.pout > nc.Opts.MaxPingsOut {
- nc.mu.Unlock()
- nc.processOpErr(ErrStaleConnection)
- return
- }
- nc.sendPing(nil)
- nc.ptmr.Reset(nc.Opts.PingInterval)
- nc.mu.Unlock()
- }
- func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) {
- if nc == nil {
- return ErrInvalidConnection
- }
- if timeout <= 0 {
- return ErrBadTimeout
- }
- nc.mu.Lock()
- if nc.isClosed() {
- nc.mu.Unlock()
- return ErrConnectionClosed
- }
- t := globalTimerPool.Get(timeout)
- defer globalTimerPool.Put(t)
-
-
-
- ch := make(chan struct{}, 1)
- nc.sendPing(ch)
- nc.mu.Unlock()
- select {
- case _, ok := <-ch:
- if !ok {
- err = ErrConnectionClosed
- } else {
- close(ch)
- }
- case <-t.C:
- err = ErrTimeout
- }
- if err != nil {
- nc.removeFlushEntry(ch)
- }
- return
- }
- func (nc *Conn) RTT() (time.Duration, error) {
- if nc.IsClosed() {
- return 0, ErrConnectionClosed
- }
- if nc.IsReconnecting() {
- return 0, ErrDisconnected
- }
- start := time.Now()
- if err := nc.FlushTimeout(10 * time.Second); err != nil {
- return 0, err
- }
- return time.Since(start), nil
- }
- func (nc *Conn) Flush() error {
- return nc.FlushTimeout(10 * time.Second)
- }
- func (nc *Conn) Buffered() (int, error) {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- if nc.isClosed() || nc.bw == nil {
- return -1, ErrConnectionClosed
- }
- return nc.bw.buffered(), nil
- }
- func (nc *Conn) resendSubscriptions() {
-
-
-
- nc.subsMu.RLock()
- subs := make([]*Subscription, 0, len(nc.subs))
- for _, s := range nc.subs {
- subs = append(subs, s)
- }
- nc.subsMu.RUnlock()
- for _, s := range subs {
- adjustedMax := uint64(0)
- s.mu.Lock()
- if s.max > 0 {
- if s.delivered < s.max {
- adjustedMax = s.max - s.delivered
- }
-
-
- if adjustedMax == 0 {
- s.mu.Unlock()
- nc.bw.writeDirect(fmt.Sprintf(unsubProto, s.sid, _EMPTY_))
- continue
- }
- }
- s.mu.Unlock()
- nc.bw.writeDirect(fmt.Sprintf(subProto, s.Subject, s.Queue, s.sid))
- if adjustedMax > 0 {
- maxStr := strconv.Itoa(int(adjustedMax))
- nc.bw.writeDirect(fmt.Sprintf(unsubProto, s.sid, maxStr))
- }
- }
- }
- func (nc *Conn) clearPendingFlushCalls() {
-
- for _, ch := range nc.pongs {
- if ch != nil {
- close(ch)
- }
- }
- nc.pongs = nil
- }
- func (nc *Conn) clearPendingRequestCalls() {
- if nc.respMap == nil {
- return
- }
- for key, ch := range nc.respMap {
- if ch != nil {
- close(ch)
- delete(nc.respMap, key)
- }
- }
- }
- func (nc *Conn) close(status Status, doCBs bool, err error) {
- nc.mu.Lock()
- if nc.isClosed() {
- nc.status = status
- nc.mu.Unlock()
- return
- }
- nc.status = CLOSED
-
- nc.kickFlusher()
-
-
- if nc.rqch != nil {
- close(nc.rqch)
- nc.rqch = nil
- }
-
- nc.clearPendingFlushCalls()
-
- nc.clearPendingRequestCalls()
-
- nc.stopPingTimer()
- nc.ptmr = nil
-
-
-
- if nc.ar && nc.conn != nil {
- nc.conn.Close()
- nc.conn = nil
- } else if nc.conn != nil {
-
- nc.bw.flush()
- defer nc.conn.Close()
- }
-
-
- nc.subsMu.Lock()
- for _, s := range nc.subs {
- s.mu.Lock()
-
- if s.mch != nil && s.typ == SyncSubscription {
- close(s.mch)
- }
- s.mch = nil
-
- s.closed = true
-
- s.connClosed = true
-
- if s.typ == AsyncSubscription && s.pCond != nil {
- s.pCond.Signal()
- }
- s.mu.Unlock()
- }
- nc.subs = nil
- nc.subsMu.Unlock()
- nc.status = status
-
- if doCBs {
- if nc.conn != nil {
- if nc.Opts.DisconnectedErrCB != nil {
- nc.ach.push(func() { nc.Opts.DisconnectedErrCB(nc, err) })
- } else if nc.Opts.DisconnectedCB != nil {
- nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
- }
- }
- if nc.Opts.ClosedCB != nil {
- nc.ach.push(func() { nc.Opts.ClosedCB(nc) })
- }
- }
-
-
- if status == CLOSED {
- nc.ach.close()
- }
- nc.mu.Unlock()
- }
- func (nc *Conn) Close() {
- if nc != nil {
-
-
-
-
-
- nc.wsClose()
- nc.close(CLOSED, !nc.Opts.NoCallbacksAfterClientClose, nil)
- }
- }
- func (nc *Conn) IsClosed() bool {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.isClosed()
- }
- func (nc *Conn) IsReconnecting() bool {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.isReconnecting()
- }
- func (nc *Conn) IsConnected() bool {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.isConnected()
- }
- func (nc *Conn) drainConnection() {
-
- nc.mu.Lock()
-
- if nc.isClosed() {
- nc.mu.Unlock()
- return
- }
- if nc.isConnecting() || nc.isReconnecting() {
- nc.mu.Unlock()
-
- nc.Close()
- return
- }
- subs := make([]*Subscription, 0, len(nc.subs))
- for _, s := range nc.subs {
- if s == nc.respMux {
-
-
- continue
- }
- subs = append(subs, s)
- }
- errCB := nc.Opts.AsyncErrorCB
- drainWait := nc.Opts.DrainTimeout
- respMux := nc.respMux
- nc.mu.Unlock()
-
- pushErr := func(err error) {
- nc.mu.Lock()
- nc.err = err
- if errCB != nil {
- nc.ach.push(func() { errCB(nc, nil, err) })
- }
- nc.mu.Unlock()
- }
-
- for _, s := range subs {
- if err := s.Drain(); err != nil {
-
- pushErr(err)
- }
- }
-
- timeout := time.Now().Add(drainWait)
- var min int
- if respMux != nil {
- min = 1
- } else {
- min = 0
- }
- for time.Now().Before(timeout) {
- if nc.NumSubscriptions() == min {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
-
-
- if respMux != nil {
- if err := respMux.Drain(); err != nil {
-
- pushErr(err)
- }
- for time.Now().Before(timeout) {
- if nc.NumSubscriptions() == 0 {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- }
-
- if nc.NumSubscriptions() != 0 {
- pushErr(ErrDrainTimeout)
- }
-
- nc.mu.Lock()
- nc.status = DRAINING_PUBS
- nc.mu.Unlock()
-
- err := nc.FlushTimeout(5 * time.Second)
- if err != nil {
- pushErr(err)
- }
-
- nc.Close()
- }
- func (nc *Conn) Drain() error {
- nc.mu.Lock()
- if nc.isClosed() {
- nc.mu.Unlock()
- return ErrConnectionClosed
- }
- if nc.isConnecting() || nc.isReconnecting() {
- nc.mu.Unlock()
- nc.Close()
- return ErrConnectionReconnecting
- }
- if nc.isDraining() {
- nc.mu.Unlock()
- return nil
- }
- nc.status = DRAINING_SUBS
- go nc.drainConnection()
- nc.mu.Unlock()
- return nil
- }
- func (nc *Conn) IsDraining() bool {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.isDraining()
- }
- func (nc *Conn) getServers(implicitOnly bool) []string {
- poolSize := len(nc.srvPool)
- var servers = make([]string, 0)
- for i := 0; i < poolSize; i++ {
- if implicitOnly && !nc.srvPool[i].isImplicit {
- continue
- }
- url := nc.srvPool[i].url
- servers = append(servers, fmt.Sprintf("%s://%s", url.Scheme, url.Host))
- }
- return servers
- }
- func (nc *Conn) Servers() []string {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.getServers(false)
- }
- func (nc *Conn) DiscoveredServers() []string {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.getServers(true)
- }
- func (nc *Conn) Status() Status {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.status
- }
- func (nc *Conn) isClosed() bool {
- return nc.status == CLOSED
- }
- func (nc *Conn) isConnecting() bool {
- return nc.status == CONNECTING
- }
- func (nc *Conn) isReconnecting() bool {
- return nc.status == RECONNECTING
- }
- func (nc *Conn) isConnected() bool {
- return nc.status == CONNECTED || nc.isDraining()
- }
- func (nc *Conn) isDraining() bool {
- return nc.status == DRAINING_SUBS || nc.status == DRAINING_PUBS
- }
- func (nc *Conn) isDrainingPubs() bool {
- return nc.status == DRAINING_PUBS
- }
- func (nc *Conn) Stats() Statistics {
-
-
- nc.mu.Lock()
- stats := Statistics{
- InMsgs: atomic.LoadUint64(&nc.InMsgs),
- InBytes: atomic.LoadUint64(&nc.InBytes),
- OutMsgs: nc.OutMsgs,
- OutBytes: nc.OutBytes,
- Reconnects: nc.Reconnects,
- }
- nc.mu.Unlock()
- return stats
- }
- func (nc *Conn) MaxPayload() int64 {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.info.MaxPayload
- }
- func (nc *Conn) HeadersSupported() bool {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.info.Headers
- }
- func (nc *Conn) AuthRequired() bool {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.info.AuthRequired
- }
- func (nc *Conn) TLSRequired() bool {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- return nc.info.TLSRequired
- }
- func (nc *Conn) Barrier(f func()) error {
- nc.mu.Lock()
- if nc.isClosed() {
- nc.mu.Unlock()
- return ErrConnectionClosed
- }
- nc.subsMu.Lock()
-
- numSubs := 0
- for _, sub := range nc.subs {
- if sub.typ == AsyncSubscription {
- numSubs++
- }
- }
- if numSubs == 0 {
- nc.subsMu.Unlock()
- nc.mu.Unlock()
- f()
- return nil
- }
- barrier := &barrierInfo{refs: int64(numSubs), f: f}
- for _, sub := range nc.subs {
- sub.mu.Lock()
- if sub.mch == nil {
- msg := &Msg{barrier: barrier}
-
- if sub.pTail != nil {
- sub.pTail.next = msg
- } else {
- sub.pHead = msg
- sub.pCond.Signal()
- }
- sub.pTail = msg
- }
- sub.mu.Unlock()
- }
- nc.subsMu.Unlock()
- nc.mu.Unlock()
- return nil
- }
- func (nc *Conn) GetClientIP() (net.IP, error) {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- if nc.isClosed() {
- return nil, ErrConnectionClosed
- }
- if nc.info.ClientIP == "" {
- return nil, ErrClientIPNotSupported
- }
- ip := net.ParseIP(nc.info.ClientIP)
- return ip, nil
- }
- func (nc *Conn) GetClientID() (uint64, error) {
- nc.mu.RLock()
- defer nc.mu.RUnlock()
- if nc.isClosed() {
- return 0, ErrConnectionClosed
- }
- if nc.info.CID == 0 {
- return 0, ErrClientIDNotSupported
- }
- return nc.info.CID, nil
- }
- func NkeyOptionFromSeed(seedFile string) (Option, error) {
- kp, err := nkeyPairFromSeedFile(seedFile)
- if err != nil {
- return nil, err
- }
-
- defer kp.Wipe()
- pub, err := kp.PublicKey()
- if err != nil {
- return nil, err
- }
- if !nkeys.IsValidPublicUserKey(pub) {
- return nil, fmt.Errorf("nats: Not a valid nkey user seed")
- }
- sigCB := func(nonce []byte) ([]byte, error) {
- return sigHandler(nonce, seedFile)
- }
- return Nkey(string(pub), sigCB), nil
- }
- func wipeSlice(buf []byte) {
- for i := range buf {
- buf[i] = 'x'
- }
- }
- func userFromFile(userFile string) (string, error) {
- path, err := expandPath(userFile)
- if err != nil {
- return _EMPTY_, fmt.Errorf("nats: %v", err)
- }
- contents, err := ioutil.ReadFile(path)
- if err != nil {
- return _EMPTY_, fmt.Errorf("nats: %v", err)
- }
- defer wipeSlice(contents)
- return nkeys.ParseDecoratedJWT(contents)
- }
- func homeDir() (string, error) {
- if runtime.GOOS == "windows" {
- homeDrive, homePath := os.Getenv("HOMEDRIVE"), os.Getenv("HOMEPATH")
- userProfile := os.Getenv("USERPROFILE")
- var home string
- if homeDrive == "" || homePath == "" {
- if userProfile == "" {
- return _EMPTY_, errors.New("nats: failed to get home dir, require %HOMEDRIVE% and %HOMEPATH% or %USERPROFILE%")
- }
- home = userProfile
- } else {
- home = filepath.Join(homeDrive, homePath)
- }
- return home, nil
- }
- home := os.Getenv("HOME")
- if home == "" {
- return _EMPTY_, errors.New("nats: failed to get home dir, require $HOME")
- }
- return home, nil
- }
- func expandPath(p string) (string, error) {
- p = os.ExpandEnv(p)
- if !strings.HasPrefix(p, "~") {
- return p, nil
- }
- home, err := homeDir()
- if err != nil {
- return _EMPTY_, err
- }
- return filepath.Join(home, p[1:]), nil
- }
- func nkeyPairFromSeedFile(seedFile string) (nkeys.KeyPair, error) {
- contents, err := ioutil.ReadFile(seedFile)
- if err != nil {
- return nil, fmt.Errorf("nats: %v", err)
- }
- defer wipeSlice(contents)
- return nkeys.ParseDecoratedNKey(contents)
- }
- func sigHandler(nonce []byte, seedFile string) ([]byte, error) {
- kp, err := nkeyPairFromSeedFile(seedFile)
- if err != nil {
- return nil, err
- }
-
- defer kp.Wipe()
- sig, _ := kp.Sign(nonce)
- return sig, nil
- }
- type timeoutWriter struct {
- timeout time.Duration
- conn net.Conn
- err error
- }
- func (tw *timeoutWriter) Write(p []byte) (int, error) {
- if tw.err != nil {
- return 0, tw.err
- }
- var n int
- tw.conn.SetWriteDeadline(time.Now().Add(tw.timeout))
- n, tw.err = tw.conn.Write(p)
- tw.conn.SetWriteDeadline(time.Time{})
- return n, tw.err
- }
|