nats.go 99 KB


  1. // Copyright 2012-2019 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. // A Go client for the NATS messaging system (https://nats.io).
  14. package nats
  15. import (
  16. "bufio"
  17. "bytes"
  18. "crypto/tls"
  19. "crypto/x509"
  20. "encoding/base64"
  21. "encoding/json"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "io/ioutil"
  26. "math/rand"
  27. "net"
  28. "net/url"
  29. "regexp"
  30. "runtime"
  31. "strconv"
  32. "strings"
  33. "sync"
  34. "sync/atomic"
  35. "time"
  36. "github.com/nats-io/go-nats/util"
  37. "github.com/nats-io/nkeys"
  38. "github.com/nats-io/nuid"
  39. )
  40. // Default Constants
  41. const (
  42. Version = "1.7.2"
  43. DefaultURL = "nats://localhost:4222"
  44. DefaultPort = 4222
  45. DefaultMaxReconnect = 60
  46. DefaultReconnectWait = 2 * time.Second
  47. DefaultTimeout = 2 * time.Second
  48. DefaultPingInterval = 2 * time.Minute
  49. DefaultMaxPingOut = 2
  50. DefaultMaxChanLen = 8192 // 8k
  51. DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
  52. RequestChanLen = 8
  53. DefaultDrainTimeout = 30 * time.Second
  54. LangString = "go"
  55. )
  56. const (
  57. // STALE_CONNECTION is for detection and proper handling of stale connections.
  58. STALE_CONNECTION = "stale connection"
  59. // PERMISSIONS_ERR is for when nats server subject authorization has failed.
  60. PERMISSIONS_ERR = "permissions violation"
  61. // AUTHORIZATION_ERR is for when nats server user authorization has failed.
  62. AUTHORIZATION_ERR = "authorization violation"
  63. )
  64. // Errors
  65. var (
  66. ErrConnectionClosed = errors.New("nats: connection closed")
  67. ErrConnectionDraining = errors.New("nats: connection draining")
  68. ErrDrainTimeout = errors.New("nats: draining connection timed out")
  69. ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
  70. ErrSecureConnRequired = errors.New("nats: secure connection required")
  71. ErrSecureConnWanted = errors.New("nats: secure connection not available")
  72. ErrBadSubscription = errors.New("nats: invalid subscription")
  73. ErrTypeSubscription = errors.New("nats: invalid subscription type")
  74. ErrBadSubject = errors.New("nats: invalid subject")
  75. ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
  76. ErrTimeout = errors.New("nats: timeout")
  77. ErrBadTimeout = errors.New("nats: timeout invalid")
  78. ErrAuthorization = errors.New("nats: authorization violation")
  79. ErrNoServers = errors.New("nats: no servers available for connection")
  80. ErrJsonParse = errors.New("nats: connect message, json parse error")
  81. ErrChanArg = errors.New("nats: argument needs to be a channel type")
  82. ErrMaxPayload = errors.New("nats: maximum payload exceeded")
  83. ErrMaxMessages = errors.New("nats: maximum messages delivered")
  84. ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
  85. ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
  86. ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
  87. ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
  88. ErrInvalidConnection = errors.New("nats: invalid connection")
  89. ErrInvalidMsg = errors.New("nats: invalid message or message nil")
  90. ErrInvalidArg = errors.New("nats: invalid argument")
  91. ErrInvalidContext = errors.New("nats: invalid context")
  92. ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
  93. ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
  94. ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
  95. ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
  96. ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
  97. ErrNoUserCB = errors.New("nats: user callback not defined")
  98. ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
  99. ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
  100. ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
  101. ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
  102. )
  103. // GetDefaultOptions returns default configuration options for the client.
  104. func GetDefaultOptions() Options {
  105. return Options{
  106. AllowReconnect: true,
  107. MaxReconnect: DefaultMaxReconnect,
  108. ReconnectWait: DefaultReconnectWait,
  109. Timeout: DefaultTimeout,
  110. PingInterval: DefaultPingInterval,
  111. MaxPingsOut: DefaultMaxPingOut,
  112. SubChanLen: DefaultMaxChanLen,
  113. ReconnectBufSize: DefaultReconnectBufSize,
  114. DrainTimeout: DefaultDrainTimeout,
  115. }
  116. }
  117. // DEPRECATED: Use GetDefaultOptions() instead.
  118. // DefaultOptions is not safe for use by multiple clients.
  119. // For details see #308.
  120. var DefaultOptions = GetDefaultOptions()
  121. // Status represents the state of the connection.
  122. type Status int
  123. const (
  124. DISCONNECTED = Status(iota)
  125. CONNECTED
  126. CLOSED
  127. RECONNECTING
  128. CONNECTING
  129. DRAINING_SUBS
  130. DRAINING_PUBS
  131. )
  132. // ConnHandler is used for asynchronous events such as
  133. // disconnected and closed connections.
  134. type ConnHandler func(*Conn)
  135. // ErrHandler is used to process asynchronous errors encountered
  136. // while processing inbound messages.
  137. type ErrHandler func(*Conn, *Subscription, error)
  138. // UserJWTHandler is used to fetch and return the account signed
  139. // JWT for this user.
  140. type UserJWTHandler func() (string, error)
  141. // SignatureHandler is used to sign a nonce from the server while
  142. // authenticating with nkeys. The user should sign the nonce and
  143. // return the base64 encoded signature.
  144. type SignatureHandler func([]byte) ([]byte, error)
  145. // AuthTokenHandler is used to generate a new token.
  146. type AuthTokenHandler func() string
  147. // asyncCB is used to preserve order for async callbacks.
  148. type asyncCB struct {
  149. f func()
  150. next *asyncCB
  151. }
  152. type asyncCallbacksHandler struct {
  153. mu sync.Mutex
  154. cond *sync.Cond
  155. head *asyncCB
  156. tail *asyncCB
  157. }
  158. // Option is a function on the options for a connection.
  159. type Option func(*Options) error
  160. // CustomDialer can be used to specify any dialer, not necessarily
  161. // a *net.Dialer.
  162. type CustomDialer interface {
  163. Dial(network, address string) (net.Conn, error)
  164. }
  165. // Options can be used to create a customized connection.
  166. type Options struct {
  167. // Url represents a single NATS server url to which the client
  168. // will be connecting. If the Servers option is also set, it
  169. // then becomes the first server in the Servers array.
  170. Url string
  171. // Servers is a configured set of servers which this client
  172. // will use when attempting to connect.
  173. Servers []string
  174. // NoRandomize configures whether we will randomize the
  175. // server pool.
  176. NoRandomize bool
  177. // NoEcho configures whether the server will echo back messages
  178. // that are sent on this connection if we also have matching subscriptions.
  179. // Note this is supported on servers >= version 1.2. Proto 1 or greater.
  180. NoEcho bool
  181. // Name is an optional name label which will be sent to the server
  182. // on CONNECT to identify the client.
  183. Name string
  184. // Verbose signals the server to send an OK ack for commands
  185. // successfully processed by the server.
  186. Verbose bool
  187. // Pedantic signals the server whether it should be doing further
  188. // validation of subjects.
  189. Pedantic bool
  190. // Secure enables TLS secure connections that skip server
  191. // verification by default. NOT RECOMMENDED.
  192. Secure bool
  193. // TLSConfig is a custom TLS configuration to use for secure
  194. // transports.
  195. TLSConfig *tls.Config
  196. // AllowReconnect enables reconnection logic to be used when we
  197. // encounter a disconnect from the current server.
  198. AllowReconnect bool
  199. // MaxReconnect sets the number of reconnect attempts that will be
  200. // tried before giving up. If negative, then it will never give up
  201. // trying to reconnect.
  202. MaxReconnect int
  203. // ReconnectWait sets the time to backoff after attempting a reconnect
  204. // to a server that we were already connected to previously.
  205. ReconnectWait time.Duration
  206. // Timeout sets the timeout for a Dial operation on a connection.
  207. Timeout time.Duration
  208. // DrainTimeout sets the timeout for a Drain Operation to complete.
  209. DrainTimeout time.Duration
  210. // FlusherTimeout is the maximum time to wait for write operations
  211. // to the underlying connection to complete (including the flusher loop).
  212. FlusherTimeout time.Duration
  213. // PingInterval is the period at which the client will be sending ping
  214. // commands to the server, disabled if 0 or negative.
  215. PingInterval time.Duration
  216. // MaxPingsOut is the maximum number of pending ping commands that can
  217. // be awaiting a response before raising an ErrStaleConnection error.
  218. MaxPingsOut int
  219. // ClosedCB sets the closed handler that is called when a client will
  220. // no longer be connected.
  221. ClosedCB ConnHandler
  222. // DisconnectedCB sets the disconnected handler that is called
  223. // whenever the connection is disconnected.
  224. DisconnectedCB ConnHandler
  225. // ReconnectedCB sets the reconnected handler called whenever
  226. // the connection is successfully reconnected.
  227. ReconnectedCB ConnHandler
  228. // DiscoveredServersCB sets the callback that is invoked whenever a new
  229. // server has joined the cluster.
  230. DiscoveredServersCB ConnHandler
  231. // AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
  232. AsyncErrorCB ErrHandler
  233. // ReconnectBufSize is the size of the backing bufio during reconnect.
  234. // Once this has been exhausted publish operations will return an error.
  235. ReconnectBufSize int
  236. // SubChanLen is the size of the buffered channel used between the socket
  237. // Go routine and the message delivery for SyncSubscriptions.
  238. // NOTE: This does not affect AsyncSubscriptions which are
  239. // dictated by PendingLimits()
  240. SubChanLen int
  241. // UserJWT sets the callback handler that will fetch a user's JWT.
  242. UserJWT UserJWTHandler
  243. // Nkey sets the public nkey that will be used to authenticate
  244. // when connecting to the server. UserJWT and Nkey are mutually exclusive
  245. // and if defined, UserJWT will take precedence.
  246. Nkey string
  247. // SignatureCB designates the function used to sign the nonce
  248. // presented from the server.
  249. SignatureCB SignatureHandler
  250. // User sets the username to be used when connecting to the server.
  251. User string
  252. // Password sets the password to be used when connecting to a server.
  253. Password string
  254. // Token sets the token to be used when connecting to a server.
  255. Token string
  256. // TokenHandler designates the function used to generate the token to be used when connecting to a server.
  257. TokenHandler AuthTokenHandler
  258. // Dialer allows a custom net.Dialer when forming connections.
  259. // DEPRECATED: should use CustomDialer instead.
  260. Dialer *net.Dialer
  261. // CustomDialer allows to specify a custom dialer (not necessarily
  262. // a *net.Dialer).
  263. CustomDialer CustomDialer
  264. // UseOldRequestStyle forces the old method of Requests that utilize
  265. // a new Inbox and a new Subscription for each request.
  266. UseOldRequestStyle bool
  267. }
  268. const (
  269. // Scratch storage for assembling protocol headers
  270. scratchSize = 512
  271. // The size of the bufio reader/writer on top of the socket.
  272. defaultBufSize = 32768
  273. // The buffered size of the flush "kick" channel
  274. flushChanSize = 1024
  275. // Default server pool size
  276. srvPoolSize = 4
  277. // NUID size
  278. nuidSize = 22
  279. // Default port used if none is specified in given URL(s)
  280. defaultPortString = "4222"
  281. )
  282. // A Conn represents a bare connection to a nats-server.
  283. // It can send and receive []byte payloads.
  284. type Conn struct {
  285. // Keep all members for which we use atomic at the beginning of the
  286. // struct and make sure they are all 64bits (or use padding if necessary).
  287. // atomic.* functions crash on 32bit machines if operand is not aligned
  288. // at 64bit. See https://github.com/golang/go/issues/599
  289. Statistics
  290. mu sync.Mutex
  291. // Opts holds the configuration of the Conn.
  292. // Modifying the configuration of a running Conn is a race.
  293. Opts Options
  294. wg sync.WaitGroup
  295. srvPool []*srv
  296. current *srv
  297. urls map[string]struct{} // Keep track of all known URLs (used by processInfo)
  298. conn net.Conn
  299. bw *bufio.Writer
  300. pending *bytes.Buffer
  301. fch chan struct{}
  302. info serverInfo
  303. ssid int64
  304. subsMu sync.RWMutex
  305. subs map[int64]*Subscription
  306. ach *asyncCallbacksHandler
  307. pongs []chan struct{}
  308. scratch [scratchSize]byte
  309. status Status
  310. initc bool // true if the connection is performing the initial connect
  311. err error
  312. ps *parseState
  313. ptmr *time.Timer
  314. pout int
  315. // New style response handler
  316. respSub string // The wildcard subject
  317. respMux *Subscription // A single response subscription
  318. respMap map[string]chan *Msg // Request map for the response msg channels
  319. respSetup sync.Once // Ensures response subscription occurs once
  320. respRand *rand.Rand // Used for generating suffix.
  321. }
  322. // A Subscription represents interest in a given subject.
  323. type Subscription struct {
  324. mu sync.Mutex
  325. sid int64
  326. // Subject that represents this subscription. This can be different
  327. // than the received subject inside a Msg if this is a wildcard.
  328. Subject string
  329. // Optional queue group name. If present, all subscriptions with the
  330. // same name will form a distributed queue, and each message will
  331. // only be processed by one member of the group.
  332. Queue string
  333. delivered uint64
  334. max uint64
  335. conn *Conn
  336. mcb MsgHandler
  337. mch chan *Msg
  338. closed bool
  339. sc bool
  340. connClosed bool
  341. // Type of Subscription
  342. typ SubscriptionType
  343. // Async linked list
  344. pHead *Msg
  345. pTail *Msg
  346. pCond *sync.Cond
  347. // Pending stats, async subscriptions, high-speed etc.
  348. pMsgs int
  349. pBytes int
  350. pMsgsMax int
  351. pBytesMax int
  352. pMsgsLimit int
  353. pBytesLimit int
  354. dropped int
  355. }
  356. // Msg is a structure used by Subscribers and PublishMsg().
  357. type Msg struct {
  358. Subject string
  359. Reply string
  360. Data []byte
  361. Sub *Subscription
  362. next *Msg
  363. barrier *barrierInfo
  364. }
  365. type barrierInfo struct {
  366. refs int64
  367. f func()
  368. }
  369. // Tracks various stats received and sent on this connection,
  370. // including counts for messages and bytes.
  371. type Statistics struct {
  372. InMsgs uint64
  373. OutMsgs uint64
  374. InBytes uint64
  375. OutBytes uint64
  376. Reconnects uint64
  377. }
  378. // Tracks individual backend servers.
  379. type srv struct {
  380. url *url.URL
  381. didConnect bool
  382. reconnects int
  383. lastAttempt time.Time
  384. isImplicit bool
  385. tlsName string
  386. }
  387. type serverInfo struct {
  388. Id string `json:"server_id"`
  389. Host string `json:"host"`
  390. Port uint `json:"port"`
  391. Version string `json:"version"`
  392. AuthRequired bool `json:"auth_required"`
  393. TLSRequired bool `json:"tls_required"`
  394. MaxPayload int64 `json:"max_payload"`
  395. ConnectURLs []string `json:"connect_urls,omitempty"`
  396. Proto int `json:"proto,omitempty"`
  397. CID uint64 `json:"client_id,omitempty"`
  398. Nonce string `json:"nonce,omitempty"`
  399. }
  400. const (
  401. // clientProtoZero is the original client protocol from 2009.
  402. // http://nats.io/documentation/internals/nats-protocol/
  403. /* clientProtoZero */ _ = iota
  404. // clientProtoInfo signals a client can receive more then the original INFO block.
  405. // This can be used to update clients on other cluster members, etc.
  406. clientProtoInfo
  407. )
  408. type connectInfo struct {
  409. Verbose bool `json:"verbose"`
  410. Pedantic bool `json:"pedantic"`
  411. UserJWT string `json:"jwt,omitempty"`
  412. Nkey string `json:"nkey,omitempty"`
  413. Signature string `json:"sig,omitempty"`
  414. User string `json:"user,omitempty"`
  415. Pass string `json:"pass,omitempty"`
  416. Token string `json:"auth_token,omitempty"`
  417. TLS bool `json:"tls_required"`
  418. Name string `json:"name"`
  419. Lang string `json:"lang"`
  420. Version string `json:"version"`
  421. Protocol int `json:"protocol"`
  422. Echo bool `json:"echo"`
  423. }
  424. // MsgHandler is a callback function that processes messages delivered to
  425. // asynchronous subscribers.
  426. type MsgHandler func(msg *Msg)
  427. // Connect will attempt to connect to the NATS system.
  428. // The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
  429. // Comma separated arrays are also supported, e.g. urlA, urlB.
  430. // Options start with the defaults but can be overridden.
  431. func Connect(url string, options ...Option) (*Conn, error) {
  432. opts := GetDefaultOptions()
  433. opts.Servers = processUrlString(url)
  434. for _, opt := range options {
  435. if opt != nil {
  436. if err := opt(&opts); err != nil {
  437. return nil, err
  438. }
  439. }
  440. }
  441. return opts.Connect()
  442. }
  443. // Options that can be passed to Connect.
  444. // Name is an Option to set the client name.
  445. func Name(name string) Option {
  446. return func(o *Options) error {
  447. o.Name = name
  448. return nil
  449. }
  450. }
  451. // Secure is an Option to enable TLS secure connections that skip server verification by default.
  452. // Pass a TLS Configuration for proper TLS.
  453. // NOTE: This should NOT be used in a production setting.
  454. func Secure(tls ...*tls.Config) Option {
  455. return func(o *Options) error {
  456. o.Secure = true
  457. // Use of variadic just simplifies testing scenarios. We only take the first one.
  458. if len(tls) > 1 {
  459. return ErrMultipleTLSConfigs
  460. }
  461. if len(tls) == 1 {
  462. o.TLSConfig = tls[0]
  463. }
  464. return nil
  465. }
  466. }
  467. // RootCAs is a helper option to provide the RootCAs pool from a list of filenames.
  468. // If Secure is not already set this will set it as well.
  469. func RootCAs(file ...string) Option {
  470. return func(o *Options) error {
  471. pool := x509.NewCertPool()
  472. for _, f := range file {
  473. rootPEM, err := ioutil.ReadFile(f)
  474. if err != nil || rootPEM == nil {
  475. return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
  476. }
  477. ok := pool.AppendCertsFromPEM(rootPEM)
  478. if !ok {
  479. return fmt.Errorf("nats: failed to parse root certificate from %q", f)
  480. }
  481. }
  482. if o.TLSConfig == nil {
  483. o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
  484. }
  485. o.TLSConfig.RootCAs = pool
  486. o.Secure = true
  487. return nil
  488. }
  489. }
  490. // ClientCert is a helper option to provide the client certificate from a file.
  491. // If Secure is not already set this will set it as well.
  492. func ClientCert(certFile, keyFile string) Option {
  493. return func(o *Options) error {
  494. cert, err := tls.LoadX509KeyPair(certFile, keyFile)
  495. if err != nil {
  496. return fmt.Errorf("nats: error loading client certificate: %v", err)
  497. }
  498. cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
  499. if err != nil {
  500. return fmt.Errorf("nats: error parsing client certificate: %v", err)
  501. }
  502. if o.TLSConfig == nil {
  503. o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
  504. }
  505. o.TLSConfig.Certificates = []tls.Certificate{cert}
  506. o.Secure = true
  507. return nil
  508. }
  509. }
  510. // NoReconnect is an Option to turn off reconnect behavior.
  511. func NoReconnect() Option {
  512. return func(o *Options) error {
  513. o.AllowReconnect = false
  514. return nil
  515. }
  516. }
  517. // DontRandomize is an Option to turn off randomizing the server pool.
  518. func DontRandomize() Option {
  519. return func(o *Options) error {
  520. o.NoRandomize = true
  521. return nil
  522. }
  523. }
  524. // NoEcho is an Option to turn off messages echoing back from a server.
  525. // Note this is supported on servers >= version 1.2. Proto 1 or greater.
  526. func NoEcho() Option {
  527. return func(o *Options) error {
  528. o.NoEcho = true
  529. return nil
  530. }
  531. }
  532. // ReconnectWait is an Option to set the wait time between reconnect attempts.
  533. func ReconnectWait(t time.Duration) Option {
  534. return func(o *Options) error {
  535. o.ReconnectWait = t
  536. return nil
  537. }
  538. }
  539. // MaxReconnects is an Option to set the maximum number of reconnect attempts.
  540. func MaxReconnects(max int) Option {
  541. return func(o *Options) error {
  542. o.MaxReconnect = max
  543. return nil
  544. }
  545. }
  546. // PingInterval is an Option to set the period for client ping commands.
  547. func PingInterval(t time.Duration) Option {
  548. return func(o *Options) error {
  549. o.PingInterval = t
  550. return nil
  551. }
  552. }
  553. // MaxPingsOutstanding is an Option to set the maximum number of ping requests
  554. // that can go un-answered by the server before closing the connection.
  555. func MaxPingsOutstanding(max int) Option {
  556. return func(o *Options) error {
  557. o.MaxPingsOut = max
  558. return nil
  559. }
  560. }
  561. // ReconnectBufSize sets the buffer size of messages kept while busy reconnecting.
  562. func ReconnectBufSize(size int) Option {
  563. return func(o *Options) error {
  564. o.ReconnectBufSize = size
  565. return nil
  566. }
  567. }
  568. // Timeout is an Option to set the timeout for Dial on a connection.
  569. func Timeout(t time.Duration) Option {
  570. return func(o *Options) error {
  571. o.Timeout = t
  572. return nil
  573. }
  574. }
  575. // FlusherTimeout is an Option to set the write (and flush) timeout on a connection.
  576. func FlusherTimeout(t time.Duration) Option {
  577. return func(o *Options) error {
  578. o.FlusherTimeout = t
  579. return nil
  580. }
  581. }
  582. // DrainTimeout is an Option to set the timeout for draining a connection.
  583. func DrainTimeout(t time.Duration) Option {
  584. return func(o *Options) error {
  585. o.DrainTimeout = t
  586. return nil
  587. }
  588. }
  589. // DisconnectHandler is an Option to set the disconnected handler.
  590. func DisconnectHandler(cb ConnHandler) Option {
  591. return func(o *Options) error {
  592. o.DisconnectedCB = cb
  593. return nil
  594. }
  595. }
  596. // ReconnectHandler is an Option to set the reconnected handler.
  597. func ReconnectHandler(cb ConnHandler) Option {
  598. return func(o *Options) error {
  599. o.ReconnectedCB = cb
  600. return nil
  601. }
  602. }
  603. // ClosedHandler is an Option to set the closed handler.
  604. func ClosedHandler(cb ConnHandler) Option {
  605. return func(o *Options) error {
  606. o.ClosedCB = cb
  607. return nil
  608. }
  609. }
  610. // DiscoveredServersHandler is an Option to set the new servers handler.
  611. func DiscoveredServersHandler(cb ConnHandler) Option {
  612. return func(o *Options) error {
  613. o.DiscoveredServersCB = cb
  614. return nil
  615. }
  616. }
  617. // ErrorHandler is an Option to set the async error handler.
  618. func ErrorHandler(cb ErrHandler) Option {
  619. return func(o *Options) error {
  620. o.AsyncErrorCB = cb
  621. return nil
  622. }
  623. }
  624. // UserInfo is an Option to set the username and password to
  625. // use when not included directly in the URLs.
  626. func UserInfo(user, password string) Option {
  627. return func(o *Options) error {
  628. o.User = user
  629. o.Password = password
  630. return nil
  631. }
  632. }
  633. // Token is an Option to set the token to use
  634. // when a token is not included directly in the URLs
  635. // and when a token handler is not provided.
  636. func Token(token string) Option {
  637. return func(o *Options) error {
  638. if o.TokenHandler != nil {
  639. return ErrTokenAlreadySet
  640. }
  641. o.Token = token
  642. return nil
  643. }
  644. }
  645. // TokenHandler is an Option to set the token handler to use
  646. // when a token is not included directly in the URLs
  647. // and when a token is not set.
  648. func TokenHandler(cb AuthTokenHandler) Option {
  649. return func(o *Options) error {
  650. if o.Token != "" {
  651. return ErrTokenAlreadySet
  652. }
  653. o.TokenHandler = cb
  654. return nil
  655. }
  656. }
  657. // UserCredentials is a convenience function that takes a filename
  658. // for a user's JWT and a filename for the user's private Nkey seed.
  659. func UserCredentials(userOrChainedFile string, seedFiles ...string) Option {
  660. userCB := func() (string, error) {
  661. return userFromFile(userOrChainedFile)
  662. }
  663. var keyFile string
  664. if len(seedFiles) > 0 {
  665. keyFile = seedFiles[0]
  666. } else {
  667. keyFile = userOrChainedFile
  668. }
  669. sigCB := func(nonce []byte) ([]byte, error) {
  670. return sigHandler(nonce, keyFile)
  671. }
  672. return UserJWT(userCB, sigCB)
  673. }
  674. // UserJWT will set the callbacks to retrieve the user's JWT and
  675. // the signature callback to sign the server nonce. This an the Nkey
  676. // option are mutually exclusive.
  677. func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option {
  678. return func(o *Options) error {
  679. if userCB == nil {
  680. return ErrNoUserCB
  681. }
  682. if sigCB == nil {
  683. return ErrUserButNoSigCB
  684. }
  685. o.UserJWT = userCB
  686. o.SignatureCB = sigCB
  687. return nil
  688. }
  689. }
  690. // Nkey will set the public Nkey and the signature callback to
  691. // sign the server nonce.
  692. func Nkey(pubKey string, sigCB SignatureHandler) Option {
  693. return func(o *Options) error {
  694. o.Nkey = pubKey
  695. o.SignatureCB = sigCB
  696. if pubKey != "" && sigCB == nil {
  697. return ErrNkeyButNoSigCB
  698. }
  699. return nil
  700. }
  701. }
  702. // SyncQueueLen will set the maximum queue len for the internal
  703. // channel used for SubscribeSync().
  704. func SyncQueueLen(max int) Option {
  705. return func(o *Options) error {
  706. o.SubChanLen = max
  707. return nil
  708. }
  709. }
  710. // Dialer is an Option to set the dialer which will be used when
  711. // attempting to establish a connection.
  712. // DEPRECATED: Should use CustomDialer instead.
  713. func Dialer(dialer *net.Dialer) Option {
  714. return func(o *Options) error {
  715. o.Dialer = dialer
  716. return nil
  717. }
  718. }
  719. // SetCustomDialer is an Option to set a custom dialer which will be
  720. // used when attempting to establish a connection. If both Dialer
  721. // and CustomDialer are specified, CustomDialer takes precedence.
  722. func SetCustomDialer(dialer CustomDialer) Option {
  723. return func(o *Options) error {
  724. o.CustomDialer = dialer
  725. return nil
  726. }
  727. }
  728. // UseOldRequestStyle is an Option to force usage of the old Request style.
  729. func UseOldRequestStyle() Option {
  730. return func(o *Options) error {
  731. o.UseOldRequestStyle = true
  732. return nil
  733. }
  734. }
  735. // Handler processing
  736. // SetDisconnectHandler will set the disconnect event handler.
  737. func (nc *Conn) SetDisconnectHandler(dcb ConnHandler) {
  738. if nc == nil {
  739. return
  740. }
  741. nc.mu.Lock()
  742. defer nc.mu.Unlock()
  743. nc.Opts.DisconnectedCB = dcb
  744. }
  745. // SetReconnectHandler will set the reconnect event handler.
  746. func (nc *Conn) SetReconnectHandler(rcb ConnHandler) {
  747. if nc == nil {
  748. return
  749. }
  750. nc.mu.Lock()
  751. defer nc.mu.Unlock()
  752. nc.Opts.ReconnectedCB = rcb
  753. }
  754. // SetDiscoveredServersHandler will set the discovered servers handler.
  755. func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) {
  756. if nc == nil {
  757. return
  758. }
  759. nc.mu.Lock()
  760. defer nc.mu.Unlock()
  761. nc.Opts.DiscoveredServersCB = dscb
  762. }
  763. // SetClosedHandler will set the reconnect event handler.
  764. func (nc *Conn) SetClosedHandler(cb ConnHandler) {
  765. if nc == nil {
  766. return
  767. }
  768. nc.mu.Lock()
  769. defer nc.mu.Unlock()
  770. nc.Opts.ClosedCB = cb
  771. }
  772. // SetErrorHandler will set the async error handler.
  773. func (nc *Conn) SetErrorHandler(cb ErrHandler) {
  774. if nc == nil {
  775. return
  776. }
  777. nc.mu.Lock()
  778. defer nc.mu.Unlock()
  779. nc.Opts.AsyncErrorCB = cb
  780. }
  781. // Process the url string argument to Connect.
  782. // Return an array of urls, even if only one.
  783. func processUrlString(url string) []string {
  784. urls := strings.Split(url, ",")
  785. for i, s := range urls {
  786. urls[i] = strings.TrimSpace(s)
  787. }
  788. return urls
  789. }
  790. // Connect will attempt to connect to a NATS server with multiple options.
  791. func (o Options) Connect() (*Conn, error) {
  792. nc := &Conn{Opts: o}
  793. // Some default options processing.
  794. if nc.Opts.MaxPingsOut == 0 {
  795. nc.Opts.MaxPingsOut = DefaultMaxPingOut
  796. }
  797. // Allow old default for channel length to work correctly.
  798. if nc.Opts.SubChanLen == 0 {
  799. nc.Opts.SubChanLen = DefaultMaxChanLen
  800. }
  801. // Default ReconnectBufSize
  802. if nc.Opts.ReconnectBufSize == 0 {
  803. nc.Opts.ReconnectBufSize = DefaultReconnectBufSize
  804. }
  805. // Ensure that Timeout is not 0
  806. if nc.Opts.Timeout == 0 {
  807. nc.Opts.Timeout = DefaultTimeout
  808. }
  809. // Check first for user jwt callback being defined and nkey.
  810. if nc.Opts.UserJWT != nil && nc.Opts.Nkey != "" {
  811. return nil, ErrNkeyAndUser
  812. }
  813. // Check if we have an nkey but no signature callback defined.
  814. if nc.Opts.Nkey != "" && nc.Opts.SignatureCB == nil {
  815. return nil, ErrNkeyButNoSigCB
  816. }
  817. // Allow custom Dialer for connecting using DialTimeout by default
  818. if nc.Opts.Dialer == nil {
  819. nc.Opts.Dialer = &net.Dialer{
  820. Timeout: nc.Opts.Timeout,
  821. }
  822. }
  823. if err := nc.setupServerPool(); err != nil {
  824. return nil, err
  825. }
  826. // Create the async callback handler.
  827. nc.ach = &asyncCallbacksHandler{}
  828. nc.ach.cond = sync.NewCond(&nc.ach.mu)
  829. if err := nc.connect(); err != nil {
  830. return nil, err
  831. }
  832. // Spin up the async cb dispatcher on success
  833. go nc.ach.asyncCBDispatcher()
  834. return nc, nil
  835. }
  836. const (
  837. _CRLF_ = "\r\n"
  838. _EMPTY_ = ""
  839. _SPC_ = " "
  840. _PUB_P_ = "PUB "
  841. )
  842. const (
  843. _OK_OP_ = "+OK"
  844. _ERR_OP_ = "-ERR"
  845. _PONG_OP_ = "PONG"
  846. _INFO_OP_ = "INFO"
  847. )
  848. const (
  849. conProto = "CONNECT %s" + _CRLF_
  850. pingProto = "PING" + _CRLF_
  851. pongProto = "PONG" + _CRLF_
  852. subProto = "SUB %s %s %d" + _CRLF_
  853. unsubProto = "UNSUB %d %s" + _CRLF_
  854. okProto = _OK_OP_ + _CRLF_
  855. )
  856. // Return the currently selected server
  857. func (nc *Conn) currentServer() (int, *srv) {
  858. for i, s := range nc.srvPool {
  859. if s == nil {
  860. continue
  861. }
  862. if s == nc.current {
  863. return i, s
  864. }
  865. }
  866. return -1, nil
  867. }
  868. // Pop the current server and put onto the end of the list. Select head of list as long
  869. // as number of reconnect attempts under MaxReconnect.
  870. func (nc *Conn) selectNextServer() (*srv, error) {
  871. i, s := nc.currentServer()
  872. if i < 0 {
  873. return nil, ErrNoServers
  874. }
  875. sp := nc.srvPool
  876. num := len(sp)
  877. copy(sp[i:num-1], sp[i+1:num])
  878. maxReconnect := nc.Opts.MaxReconnect
  879. if maxReconnect < 0 || s.reconnects < maxReconnect {
  880. nc.srvPool[num-1] = s
  881. } else {
  882. nc.srvPool = sp[0 : num-1]
  883. }
  884. if len(nc.srvPool) <= 0 {
  885. nc.current = nil
  886. return nil, ErrNoServers
  887. }
  888. nc.current = nc.srvPool[0]
  889. return nc.srvPool[0], nil
  890. }
  891. // Will assign the correct server to nc.current
  892. func (nc *Conn) pickServer() error {
  893. nc.current = nil
  894. if len(nc.srvPool) <= 0 {
  895. return ErrNoServers
  896. }
  897. for _, s := range nc.srvPool {
  898. if s != nil {
  899. nc.current = s
  900. return nil
  901. }
  902. }
  903. return ErrNoServers
  904. }
  905. const tlsScheme = "tls"
  906. // Create the server pool using the options given.
  907. // We will place a Url option first, followed by any
  908. // Server Options. We will randomize the server pool unless
  909. // the NoRandomize flag is set.
  910. func (nc *Conn) setupServerPool() error {
  911. nc.srvPool = make([]*srv, 0, srvPoolSize)
  912. nc.urls = make(map[string]struct{}, srvPoolSize)
  913. // Create srv objects from each url string in nc.Opts.Servers
  914. // and add them to the pool.
  915. for _, urlString := range nc.Opts.Servers {
  916. if err := nc.addURLToPool(urlString, false, false); err != nil {
  917. return err
  918. }
  919. }
  920. // Randomize if allowed to
  921. if !nc.Opts.NoRandomize {
  922. nc.shufflePool()
  923. }
  924. // Normally, if this one is set, Options.Servers should not be,
  925. // but we always allowed that, so continue to do so.
  926. if nc.Opts.Url != _EMPTY_ {
  927. // Add to the end of the array
  928. if err := nc.addURLToPool(nc.Opts.Url, false, false); err != nil {
  929. return err
  930. }
  931. // Then swap it with first to guarantee that Options.Url is tried first.
  932. last := len(nc.srvPool) - 1
  933. if last > 0 {
  934. nc.srvPool[0], nc.srvPool[last] = nc.srvPool[last], nc.srvPool[0]
  935. }
  936. } else if len(nc.srvPool) <= 0 {
  937. // Place default URL if pool is empty.
  938. if err := nc.addURLToPool(DefaultURL, false, false); err != nil {
  939. return err
  940. }
  941. }
  942. // Check for Scheme hint to move to TLS mode.
  943. for _, srv := range nc.srvPool {
  944. if srv.url.Scheme == tlsScheme {
  945. // FIXME(dlc), this is for all in the pool, should be case by case.
  946. nc.Opts.Secure = true
  947. if nc.Opts.TLSConfig == nil {
  948. nc.Opts.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
  949. }
  950. }
  951. }
  952. return nc.pickServer()
  953. }
  954. // Helper function to return scheme
  955. func (nc *Conn) connScheme() string {
  956. if nc.Opts.Secure {
  957. return tlsScheme
  958. }
  959. return "nats"
  960. }
  961. // Return true iff u.Hostname() is an IP address.
  962. func hostIsIP(u *url.URL) bool {
  963. return net.ParseIP(u.Hostname()) != nil
  964. }
  965. // addURLToPool adds an entry to the server pool
  966. func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
  967. if !strings.Contains(sURL, "://") {
  968. sURL = fmt.Sprintf("%s://%s", nc.connScheme(), sURL)
  969. }
  970. var (
  971. u *url.URL
  972. err error
  973. )
  974. for i := 0; i < 2; i++ {
  975. u, err = url.Parse(sURL)
  976. if err != nil {
  977. return err
  978. }
  979. if u.Port() != "" {
  980. break
  981. }
  982. // In case given URL is of the form "localhost:", just add
  983. // the port number at the end, otherwise, add ":4222".
  984. if sURL[len(sURL)-1] != ':' {
  985. sURL += ":"
  986. }
  987. sURL += defaultPortString
  988. }
  989. var tlsName string
  990. if implicit {
  991. curl := nc.current.url
  992. // Check to see if we do not have a url.User but current connected
  993. // url does. If so copy over.
  994. if u.User == nil && curl.User != nil {
  995. u.User = curl.User
  996. }
  997. // We are checking to see if we have a secure connection and are
  998. // adding an implicit server that just has an IP. If so we will remember
  999. // the current hostname we are connected to.
  1000. if saveTLSName && hostIsIP(u) {
  1001. tlsName = curl.Hostname()
  1002. }
  1003. }
  1004. s := &srv{url: u, isImplicit: implicit, tlsName: tlsName}
  1005. nc.srvPool = append(nc.srvPool, s)
  1006. nc.urls[u.Host] = struct{}{}
  1007. return nil
  1008. }
  1009. // shufflePool swaps randomly elements in the server pool
  1010. func (nc *Conn) shufflePool() {
  1011. if len(nc.srvPool) <= 1 {
  1012. return
  1013. }
  1014. source := rand.NewSource(time.Now().UnixNano())
  1015. r := rand.New(source)
  1016. for i := range nc.srvPool {
  1017. j := r.Intn(i + 1)
  1018. nc.srvPool[i], nc.srvPool[j] = nc.srvPool[j], nc.srvPool[i]
  1019. }
  1020. }
  1021. func (nc *Conn) newBuffer() *bufio.Writer {
  1022. var w io.Writer = nc.conn
  1023. if nc.Opts.FlusherTimeout > 0 {
  1024. w = &timeoutWriter{conn: nc.conn, timeout: nc.Opts.FlusherTimeout}
  1025. }
  1026. return bufio.NewWriterSize(w, defaultBufSize)
  1027. }
  1028. // createConn will connect to the server and wrap the appropriate
  1029. // bufio structures. It will do the right thing when an existing
  1030. // connection is in place.
  1031. func (nc *Conn) createConn() (err error) {
  1032. if nc.Opts.Timeout < 0 {
  1033. return ErrBadTimeout
  1034. }
  1035. if _, cur := nc.currentServer(); cur == nil {
  1036. return ErrNoServers
  1037. } else {
  1038. cur.lastAttempt = time.Now()
  1039. }
  1040. // We will auto-expand host names if they resolve to multiple IPs
  1041. hosts := map[string]struct{}{}
  1042. u := nc.current.url
  1043. if net.ParseIP(u.Hostname()) == nil {
  1044. addrs, _ := net.LookupHost(u.Hostname())
  1045. for _, addr := range addrs {
  1046. hosts[net.JoinHostPort(addr, u.Port())] = struct{}{}
  1047. }
  1048. }
  1049. // Fall back to what we were given.
  1050. if len(hosts) == 0 {
  1051. hosts[u.Host] = struct{}{}
  1052. }
  1053. // CustomDialer takes precedence. If not set, use Opts.Dialer which
  1054. // is set to a default *net.Dialer (in Connect()) if not explicitly
  1055. // set by the user.
  1056. dialer := nc.Opts.CustomDialer
  1057. if dialer == nil {
  1058. // We will copy and shorten the timeout if we have multiple hosts to try.
  1059. copyDialer := *nc.Opts.Dialer
  1060. copyDialer.Timeout = copyDialer.Timeout / time.Duration(len(hosts))
  1061. dialer = &copyDialer
  1062. }
  1063. for host := range hosts {
  1064. nc.conn, err = dialer.Dial("tcp", host)
  1065. if err == nil {
  1066. break
  1067. }
  1068. }
  1069. if err != nil {
  1070. return err
  1071. }
  1072. // No clue why, but this stalls and kills performance on Mac (Mavericks).
  1073. // https://code.google.com/p/go/issues/detail?id=6930
  1074. //if ip, ok := nc.conn.(*net.TCPConn); ok {
  1075. // ip.SetReadBuffer(defaultBufSize)
  1076. //}
  1077. if nc.pending != nil && nc.bw != nil {
  1078. // Move to pending buffer.
  1079. nc.bw.Flush()
  1080. }
  1081. nc.bw = nc.newBuffer()
  1082. return nil
  1083. }
  1084. // makeTLSConn will wrap an existing Conn using TLS
  1085. func (nc *Conn) makeTLSConn() error {
  1086. // Allow the user to configure their own tls.Config structure.
  1087. var tlsCopy *tls.Config
  1088. if nc.Opts.TLSConfig != nil {
  1089. tlsCopy = util.CloneTLSConfig(nc.Opts.TLSConfig)
  1090. } else {
  1091. tlsCopy = &tls.Config{}
  1092. }
  1093. // If its blank we will override it with the current host
  1094. if tlsCopy.ServerName == _EMPTY_ {
  1095. if nc.current.tlsName != _EMPTY_ {
  1096. tlsCopy.ServerName = nc.current.tlsName
  1097. } else {
  1098. h, _, _ := net.SplitHostPort(nc.current.url.Host)
  1099. tlsCopy.ServerName = h
  1100. }
  1101. }
  1102. nc.conn = tls.Client(nc.conn, tlsCopy)
  1103. conn := nc.conn.(*tls.Conn)
  1104. if err := conn.Handshake(); err != nil {
  1105. return err
  1106. }
  1107. nc.bw = nc.newBuffer()
  1108. return nil
  1109. }
  1110. // waitForExits will wait for all socket watcher Go routines to
  1111. // be shutdown before proceeding.
  1112. func (nc *Conn) waitForExits() {
  1113. // Kick old flusher forcefully.
  1114. select {
  1115. case nc.fch <- struct{}{}:
  1116. default:
  1117. }
  1118. // Wait for any previous go routines.
  1119. nc.wg.Wait()
  1120. }
  1121. // Report the connected server's Url
  1122. func (nc *Conn) ConnectedUrl() string {
  1123. if nc == nil {
  1124. return _EMPTY_
  1125. }
  1126. nc.mu.Lock()
  1127. defer nc.mu.Unlock()
  1128. if nc.status != CONNECTED {
  1129. return _EMPTY_
  1130. }
  1131. return nc.current.url.String()
  1132. }
  1133. // ConnectedAddr returns the connected server's IP
  1134. func (nc *Conn) ConnectedAddr() string {
  1135. if nc == nil {
  1136. return _EMPTY_
  1137. }
  1138. nc.mu.Lock()
  1139. defer nc.mu.Unlock()
  1140. if nc.status != CONNECTED {
  1141. return _EMPTY_
  1142. }
  1143. return nc.conn.RemoteAddr().String()
  1144. }
  1145. // Report the connected server's Id
  1146. func (nc *Conn) ConnectedServerId() string {
  1147. if nc == nil {
  1148. return _EMPTY_
  1149. }
  1150. nc.mu.Lock()
  1151. defer nc.mu.Unlock()
  1152. if nc.status != CONNECTED {
  1153. return _EMPTY_
  1154. }
  1155. return nc.info.Id
  1156. }
  1157. // Low level setup for structs, etc
  1158. func (nc *Conn) setup() {
  1159. nc.subs = make(map[int64]*Subscription)
  1160. nc.pongs = make([]chan struct{}, 0, 8)
  1161. nc.fch = make(chan struct{}, flushChanSize)
  1162. // Setup scratch outbound buffer for PUB
  1163. pub := nc.scratch[:len(_PUB_P_)]
  1164. copy(pub, _PUB_P_)
  1165. }
  1166. // Process a connected connection and initialize properly.
  1167. func (nc *Conn) processConnectInit() error {
  1168. // Set our deadline for the whole connect process
  1169. nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout))
  1170. defer nc.conn.SetDeadline(time.Time{})
  1171. // Set our status to connecting.
  1172. nc.status = CONNECTING
  1173. // Process the INFO protocol received from the server
  1174. err := nc.processExpectedInfo()
  1175. if err != nil {
  1176. return err
  1177. }
  1178. // Send the CONNECT protocol along with the initial PING protocol.
  1179. // Wait for the PONG response (or any error that we get from the server).
  1180. err = nc.sendConnect()
  1181. if err != nil {
  1182. return err
  1183. }
  1184. // Reset the number of PING sent out
  1185. nc.pout = 0
  1186. // Start or reset Timer
  1187. if nc.Opts.PingInterval > 0 {
  1188. if nc.ptmr == nil {
  1189. nc.ptmr = time.AfterFunc(nc.Opts.PingInterval, nc.processPingTimer)
  1190. } else {
  1191. nc.ptmr.Reset(nc.Opts.PingInterval)
  1192. }
  1193. }
  1194. // Start the readLoop and flusher go routines, we will wait on both on a reconnect event.
  1195. nc.wg.Add(2)
  1196. go nc.readLoop()
  1197. go nc.flusher()
  1198. return nil
  1199. }
  1200. // Main connect function. Will connect to the nats-server
  1201. func (nc *Conn) connect() error {
  1202. var returnedErr error
  1203. // Create actual socket connection
  1204. // For first connect we walk all servers in the pool and try
  1205. // to connect immediately.
  1206. nc.mu.Lock()
  1207. nc.initc = true
  1208. // The pool may change inside the loop iteration due to INFO protocol.
  1209. for i := 0; i < len(nc.srvPool); i++ {
  1210. nc.current = nc.srvPool[i]
  1211. if err := nc.createConn(); err == nil {
  1212. // This was moved out of processConnectInit() because
  1213. // that function is now invoked from doReconnect() too.
  1214. nc.setup()
  1215. err = nc.processConnectInit()
  1216. if err == nil {
  1217. nc.srvPool[i].didConnect = true
  1218. nc.srvPool[i].reconnects = 0
  1219. returnedErr = nil
  1220. break
  1221. } else {
  1222. returnedErr = err
  1223. nc.mu.Unlock()
  1224. nc.close(DISCONNECTED, false)
  1225. nc.mu.Lock()
  1226. nc.current = nil
  1227. }
  1228. } else {
  1229. // Cancel out default connection refused, will trigger the
  1230. // No servers error conditional
  1231. if strings.Contains(err.Error(), "connection refused") {
  1232. returnedErr = nil
  1233. }
  1234. }
  1235. }
  1236. nc.initc = false
  1237. defer nc.mu.Unlock()
  1238. if returnedErr == nil && nc.status != CONNECTED {
  1239. returnedErr = ErrNoServers
  1240. }
  1241. return returnedErr
  1242. }
  1243. // This will check to see if the connection should be
  1244. // secure. This can be dictated from either end and should
  1245. // only be called after the INIT protocol has been received.
  1246. func (nc *Conn) checkForSecure() error {
  1247. // Check to see if we need to engage TLS
  1248. o := nc.Opts
  1249. // Check for mismatch in setups
  1250. if o.Secure && !nc.info.TLSRequired {
  1251. return ErrSecureConnWanted
  1252. } else if nc.info.TLSRequired && !o.Secure {
  1253. // Switch to Secure since server needs TLS.
  1254. o.Secure = true
  1255. }
  1256. // Need to rewrap with bufio
  1257. if o.Secure {
  1258. if err := nc.makeTLSConn(); err != nil {
  1259. return err
  1260. }
  1261. }
  1262. return nil
  1263. }
  1264. // processExpectedInfo will look for the expected first INFO message
  1265. // sent when a connection is established. The lock should be held entering.
  1266. func (nc *Conn) processExpectedInfo() error {
  1267. c := &control{}
  1268. // Read the protocol
  1269. err := nc.readOp(c)
  1270. if err != nil {
  1271. return err
  1272. }
  1273. // The nats protocol should send INFO first always.
  1274. if c.op != _INFO_OP_ {
  1275. return ErrNoInfoReceived
  1276. }
  1277. // Parse the protocol
  1278. if err := nc.processInfo(c.args); err != nil {
  1279. return err
  1280. }
  1281. if nc.Opts.Nkey != "" && nc.info.Nonce == "" {
  1282. return ErrNkeysNotSupported
  1283. }
  1284. return nc.checkForSecure()
  1285. }
  1286. // Sends a protocol control message by queuing into the bufio writer
  1287. // and kicking the flush Go routine. These writes are protected.
  1288. func (nc *Conn) sendProto(proto string) {
  1289. nc.mu.Lock()
  1290. nc.bw.WriteString(proto)
  1291. nc.kickFlusher()
  1292. nc.mu.Unlock()
  1293. }
  1294. // Generate a connect protocol message, issuing user/password if
  1295. // applicable. The lock is assumed to be held upon entering.
  1296. func (nc *Conn) connectProto() (string, error) {
  1297. o := nc.Opts
  1298. var nkey, sig, user, pass, token, ujwt string
  1299. u := nc.current.url.User
  1300. if u != nil {
  1301. // if no password, assume username is authToken
  1302. if _, ok := u.Password(); !ok {
  1303. token = u.Username()
  1304. } else {
  1305. user = u.Username()
  1306. pass, _ = u.Password()
  1307. }
  1308. } else {
  1309. // Take from options (possibly all empty strings)
  1310. user = o.User
  1311. pass = o.Password
  1312. token = o.Token
  1313. nkey = o.Nkey
  1314. }
  1315. // Look for user jwt.
  1316. if o.UserJWT != nil {
  1317. if jwt, err := o.UserJWT(); err != nil {
  1318. return _EMPTY_, err
  1319. } else {
  1320. ujwt = jwt
  1321. }
  1322. if nkey != _EMPTY_ {
  1323. return _EMPTY_, ErrNkeyAndUser
  1324. }
  1325. }
  1326. if ujwt != _EMPTY_ || nkey != _EMPTY_ {
  1327. if o.SignatureCB == nil {
  1328. if ujwt == _EMPTY_ {
  1329. return _EMPTY_, ErrNkeyButNoSigCB
  1330. }
  1331. return _EMPTY_, ErrUserButNoSigCB
  1332. }
  1333. sigraw, err := o.SignatureCB([]byte(nc.info.Nonce))
  1334. if err != nil {
  1335. return _EMPTY_, err
  1336. }
  1337. sig = base64.RawURLEncoding.EncodeToString(sigraw)
  1338. }
  1339. if nc.Opts.TokenHandler != nil {
  1340. if token != _EMPTY_ {
  1341. return _EMPTY_, ErrTokenAlreadySet
  1342. }
  1343. token = nc.Opts.TokenHandler()
  1344. }
  1345. cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
  1346. o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho}
  1347. b, err := json.Marshal(cinfo)
  1348. if err != nil {
  1349. return _EMPTY_, ErrJsonParse
  1350. }
  1351. // Check if NoEcho is set and we have a server that supports it.
  1352. if o.NoEcho && nc.info.Proto < 1 {
  1353. return _EMPTY_, ErrNoEchoNotSupported
  1354. }
  1355. return fmt.Sprintf(conProto, b), nil
  1356. }
  1357. // normalizeErr removes the prefix -ERR, trim spaces and remove the quotes.
  1358. func normalizeErr(line string) string {
  1359. s := strings.TrimSpace(strings.TrimPrefix(line, _ERR_OP_))
  1360. s = strings.TrimLeft(strings.TrimRight(s, "'"), "'")
  1361. return s
  1362. }
  1363. // Send a connect protocol message to the server, issue user/password if
  1364. // applicable. Will wait for a flush to return from the server for error
  1365. // processing.
  1366. func (nc *Conn) sendConnect() error {
  1367. // Construct the CONNECT protocol string
  1368. cProto, err := nc.connectProto()
  1369. if err != nil {
  1370. return err
  1371. }
  1372. // Write the protocol into the buffer
  1373. _, err = nc.bw.WriteString(cProto)
  1374. if err != nil {
  1375. return err
  1376. }
  1377. // Add to the buffer the PING protocol
  1378. _, err = nc.bw.WriteString(pingProto)
  1379. if err != nil {
  1380. return err
  1381. }
  1382. // Flush the buffer
  1383. err = nc.bw.Flush()
  1384. if err != nil {
  1385. return err
  1386. }
  1387. // We don't want to read more than we need here, otherwise
  1388. // we would need to transfer the excess read data to the readLoop.
  1389. // Since in normal situations we just are looking for a PONG\r\n,
  1390. // reading byte-by-byte here is ok.
  1391. proto, err := nc.readProto()
  1392. if err != nil {
  1393. return err
  1394. }
  1395. // If opts.Verbose is set, handle +OK
  1396. if nc.Opts.Verbose && proto == okProto {
  1397. // Read the rest now...
  1398. proto, err = nc.readProto()
  1399. if err != nil {
  1400. return err
  1401. }
  1402. }
  1403. // We expect a PONG
  1404. if proto != pongProto {
  1405. // But it could be something else, like -ERR
  1406. // Since we no longer use ReadLine(), trim the trailing "\r\n"
  1407. proto = strings.TrimRight(proto, "\r\n")
  1408. // If it's a server error...
  1409. if strings.HasPrefix(proto, _ERR_OP_) {
  1410. // Remove -ERR, trim spaces and quotes, and convert to lower case.
  1411. proto = normalizeErr(proto)
  1412. return errors.New("nats: " + proto)
  1413. }
  1414. // Notify that we got an unexpected protocol.
  1415. return fmt.Errorf("nats: expected '%s', got '%s'", _PONG_OP_, proto)
  1416. }
  1417. // This is where we are truly connected.
  1418. nc.status = CONNECTED
  1419. return nil
  1420. }
  1421. // reads a protocol one byte at a time.
  1422. func (nc *Conn) readProto() (string, error) {
  1423. var (
  1424. _buf = [10]byte{}
  1425. buf = _buf[:0]
  1426. b = [1]byte{}
  1427. protoEnd = byte('\n')
  1428. )
  1429. for {
  1430. if _, err := nc.conn.Read(b[:1]); err != nil {
  1431. // Do not report EOF error
  1432. if err == io.EOF {
  1433. return string(buf), nil
  1434. }
  1435. return "", err
  1436. }
  1437. buf = append(buf, b[0])
  1438. if b[0] == protoEnd {
  1439. return string(buf), nil
  1440. }
  1441. }
  1442. }
  1443. // A control protocol line.
  1444. type control struct {
  1445. op, args string
  1446. }
  1447. // Read a control line and process the intended op.
  1448. func (nc *Conn) readOp(c *control) error {
  1449. br := bufio.NewReaderSize(nc.conn, defaultBufSize)
  1450. line, err := br.ReadString('\n')
  1451. if err != nil {
  1452. return err
  1453. }
  1454. parseControl(line, c)
  1455. return nil
  1456. }
  1457. // Parse a control line from the server.
  1458. func parseControl(line string, c *control) {
  1459. toks := strings.SplitN(line, _SPC_, 2)
  1460. if len(toks) == 1 {
  1461. c.op = strings.TrimSpace(toks[0])
  1462. c.args = _EMPTY_
  1463. } else if len(toks) == 2 {
  1464. c.op, c.args = strings.TrimSpace(toks[0]), strings.TrimSpace(toks[1])
  1465. } else {
  1466. c.op = _EMPTY_
  1467. }
  1468. }
  1469. // flushReconnectPending will push the pending items that were
  1470. // gathered while we were in a RECONNECTING state to the socket.
  1471. func (nc *Conn) flushReconnectPendingItems() {
  1472. if nc.pending == nil {
  1473. return
  1474. }
  1475. if nc.pending.Len() > 0 {
  1476. nc.bw.Write(nc.pending.Bytes())
  1477. }
  1478. }
  1479. // Stops the ping timer if set.
  1480. // Connection lock is held on entry.
  1481. func (nc *Conn) stopPingTimer() {
  1482. if nc.ptmr != nil {
  1483. nc.ptmr.Stop()
  1484. }
  1485. }
  1486. // Try to reconnect using the option parameters.
  1487. // This function assumes we are allowed to reconnect.
  1488. func (nc *Conn) doReconnect() {
  1489. // We want to make sure we have the other watchers shutdown properly
  1490. // here before we proceed past this point.
  1491. nc.waitForExits()
  1492. // FIXME(dlc) - We have an issue here if we have
  1493. // outstanding flush points (pongs) and they were not
  1494. // sent out, but are still in the pipe.
  1495. // Hold the lock manually and release where needed below,
  1496. // can't do defer here.
  1497. nc.mu.Lock()
  1498. // Clear any queued pongs, e.g. pending flush calls.
  1499. nc.clearPendingFlushCalls()
  1500. // Clear any errors.
  1501. nc.err = nil
  1502. // Perform appropriate callback if needed for a disconnect.
  1503. if nc.Opts.DisconnectedCB != nil {
  1504. nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
  1505. }
  1506. // This is used to wait on go routines exit if we start them in the loop
  1507. // but an error occurs after that.
  1508. waitForGoRoutines := false
  1509. for len(nc.srvPool) > 0 {
  1510. cur, err := nc.selectNextServer()
  1511. if err != nil {
  1512. nc.err = err
  1513. break
  1514. }
  1515. sleepTime := int64(0)
  1516. // Sleep appropriate amount of time before the
  1517. // connection attempt if connecting to same server
  1518. // we just got disconnected from..
  1519. if time.Since(cur.lastAttempt) < nc.Opts.ReconnectWait {
  1520. sleepTime = int64(nc.Opts.ReconnectWait - time.Since(cur.lastAttempt))
  1521. }
  1522. // On Windows, createConn() will take more than a second when no
  1523. // server is running at that address. So it could be that the
  1524. // time elapsed between reconnect attempts is always > than
  1525. // the set option. Release the lock to give a chance to a parallel
  1526. // nc.Close() to break the loop.
  1527. nc.mu.Unlock()
  1528. if sleepTime <= 0 {
  1529. runtime.Gosched()
  1530. } else {
  1531. time.Sleep(time.Duration(sleepTime))
  1532. }
  1533. // If the readLoop, etc.. go routines were started, wait for them to complete.
  1534. if waitForGoRoutines {
  1535. nc.waitForExits()
  1536. waitForGoRoutines = false
  1537. }
  1538. nc.mu.Lock()
  1539. // Check if we have been closed first.
  1540. if nc.isClosed() {
  1541. break
  1542. }
  1543. // Mark that we tried a reconnect
  1544. cur.reconnects++
  1545. // Try to create a new connection
  1546. err = nc.createConn()
  1547. // Not yet connected, retry...
  1548. // Continue to hold the lock
  1549. if err != nil {
  1550. nc.err = nil
  1551. continue
  1552. }
  1553. // We are reconnected
  1554. nc.Reconnects++
  1555. // Process connect logic
  1556. if nc.err = nc.processConnectInit(); nc.err != nil {
  1557. nc.status = RECONNECTING
  1558. // Reset the buffered writer to the pending buffer
  1559. // (was set to a buffered writer on nc.conn in createConn)
  1560. nc.bw.Reset(nc.pending)
  1561. continue
  1562. }
  1563. // Clear out server stats for the server we connected to..
  1564. cur.didConnect = true
  1565. cur.reconnects = 0
  1566. // Send existing subscription state
  1567. nc.resendSubscriptions()
  1568. // Now send off and clear pending buffer
  1569. nc.flushReconnectPendingItems()
  1570. // Flush the buffer
  1571. nc.err = nc.bw.Flush()
  1572. if nc.err != nil {
  1573. nc.status = RECONNECTING
  1574. // Reset the buffered writer to the pending buffer (bytes.Buffer).
  1575. nc.bw.Reset(nc.pending)
  1576. // Stop the ping timer (if set)
  1577. nc.stopPingTimer()
  1578. // Since processConnectInit() returned without error, the
  1579. // go routines were started, so wait for them to return
  1580. // on the next iteration (after releasing the lock).
  1581. waitForGoRoutines = true
  1582. continue
  1583. }
  1584. // Done with the pending buffer
  1585. nc.pending = nil
  1586. // This is where we are truly connected.
  1587. nc.status = CONNECTED
  1588. // Queue up the reconnect callback.
  1589. if nc.Opts.ReconnectedCB != nil {
  1590. nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
  1591. }
  1592. // Release lock here, we will return below.
  1593. nc.mu.Unlock()
  1594. // Make sure to flush everything
  1595. nc.Flush()
  1596. return
  1597. }
  1598. // Call into close.. We have no servers left..
  1599. if nc.err == nil {
  1600. nc.err = ErrNoServers
  1601. }
  1602. nc.mu.Unlock()
  1603. nc.Close()
  1604. }
  1605. // processOpErr handles errors from reading or parsing the protocol.
  1606. // The lock should not be held entering this function.
  1607. func (nc *Conn) processOpErr(err error) {
  1608. nc.mu.Lock()
  1609. if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() {
  1610. nc.mu.Unlock()
  1611. return
  1612. }
  1613. if nc.Opts.AllowReconnect && nc.status == CONNECTED {
  1614. // Set our new status
  1615. nc.status = RECONNECTING
  1616. // Stop ping timer if set
  1617. nc.stopPingTimer()
  1618. if nc.conn != nil {
  1619. nc.bw.Flush()
  1620. nc.conn.Close()
  1621. nc.conn = nil
  1622. }
  1623. // Create pending buffer before reconnecting.
  1624. nc.pending = new(bytes.Buffer)
  1625. nc.bw.Reset(nc.pending)
  1626. go nc.doReconnect()
  1627. nc.mu.Unlock()
  1628. return
  1629. }
  1630. nc.status = DISCONNECTED
  1631. nc.err = err
  1632. nc.mu.Unlock()
  1633. nc.Close()
  1634. }
  1635. // dispatch is responsible for calling any async callbacks
  1636. func (ac *asyncCallbacksHandler) asyncCBDispatcher() {
  1637. for {
  1638. ac.mu.Lock()
  1639. // Protect for spurious wakeups. We should get out of the
  1640. // wait only if there is an element to pop from the list.
  1641. for ac.head == nil {
  1642. ac.cond.Wait()
  1643. }
  1644. cur := ac.head
  1645. ac.head = cur.next
  1646. if cur == ac.tail {
  1647. ac.tail = nil
  1648. }
  1649. ac.mu.Unlock()
  1650. // This signals that the dispatcher has been closed and all
  1651. // previous callbacks have been dispatched.
  1652. if cur.f == nil {
  1653. return
  1654. }
  1655. // Invoke callback outside of handler's lock
  1656. cur.f()
  1657. }
  1658. }
  1659. // Add the given function to the tail of the list and
  1660. // signals the dispatcher.
  1661. func (ac *asyncCallbacksHandler) push(f func()) {
  1662. ac.pushOrClose(f, false)
  1663. }
  1664. // Signals that we are closing...
  1665. func (ac *asyncCallbacksHandler) close() {
  1666. ac.pushOrClose(nil, true)
  1667. }
  1668. // Add the given function to the tail of the list and
  1669. // signals the dispatcher.
  1670. func (ac *asyncCallbacksHandler) pushOrClose(f func(), close bool) {
  1671. ac.mu.Lock()
  1672. defer ac.mu.Unlock()
  1673. // Make sure that library is not calling push with nil function,
  1674. // since this is used to notify the dispatcher that it should stop.
  1675. if !close && f == nil {
  1676. panic("pushing a nil callback")
  1677. }
  1678. cb := &asyncCB{f: f}
  1679. if ac.tail != nil {
  1680. ac.tail.next = cb
  1681. } else {
  1682. ac.head = cb
  1683. }
  1684. ac.tail = cb
  1685. if close {
  1686. ac.cond.Broadcast()
  1687. } else {
  1688. ac.cond.Signal()
  1689. }
  1690. }
  1691. // readLoop() will sit on the socket reading and processing the
  1692. // protocol from the server. It will dispatch appropriately based
  1693. // on the op type.
  1694. func (nc *Conn) readLoop() {
  1695. // Release the wait group on exit
  1696. defer nc.wg.Done()
  1697. // Create a parseState if needed.
  1698. nc.mu.Lock()
  1699. if nc.ps == nil {
  1700. nc.ps = &parseState{}
  1701. }
  1702. nc.mu.Unlock()
  1703. // Stack based buffer.
  1704. b := make([]byte, defaultBufSize)
  1705. for {
  1706. // FIXME(dlc): RWLock here?
  1707. nc.mu.Lock()
  1708. sb := nc.isClosed() || nc.isReconnecting()
  1709. if sb {
  1710. nc.ps = &parseState{}
  1711. }
  1712. conn := nc.conn
  1713. nc.mu.Unlock()
  1714. if sb || conn == nil {
  1715. break
  1716. }
  1717. n, err := conn.Read(b)
  1718. if err != nil {
  1719. nc.processOpErr(err)
  1720. break
  1721. }
  1722. if err := nc.parse(b[:n]); err != nil {
  1723. nc.processOpErr(err)
  1724. break
  1725. }
  1726. }
  1727. // Clear the parseState here..
  1728. nc.mu.Lock()
  1729. nc.ps = nil
  1730. nc.mu.Unlock()
  1731. }
  1732. // waitForMsgs waits on the conditional shared with readLoop and processMsg.
  1733. // It is used to deliver messages to asynchronous subscribers.
  1734. func (nc *Conn) waitForMsgs(s *Subscription) {
  1735. var closed bool
  1736. var delivered, max uint64
  1737. // Used to account for adjustments to sub.pBytes when we wrap back around.
  1738. msgLen := -1
  1739. for {
  1740. s.mu.Lock()
  1741. // Do accounting for last msg delivered here so we only lock once
  1742. // and drain state trips after callback has returned.
  1743. if msgLen >= 0 {
  1744. s.pMsgs--
  1745. s.pBytes -= msgLen
  1746. msgLen = -1
  1747. }
  1748. if s.pHead == nil && !s.closed {
  1749. s.pCond.Wait()
  1750. }
  1751. // Pop the msg off the list
  1752. m := s.pHead
  1753. if m != nil {
  1754. s.pHead = m.next
  1755. if s.pHead == nil {
  1756. s.pTail = nil
  1757. }
  1758. if m.barrier != nil {
  1759. s.mu.Unlock()
  1760. if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
  1761. m.barrier.f()
  1762. }
  1763. continue
  1764. }
  1765. msgLen = len(m.Data)
  1766. }
  1767. mcb := s.mcb
  1768. max = s.max
  1769. closed = s.closed
  1770. if !s.closed {
  1771. s.delivered++
  1772. delivered = s.delivered
  1773. }
  1774. s.mu.Unlock()
  1775. if closed {
  1776. break
  1777. }
  1778. // Deliver the message.
  1779. if m != nil && (max == 0 || delivered <= max) {
  1780. mcb(m)
  1781. }
  1782. // If we have hit the max for delivered msgs, remove sub.
  1783. if max > 0 && delivered >= max {
  1784. nc.mu.Lock()
  1785. nc.removeSub(s)
  1786. nc.mu.Unlock()
  1787. break
  1788. }
  1789. }
  1790. // Check for barrier messages
  1791. s.mu.Lock()
  1792. for m := s.pHead; m != nil; m = s.pHead {
  1793. if m.barrier != nil {
  1794. s.mu.Unlock()
  1795. if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
  1796. m.barrier.f()
  1797. }
  1798. s.mu.Lock()
  1799. }
  1800. s.pHead = m.next
  1801. }
  1802. s.mu.Unlock()
  1803. }
  1804. // processMsg is called by parse and will place the msg on the
  1805. // appropriate channel/pending queue for processing. If the channel is full,
  1806. // or the pending queue is over the pending limits, the connection is
  1807. // considered a slow consumer.
  1808. func (nc *Conn) processMsg(data []byte) {
  1809. // Don't lock the connection to avoid server cutting us off if the
  1810. // flusher is holding the connection lock, trying to send to the server
  1811. // that is itself trying to send data to us.
  1812. nc.subsMu.RLock()
  1813. // Stats
  1814. nc.InMsgs++
  1815. nc.InBytes += uint64(len(data))
  1816. sub := nc.subs[nc.ps.ma.sid]
  1817. if sub == nil {
  1818. nc.subsMu.RUnlock()
  1819. return
  1820. }
  1821. // Copy them into string
  1822. subj := string(nc.ps.ma.subject)
  1823. reply := string(nc.ps.ma.reply)
  1824. // Doing message create outside of the sub's lock to reduce contention.
  1825. // It's possible that we end-up not using the message, but that's ok.
  1826. // FIXME(dlc): Need to copy, should/can do COW?
  1827. msgPayload := make([]byte, len(data))
  1828. copy(msgPayload, data)
  1829. // FIXME(dlc): Should we recycle these containers?
  1830. m := &Msg{Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
  1831. sub.mu.Lock()
  1832. // Subscription internal stats (applicable only for non ChanSubscription's)
  1833. if sub.typ != ChanSubscription {
  1834. sub.pMsgs++
  1835. if sub.pMsgs > sub.pMsgsMax {
  1836. sub.pMsgsMax = sub.pMsgs
  1837. }
  1838. sub.pBytes += len(m.Data)
  1839. if sub.pBytes > sub.pBytesMax {
  1840. sub.pBytesMax = sub.pBytes
  1841. }
  1842. // Check for a Slow Consumer
  1843. if (sub.pMsgsLimit > 0 && sub.pMsgs > sub.pMsgsLimit) ||
  1844. (sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
  1845. goto slowConsumer
  1846. }
  1847. }
  1848. // We have two modes of delivery. One is the channel, used by channel
  1849. // subscribers and syncSubscribers, the other is a linked list for async.
  1850. if sub.mch != nil {
  1851. select {
  1852. case sub.mch <- m:
  1853. default:
  1854. goto slowConsumer
  1855. }
  1856. } else {
  1857. // Push onto the async pList
  1858. if sub.pHead == nil {
  1859. sub.pHead = m
  1860. sub.pTail = m
  1861. sub.pCond.Signal()
  1862. } else {
  1863. sub.pTail.next = m
  1864. sub.pTail = m
  1865. }
  1866. }
  1867. // Clear SlowConsumer status.
  1868. sub.sc = false
  1869. sub.mu.Unlock()
  1870. nc.subsMu.RUnlock()
  1871. return
  1872. slowConsumer:
  1873. sub.dropped++
  1874. sc := !sub.sc
  1875. sub.sc = true
  1876. // Undo stats from above
  1877. if sub.typ != ChanSubscription {
  1878. sub.pMsgs--
  1879. sub.pBytes -= len(m.Data)
  1880. }
  1881. sub.mu.Unlock()
  1882. nc.subsMu.RUnlock()
  1883. if sc {
  1884. // Now we need connection's lock and we may end-up in the situation
  1885. // that we were trying to avoid, except that in this case, the client
  1886. // is already experiencing client-side slow consumer situation.
  1887. nc.mu.Lock()
  1888. nc.err = ErrSlowConsumer
  1889. if nc.Opts.AsyncErrorCB != nil {
  1890. nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, sub, ErrSlowConsumer) })
  1891. }
  1892. nc.mu.Unlock()
  1893. }
  1894. }
  1895. // processPermissionsViolation is called when the server signals a subject
  1896. // permissions violation on either publish or subscribe.
  1897. func (nc *Conn) processPermissionsViolation(err string) {
  1898. nc.mu.Lock()
  1899. // create error here so we can pass it as a closure to the async cb dispatcher.
  1900. e := errors.New("nats: " + err)
  1901. nc.err = e
  1902. if nc.Opts.AsyncErrorCB != nil {
  1903. nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, e) })
  1904. }
  1905. nc.mu.Unlock()
  1906. }
  1907. // processAuthorizationViolation is called when the server signals a user
  1908. // authorization violation.
  1909. func (nc *Conn) processAuthorizationViolation(err string) {
  1910. nc.mu.Lock()
  1911. nc.err = ErrAuthorization
  1912. if nc.Opts.AsyncErrorCB != nil {
  1913. nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, ErrAuthorization) })
  1914. }
  1915. nc.mu.Unlock()
  1916. }
  1917. // flusher is a separate Go routine that will process flush requests for the write
  1918. // bufio. This allows coalescing of writes to the underlying socket.
  1919. func (nc *Conn) flusher() {
  1920. // Release the wait group
  1921. defer nc.wg.Done()
  1922. // snapshot the bw and conn since they can change from underneath of us.
  1923. nc.mu.Lock()
  1924. bw := nc.bw
  1925. conn := nc.conn
  1926. fch := nc.fch
  1927. nc.mu.Unlock()
  1928. if conn == nil || bw == nil {
  1929. return
  1930. }
  1931. for {
  1932. if _, ok := <-fch; !ok {
  1933. return
  1934. }
  1935. nc.mu.Lock()
  1936. // Check to see if we should bail out.
  1937. if !nc.isConnected() || nc.isConnecting() || bw != nc.bw || conn != nc.conn {
  1938. nc.mu.Unlock()
  1939. return
  1940. }
  1941. if bw.Buffered() > 0 {
  1942. if err := bw.Flush(); err != nil {
  1943. if nc.err == nil {
  1944. nc.err = err
  1945. }
  1946. }
  1947. }
  1948. nc.mu.Unlock()
  1949. }
  1950. }
  1951. // processPing will send an immediate pong protocol response to the
  1952. // server. The server uses this mechanism to detect dead clients.
  1953. func (nc *Conn) processPing() {
  1954. nc.sendProto(pongProto)
  1955. }
  1956. // processPong is used to process responses to the client's ping
  1957. // messages. We use pings for the flush mechanism as well.
  1958. func (nc *Conn) processPong() {
  1959. var ch chan struct{}
  1960. nc.mu.Lock()
  1961. if len(nc.pongs) > 0 {
  1962. ch = nc.pongs[0]
  1963. nc.pongs = nc.pongs[1:]
  1964. }
  1965. nc.pout = 0
  1966. nc.mu.Unlock()
  1967. if ch != nil {
  1968. ch <- struct{}{}
  1969. }
  1970. }
  1971. // processOK is a placeholder for processing OK messages.
  1972. func (nc *Conn) processOK() {
  1973. // do nothing
  1974. }
  1975. // processInfo is used to parse the info messages sent
  1976. // from the server.
  1977. // This function may update the server pool.
  1978. func (nc *Conn) processInfo(info string) error {
  1979. if info == _EMPTY_ {
  1980. return nil
  1981. }
  1982. ncInfo := serverInfo{}
  1983. if err := json.Unmarshal([]byte(info), &ncInfo); err != nil {
  1984. return err
  1985. }
  1986. // Copy content into connection's info structure.
  1987. nc.info = ncInfo
  1988. // The array could be empty/not present on initial connect,
  1989. // if advertise is disabled on that server, or servers that
  1990. // did not include themselves in the async INFO protocol.
  1991. // If empty, do not remove the implicit servers from the pool.
  1992. if len(ncInfo.ConnectURLs) == 0 {
  1993. return nil
  1994. }
  1995. // Note about pool randomization: when the pool was first created,
  1996. // it was randomized (if allowed). We keep the order the same (removing
  1997. // implicit servers that are no longer sent to us). New URLs are sent
  1998. // to us in no specific order so don't need extra randomization.
  1999. hasNew := false
  2000. // This is what we got from the server we are connected to.
  2001. urls := nc.info.ConnectURLs
  2002. // Transform that to a map for easy lookups
  2003. tmp := make(map[string]struct{}, len(urls))
  2004. for _, curl := range urls {
  2005. tmp[curl] = struct{}{}
  2006. }
  2007. // Walk the pool and removed the implicit servers that are no longer in the
  2008. // given array/map
  2009. sp := nc.srvPool
  2010. for i := 0; i < len(sp); i++ {
  2011. srv := sp[i]
  2012. curl := srv.url.Host
  2013. // Check if this URL is in the INFO protocol
  2014. _, inInfo := tmp[curl]
  2015. // Remove from the temp map so that at the end we are left with only
  2016. // new (or restarted) servers that need to be added to the pool.
  2017. delete(tmp, curl)
  2018. // Keep servers that were set through Options, but also the one that
  2019. // we are currently connected to (even if it is a discovered server).
  2020. if !srv.isImplicit || srv.url == nc.current.url {
  2021. continue
  2022. }
  2023. if !inInfo {
  2024. // Remove from server pool. Keep current order.
  2025. copy(sp[i:], sp[i+1:])
  2026. nc.srvPool = sp[:len(sp)-1]
  2027. sp = nc.srvPool
  2028. i--
  2029. }
  2030. }
  2031. // Figure out if we should save off the current non-IP hostname if we encounter a bare IP.
  2032. var saveTLS bool
  2033. if nc.current != nil && nc.Opts.Secure && !hostIsIP(nc.current.url) {
  2034. saveTLS = true
  2035. }
  2036. // If there are any left in the tmp map, these are new (or restarted) servers
  2037. // and need to be added to the pool.
  2038. for curl := range tmp {
  2039. // Before adding, check if this is a new (as in never seen) URL.
  2040. // This is used to figure out if we invoke the DiscoveredServersCB
  2041. if _, present := nc.urls[curl]; !present {
  2042. hasNew = true
  2043. }
  2044. nc.addURLToPool(fmt.Sprintf("%s://%s", nc.connScheme(), curl), true, saveTLS)
  2045. }
  2046. if hasNew && !nc.initc && nc.Opts.DiscoveredServersCB != nil {
  2047. nc.ach.push(func() { nc.Opts.DiscoveredServersCB(nc) })
  2048. }
  2049. return nil
  2050. }
  2051. // processAsyncInfo does the same than processInfo, but is called
  2052. // from the parser. Calls processInfo under connection's lock
  2053. // protection.
  2054. func (nc *Conn) processAsyncInfo(info []byte) {
  2055. nc.mu.Lock()
  2056. // Ignore errors, we will simply not update the server pool...
  2057. nc.processInfo(string(info))
  2058. nc.mu.Unlock()
  2059. }
  2060. // LastError reports the last error encountered via the connection.
  2061. // It can be used reliably within ClosedCB in order to find out reason
  2062. // why connection was closed for example.
  2063. func (nc *Conn) LastError() error {
  2064. if nc == nil {
  2065. return ErrInvalidConnection
  2066. }
  2067. nc.mu.Lock()
  2068. err := nc.err
  2069. nc.mu.Unlock()
  2070. return err
  2071. }
  2072. // processErr processes any error messages from the server and
  2073. // sets the connection's lastError.
  2074. func (nc *Conn) processErr(ie string) {
  2075. // Trim, remove quotes
  2076. ne := normalizeErr(ie)
  2077. // convert to lower case.
  2078. e := strings.ToLower(ne)
  2079. // FIXME(dlc) - process Slow Consumer signals special.
  2080. if e == STALE_CONNECTION {
  2081. nc.processOpErr(ErrStaleConnection)
  2082. } else if strings.HasPrefix(e, PERMISSIONS_ERR) {
  2083. nc.processPermissionsViolation(ne)
  2084. } else if strings.HasPrefix(e, AUTHORIZATION_ERR) {
  2085. nc.processAuthorizationViolation(ne)
  2086. } else {
  2087. nc.mu.Lock()
  2088. nc.err = errors.New("nats: " + ne)
  2089. nc.mu.Unlock()
  2090. nc.Close()
  2091. }
  2092. }
  2093. // kickFlusher will send a bool on a channel to kick the
  2094. // flush Go routine to flush data to the server.
  2095. func (nc *Conn) kickFlusher() {
  2096. if nc.bw != nil {
  2097. select {
  2098. case nc.fch <- struct{}{}:
  2099. default:
  2100. }
  2101. }
  2102. }
  2103. // Publish publishes the data argument to the given subject. The data
  2104. // argument is left untouched and needs to be correctly interpreted on
  2105. // the receiver.
  2106. func (nc *Conn) Publish(subj string, data []byte) error {
  2107. return nc.publish(subj, _EMPTY_, data)
  2108. }
  2109. // PublishMsg publishes the Msg structure, which includes the
  2110. // Subject, an optional Reply and an optional Data field.
  2111. func (nc *Conn) PublishMsg(m *Msg) error {
  2112. if m == nil {
  2113. return ErrInvalidMsg
  2114. }
  2115. return nc.publish(m.Subject, m.Reply, m.Data)
  2116. }
  2117. // PublishRequest will perform a Publish() excpecting a response on the
  2118. // reply subject. Use Request() for automatically waiting for a response
  2119. // inline.
  2120. func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
  2121. return nc.publish(subj, reply, data)
  2122. }
  2123. // Used for handrolled itoa
  2124. const digits = "0123456789"
  2125. // publish is the internal function to publish messages to a nats-server.
  2126. // Sends a protocol data message by queuing into the bufio writer
  2127. // and kicking the flush go routine. These writes should be protected.
  2128. func (nc *Conn) publish(subj, reply string, data []byte) error {
  2129. if nc == nil {
  2130. return ErrInvalidConnection
  2131. }
  2132. if subj == "" {
  2133. return ErrBadSubject
  2134. }
  2135. nc.mu.Lock()
  2136. if nc.isClosed() {
  2137. nc.mu.Unlock()
  2138. return ErrConnectionClosed
  2139. }
  2140. if nc.isDrainingPubs() {
  2141. nc.mu.Unlock()
  2142. return ErrConnectionDraining
  2143. }
  2144. // Proactively reject payloads over the threshold set by server.
  2145. msgSize := int64(len(data))
  2146. if msgSize > nc.info.MaxPayload {
  2147. nc.mu.Unlock()
  2148. return ErrMaxPayload
  2149. }
  2150. // Check if we are reconnecting, and if so check if
  2151. // we have exceeded our reconnect outbound buffer limits.
  2152. if nc.isReconnecting() {
  2153. // Flush to underlying buffer.
  2154. nc.bw.Flush()
  2155. // Check if we are over
  2156. if nc.pending.Len() >= nc.Opts.ReconnectBufSize {
  2157. nc.mu.Unlock()
  2158. return ErrReconnectBufExceeded
  2159. }
  2160. }
  2161. msgh := nc.scratch[:len(_PUB_P_)]
  2162. msgh = append(msgh, subj...)
  2163. msgh = append(msgh, ' ')
  2164. if reply != "" {
  2165. msgh = append(msgh, reply...)
  2166. msgh = append(msgh, ' ')
  2167. }
  2168. // We could be smarter here, but simple loop is ok,
  2169. // just avoid strconv in fast path
  2170. // FIXME(dlc) - Find a better way here.
  2171. // msgh = strconv.AppendInt(msgh, int64(len(data)), 10)
  2172. var b [12]byte
  2173. var i = len(b)
  2174. if len(data) > 0 {
  2175. for l := len(data); l > 0; l /= 10 {
  2176. i -= 1
  2177. b[i] = digits[l%10]
  2178. }
  2179. } else {
  2180. i -= 1
  2181. b[i] = digits[0]
  2182. }
  2183. msgh = append(msgh, b[i:]...)
  2184. msgh = append(msgh, _CRLF_...)
  2185. _, err := nc.bw.Write(msgh)
  2186. if err == nil {
  2187. _, err = nc.bw.Write(data)
  2188. }
  2189. if err == nil {
  2190. _, err = nc.bw.WriteString(_CRLF_)
  2191. }
  2192. if err != nil {
  2193. nc.mu.Unlock()
  2194. return err
  2195. }
  2196. nc.OutMsgs++
  2197. nc.OutBytes += uint64(len(data))
  2198. if len(nc.fch) == 0 {
  2199. nc.kickFlusher()
  2200. }
  2201. nc.mu.Unlock()
  2202. return nil
  2203. }
  2204. // respHandler is the global response handler. It will look up
  2205. // the appropriate channel based on the last token and place
  2206. // the message on the channel if possible.
  2207. func (nc *Conn) respHandler(m *Msg) {
  2208. rt := respToken(m.Subject)
  2209. nc.mu.Lock()
  2210. // Just return if closed.
  2211. if nc.isClosed() {
  2212. nc.mu.Unlock()
  2213. return
  2214. }
  2215. // Grab mch
  2216. mch := nc.respMap[rt]
  2217. // Delete the key regardless, one response only.
  2218. // FIXME(dlc) - should we track responses past 1
  2219. // just statistics wise?
  2220. delete(nc.respMap, rt)
  2221. nc.mu.Unlock()
  2222. // Don't block, let Request timeout instead, mch is
  2223. // buffered and we should delete the key before a
  2224. // second response is processed.
  2225. select {
  2226. case mch <- m:
  2227. default:
  2228. return
  2229. }
  2230. }
  2231. // Create the response subscription we will use for all
  2232. // new style responses. This will be on an _INBOX with an
  2233. // additional terminal token. The subscription will be on
  2234. // a wildcard. Caller is responsible for ensuring this is
  2235. // only called once.
  2236. func (nc *Conn) createRespMux(respSub string) error {
  2237. s, err := nc.Subscribe(respSub, nc.respHandler)
  2238. if err != nil {
  2239. return err
  2240. }
  2241. nc.mu.Lock()
  2242. nc.respMux = s
  2243. nc.mu.Unlock()
  2244. return nil
  2245. }
  2246. // Request will send a request payload and deliver the response message,
  2247. // or an error, including a timeout if no message was received properly.
  2248. func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
  2249. if nc == nil {
  2250. return nil, ErrInvalidConnection
  2251. }
  2252. nc.mu.Lock()
  2253. // If user wants the old style.
  2254. if nc.Opts.UseOldRequestStyle {
  2255. nc.mu.Unlock()
  2256. return nc.oldRequest(subj, data, timeout)
  2257. }
  2258. // Do setup for the new style.
  2259. if nc.respMap == nil {
  2260. nc.initNewResp()
  2261. }
  2262. // Create literal Inbox and map to a chan msg.
  2263. mch := make(chan *Msg, RequestChanLen)
  2264. respInbox := nc.newRespInbox()
  2265. token := respToken(respInbox)
  2266. nc.respMap[token] = mch
  2267. createSub := nc.respMux == nil
  2268. ginbox := nc.respSub
  2269. nc.mu.Unlock()
  2270. if createSub {
  2271. // Make sure scoped subscription is setup only once.
  2272. var err error
  2273. nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
  2274. if err != nil {
  2275. return nil, err
  2276. }
  2277. }
  2278. if err := nc.PublishRequest(subj, respInbox, data); err != nil {
  2279. return nil, err
  2280. }
  2281. t := globalTimerPool.Get(timeout)
  2282. defer globalTimerPool.Put(t)
  2283. var ok bool
  2284. var msg *Msg
  2285. select {
  2286. case msg, ok = <-mch:
  2287. if !ok {
  2288. return nil, ErrConnectionClosed
  2289. }
  2290. case <-t.C:
  2291. nc.mu.Lock()
  2292. delete(nc.respMap, token)
  2293. nc.mu.Unlock()
  2294. return nil, ErrTimeout
  2295. }
  2296. return msg, nil
  2297. }
  2298. // oldRequest will create an Inbox and perform a Request() call
  2299. // with the Inbox reply and return the first reply received.
  2300. // This is optimized for the case of multiple responses.
  2301. func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Msg, error) {
  2302. inbox := NewInbox()
  2303. ch := make(chan *Msg, RequestChanLen)
  2304. s, err := nc.subscribe(inbox, _EMPTY_, nil, ch)
  2305. if err != nil {
  2306. return nil, err
  2307. }
  2308. s.AutoUnsubscribe(1)
  2309. defer s.Unsubscribe()
  2310. err = nc.PublishRequest(subj, inbox, data)
  2311. if err != nil {
  2312. return nil, err
  2313. }
  2314. return s.NextMsg(timeout)
  2315. }
  2316. // InboxPrefix is the prefix for all inbox subjects.
  2317. const (
  2318. InboxPrefix = "_INBOX."
  2319. inboxPrefixLen = len(InboxPrefix)
  2320. respInboxPrefixLen = inboxPrefixLen + nuidSize + 1
  2321. replySuffixLen = 8 // Gives us 62^8
  2322. rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
  2323. base = 62
  2324. )
  2325. // NewInbox will return an inbox string which can be used for directed replies from
  2326. // subscribers. These are guaranteed to be unique, but can be shared and subscribed
  2327. // to by others.
  2328. func NewInbox() string {
  2329. var b [inboxPrefixLen + nuidSize]byte
  2330. pres := b[:inboxPrefixLen]
  2331. copy(pres, InboxPrefix)
  2332. ns := b[inboxPrefixLen:]
  2333. copy(ns, nuid.Next())
  2334. return string(b[:])
  2335. }
  2336. // Function to init new response structures.
  2337. func (nc *Conn) initNewResp() {
  2338. // _INBOX wildcard
  2339. nc.respSub = fmt.Sprintf("%s.*", NewInbox())
  2340. nc.respMap = make(map[string]chan *Msg)
  2341. nc.respRand = rand.New(rand.NewSource(time.Now().UnixNano()))
  2342. }
  2343. // newRespInbox creates a new literal response subject
  2344. // that will trigger the mux subscription handler.
  2345. // Lock should be held.
  2346. func (nc *Conn) newRespInbox() string {
  2347. if nc.respMap == nil {
  2348. nc.initNewResp()
  2349. }
  2350. var b [respInboxPrefixLen + replySuffixLen]byte
  2351. pres := b[:respInboxPrefixLen]
  2352. copy(pres, nc.respSub)
  2353. rn := nc.respRand.Int63()
  2354. for i, l := respInboxPrefixLen, rn; i < len(b); i++ {
  2355. b[i] = rdigits[l%base]
  2356. l /= base
  2357. }
  2358. return string(b[:])
  2359. }
  2360. // NewRespInbox is the new format used for _INBOX.
  2361. func (nc *Conn) NewRespInbox() string {
  2362. nc.mu.Lock()
  2363. s := nc.newRespInbox()
  2364. nc.mu.Unlock()
  2365. return s
  2366. }
  2367. // respToken will return the last token of a literal response inbox
  2368. // which we use for the message channel lookup.
  2369. func respToken(respInbox string) string {
  2370. return respInbox[respInboxPrefixLen:]
  2371. }
  2372. // Subscribe will express interest in the given subject. The subject
  2373. // can have wildcards (partial:*, full:>). Messages will be delivered
  2374. // to the associated MsgHandler.
  2375. func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
  2376. return nc.subscribe(subj, _EMPTY_, cb, nil)
  2377. }
  2378. // ChanSubscribe will express interest in the given subject and place
  2379. // all messages received on the channel.
  2380. // You should not close the channel until sub.Unsubscribe() has been called.
  2381. func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
  2382. return nc.subscribe(subj, _EMPTY_, nil, ch)
  2383. }
  2384. // ChanQueueSubscribe will express interest in the given subject.
  2385. // All subscribers with the same queue name will form the queue group
  2386. // and only one member of the group will be selected to receive any given message,
  2387. // which will be placed on the channel.
  2388. // You should not close the channel until sub.Unsubscribe() has been called.
  2389. // Note: This is the same than QueueSubscribeSyncWithChan.
  2390. func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
  2391. return nc.subscribe(subj, group, nil, ch)
  2392. }
  2393. // SubscribeSync will express interest on the given subject. Messages will
  2394. // be received synchronously using Subscription.NextMsg().
  2395. func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
  2396. if nc == nil {
  2397. return nil, ErrInvalidConnection
  2398. }
  2399. mch := make(chan *Msg, nc.Opts.SubChanLen)
  2400. s, e := nc.subscribe(subj, _EMPTY_, nil, mch)
  2401. if s != nil {
  2402. s.typ = SyncSubscription
  2403. }
  2404. return s, e
  2405. }
  2406. // QueueSubscribe creates an asynchronous queue subscriber on the given subject.
  2407. // All subscribers with the same queue name will form the queue group and
  2408. // only one member of the group will be selected to receive any given
  2409. // message asynchronously.
  2410. func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
  2411. return nc.subscribe(subj, queue, cb, nil)
  2412. }
  2413. // QueueSubscribeSync creates a synchronous queue subscriber on the given
  2414. // subject. All subscribers with the same queue name will form the queue
  2415. // group and only one member of the group will be selected to receive any
  2416. // given message synchronously using Subscription.NextMsg().
  2417. func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
  2418. mch := make(chan *Msg, nc.Opts.SubChanLen)
  2419. s, e := nc.subscribe(subj, queue, nil, mch)
  2420. if s != nil {
  2421. s.typ = SyncSubscription
  2422. }
  2423. return s, e
  2424. }
  2425. // QueueSubscribeSyncWithChan will express interest in the given subject.
  2426. // All subscribers with the same queue name will form the queue group
  2427. // and only one member of the group will be selected to receive any given message,
  2428. // which will be placed on the channel.
  2429. // You should not close the channel until sub.Unsubscribe() has been called.
  2430. // Note: This is the same than ChanQueueSubscribe.
  2431. func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
  2432. return nc.subscribe(subj, queue, nil, ch)
  2433. }
  2434. // subscribe is the internal subscribe function that indicates interest in a subject.
  2435. func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Subscription, error) {
  2436. if nc == nil {
  2437. return nil, ErrInvalidConnection
  2438. }
  2439. nc.mu.Lock()
  2440. // ok here, but defer is generally expensive
  2441. defer nc.mu.Unlock()
  2442. // Check for some error conditions.
  2443. if nc.isClosed() {
  2444. return nil, ErrConnectionClosed
  2445. }
  2446. if nc.isDraining() {
  2447. return nil, ErrConnectionDraining
  2448. }
  2449. if cb == nil && ch == nil {
  2450. return nil, ErrBadSubscription
  2451. }
  2452. sub := &Subscription{Subject: subj, Queue: queue, mcb: cb, conn: nc}
  2453. // Set pending limits.
  2454. sub.pMsgsLimit = DefaultSubPendingMsgsLimit
  2455. sub.pBytesLimit = DefaultSubPendingBytesLimit
  2456. // If we have an async callback, start up a sub specific
  2457. // Go routine to deliver the messages.
  2458. if cb != nil {
  2459. sub.typ = AsyncSubscription
  2460. sub.pCond = sync.NewCond(&sub.mu)
  2461. go nc.waitForMsgs(sub)
  2462. } else {
  2463. sub.typ = ChanSubscription
  2464. sub.mch = ch
  2465. }
  2466. nc.subsMu.Lock()
  2467. nc.ssid++
  2468. sub.sid = nc.ssid
  2469. nc.subs[sub.sid] = sub
  2470. nc.subsMu.Unlock()
  2471. // We will send these for all subs when we reconnect
  2472. // so that we can suppress here if reconnecting.
  2473. if !nc.isReconnecting() {
  2474. fmt.Fprintf(nc.bw, subProto, subj, queue, sub.sid)
  2475. // Kick flusher if needed.
  2476. if len(nc.fch) == 0 {
  2477. nc.kickFlusher()
  2478. }
  2479. }
  2480. return sub, nil
  2481. }
  2482. // NumSubscriptions returns active number of subscriptions.
  2483. func (nc *Conn) NumSubscriptions() int {
  2484. nc.mu.Lock()
  2485. defer nc.mu.Unlock()
  2486. return len(nc.subs)
  2487. }
  2488. // Lock for nc should be held here upon entry
  2489. func (nc *Conn) removeSub(s *Subscription) {
  2490. nc.subsMu.Lock()
  2491. delete(nc.subs, s.sid)
  2492. nc.subsMu.Unlock()
  2493. s.mu.Lock()
  2494. defer s.mu.Unlock()
  2495. // Release callers on NextMsg for SyncSubscription only
  2496. if s.mch != nil && s.typ == SyncSubscription {
  2497. close(s.mch)
  2498. }
  2499. s.mch = nil
  2500. // Mark as invalid
  2501. s.conn = nil
  2502. s.closed = true
  2503. if s.pCond != nil {
  2504. s.pCond.Broadcast()
  2505. }
  2506. }
  2507. // SubscriptionType is the type of the Subscription.
  2508. type SubscriptionType int
  2509. // The different types of subscription types.
  2510. const (
  2511. AsyncSubscription = SubscriptionType(iota)
  2512. SyncSubscription
  2513. ChanSubscription
  2514. NilSubscription
  2515. )
  2516. // Type returns the type of Subscription.
  2517. func (s *Subscription) Type() SubscriptionType {
  2518. if s == nil {
  2519. return NilSubscription
  2520. }
  2521. s.mu.Lock()
  2522. defer s.mu.Unlock()
  2523. return s.typ
  2524. }
  2525. // IsValid returns a boolean indicating whether the subscription
  2526. // is still active. This will return false if the subscription has
  2527. // already been closed.
  2528. func (s *Subscription) IsValid() bool {
  2529. if s == nil {
  2530. return false
  2531. }
  2532. s.mu.Lock()
  2533. defer s.mu.Unlock()
  2534. return s.conn != nil
  2535. }
  2536. // Drain will remove interest but continue callbacks until all messages
  2537. // have been processed.
  2538. func (s *Subscription) Drain() error {
  2539. if s == nil {
  2540. return ErrBadSubscription
  2541. }
  2542. s.mu.Lock()
  2543. conn := s.conn
  2544. s.mu.Unlock()
  2545. if conn == nil {
  2546. return ErrBadSubscription
  2547. }
  2548. return conn.unsubscribe(s, 0, true)
  2549. }
  2550. // Unsubscribe will remove interest in the given subject.
  2551. func (s *Subscription) Unsubscribe() error {
  2552. if s == nil {
  2553. return ErrBadSubscription
  2554. }
  2555. s.mu.Lock()
  2556. conn := s.conn
  2557. s.mu.Unlock()
  2558. if conn == nil {
  2559. return ErrBadSubscription
  2560. }
  2561. if conn.IsDraining() {
  2562. return ErrConnectionDraining
  2563. }
  2564. return conn.unsubscribe(s, 0, false)
  2565. }
  2566. // checkDrained will watch for a subscription to be fully drained
  2567. // and then remove it.
  2568. func (nc *Conn) checkDrained(sub *Subscription) {
  2569. if nc == nil || sub == nil {
  2570. return
  2571. }
  2572. // This allows us to know that whatever we have in the client pending
  2573. // is correct and the server will not send additional information.
  2574. nc.Flush()
  2575. // Once we are here we just wait for Pending to reach 0 or
  2576. // any other state to exit this go routine.
  2577. for {
  2578. // check connection is still valid.
  2579. if nc.IsClosed() {
  2580. return
  2581. }
  2582. // Check subscription state
  2583. sub.mu.Lock()
  2584. conn := sub.conn
  2585. closed := sub.closed
  2586. pMsgs := sub.pMsgs
  2587. sub.mu.Unlock()
  2588. if conn == nil || closed || pMsgs == 0 {
  2589. nc.mu.Lock()
  2590. nc.removeSub(sub)
  2591. nc.mu.Unlock()
  2592. return
  2593. }
  2594. time.Sleep(100 * time.Millisecond)
  2595. }
  2596. }
  2597. // AutoUnsubscribe will issue an automatic Unsubscribe that is
  2598. // processed by the server when max messages have been received.
  2599. // This can be useful when sending a request to an unknown number
  2600. // of subscribers.
  2601. func (s *Subscription) AutoUnsubscribe(max int) error {
  2602. if s == nil {
  2603. return ErrBadSubscription
  2604. }
  2605. s.mu.Lock()
  2606. conn := s.conn
  2607. s.mu.Unlock()
  2608. if conn == nil {
  2609. return ErrBadSubscription
  2610. }
  2611. return conn.unsubscribe(s, max, false)
  2612. }
  2613. // unsubscribe performs the low level unsubscribe to the server.
  2614. // Use Subscription.Unsubscribe()
  2615. func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
  2616. nc.mu.Lock()
  2617. // ok here, but defer is expensive
  2618. defer nc.mu.Unlock()
  2619. defer nc.kickFlusher()
  2620. if nc.isClosed() {
  2621. return ErrConnectionClosed
  2622. }
  2623. nc.subsMu.RLock()
  2624. s := nc.subs[sub.sid]
  2625. nc.subsMu.RUnlock()
  2626. // Already unsubscribed
  2627. if s == nil {
  2628. return nil
  2629. }
  2630. maxStr := _EMPTY_
  2631. if max > 0 {
  2632. s.max = uint64(max)
  2633. maxStr = strconv.Itoa(max)
  2634. } else if !drainMode {
  2635. nc.removeSub(s)
  2636. }
  2637. if drainMode {
  2638. go nc.checkDrained(sub)
  2639. }
  2640. // We will send these for all subs when we reconnect
  2641. // so that we can suppress here.
  2642. if !nc.isReconnecting() {
  2643. fmt.Fprintf(nc.bw, unsubProto, s.sid, maxStr)
  2644. }
  2645. return nil
  2646. }
  2647. // NextMsg will return the next message available to a synchronous subscriber
  2648. // or block until one is available. A timeout can be used to return when no
  2649. // message has been delivered.
  2650. func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
  2651. if s == nil {
  2652. return nil, ErrBadSubscription
  2653. }
  2654. s.mu.Lock()
  2655. err := s.validateNextMsgState()
  2656. if err != nil {
  2657. s.mu.Unlock()
  2658. return nil, err
  2659. }
  2660. // snapshot
  2661. mch := s.mch
  2662. s.mu.Unlock()
  2663. var ok bool
  2664. var msg *Msg
  2665. // If something is available right away, let's optimize that case.
  2666. select {
  2667. case msg, ok = <-mch:
  2668. if !ok {
  2669. return nil, ErrConnectionClosed
  2670. }
  2671. if err := s.processNextMsgDelivered(msg); err != nil {
  2672. return nil, err
  2673. } else {
  2674. return msg, nil
  2675. }
  2676. default:
  2677. }
  2678. // If we are here a message was not immediately available, so lets loop
  2679. // with a timeout.
  2680. t := globalTimerPool.Get(timeout)
  2681. defer globalTimerPool.Put(t)
  2682. select {
  2683. case msg, ok = <-mch:
  2684. if !ok {
  2685. return nil, ErrConnectionClosed
  2686. }
  2687. if err := s.processNextMsgDelivered(msg); err != nil {
  2688. return nil, err
  2689. }
  2690. case <-t.C:
  2691. return nil, ErrTimeout
  2692. }
  2693. return msg, nil
  2694. }
  2695. // validateNextMsgState checks whether the subscription is in a valid
  2696. // state to call NextMsg and be delivered another message synchronously.
  2697. // This should be called while holding the lock.
  2698. func (s *Subscription) validateNextMsgState() error {
  2699. if s.connClosed {
  2700. return ErrConnectionClosed
  2701. }
  2702. if s.mch == nil {
  2703. if s.max > 0 && s.delivered >= s.max {
  2704. return ErrMaxMessages
  2705. } else if s.closed {
  2706. return ErrBadSubscription
  2707. }
  2708. }
  2709. if s.mcb != nil {
  2710. return ErrSyncSubRequired
  2711. }
  2712. if s.sc {
  2713. s.sc = false
  2714. return ErrSlowConsumer
  2715. }
  2716. return nil
  2717. }
  2718. // processNextMsgDelivered takes a message and applies the needed
  2719. // accounting to the stats from the subscription, returning an
  2720. // error in case we have the maximum number of messages have been
  2721. // delivered already. It should not be called while holding the lock.
  2722. func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
  2723. s.mu.Lock()
  2724. nc := s.conn
  2725. max := s.max
  2726. // Update some stats.
  2727. s.delivered++
  2728. delivered := s.delivered
  2729. if s.typ == SyncSubscription {
  2730. s.pMsgs--
  2731. s.pBytes -= len(msg.Data)
  2732. }
  2733. s.mu.Unlock()
  2734. if max > 0 {
  2735. if delivered > max {
  2736. return ErrMaxMessages
  2737. }
  2738. // Remove subscription if we have reached max.
  2739. if delivered == max {
  2740. nc.mu.Lock()
  2741. nc.removeSub(s)
  2742. nc.mu.Unlock()
  2743. }
  2744. }
  2745. return nil
  2746. }
  2747. // Queued returns the number of queued messages in the client for this subscription.
  2748. // DEPRECATED: Use Pending()
  2749. func (s *Subscription) QueuedMsgs() (int, error) {
  2750. m, _, err := s.Pending()
  2751. return int(m), err
  2752. }
  2753. // Pending returns the number of queued messages and queued bytes in the client for this subscription.
  2754. func (s *Subscription) Pending() (int, int, error) {
  2755. if s == nil {
  2756. return -1, -1, ErrBadSubscription
  2757. }
  2758. s.mu.Lock()
  2759. defer s.mu.Unlock()
  2760. if s.conn == nil {
  2761. return -1, -1, ErrBadSubscription
  2762. }
  2763. if s.typ == ChanSubscription {
  2764. return -1, -1, ErrTypeSubscription
  2765. }
  2766. return s.pMsgs, s.pBytes, nil
  2767. }
  2768. // MaxPending returns the maximum number of queued messages and queued bytes seen so far.
  2769. func (s *Subscription) MaxPending() (int, int, error) {
  2770. if s == nil {
  2771. return -1, -1, ErrBadSubscription
  2772. }
  2773. s.mu.Lock()
  2774. defer s.mu.Unlock()
  2775. if s.conn == nil {
  2776. return -1, -1, ErrBadSubscription
  2777. }
  2778. if s.typ == ChanSubscription {
  2779. return -1, -1, ErrTypeSubscription
  2780. }
  2781. return s.pMsgsMax, s.pBytesMax, nil
  2782. }
  2783. // ClearMaxPending resets the maximums seen so far.
  2784. func (s *Subscription) ClearMaxPending() error {
  2785. if s == nil {
  2786. return ErrBadSubscription
  2787. }
  2788. s.mu.Lock()
  2789. defer s.mu.Unlock()
  2790. if s.conn == nil {
  2791. return ErrBadSubscription
  2792. }
  2793. if s.typ == ChanSubscription {
  2794. return ErrTypeSubscription
  2795. }
  2796. s.pMsgsMax, s.pBytesMax = 0, 0
  2797. return nil
  2798. }
  2799. // Pending Limits
  2800. const (
  2801. DefaultSubPendingMsgsLimit = 65536
  2802. DefaultSubPendingBytesLimit = 65536 * 1024
  2803. )
  2804. // PendingLimits returns the current limits for this subscription.
  2805. // If no error is returned, a negative value indicates that the
  2806. // given metric is not limited.
  2807. func (s *Subscription) PendingLimits() (int, int, error) {
  2808. if s == nil {
  2809. return -1, -1, ErrBadSubscription
  2810. }
  2811. s.mu.Lock()
  2812. defer s.mu.Unlock()
  2813. if s.conn == nil {
  2814. return -1, -1, ErrBadSubscription
  2815. }
  2816. if s.typ == ChanSubscription {
  2817. return -1, -1, ErrTypeSubscription
  2818. }
  2819. return s.pMsgsLimit, s.pBytesLimit, nil
  2820. }
  2821. // SetPendingLimits sets the limits for pending msgs and bytes for this subscription.
  2822. // Zero is not allowed. Any negative value means that the given metric is not limited.
  2823. func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error {
  2824. if s == nil {
  2825. return ErrBadSubscription
  2826. }
  2827. s.mu.Lock()
  2828. defer s.mu.Unlock()
  2829. if s.conn == nil {
  2830. return ErrBadSubscription
  2831. }
  2832. if s.typ == ChanSubscription {
  2833. return ErrTypeSubscription
  2834. }
  2835. if msgLimit == 0 || bytesLimit == 0 {
  2836. return ErrInvalidArg
  2837. }
  2838. s.pMsgsLimit, s.pBytesLimit = msgLimit, bytesLimit
  2839. return nil
  2840. }
  2841. // Delivered returns the number of delivered messages for this subscription.
  2842. func (s *Subscription) Delivered() (int64, error) {
  2843. if s == nil {
  2844. return -1, ErrBadSubscription
  2845. }
  2846. s.mu.Lock()
  2847. defer s.mu.Unlock()
  2848. if s.conn == nil {
  2849. return -1, ErrBadSubscription
  2850. }
  2851. return int64(s.delivered), nil
  2852. }
  2853. // Dropped returns the number of known dropped messages for this subscription.
  2854. // This will correspond to messages dropped by violations of PendingLimits. If
  2855. // the server declares the connection a SlowConsumer, this number may not be
  2856. // valid.
  2857. func (s *Subscription) Dropped() (int, error) {
  2858. if s == nil {
  2859. return -1, ErrBadSubscription
  2860. }
  2861. s.mu.Lock()
  2862. defer s.mu.Unlock()
  2863. if s.conn == nil {
  2864. return -1, ErrBadSubscription
  2865. }
  2866. return s.dropped, nil
  2867. }
  2868. // FIXME: This is a hack
  2869. // removeFlushEntry is needed when we need to discard queued up responses
  2870. // for our pings as part of a flush call. This happens when we have a flush
  2871. // call outstanding and we call close.
  2872. func (nc *Conn) removeFlushEntry(ch chan struct{}) bool {
  2873. nc.mu.Lock()
  2874. defer nc.mu.Unlock()
  2875. if nc.pongs == nil {
  2876. return false
  2877. }
  2878. for i, c := range nc.pongs {
  2879. if c == ch {
  2880. nc.pongs[i] = nil
  2881. return true
  2882. }
  2883. }
  2884. return false
  2885. }
  2886. // The lock must be held entering this function.
  2887. func (nc *Conn) sendPing(ch chan struct{}) {
  2888. nc.pongs = append(nc.pongs, ch)
  2889. nc.bw.WriteString(pingProto)
  2890. // Flush in place.
  2891. nc.bw.Flush()
  2892. }
  2893. // This will fire periodically and send a client origin
  2894. // ping to the server. Will also check that we have received
  2895. // responses from the server.
  2896. func (nc *Conn) processPingTimer() {
  2897. nc.mu.Lock()
  2898. if nc.status != CONNECTED {
  2899. nc.mu.Unlock()
  2900. return
  2901. }
  2902. // Check for violation
  2903. nc.pout++
  2904. if nc.pout > nc.Opts.MaxPingsOut {
  2905. nc.mu.Unlock()
  2906. nc.processOpErr(ErrStaleConnection)
  2907. return
  2908. }
  2909. nc.sendPing(nil)
  2910. nc.ptmr.Reset(nc.Opts.PingInterval)
  2911. nc.mu.Unlock()
  2912. }
  2913. // FlushTimeout allows a Flush operation to have an associated timeout.
  2914. func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) {
  2915. if nc == nil {
  2916. return ErrInvalidConnection
  2917. }
  2918. if timeout <= 0 {
  2919. return ErrBadTimeout
  2920. }
  2921. nc.mu.Lock()
  2922. if nc.isClosed() {
  2923. nc.mu.Unlock()
  2924. return ErrConnectionClosed
  2925. }
  2926. t := globalTimerPool.Get(timeout)
  2927. defer globalTimerPool.Put(t)
  2928. // Create a buffered channel to prevent chan send to block
  2929. // in processPong() if this code here times out just when
  2930. // PONG was received.
  2931. ch := make(chan struct{}, 1)
  2932. nc.sendPing(ch)
  2933. nc.mu.Unlock()
  2934. select {
  2935. case _, ok := <-ch:
  2936. if !ok {
  2937. err = ErrConnectionClosed
  2938. } else {
  2939. close(ch)
  2940. }
  2941. case <-t.C:
  2942. err = ErrTimeout
  2943. }
  2944. if err != nil {
  2945. nc.removeFlushEntry(ch)
  2946. }
  2947. return
  2948. }
  2949. // Flush will perform a round trip to the server and return when it
  2950. // receives the internal reply.
  2951. func (nc *Conn) Flush() error {
  2952. return nc.FlushTimeout(60 * time.Second)
  2953. }
  2954. // Buffered will return the number of bytes buffered to be sent to the server.
  2955. // FIXME(dlc) take into account disconnected state.
  2956. func (nc *Conn) Buffered() (int, error) {
  2957. nc.mu.Lock()
  2958. defer nc.mu.Unlock()
  2959. if nc.isClosed() || nc.bw == nil {
  2960. return -1, ErrConnectionClosed
  2961. }
  2962. return nc.bw.Buffered(), nil
  2963. }
  2964. // resendSubscriptions will send our subscription state back to the
  2965. // server. Used in reconnects
  2966. func (nc *Conn) resendSubscriptions() {
  2967. // Since we are going to send protocols to the server, we don't want to
  2968. // be holding the subsMu lock (which is used in processMsg). So copy
  2969. // the subscriptions in a temporary array.
  2970. nc.subsMu.RLock()
  2971. subs := make([]*Subscription, 0, len(nc.subs))
  2972. for _, s := range nc.subs {
  2973. subs = append(subs, s)
  2974. }
  2975. nc.subsMu.RUnlock()
  2976. for _, s := range subs {
  2977. adjustedMax := uint64(0)
  2978. s.mu.Lock()
  2979. if s.max > 0 {
  2980. if s.delivered < s.max {
  2981. adjustedMax = s.max - s.delivered
  2982. }
  2983. // adjustedMax could be 0 here if the number of delivered msgs
  2984. // reached the max, if so unsubscribe.
  2985. if adjustedMax == 0 {
  2986. s.mu.Unlock()
  2987. fmt.Fprintf(nc.bw, unsubProto, s.sid, _EMPTY_)
  2988. continue
  2989. }
  2990. }
  2991. s.mu.Unlock()
  2992. fmt.Fprintf(nc.bw, subProto, s.Subject, s.Queue, s.sid)
  2993. if adjustedMax > 0 {
  2994. maxStr := strconv.Itoa(int(adjustedMax))
  2995. fmt.Fprintf(nc.bw, unsubProto, s.sid, maxStr)
  2996. }
  2997. }
  2998. }
  2999. // This will clear any pending flush calls and release pending calls.
  3000. // Lock is assumed to be held by the caller.
  3001. func (nc *Conn) clearPendingFlushCalls() {
  3002. // Clear any queued pongs, e.g. pending flush calls.
  3003. for _, ch := range nc.pongs {
  3004. if ch != nil {
  3005. close(ch)
  3006. }
  3007. }
  3008. nc.pongs = nil
  3009. }
  3010. // This will clear any pending Request calls.
  3011. // Lock is assumed to be held by the caller.
  3012. func (nc *Conn) clearPendingRequestCalls() {
  3013. if nc.respMap == nil {
  3014. return
  3015. }
  3016. for key, ch := range nc.respMap {
  3017. if ch != nil {
  3018. close(ch)
  3019. delete(nc.respMap, key)
  3020. }
  3021. }
  3022. }
  3023. // Low level close call that will do correct cleanup and set
  3024. // desired status. Also controls whether user defined callbacks
  3025. // will be triggered. The lock should not be held entering this
  3026. // function. This function will handle the locking manually.
  3027. func (nc *Conn) close(status Status, doCBs bool) {
  3028. nc.mu.Lock()
  3029. if nc.isClosed() {
  3030. nc.status = status
  3031. nc.mu.Unlock()
  3032. return
  3033. }
  3034. nc.status = CLOSED
  3035. // Kick the Go routines so they fall out.
  3036. nc.kickFlusher()
  3037. nc.mu.Unlock()
  3038. nc.mu.Lock()
  3039. // Clear any queued pongs, e.g. pending flush calls.
  3040. nc.clearPendingFlushCalls()
  3041. // Clear any queued and blocking Requests.
  3042. nc.clearPendingRequestCalls()
  3043. // Stop ping timer if set.
  3044. nc.stopPingTimer()
  3045. nc.ptmr = nil
  3046. // Go ahead and make sure we have flushed the outbound
  3047. if nc.conn != nil {
  3048. nc.bw.Flush()
  3049. defer nc.conn.Close()
  3050. }
  3051. // Close sync subscriber channels and release any
  3052. // pending NextMsg() calls.
  3053. nc.subsMu.Lock()
  3054. for _, s := range nc.subs {
  3055. s.mu.Lock()
  3056. // Release callers on NextMsg for SyncSubscription only
  3057. if s.mch != nil && s.typ == SyncSubscription {
  3058. close(s.mch)
  3059. }
  3060. s.mch = nil
  3061. // Mark as invalid, for signaling to deliverMsgs
  3062. s.closed = true
  3063. // Mark connection closed in subscription
  3064. s.connClosed = true
  3065. // If we have an async subscription, signals it to exit
  3066. if s.typ == AsyncSubscription && s.pCond != nil {
  3067. s.pCond.Signal()
  3068. }
  3069. s.mu.Unlock()
  3070. }
  3071. nc.subs = nil
  3072. nc.subsMu.Unlock()
  3073. nc.status = status
  3074. // Perform appropriate callback if needed for a disconnect.
  3075. if doCBs {
  3076. if nc.Opts.DisconnectedCB != nil && nc.conn != nil {
  3077. nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
  3078. }
  3079. if nc.Opts.ClosedCB != nil {
  3080. nc.ach.push(func() { nc.Opts.ClosedCB(nc) })
  3081. }
  3082. nc.ach.close()
  3083. }
  3084. nc.mu.Unlock()
  3085. }
  3086. // Close will close the connection to the server. This call will release
  3087. // all blocking calls, such as Flush() and NextMsg()
  3088. func (nc *Conn) Close() {
  3089. nc.close(CLOSED, true)
  3090. }
  3091. // IsClosed tests if a Conn has been closed.
  3092. func (nc *Conn) IsClosed() bool {
  3093. nc.mu.Lock()
  3094. defer nc.mu.Unlock()
  3095. return nc.isClosed()
  3096. }
  3097. // IsReconnecting tests if a Conn is reconnecting.
  3098. func (nc *Conn) IsReconnecting() bool {
  3099. nc.mu.Lock()
  3100. defer nc.mu.Unlock()
  3101. return nc.isReconnecting()
  3102. }
  3103. // IsConnected tests if a Conn is connected.
  3104. func (nc *Conn) IsConnected() bool {
  3105. nc.mu.Lock()
  3106. defer nc.mu.Unlock()
  3107. return nc.isConnected()
  3108. }
  3109. // drainConnection will run in a separate Go routine and will
  3110. // flush all publishes and drain all active subscriptions.
  3111. func (nc *Conn) drainConnection() {
  3112. // Snapshot subs list.
  3113. nc.mu.Lock()
  3114. subs := make([]*Subscription, 0, len(nc.subs))
  3115. for _, s := range nc.subs {
  3116. subs = append(subs, s)
  3117. }
  3118. errCB := nc.Opts.AsyncErrorCB
  3119. drainWait := nc.Opts.DrainTimeout
  3120. nc.mu.Unlock()
  3121. // for pushing errors with context.
  3122. pushErr := func(err error) {
  3123. nc.mu.Lock()
  3124. nc.err = err
  3125. if errCB != nil {
  3126. nc.ach.push(func() { errCB(nc, nil, err) })
  3127. }
  3128. nc.mu.Unlock()
  3129. }
  3130. // Do subs first
  3131. for _, s := range subs {
  3132. if err := s.Drain(); err != nil {
  3133. // We will notify about these but continue.
  3134. pushErr(err)
  3135. }
  3136. }
  3137. // Wait for the subscriptions to drop to zero.
  3138. timeout := time.Now().Add(drainWait)
  3139. for time.Now().Before(timeout) {
  3140. if nc.NumSubscriptions() == 0 {
  3141. break
  3142. }
  3143. time.Sleep(10 * time.Millisecond)
  3144. }
  3145. // Check if we timed out.
  3146. if nc.NumSubscriptions() != 0 {
  3147. pushErr(ErrDrainTimeout)
  3148. }
  3149. // Flip State
  3150. nc.mu.Lock()
  3151. nc.status = DRAINING_PUBS
  3152. nc.mu.Unlock()
  3153. // Do publish drain via Flush() call.
  3154. err := nc.Flush()
  3155. if err != nil {
  3156. pushErr(err)
  3157. nc.Close()
  3158. return
  3159. }
  3160. // Move to closed state.
  3161. nc.Close()
  3162. }
  3163. // Drain will put a connection into a drain state. All subscriptions will
  3164. // immediately be put into a drain state. Upon completion, the publishers
  3165. // will be drained and can not publish any additional messages. Upon draining
  3166. // of the publishers, the connection will be closed. Use the ClosedCB()
  3167. // option to know when the connection has moved from draining to closed.
  3168. func (nc *Conn) Drain() error {
  3169. nc.mu.Lock()
  3170. defer nc.mu.Unlock()
  3171. if nc.isClosed() {
  3172. return ErrConnectionClosed
  3173. }
  3174. if nc.isConnecting() || nc.isReconnecting() {
  3175. return ErrConnectionReconnecting
  3176. }
  3177. if nc.isDraining() {
  3178. return nil
  3179. }
  3180. nc.status = DRAINING_SUBS
  3181. go nc.drainConnection()
  3182. return nil
  3183. }
  3184. // IsDraining tests if a Conn is in the draining state.
  3185. func (nc *Conn) IsDraining() bool {
  3186. nc.mu.Lock()
  3187. defer nc.mu.Unlock()
  3188. return nc.isDraining()
  3189. }
  3190. // caller must lock
  3191. func (nc *Conn) getServers(implicitOnly bool) []string {
  3192. poolSize := len(nc.srvPool)
  3193. var servers = make([]string, 0)
  3194. for i := 0; i < poolSize; i++ {
  3195. if implicitOnly && !nc.srvPool[i].isImplicit {
  3196. continue
  3197. }
  3198. url := nc.srvPool[i].url
  3199. servers = append(servers, fmt.Sprintf("%s://%s", url.Scheme, url.Host))
  3200. }
  3201. return servers
  3202. }
  3203. // Servers returns the list of known server urls, including additional
  3204. // servers discovered after a connection has been established. If
  3205. // authentication is enabled, use UserInfo or Token when connecting with
  3206. // these urls.
  3207. func (nc *Conn) Servers() []string {
  3208. nc.mu.Lock()
  3209. defer nc.mu.Unlock()
  3210. return nc.getServers(false)
  3211. }
  3212. // DiscoveredServers returns only the server urls that have been discovered
  3213. // after a connection has been established. If authentication is enabled,
  3214. // use UserInfo or Token when connecting with these urls.
  3215. func (nc *Conn) DiscoveredServers() []string {
  3216. nc.mu.Lock()
  3217. defer nc.mu.Unlock()
  3218. return nc.getServers(true)
  3219. }
  3220. // Status returns the current state of the connection.
  3221. func (nc *Conn) Status() Status {
  3222. nc.mu.Lock()
  3223. defer nc.mu.Unlock()
  3224. return nc.status
  3225. }
  3226. // Test if Conn has been closed Lock is assumed held.
  3227. func (nc *Conn) isClosed() bool {
  3228. return nc.status == CLOSED
  3229. }
  3230. // Test if Conn is in the process of connecting
  3231. func (nc *Conn) isConnecting() bool {
  3232. return nc.status == CONNECTING
  3233. }
  3234. // Test if Conn is being reconnected.
  3235. func (nc *Conn) isReconnecting() bool {
  3236. return nc.status == RECONNECTING
  3237. }
  3238. // Test if Conn is connected or connecting.
  3239. func (nc *Conn) isConnected() bool {
  3240. return nc.status == CONNECTED || nc.isDraining()
  3241. }
  3242. // Test if Conn is in the draining state.
  3243. func (nc *Conn) isDraining() bool {
  3244. return nc.status == DRAINING_SUBS || nc.status == DRAINING_PUBS
  3245. }
  3246. // Test if Conn is in the draining state for pubs.
  3247. func (nc *Conn) isDrainingPubs() bool {
  3248. return nc.status == DRAINING_PUBS
  3249. }
  3250. // Stats will return a race safe copy of the Statistics section for the connection.
  3251. func (nc *Conn) Stats() Statistics {
  3252. // Stats are updated either under connection's mu or subsMu mutexes.
  3253. // Lock both to safely get them.
  3254. nc.mu.Lock()
  3255. nc.subsMu.RLock()
  3256. stats := Statistics{
  3257. InMsgs: nc.InMsgs,
  3258. InBytes: nc.InBytes,
  3259. OutMsgs: nc.OutMsgs,
  3260. OutBytes: nc.OutBytes,
  3261. Reconnects: nc.Reconnects,
  3262. }
  3263. nc.subsMu.RUnlock()
  3264. nc.mu.Unlock()
  3265. return stats
  3266. }
  3267. // MaxPayload returns the size limit that a message payload can have.
  3268. // This is set by the server configuration and delivered to the client
  3269. // upon connect.
  3270. func (nc *Conn) MaxPayload() int64 {
  3271. nc.mu.Lock()
  3272. defer nc.mu.Unlock()
  3273. return nc.info.MaxPayload
  3274. }
  3275. // AuthRequired will return if the connected server requires authorization.
  3276. func (nc *Conn) AuthRequired() bool {
  3277. nc.mu.Lock()
  3278. defer nc.mu.Unlock()
  3279. return nc.info.AuthRequired
  3280. }
  3281. // TLSRequired will return if the connected server requires TLS connections.
  3282. func (nc *Conn) TLSRequired() bool {
  3283. nc.mu.Lock()
  3284. defer nc.mu.Unlock()
  3285. return nc.info.TLSRequired
  3286. }
  3287. // Barrier schedules the given function `f` to all registered asynchronous
  3288. // subscriptions.
  3289. // Only the last subscription to see this barrier will invoke the function.
  3290. // If no subscription is registered at the time of this call, `f()` is invoked
  3291. // right away.
  3292. // ErrConnectionClosed is returned if the connection is closed prior to
  3293. // the call.
  3294. func (nc *Conn) Barrier(f func()) error {
  3295. nc.mu.Lock()
  3296. if nc.isClosed() {
  3297. nc.mu.Unlock()
  3298. return ErrConnectionClosed
  3299. }
  3300. nc.subsMu.Lock()
  3301. // Need to figure out how many non chan subscriptions there are
  3302. numSubs := 0
  3303. for _, sub := range nc.subs {
  3304. if sub.typ == AsyncSubscription {
  3305. numSubs++
  3306. }
  3307. }
  3308. if numSubs == 0 {
  3309. nc.subsMu.Unlock()
  3310. nc.mu.Unlock()
  3311. f()
  3312. return nil
  3313. }
  3314. barrier := &barrierInfo{refs: int64(numSubs), f: f}
  3315. for _, sub := range nc.subs {
  3316. sub.mu.Lock()
  3317. if sub.mch == nil {
  3318. msg := &Msg{barrier: barrier}
  3319. // Push onto the async pList
  3320. if sub.pTail != nil {
  3321. sub.pTail.next = msg
  3322. } else {
  3323. sub.pHead = msg
  3324. sub.pCond.Signal()
  3325. }
  3326. sub.pTail = msg
  3327. }
  3328. sub.mu.Unlock()
  3329. }
  3330. nc.subsMu.Unlock()
  3331. nc.mu.Unlock()
  3332. return nil
  3333. }
  3334. // GetClientID returns the client ID assigned by the server to which
  3335. // the client is currently connected to. Note that the value may change if
  3336. // the client reconnects.
  3337. // This function returns ErrNoClientIDReturned if the server is of a
  3338. // version prior to 1.2.0.
  3339. func (nc *Conn) GetClientID() (uint64, error) {
  3340. nc.mu.Lock()
  3341. defer nc.mu.Unlock()
  3342. if nc.isClosed() {
  3343. return 0, ErrConnectionClosed
  3344. }
  3345. if nc.info.CID == 0 {
  3346. return 0, ErrClientIDNotSupported
  3347. }
  3348. return nc.info.CID, nil
  3349. }
  3350. // NkeyOptionFromSeed will load an nkey pair from a seed file.
  3351. // It will return the NKey Option and will handle
  3352. // signing of nonce challenges from the server. It will take
  3353. // care to not hold keys in memory and to wipe memory.
  3354. func NkeyOptionFromSeed(seedFile string) (Option, error) {
  3355. kp, err := nkeyPairFromSeedFile(seedFile)
  3356. if err != nil {
  3357. return nil, err
  3358. }
  3359. // Wipe our key on exit.
  3360. defer kp.Wipe()
  3361. pub, err := kp.PublicKey()
  3362. if err != nil {
  3363. return nil, err
  3364. }
  3365. if !nkeys.IsValidPublicUserKey(pub) {
  3366. return nil, fmt.Errorf("nats: Not a valid nkey user seed")
  3367. }
  3368. sigCB := func(nonce []byte) ([]byte, error) {
  3369. return sigHandler(nonce, seedFile)
  3370. }
  3371. return Nkey(string(pub), sigCB), nil
  3372. }
  3373. // This is a regex to match decorated jwts in keys/seeds.
  3374. // .e.g.
  3375. // -----BEGIN NATS USER JWT-----
  3376. // eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5...
  3377. // ------END NATS USER JWT------
  3378. //
  3379. // ************************* IMPORTANT *************************
  3380. // NKEY Seed printed below can be used sign and prove identity.
  3381. // NKEYs are sensitive and should be treated as secrets.
  3382. //
  3383. // -----BEGIN USER NKEY SEED-----
  3384. // SUAIO3FHUX5PNV2LQIIP7TZ3N4L7TX3W53MQGEIVYFIGA635OZCKEYHFLM
  3385. // ------END USER NKEY SEED------
  3386. var nscDecoratedRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}[^\n]*[-]{3,}\n)(.+)(?:\n\s*[-]{3,}[^\n]*[-]{3,}\n))`)
  3387. func userFromFile(userFile string) (string, error) {
  3388. contents, err := ioutil.ReadFile(userFile)
  3389. if err != nil {
  3390. return _EMPTY_, fmt.Errorf("nats: %v", err)
  3391. }
  3392. defer wipeSlice(contents)
  3393. items := nscDecoratedRe.FindAllSubmatch(contents, -1)
  3394. if len(items) == 0 {
  3395. return string(contents), nil
  3396. }
  3397. // First result should be the user JWT.
  3398. // We copy here so that if the file contained a seed file too we wipe appropriately.
  3399. raw := items[0][1]
  3400. tmp := make([]byte, len(raw))
  3401. copy(tmp, raw)
  3402. return string(tmp), nil
  3403. }
  3404. func nkeyPairFromSeedFile(seedFile string) (nkeys.KeyPair, error) {
  3405. var seed []byte
  3406. contents, err := ioutil.ReadFile(seedFile)
  3407. if err != nil {
  3408. return nil, fmt.Errorf("nats: %v", err)
  3409. }
  3410. defer wipeSlice(contents)
  3411. items := nscDecoratedRe.FindAllSubmatch(contents, -1)
  3412. if len(items) > 1 {
  3413. seed = items[1][1]
  3414. } else {
  3415. lines := bytes.Split(contents, []byte("\n"))
  3416. for _, line := range lines {
  3417. if bytes.HasPrefix(bytes.TrimSpace(line), []byte("SU")) {
  3418. seed = line
  3419. break
  3420. }
  3421. }
  3422. }
  3423. if seed == nil {
  3424. return nil, fmt.Errorf("nats: No nkey user seed found in %q", seedFile)
  3425. }
  3426. kp, err := nkeys.FromSeed(seed)
  3427. if err != nil {
  3428. return nil, err
  3429. }
  3430. return kp, nil
  3431. }
  3432. // Sign authentication challenges from the server.
  3433. // Do not keep private seed in memory.
  3434. func sigHandler(nonce []byte, seedFile string) ([]byte, error) {
  3435. kp, err := nkeyPairFromSeedFile(seedFile)
  3436. if err != nil {
  3437. return nil, err
  3438. }
  3439. // Wipe our key on exit.
  3440. defer kp.Wipe()
  3441. sig, _ := kp.Sign(nonce)
  3442. return sig, nil
  3443. }
  3444. // Just wipe slice with 'x', for clearing contents of nkey seed file.
  3445. func wipeSlice(buf []byte) {
  3446. for i := range buf {
  3447. buf[i] = 'x'
  3448. }
  3449. }
  3450. type timeoutWriter struct {
  3451. timeout time.Duration
  3452. conn net.Conn
  3453. err error
  3454. }
  3455. // Write implements the io.Writer interface.
  3456. func (tw *timeoutWriter) Write(p []byte) (int, error) {
  3457. if tw.err != nil {
  3458. return 0, tw.err
  3459. }
  3460. var n int
  3461. tw.conn.SetWriteDeadline(time.Now().Add(tw.timeout))
  3462. n, tw.err = tw.conn.Write(p)
  3463. tw.conn.SetWriteDeadline(time.Time{})
  3464. return n, tw.err
  3465. }