nats.go 79 KB


  1. // Copyright 2012-2018 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. // A Go client for the NATS messaging system (https://nats.io).
  14. package nats
  15. import (
  16. "bufio"
  17. "bytes"
  18. "crypto/tls"
  19. "crypto/x509"
  20. "encoding/json"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "io/ioutil"
  25. "math/rand"
  26. "net"
  27. "net/url"
  28. "runtime"
  29. "strconv"
  30. "strings"
  31. "sync"
  32. "sync/atomic"
  33. "time"
  34. "github.com/nats-io/go-nats/util"
  35. "github.com/nats-io/nuid"
  36. )
  37. // Default Constants
  38. const (
  39. Version = "1.5.0"
  40. DefaultURL = "nats://localhost:4222"
  41. DefaultPort = 4222
  42. DefaultMaxReconnect = 60
  43. DefaultReconnectWait = 2 * time.Second
  44. DefaultTimeout = 2 * time.Second
  45. DefaultPingInterval = 2 * time.Minute
  46. DefaultMaxPingOut = 2
  47. DefaultMaxChanLen = 8192 // 8k
  48. DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
  49. RequestChanLen = 8
  50. LangString = "go"
  51. )
  52. // STALE_CONNECTION is for detection and proper handling of stale connections.
  53. const STALE_CONNECTION = "stale connection"
  54. // PERMISSIONS_ERR is for when nats server subject authorization has failed.
  55. const PERMISSIONS_ERR = "permissions violation"
  56. // AUTHORIZATION_ERR is for when nats server user authorization has failed.
  57. const AUTHORIZATION_ERR = "authorization violation"
  58. // Errors
  59. var (
  60. ErrConnectionClosed = errors.New("nats: connection closed")
  61. ErrSecureConnRequired = errors.New("nats: secure connection required")
  62. ErrSecureConnWanted = errors.New("nats: secure connection not available")
  63. ErrBadSubscription = errors.New("nats: invalid subscription")
  64. ErrTypeSubscription = errors.New("nats: invalid subscription type")
  65. ErrBadSubject = errors.New("nats: invalid subject")
  66. ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
  67. ErrTimeout = errors.New("nats: timeout")
  68. ErrBadTimeout = errors.New("nats: timeout invalid")
  69. ErrAuthorization = errors.New("nats: authorization violation")
  70. ErrNoServers = errors.New("nats: no servers available for connection")
  71. ErrJsonParse = errors.New("nats: connect message, json parse error")
  72. ErrChanArg = errors.New("nats: argument needs to be a channel type")
  73. ErrMaxPayload = errors.New("nats: maximum payload exceeded")
  74. ErrMaxMessages = errors.New("nats: maximum messages delivered")
  75. ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
  76. ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
  77. ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
  78. ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
  79. ErrInvalidConnection = errors.New("nats: invalid connection")
  80. ErrInvalidMsg = errors.New("nats: invalid message or message nil")
  81. ErrInvalidArg = errors.New("nats: invalid argument")
  82. ErrInvalidContext = errors.New("nats: invalid context")
  83. ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
  84. )
  85. // GetDefaultOptions returns default configuration options for the client.
  86. func GetDefaultOptions() Options {
  87. return Options{
  88. AllowReconnect: true,
  89. MaxReconnect: DefaultMaxReconnect,
  90. ReconnectWait: DefaultReconnectWait,
  91. Timeout: DefaultTimeout,
  92. PingInterval: DefaultPingInterval,
  93. MaxPingsOut: DefaultMaxPingOut,
  94. SubChanLen: DefaultMaxChanLen,
  95. ReconnectBufSize: DefaultReconnectBufSize,
  96. }
  97. }
  98. // DEPRECATED: Use GetDefaultOptions() instead.
  99. // DefaultOptions is not safe for use by multiple clients.
  100. // For details see #308.
  101. var DefaultOptions = GetDefaultOptions()
  102. // Status represents the state of the connection.
  103. type Status int
  104. const (
  105. DISCONNECTED = Status(iota)
  106. CONNECTED
  107. CLOSED
  108. RECONNECTING
  109. CONNECTING
  110. )
  111. // ConnHandler is used for asynchronous events such as
  112. // disconnected and closed connections.
  113. type ConnHandler func(*Conn)
  114. // ErrHandler is used to process asynchronous errors encountered
  115. // while processing inbound messages.
  116. type ErrHandler func(*Conn, *Subscription, error)
  117. // asyncCB is used to preserve order for async callbacks.
  118. type asyncCB func()
  119. // Option is a function on the options for a connection.
  120. type Option func(*Options) error
  121. // CustomDialer can be used to specify any dialer, not necessarily
  122. // a *net.Dialer.
  123. type CustomDialer interface {
  124. Dial(network, address string) (net.Conn, error)
  125. }
  126. // Options can be used to create a customized connection.
  127. type Options struct {
  128. // Url represents a single NATS server url to which the client
  129. // will be connecting. If the Servers option is also set, it
  130. // then becomes the first server in the Servers array.
  131. Url string
  132. // Servers is a configured set of servers which this client
  133. // will use when attempting to connect.
  134. Servers []string
  135. // NoRandomize configures whether we will randomize the
  136. // server pool.
  137. NoRandomize bool
  138. // Name is an optional name label which will be sent to the server
  139. // on CONNECT to identify the client.
  140. Name string
  141. // Verbose signals the server to send an OK ack for commands
  142. // successfully processed by the server.
  143. Verbose bool
  144. // Pedantic signals the server whether it should be doing further
  145. // validation of subjects.
  146. Pedantic bool
  147. // Secure enables TLS secure connections that skip server
  148. // verification by default. NOT RECOMMENDED.
  149. Secure bool
  150. // TLSConfig is a custom TLS configuration to use for secure
  151. // transports.
  152. TLSConfig *tls.Config
  153. // AllowReconnect enables reconnection logic to be used when we
  154. // encounter a disconnect from the current server.
  155. AllowReconnect bool
  156. // MaxReconnect sets the number of reconnect attempts that will be
  157. // tried before giving up. If negative, then it will never give up
  158. // trying to reconnect.
  159. MaxReconnect int
  160. // ReconnectWait sets the time to backoff after attempting a reconnect
  161. // to a server that we were already connected to previously.
  162. ReconnectWait time.Duration
  163. // Timeout sets the timeout for a Dial operation on a connection.
  164. Timeout time.Duration
  165. // FlusherTimeout is the maximum time to wait for the flusher loop
  166. // to be able to finish writing to the underlying connection.
  167. FlusherTimeout time.Duration
  168. // PingInterval is the period at which the client will be sending ping
  169. // commands to the server, disabled if 0 or negative.
  170. PingInterval time.Duration
  171. // MaxPingsOut is the maximum number of pending ping commands that can
  172. // be awaiting a response before raising an ErrStaleConnection error.
  173. MaxPingsOut int
  174. // ClosedCB sets the closed handler that is called when a client will
  175. // no longer be connected.
  176. ClosedCB ConnHandler
  177. // DisconnectedCB sets the disconnected handler that is called
  178. // whenever the connection is disconnected.
  179. DisconnectedCB ConnHandler
  180. // ReconnectedCB sets the reconnected handler called whenever
  181. // the connection is successfully reconnected.
  182. ReconnectedCB ConnHandler
  183. // DiscoveredServersCB sets the callback that is invoked whenever a new
  184. // server has joined the cluster.
  185. DiscoveredServersCB ConnHandler
  186. // AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
  187. AsyncErrorCB ErrHandler
  188. // ReconnectBufSize is the size of the backing bufio during reconnect.
  189. // Once this has been exhausted publish operations will return an error.
  190. ReconnectBufSize int
  191. // SubChanLen is the size of the buffered channel used between the socket
  192. // Go routine and the message delivery for SyncSubscriptions.
  193. // NOTE: This does not affect AsyncSubscriptions which are
  194. // dictated by PendingLimits()
  195. SubChanLen int
  196. // User sets the username to be used when connecting to the server.
  197. User string
  198. // Password sets the password to be used when connecting to a server.
  199. Password string
  200. // Token sets the token to be used when connecting to a server.
  201. Token string
  202. // Dialer allows a custom net.Dialer when forming connections.
  203. // DEPRECATED: should use CustomDialer instead.
  204. Dialer *net.Dialer
  205. // CustomDialer allows to specify a custom dialer (not necessarily
  206. // a *net.Dialer).
  207. CustomDialer CustomDialer
  208. // UseOldRequestStyle forces the old method of Requests that utilize
  209. // a new Inbox and a new Subscription for each request.
  210. UseOldRequestStyle bool
  211. }
  212. const (
  213. // Scratch storage for assembling protocol headers
  214. scratchSize = 512
  215. // The size of the bufio reader/writer on top of the socket.
  216. defaultBufSize = 32768
  217. // The buffered size of the flush "kick" channel
  218. flushChanSize = 1024
  219. // Default server pool size
  220. srvPoolSize = 4
  221. // Channel size for the async callback handler.
  222. asyncCBChanSize = 32
  223. // NUID size
  224. nuidSize = 22
  225. )
  226. // A Conn represents a bare connection to a nats-server.
  227. // It can send and receive []byte payloads.
  228. type Conn struct {
  229. // Keep all members for which we use atomic at the beginning of the
  230. // struct and make sure they are all 64bits (or use padding if necessary).
  231. // atomic.* functions crash on 32bit machines if operand is not aligned
  232. // at 64bit. See https://github.com/golang/go/issues/599
  233. Statistics
  234. mu sync.Mutex
  235. Opts Options
  236. wg *sync.WaitGroup
  237. url *url.URL
  238. conn net.Conn
  239. srvPool []*srv
  240. urls map[string]struct{} // Keep track of all known URLs (used by processInfo)
  241. bw *bufio.Writer
  242. pending *bytes.Buffer
  243. fch chan struct{}
  244. info serverInfo
  245. ssid int64
  246. subsMu sync.RWMutex
  247. subs map[int64]*Subscription
  248. ach chan asyncCB
  249. pongs []chan struct{}
  250. scratch [scratchSize]byte
  251. status Status
  252. initc bool // true if the connection is performing the initial connect
  253. err error
  254. ps *parseState
  255. ptmr *time.Timer
  256. pout int
  257. // New style response handler
  258. respSub string // The wildcard subject
  259. respMux *Subscription // A single response subscription
  260. respMap map[string]chan *Msg // Request map for the response msg channels
  261. respSetup sync.Once // Ensures response subscription occurs once
  262. }
  263. // A Subscription represents interest in a given subject.
  264. type Subscription struct {
  265. mu sync.Mutex
  266. sid int64
  267. // Subject that represents this subscription. This can be different
  268. // than the received subject inside a Msg if this is a wildcard.
  269. Subject string
  270. // Optional queue group name. If present, all subscriptions with the
  271. // same name will form a distributed queue, and each message will
  272. // only be processed by one member of the group.
  273. Queue string
  274. delivered uint64
  275. max uint64
  276. conn *Conn
  277. mcb MsgHandler
  278. mch chan *Msg
  279. closed bool
  280. sc bool
  281. connClosed bool
  282. // Type of Subscription
  283. typ SubscriptionType
  284. // Async linked list
  285. pHead *Msg
  286. pTail *Msg
  287. pCond *sync.Cond
  288. // Pending stats, async subscriptions, high-speed etc.
  289. pMsgs int
  290. pBytes int
  291. pMsgsMax int
  292. pBytesMax int
  293. pMsgsLimit int
  294. pBytesLimit int
  295. dropped int
  296. }
  297. // Msg is a structure used by Subscribers and PublishMsg().
  298. type Msg struct {
  299. Subject string
  300. Reply string
  301. Data []byte
  302. Sub *Subscription
  303. next *Msg
  304. barrier *barrierInfo
  305. }
  306. type barrierInfo struct {
  307. refs int64
  308. f func()
  309. }
  310. // Tracks various stats received and sent on this connection,
  311. // including counts for messages and bytes.
  312. type Statistics struct {
  313. InMsgs uint64
  314. OutMsgs uint64
  315. InBytes uint64
  316. OutBytes uint64
  317. Reconnects uint64
  318. }
  319. // Tracks individual backend servers.
  320. type srv struct {
  321. url *url.URL
  322. didConnect bool
  323. reconnects int
  324. lastAttempt time.Time
  325. isImplicit bool
  326. }
  327. type serverInfo struct {
  328. Id string `json:"server_id"`
  329. Host string `json:"host"`
  330. Port uint `json:"port"`
  331. Version string `json:"version"`
  332. AuthRequired bool `json:"auth_required"`
  333. TLSRequired bool `json:"tls_required"`
  334. MaxPayload int64 `json:"max_payload"`
  335. ConnectURLs []string `json:"connect_urls,omitempty"`
  336. }
  337. const (
  338. // clientProtoZero is the original client protocol from 2009.
  339. // http://nats.io/documentation/internals/nats-protocol/
  340. /* clientProtoZero */ _ = iota
  341. // clientProtoInfo signals a client can receive more then the original INFO block.
  342. // This can be used to update clients on other cluster members, etc.
  343. clientProtoInfo
  344. )
  345. type connectInfo struct {
  346. Verbose bool `json:"verbose"`
  347. Pedantic bool `json:"pedantic"`
  348. User string `json:"user,omitempty"`
  349. Pass string `json:"pass,omitempty"`
  350. Token string `json:"auth_token,omitempty"`
  351. TLS bool `json:"tls_required"`
  352. Name string `json:"name"`
  353. Lang string `json:"lang"`
  354. Version string `json:"version"`
  355. Protocol int `json:"protocol"`
  356. }
  357. // MsgHandler is a callback function that processes messages delivered to
  358. // asynchronous subscribers.
  359. type MsgHandler func(msg *Msg)
  360. // Connect will attempt to connect to the NATS system.
  361. // The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
  362. // Comma separated arrays are also supported, e.g. urlA, urlB.
  363. // Options start with the defaults but can be overridden.
  364. func Connect(url string, options ...Option) (*Conn, error) {
  365. opts := GetDefaultOptions()
  366. opts.Servers = processUrlString(url)
  367. for _, opt := range options {
  368. if err := opt(&opts); err != nil {
  369. return nil, err
  370. }
  371. }
  372. return opts.Connect()
  373. }
  374. // Options that can be passed to Connect.
  375. // Name is an Option to set the client name.
  376. func Name(name string) Option {
  377. return func(o *Options) error {
  378. o.Name = name
  379. return nil
  380. }
  381. }
  382. // Secure is an Option to enable TLS secure connections that skip server verification by default.
  383. // Pass a TLS Configuration for proper TLS.
  384. func Secure(tls ...*tls.Config) Option {
  385. return func(o *Options) error {
  386. o.Secure = true
  387. // Use of variadic just simplifies testing scenarios. We only take the first one.
  388. // fixme(DLC) - Could panic if more than one. Could also do TLS option.
  389. if len(tls) > 1 {
  390. return ErrMultipleTLSConfigs
  391. }
  392. if len(tls) == 1 {
  393. o.TLSConfig = tls[0]
  394. }
  395. return nil
  396. }
  397. }
  398. // RootCAs is a helper option to provide the RootCAs pool from a list of filenames. If Secure is
  399. // not already set this will set it as well.
  400. func RootCAs(file ...string) Option {
  401. return func(o *Options) error {
  402. pool := x509.NewCertPool()
  403. for _, f := range file {
  404. rootPEM, err := ioutil.ReadFile(f)
  405. if err != nil || rootPEM == nil {
  406. return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
  407. }
  408. ok := pool.AppendCertsFromPEM(rootPEM)
  409. if !ok {
  410. return fmt.Errorf("nats: failed to parse root certificate from %q", f)
  411. }
  412. }
  413. if o.TLSConfig == nil {
  414. o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
  415. }
  416. o.TLSConfig.RootCAs = pool
  417. o.Secure = true
  418. return nil
  419. }
  420. }
  421. // ClientCert is a helper option to provide the client certificate from a file. If Secure is
  422. // not already set this will set it as well
  423. func ClientCert(certFile, keyFile string) Option {
  424. return func(o *Options) error {
  425. cert, err := tls.LoadX509KeyPair(certFile, keyFile)
  426. if err != nil {
  427. return fmt.Errorf("nats: error loading client certificate: %v", err)
  428. }
  429. cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
  430. if err != nil {
  431. return fmt.Errorf("nats: error parsing client certificate: %v", err)
  432. }
  433. if o.TLSConfig == nil {
  434. o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
  435. }
  436. o.TLSConfig.Certificates = []tls.Certificate{cert}
  437. o.Secure = true
  438. return nil
  439. }
  440. }
  441. // NoReconnect is an Option to turn off reconnect behavior.
  442. func NoReconnect() Option {
  443. return func(o *Options) error {
  444. o.AllowReconnect = false
  445. return nil
  446. }
  447. }
  448. // DontRandomize is an Option to turn off randomizing the server pool.
  449. func DontRandomize() Option {
  450. return func(o *Options) error {
  451. o.NoRandomize = true
  452. return nil
  453. }
  454. }
  455. // ReconnectWait is an Option to set the wait time between reconnect attempts.
  456. func ReconnectWait(t time.Duration) Option {
  457. return func(o *Options) error {
  458. o.ReconnectWait = t
  459. return nil
  460. }
  461. }
  462. // MaxReconnects is an Option to set the maximum number of reconnect attempts.
  463. func MaxReconnects(max int) Option {
  464. return func(o *Options) error {
  465. o.MaxReconnect = max
  466. return nil
  467. }
  468. }
  469. // PingInterval is an Option to set the period for client ping commands
  470. func PingInterval(t time.Duration) Option {
  471. return func(o *Options) error {
  472. o.PingInterval = t
  473. return nil
  474. }
  475. }
  476. // ReconnectBufSize sets the buffer size of messages kept while busy reconnecting
  477. func ReconnectBufSize(size int) Option {
  478. return func(o *Options) error {
  479. o.ReconnectBufSize = size
  480. return nil
  481. }
  482. }
  483. // Timeout is an Option to set the timeout for Dial on a connection.
  484. func Timeout(t time.Duration) Option {
  485. return func(o *Options) error {
  486. o.Timeout = t
  487. return nil
  488. }
  489. }
  490. // DisconnectHandler is an Option to set the disconnected handler.
  491. func DisconnectHandler(cb ConnHandler) Option {
  492. return func(o *Options) error {
  493. o.DisconnectedCB = cb
  494. return nil
  495. }
  496. }
  497. // ReconnectHandler is an Option to set the reconnected handler.
  498. func ReconnectHandler(cb ConnHandler) Option {
  499. return func(o *Options) error {
  500. o.ReconnectedCB = cb
  501. return nil
  502. }
  503. }
  504. // ClosedHandler is an Option to set the closed handler.
  505. func ClosedHandler(cb ConnHandler) Option {
  506. return func(o *Options) error {
  507. o.ClosedCB = cb
  508. return nil
  509. }
  510. }
  511. // DiscoveredServersHandler is an Option to set the new servers handler.
  512. func DiscoveredServersHandler(cb ConnHandler) Option {
  513. return func(o *Options) error {
  514. o.DiscoveredServersCB = cb
  515. return nil
  516. }
  517. }
  518. // ErrorHandler is an Option to set the async error handler.
  519. func ErrorHandler(cb ErrHandler) Option {
  520. return func(o *Options) error {
  521. o.AsyncErrorCB = cb
  522. return nil
  523. }
  524. }
  525. // UserInfo is an Option to set the username and password to
  526. // use when not included directly in the URLs.
  527. func UserInfo(user, password string) Option {
  528. return func(o *Options) error {
  529. o.User = user
  530. o.Password = password
  531. return nil
  532. }
  533. }
  534. // Token is an Option to set the token to use when not included
  535. // directly in the URLs.
  536. func Token(token string) Option {
  537. return func(o *Options) error {
  538. o.Token = token
  539. return nil
  540. }
  541. }
  542. // Dialer is an Option to set the dialer which will be used when
  543. // attempting to establish a connection.
  544. // DEPRECATED: Should use CustomDialer instead.
  545. func Dialer(dialer *net.Dialer) Option {
  546. return func(o *Options) error {
  547. o.Dialer = dialer
  548. return nil
  549. }
  550. }
  551. // SetCustomDialer is an Option to set a custom dialer which will be
  552. // used when attempting to establish a connection. If both Dialer
  553. // and CustomDialer are specified, CustomDialer takes precedence.
  554. func SetCustomDialer(dialer CustomDialer) Option {
  555. return func(o *Options) error {
  556. o.CustomDialer = dialer
  557. return nil
  558. }
  559. }
  560. // UseOldRequestStyle is an Option to force usage of the old Request style.
  561. func UseOldRequestStyle() Option {
  562. return func(o *Options) error {
  563. o.UseOldRequestStyle = true
  564. return nil
  565. }
  566. }
  567. // Handler processing
  568. // SetDisconnectHandler will set the disconnect event handler.
  569. func (nc *Conn) SetDisconnectHandler(dcb ConnHandler) {
  570. if nc == nil {
  571. return
  572. }
  573. nc.mu.Lock()
  574. defer nc.mu.Unlock()
  575. nc.Opts.DisconnectedCB = dcb
  576. }
  577. // SetReconnectHandler will set the reconnect event handler.
  578. func (nc *Conn) SetReconnectHandler(rcb ConnHandler) {
  579. if nc == nil {
  580. return
  581. }
  582. nc.mu.Lock()
  583. defer nc.mu.Unlock()
  584. nc.Opts.ReconnectedCB = rcb
  585. }
  586. // SetDiscoveredServersHandler will set the discovered servers handler.
  587. func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) {
  588. if nc == nil {
  589. return
  590. }
  591. nc.mu.Lock()
  592. defer nc.mu.Unlock()
  593. nc.Opts.DiscoveredServersCB = dscb
  594. }
  595. // SetClosedHandler will set the reconnect event handler.
  596. func (nc *Conn) SetClosedHandler(cb ConnHandler) {
  597. if nc == nil {
  598. return
  599. }
  600. nc.mu.Lock()
  601. defer nc.mu.Unlock()
  602. nc.Opts.ClosedCB = cb
  603. }
  604. // SetErrorHandler will set the async error handler.
  605. func (nc *Conn) SetErrorHandler(cb ErrHandler) {
  606. if nc == nil {
  607. return
  608. }
  609. nc.mu.Lock()
  610. defer nc.mu.Unlock()
  611. nc.Opts.AsyncErrorCB = cb
  612. }
  613. // Process the url string argument to Connect. Return an array of
  614. // urls, even if only one.
  615. func processUrlString(url string) []string {
  616. urls := strings.Split(url, ",")
  617. for i, s := range urls {
  618. urls[i] = strings.TrimSpace(s)
  619. }
  620. return urls
  621. }
  622. // Connect will attempt to connect to a NATS server with multiple options.
  623. func (o Options) Connect() (*Conn, error) {
  624. nc := &Conn{Opts: o}
  625. // Some default options processing.
  626. if nc.Opts.MaxPingsOut == 0 {
  627. nc.Opts.MaxPingsOut = DefaultMaxPingOut
  628. }
  629. // Allow old default for channel length to work correctly.
  630. if nc.Opts.SubChanLen == 0 {
  631. nc.Opts.SubChanLen = DefaultMaxChanLen
  632. }
  633. // Default ReconnectBufSize
  634. if nc.Opts.ReconnectBufSize == 0 {
  635. nc.Opts.ReconnectBufSize = DefaultReconnectBufSize
  636. }
  637. // Ensure that Timeout is not 0
  638. if nc.Opts.Timeout == 0 {
  639. nc.Opts.Timeout = DefaultTimeout
  640. }
  641. // Allow custom Dialer for connecting using DialTimeout by default
  642. if nc.Opts.Dialer == nil {
  643. nc.Opts.Dialer = &net.Dialer{
  644. Timeout: nc.Opts.Timeout,
  645. }
  646. }
  647. if err := nc.setupServerPool(); err != nil {
  648. return nil, err
  649. }
  650. // Create the async callback channel.
  651. nc.ach = make(chan asyncCB, asyncCBChanSize)
  652. if err := nc.connect(); err != nil {
  653. return nil, err
  654. }
  655. // Spin up the async cb dispatcher on success
  656. go nc.asyncDispatch()
  657. return nc, nil
  658. }
  659. const (
  660. _CRLF_ = "\r\n"
  661. _EMPTY_ = ""
  662. _SPC_ = " "
  663. _PUB_P_ = "PUB "
  664. )
  665. const (
  666. _OK_OP_ = "+OK"
  667. _ERR_OP_ = "-ERR"
  668. _PONG_OP_ = "PONG"
  669. _INFO_OP_ = "INFO"
  670. )
  671. const (
  672. conProto = "CONNECT %s" + _CRLF_
  673. pingProto = "PING" + _CRLF_
  674. pongProto = "PONG" + _CRLF_
  675. subProto = "SUB %s %s %d" + _CRLF_
  676. unsubProto = "UNSUB %d %s" + _CRLF_
  677. okProto = _OK_OP_ + _CRLF_
  678. )
  679. // Return the currently selected server
  680. func (nc *Conn) currentServer() (int, *srv) {
  681. for i, s := range nc.srvPool {
  682. if s == nil {
  683. continue
  684. }
  685. if s.url == nc.url {
  686. return i, s
  687. }
  688. }
  689. return -1, nil
  690. }
  691. // Pop the current server and put onto the end of the list. Select head of list as long
  692. // as number of reconnect attempts under MaxReconnect.
  693. func (nc *Conn) selectNextServer() (*srv, error) {
  694. i, s := nc.currentServer()
  695. if i < 0 {
  696. return nil, ErrNoServers
  697. }
  698. sp := nc.srvPool
  699. num := len(sp)
  700. copy(sp[i:num-1], sp[i+1:num])
  701. maxReconnect := nc.Opts.MaxReconnect
  702. if maxReconnect < 0 || s.reconnects < maxReconnect {
  703. nc.srvPool[num-1] = s
  704. } else {
  705. nc.srvPool = sp[0 : num-1]
  706. }
  707. if len(nc.srvPool) <= 0 {
  708. nc.url = nil
  709. return nil, ErrNoServers
  710. }
  711. nc.url = nc.srvPool[0].url
  712. return nc.srvPool[0], nil
  713. }
  714. // Will assign the correct server to the nc.Url
  715. func (nc *Conn) pickServer() error {
  716. nc.url = nil
  717. if len(nc.srvPool) <= 0 {
  718. return ErrNoServers
  719. }
  720. for _, s := range nc.srvPool {
  721. if s != nil {
  722. nc.url = s.url
  723. return nil
  724. }
  725. }
  726. return ErrNoServers
  727. }
  728. const tlsScheme = "tls"
  729. // Create the server pool using the options given.
  730. // We will place a Url option first, followed by any
  731. // Server Options. We will randomize the server pool unless
  732. // the NoRandomize flag is set.
  733. func (nc *Conn) setupServerPool() error {
  734. nc.srvPool = make([]*srv, 0, srvPoolSize)
  735. nc.urls = make(map[string]struct{}, srvPoolSize)
  736. // Create srv objects from each url string in nc.Opts.Servers
  737. // and add them to the pool
  738. for _, urlString := range nc.Opts.Servers {
  739. if err := nc.addURLToPool(urlString, false); err != nil {
  740. return err
  741. }
  742. }
  743. // Randomize if allowed to
  744. if !nc.Opts.NoRandomize {
  745. nc.shufflePool()
  746. }
  747. // Normally, if this one is set, Options.Servers should not be,
  748. // but we always allowed that, so continue to do so.
  749. if nc.Opts.Url != _EMPTY_ {
  750. // Add to the end of the array
  751. if err := nc.addURLToPool(nc.Opts.Url, false); err != nil {
  752. return err
  753. }
  754. // Then swap it with first to guarantee that Options.Url is tried first.
  755. last := len(nc.srvPool) - 1
  756. if last > 0 {
  757. nc.srvPool[0], nc.srvPool[last] = nc.srvPool[last], nc.srvPool[0]
  758. }
  759. } else if len(nc.srvPool) <= 0 {
  760. // Place default URL if pool is empty.
  761. if err := nc.addURLToPool(DefaultURL, false); err != nil {
  762. return err
  763. }
  764. }
  765. // Check for Scheme hint to move to TLS mode.
  766. for _, srv := range nc.srvPool {
  767. if srv.url.Scheme == tlsScheme {
  768. // FIXME(dlc), this is for all in the pool, should be case by case.
  769. nc.Opts.Secure = true
  770. if nc.Opts.TLSConfig == nil {
  771. nc.Opts.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
  772. }
  773. }
  774. }
  775. return nc.pickServer()
  776. }
  777. // addURLToPool adds an entry to the server pool
  778. func (nc *Conn) addURLToPool(sURL string, implicit bool) error {
  779. u, err := url.Parse(sURL)
  780. if err != nil {
  781. return err
  782. }
  783. s := &srv{url: u, isImplicit: implicit}
  784. nc.srvPool = append(nc.srvPool, s)
  785. nc.urls[u.Host] = struct{}{}
  786. return nil
  787. }
  788. // shufflePool swaps randomly elements in the server pool
  789. func (nc *Conn) shufflePool() {
  790. if len(nc.srvPool) <= 1 {
  791. return
  792. }
  793. source := rand.NewSource(time.Now().UnixNano())
  794. r := rand.New(source)
  795. for i := range nc.srvPool {
  796. j := r.Intn(i + 1)
  797. nc.srvPool[i], nc.srvPool[j] = nc.srvPool[j], nc.srvPool[i]
  798. }
  799. }
  800. // createConn will connect to the server and wrap the appropriate
  801. // bufio structures. It will do the right thing when an existing
  802. // connection is in place.
  803. func (nc *Conn) createConn() (err error) {
  804. if nc.Opts.Timeout < 0 {
  805. return ErrBadTimeout
  806. }
  807. if _, cur := nc.currentServer(); cur == nil {
  808. return ErrNoServers
  809. } else {
  810. cur.lastAttempt = time.Now()
  811. }
  812. // CustomDialer takes precedence. If not set, use Opts.Dialer which
  813. // is set to a default *net.Dialer (in Connect()) if not explicitly
  814. // set by the user.
  815. dialer := nc.Opts.CustomDialer
  816. if dialer == nil {
  817. dialer = nc.Opts.Dialer
  818. }
  819. nc.conn, err = dialer.Dial("tcp", nc.url.Host)
  820. if err != nil {
  821. return err
  822. }
  823. // No clue why, but this stalls and kills performance on Mac (Mavericks).
  824. // https://code.google.com/p/go/issues/detail?id=6930
  825. //if ip, ok := nc.conn.(*net.TCPConn); ok {
  826. // ip.SetReadBuffer(defaultBufSize)
  827. //}
  828. if nc.pending != nil && nc.bw != nil {
  829. // Move to pending buffer.
  830. nc.bw.Flush()
  831. }
  832. nc.bw = bufio.NewWriterSize(nc.conn, defaultBufSize)
  833. return nil
  834. }
  835. // makeTLSConn will wrap an existing Conn using TLS
  836. func (nc *Conn) makeTLSConn() {
  837. // Allow the user to configure their own tls.Config structure, otherwise
  838. // default to InsecureSkipVerify.
  839. // TODO(dlc) - We should make the more secure version the default.
  840. if nc.Opts.TLSConfig != nil {
  841. tlsCopy := util.CloneTLSConfig(nc.Opts.TLSConfig)
  842. // If its blank we will override it with the current host
  843. if tlsCopy.ServerName == _EMPTY_ {
  844. h, _, _ := net.SplitHostPort(nc.url.Host)
  845. tlsCopy.ServerName = h
  846. }
  847. nc.conn = tls.Client(nc.conn, tlsCopy)
  848. } else {
  849. nc.conn = tls.Client(nc.conn, &tls.Config{InsecureSkipVerify: true})
  850. }
  851. conn := nc.conn.(*tls.Conn)
  852. conn.Handshake()
  853. nc.bw = bufio.NewWriterSize(nc.conn, defaultBufSize)
  854. }
  855. // waitForExits will wait for all socket watcher Go routines to
  856. // be shutdown before proceeding.
  857. func (nc *Conn) waitForExits(wg *sync.WaitGroup) {
  858. // Kick old flusher forcefully.
  859. select {
  860. case nc.fch <- struct{}{}:
  861. default:
  862. }
  863. // Wait for any previous go routines.
  864. if wg != nil {
  865. wg.Wait()
  866. }
  867. }
  868. // spinUpGoRoutines will launch the Go routines responsible for
  869. // reading and writing to the socket. This will be launched via a
  870. // go routine itself to release any locks that may be held.
  871. // We also use a WaitGroup to make sure we only start them on a
  872. // reconnect when the previous ones have exited.
  873. func (nc *Conn) spinUpGoRoutines() {
  874. // Make sure everything has exited.
  875. nc.waitForExits(nc.wg)
  876. // Create a new waitGroup instance for this run.
  877. nc.wg = &sync.WaitGroup{}
  878. // We will wait on both.
  879. nc.wg.Add(2)
  880. // Spin up the readLoop and the socket flusher.
  881. go nc.readLoop(nc.wg)
  882. go nc.flusher(nc.wg)
  883. nc.mu.Lock()
  884. if nc.Opts.PingInterval > 0 {
  885. if nc.ptmr == nil {
  886. nc.ptmr = time.AfterFunc(nc.Opts.PingInterval, nc.processPingTimer)
  887. } else {
  888. nc.ptmr.Reset(nc.Opts.PingInterval)
  889. }
  890. }
  891. nc.mu.Unlock()
  892. }
  893. // Report the connected server's Url
  894. func (nc *Conn) ConnectedUrl() string {
  895. if nc == nil {
  896. return _EMPTY_
  897. }
  898. nc.mu.Lock()
  899. defer nc.mu.Unlock()
  900. if nc.status != CONNECTED {
  901. return _EMPTY_
  902. }
  903. return nc.url.String()
  904. }
  905. // Report the connected server's Id
  906. func (nc *Conn) ConnectedServerId() string {
  907. if nc == nil {
  908. return _EMPTY_
  909. }
  910. nc.mu.Lock()
  911. defer nc.mu.Unlock()
  912. if nc.status != CONNECTED {
  913. return _EMPTY_
  914. }
  915. return nc.info.Id
  916. }
  917. // Low level setup for structs, etc
  918. func (nc *Conn) setup() {
  919. nc.subs = make(map[int64]*Subscription)
  920. nc.pongs = make([]chan struct{}, 0, 8)
  921. nc.fch = make(chan struct{}, flushChanSize)
  922. // Setup scratch outbound buffer for PUB
  923. pub := nc.scratch[:len(_PUB_P_)]
  924. copy(pub, _PUB_P_)
  925. }
  926. // Process a connected connection and initialize properly.
  927. func (nc *Conn) processConnectInit() error {
  928. // Set out deadline for the whole connect process
  929. nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout))
  930. defer nc.conn.SetDeadline(time.Time{})
  931. // Set our status to connecting.
  932. nc.status = CONNECTING
  933. // Process the INFO protocol received from the server
  934. err := nc.processExpectedInfo()
  935. if err != nil {
  936. return err
  937. }
  938. // Send the CONNECT protocol along with the initial PING protocol.
  939. // Wait for the PONG response (or any error that we get from the server).
  940. err = nc.sendConnect()
  941. if err != nil {
  942. return err
  943. }
  944. // Reset the number of PING sent out
  945. nc.pout = 0
  946. go nc.spinUpGoRoutines()
  947. return nil
  948. }
  949. // Main connect function. Will connect to the nats-server
  950. func (nc *Conn) connect() error {
  951. var returnedErr error
  952. // Create actual socket connection
  953. // For first connect we walk all servers in the pool and try
  954. // to connect immediately.
  955. nc.mu.Lock()
  956. nc.initc = true
  957. // The pool may change inside the loop iteration due to INFO protocol.
  958. for i := 0; i < len(nc.srvPool); i++ {
  959. nc.url = nc.srvPool[i].url
  960. if err := nc.createConn(); err == nil {
  961. // This was moved out of processConnectInit() because
  962. // that function is now invoked from doReconnect() too.
  963. nc.setup()
  964. err = nc.processConnectInit()
  965. if err == nil {
  966. nc.srvPool[i].didConnect = true
  967. nc.srvPool[i].reconnects = 0
  968. returnedErr = nil
  969. break
  970. } else {
  971. returnedErr = err
  972. nc.mu.Unlock()
  973. nc.close(DISCONNECTED, false)
  974. nc.mu.Lock()
  975. nc.url = nil
  976. }
  977. } else {
  978. // Cancel out default connection refused, will trigger the
  979. // No servers error conditional
  980. if strings.Contains(err.Error(), "connection refused") {
  981. returnedErr = nil
  982. }
  983. }
  984. }
  985. nc.initc = false
  986. defer nc.mu.Unlock()
  987. if returnedErr == nil && nc.status != CONNECTED {
  988. returnedErr = ErrNoServers
  989. }
  990. return returnedErr
  991. }
  992. // This will check to see if the connection should be
  993. // secure. This can be dictated from either end and should
  994. // only be called after the INIT protocol has been received.
  995. func (nc *Conn) checkForSecure() error {
  996. // Check to see if we need to engage TLS
  997. o := nc.Opts
  998. // Check for mismatch in setups
  999. if o.Secure && !nc.info.TLSRequired {
  1000. return ErrSecureConnWanted
  1001. } else if nc.info.TLSRequired && !o.Secure {
  1002. return ErrSecureConnRequired
  1003. }
  1004. // Need to rewrap with bufio
  1005. if o.Secure {
  1006. nc.makeTLSConn()
  1007. }
  1008. return nil
  1009. }
  1010. // processExpectedInfo will look for the expected first INFO message
  1011. // sent when a connection is established. The lock should be held entering.
  1012. func (nc *Conn) processExpectedInfo() error {
  1013. c := &control{}
  1014. // Read the protocol
  1015. err := nc.readOp(c)
  1016. if err != nil {
  1017. return err
  1018. }
  1019. // The nats protocol should send INFO first always.
  1020. if c.op != _INFO_OP_ {
  1021. return ErrNoInfoReceived
  1022. }
  1023. // Parse the protocol
  1024. if err := nc.processInfo(c.args); err != nil {
  1025. return err
  1026. }
  1027. return nc.checkForSecure()
  1028. }
  1029. // Sends a protocol control message by queuing into the bufio writer
  1030. // and kicking the flush Go routine. These writes are protected.
  1031. func (nc *Conn) sendProto(proto string) {
  1032. nc.mu.Lock()
  1033. nc.bw.WriteString(proto)
  1034. nc.kickFlusher()
  1035. nc.mu.Unlock()
  1036. }
  1037. // Generate a connect protocol message, issuing user/password if
  1038. // applicable. The lock is assumed to be held upon entering.
  1039. func (nc *Conn) connectProto() (string, error) {
  1040. o := nc.Opts
  1041. var user, pass, token string
  1042. u := nc.url.User
  1043. if u != nil {
  1044. // if no password, assume username is authToken
  1045. if _, ok := u.Password(); !ok {
  1046. token = u.Username()
  1047. } else {
  1048. user = u.Username()
  1049. pass, _ = u.Password()
  1050. }
  1051. } else {
  1052. // Take from options (pssibly all empty strings)
  1053. user = nc.Opts.User
  1054. pass = nc.Opts.Password
  1055. token = nc.Opts.Token
  1056. }
  1057. cinfo := connectInfo{o.Verbose, o.Pedantic,
  1058. user, pass, token,
  1059. o.Secure, o.Name, LangString, Version, clientProtoInfo}
  1060. b, err := json.Marshal(cinfo)
  1061. if err != nil {
  1062. return _EMPTY_, ErrJsonParse
  1063. }
  1064. return fmt.Sprintf(conProto, b), nil
  1065. }
  1066. // normalizeErr removes the prefix -ERR, trim spaces and remove the quotes.
  1067. func normalizeErr(line string) string {
  1068. s := strings.ToLower(strings.TrimSpace(strings.TrimPrefix(line, _ERR_OP_)))
  1069. s = strings.TrimLeft(strings.TrimRight(s, "'"), "'")
  1070. return s
  1071. }
  1072. // Send a connect protocol message to the server, issue user/password if
  1073. // applicable. Will wait for a flush to return from the server for error
  1074. // processing.
  1075. func (nc *Conn) sendConnect() error {
  1076. // Construct the CONNECT protocol string
  1077. cProto, err := nc.connectProto()
  1078. if err != nil {
  1079. return err
  1080. }
  1081. // Write the protocol into the buffer
  1082. _, err = nc.bw.WriteString(cProto)
  1083. if err != nil {
  1084. return err
  1085. }
  1086. // Add to the buffer the PING protocol
  1087. _, err = nc.bw.WriteString(pingProto)
  1088. if err != nil {
  1089. return err
  1090. }
  1091. // Flush the buffer
  1092. err = nc.bw.Flush()
  1093. if err != nil {
  1094. return err
  1095. }
  1096. // We don't want to read more than we need here, otherwise
  1097. // we would need to transfer the excess read data to the readLoop.
  1098. // Since in normal situations we just are looking for a PONG\r\n,
  1099. // reading byte-by-byte here is ok.
  1100. proto, err := nc.readProto()
  1101. if err != nil {
  1102. return err
  1103. }
  1104. // If opts.Verbose is set, handle +OK
  1105. if nc.Opts.Verbose && proto == okProto {
  1106. // Read the rest now...
  1107. proto, err = nc.readProto()
  1108. if err != nil {
  1109. return err
  1110. }
  1111. }
  1112. // We expect a PONG
  1113. if proto != pongProto {
  1114. // But it could be something else, like -ERR
  1115. // Since we no longer use ReadLine(), trim the trailing "\r\n"
  1116. proto = strings.TrimRight(proto, "\r\n")
  1117. // If it's a server error...
  1118. if strings.HasPrefix(proto, _ERR_OP_) {
  1119. // Remove -ERR, trim spaces and quotes, and convert to lower case.
  1120. proto = normalizeErr(proto)
  1121. return errors.New("nats: " + proto)
  1122. }
  1123. // Notify that we got an unexpected protocol.
  1124. return fmt.Errorf("nats: expected '%s', got '%s'", _PONG_OP_, proto)
  1125. }
  1126. // This is where we are truly connected.
  1127. nc.status = CONNECTED
  1128. return nil
  1129. }
  1130. // reads a protocol one byte at a time.
  1131. func (nc *Conn) readProto() (string, error) {
  1132. var (
  1133. _buf = [10]byte{}
  1134. buf = _buf[:0]
  1135. b = [1]byte{}
  1136. protoEnd = byte('\n')
  1137. )
  1138. for {
  1139. if _, err := nc.conn.Read(b[:1]); err != nil {
  1140. // Do not report EOF error
  1141. if err == io.EOF {
  1142. return string(buf), nil
  1143. }
  1144. return "", err
  1145. }
  1146. buf = append(buf, b[0])
  1147. if b[0] == protoEnd {
  1148. return string(buf), nil
  1149. }
  1150. }
  1151. }
  1152. // A control protocol line.
  1153. type control struct {
  1154. op, args string
  1155. }
  1156. // Read a control line and process the intended op.
  1157. func (nc *Conn) readOp(c *control) error {
  1158. br := bufio.NewReaderSize(nc.conn, defaultBufSize)
  1159. line, err := br.ReadString('\n')
  1160. if err != nil {
  1161. return err
  1162. }
  1163. parseControl(line, c)
  1164. return nil
  1165. }
  1166. // Parse a control line from the server.
  1167. func parseControl(line string, c *control) {
  1168. toks := strings.SplitN(line, _SPC_, 2)
  1169. if len(toks) == 1 {
  1170. c.op = strings.TrimSpace(toks[0])
  1171. c.args = _EMPTY_
  1172. } else if len(toks) == 2 {
  1173. c.op, c.args = strings.TrimSpace(toks[0]), strings.TrimSpace(toks[1])
  1174. } else {
  1175. c.op = _EMPTY_
  1176. }
  1177. }
  1178. // flushReconnectPending will push the pending items that were
  1179. // gathered while we were in a RECONNECTING state to the socket.
  1180. func (nc *Conn) flushReconnectPendingItems() {
  1181. if nc.pending == nil {
  1182. return
  1183. }
  1184. if nc.pending.Len() > 0 {
  1185. nc.bw.Write(nc.pending.Bytes())
  1186. }
  1187. }
  1188. // Try to reconnect using the option parameters.
  1189. // This function assumes we are allowed to reconnect.
  1190. func (nc *Conn) doReconnect() {
  1191. // We want to make sure we have the other watchers shutdown properly
  1192. // here before we proceed past this point.
  1193. nc.mu.Lock()
  1194. wg := nc.wg
  1195. nc.mu.Unlock()
  1196. nc.waitForExits(wg)
  1197. // FIXME(dlc) - We have an issue here if we have
  1198. // outstanding flush points (pongs) and they were not
  1199. // sent out, but are still in the pipe.
  1200. // Hold the lock manually and release where needed below,
  1201. // can't do defer here.
  1202. nc.mu.Lock()
  1203. // Clear any queued pongs, e.g. pending flush calls.
  1204. nc.clearPendingFlushCalls()
  1205. // Clear any errors.
  1206. nc.err = nil
  1207. // Perform appropriate callback if needed for a disconnect.
  1208. if nc.Opts.DisconnectedCB != nil {
  1209. nc.ach <- func() { nc.Opts.DisconnectedCB(nc) }
  1210. }
  1211. for len(nc.srvPool) > 0 {
  1212. cur, err := nc.selectNextServer()
  1213. if err != nil {
  1214. nc.err = err
  1215. break
  1216. }
  1217. sleepTime := int64(0)
  1218. // Sleep appropriate amount of time before the
  1219. // connection attempt if connecting to same server
  1220. // we just got disconnected from..
  1221. if time.Since(cur.lastAttempt) < nc.Opts.ReconnectWait {
  1222. sleepTime = int64(nc.Opts.ReconnectWait - time.Since(cur.lastAttempt))
  1223. }
  1224. // On Windows, createConn() will take more than a second when no
  1225. // server is running at that address. So it could be that the
  1226. // time elapsed between reconnect attempts is always > than
  1227. // the set option. Release the lock to give a chance to a parallel
  1228. // nc.Close() to break the loop.
  1229. nc.mu.Unlock()
  1230. if sleepTime <= 0 {
  1231. runtime.Gosched()
  1232. } else {
  1233. time.Sleep(time.Duration(sleepTime))
  1234. }
  1235. nc.mu.Lock()
  1236. // Check if we have been closed first.
  1237. if nc.isClosed() {
  1238. break
  1239. }
  1240. // Mark that we tried a reconnect
  1241. cur.reconnects++
  1242. // Try to create a new connection
  1243. err = nc.createConn()
  1244. // Not yet connected, retry...
  1245. // Continue to hold the lock
  1246. if err != nil {
  1247. nc.err = nil
  1248. continue
  1249. }
  1250. // We are reconnected
  1251. nc.Reconnects++
  1252. // Process connect logic
  1253. if nc.err = nc.processConnectInit(); nc.err != nil {
  1254. nc.status = RECONNECTING
  1255. continue
  1256. }
  1257. // Clear out server stats for the server we connected to..
  1258. cur.didConnect = true
  1259. cur.reconnects = 0
  1260. // Send existing subscription state
  1261. nc.resendSubscriptions()
  1262. // Now send off and clear pending buffer
  1263. nc.flushReconnectPendingItems()
  1264. // Flush the buffer
  1265. nc.err = nc.bw.Flush()
  1266. if nc.err != nil {
  1267. nc.status = RECONNECTING
  1268. continue
  1269. }
  1270. // Done with the pending buffer
  1271. nc.pending = nil
  1272. // This is where we are truly connected.
  1273. nc.status = CONNECTED
  1274. // Queue up the reconnect callback.
  1275. if nc.Opts.ReconnectedCB != nil {
  1276. nc.ach <- func() { nc.Opts.ReconnectedCB(nc) }
  1277. }
  1278. // Release lock here, we will return below.
  1279. nc.mu.Unlock()
  1280. // Make sure to flush everything
  1281. nc.Flush()
  1282. return
  1283. }
  1284. // Call into close.. We have no servers left..
  1285. if nc.err == nil {
  1286. nc.err = ErrNoServers
  1287. }
  1288. nc.mu.Unlock()
  1289. nc.Close()
  1290. }
  1291. // processOpErr handles errors from reading or parsing the protocol.
  1292. // The lock should not be held entering this function.
  1293. func (nc *Conn) processOpErr(err error) {
  1294. nc.mu.Lock()
  1295. if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() {
  1296. nc.mu.Unlock()
  1297. return
  1298. }
  1299. if nc.Opts.AllowReconnect && nc.status == CONNECTED {
  1300. // Set our new status
  1301. nc.status = RECONNECTING
  1302. if nc.ptmr != nil {
  1303. nc.ptmr.Stop()
  1304. }
  1305. if nc.conn != nil {
  1306. nc.bw.Flush()
  1307. nc.conn.Close()
  1308. nc.conn = nil
  1309. }
  1310. // Reset pending buffers before reconnecting.
  1311. if nc.pending == nil {
  1312. nc.pending = new(bytes.Buffer)
  1313. }
  1314. nc.pending.Reset()
  1315. nc.bw.Reset(nc.pending)
  1316. go nc.doReconnect()
  1317. nc.mu.Unlock()
  1318. return
  1319. }
  1320. nc.status = DISCONNECTED
  1321. nc.err = err
  1322. nc.mu.Unlock()
  1323. nc.Close()
  1324. }
  1325. // Marker to close the channel to kick out the Go routine.
  1326. func (nc *Conn) closeAsyncFunc() asyncCB {
  1327. return func() {
  1328. nc.mu.Lock()
  1329. if nc.ach != nil {
  1330. close(nc.ach)
  1331. nc.ach = nil
  1332. }
  1333. nc.mu.Unlock()
  1334. }
  1335. }
  1336. // asyncDispatch is responsible for calling any async callbacks
  1337. func (nc *Conn) asyncDispatch() {
  1338. // snapshot since they can change from underneath of us.
  1339. nc.mu.Lock()
  1340. ach := nc.ach
  1341. nc.mu.Unlock()
  1342. // Loop on the channel and process async callbacks.
  1343. for {
  1344. if f, ok := <-ach; !ok {
  1345. return
  1346. } else {
  1347. f()
  1348. }
  1349. }
  1350. }
  1351. // readLoop() will sit on the socket reading and processing the
  1352. // protocol from the server. It will dispatch appropriately based
  1353. // on the op type.
  1354. func (nc *Conn) readLoop(wg *sync.WaitGroup) {
  1355. // Release the wait group on exit
  1356. defer wg.Done()
  1357. // Create a parseState if needed.
  1358. nc.mu.Lock()
  1359. if nc.ps == nil {
  1360. nc.ps = &parseState{}
  1361. }
  1362. nc.mu.Unlock()
  1363. // Stack based buffer.
  1364. b := make([]byte, defaultBufSize)
  1365. for {
  1366. // FIXME(dlc): RWLock here?
  1367. nc.mu.Lock()
  1368. sb := nc.isClosed() || nc.isReconnecting()
  1369. if sb {
  1370. nc.ps = &parseState{}
  1371. }
  1372. conn := nc.conn
  1373. nc.mu.Unlock()
  1374. if sb || conn == nil {
  1375. break
  1376. }
  1377. n, err := conn.Read(b)
  1378. if err != nil {
  1379. nc.processOpErr(err)
  1380. break
  1381. }
  1382. if err := nc.parse(b[:n]); err != nil {
  1383. nc.processOpErr(err)
  1384. break
  1385. }
  1386. }
  1387. // Clear the parseState here..
  1388. nc.mu.Lock()
  1389. nc.ps = nil
  1390. nc.mu.Unlock()
  1391. }
  1392. // waitForMsgs waits on the conditional shared with readLoop and processMsg.
  1393. // It is used to deliver messages to asynchronous subscribers.
  1394. func (nc *Conn) waitForMsgs(s *Subscription) {
  1395. var closed bool
  1396. var delivered, max uint64
  1397. for {
  1398. s.mu.Lock()
  1399. if s.pHead == nil && !s.closed {
  1400. s.pCond.Wait()
  1401. }
  1402. // Pop the msg off the list
  1403. m := s.pHead
  1404. if m != nil {
  1405. s.pHead = m.next
  1406. if s.pHead == nil {
  1407. s.pTail = nil
  1408. }
  1409. if m.barrier != nil {
  1410. s.mu.Unlock()
  1411. if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
  1412. m.barrier.f()
  1413. }
  1414. continue
  1415. }
  1416. s.pMsgs--
  1417. s.pBytes -= len(m.Data)
  1418. }
  1419. mcb := s.mcb
  1420. max = s.max
  1421. closed = s.closed
  1422. if !s.closed {
  1423. s.delivered++
  1424. delivered = s.delivered
  1425. }
  1426. s.mu.Unlock()
  1427. if closed {
  1428. break
  1429. }
  1430. // Deliver the message.
  1431. if m != nil && (max == 0 || delivered <= max) {
  1432. mcb(m)
  1433. }
  1434. // If we have hit the max for delivered msgs, remove sub.
  1435. if max > 0 && delivered >= max {
  1436. nc.mu.Lock()
  1437. nc.removeSub(s)
  1438. nc.mu.Unlock()
  1439. break
  1440. }
  1441. }
  1442. // Check for barrier messages
  1443. s.mu.Lock()
  1444. for m := s.pHead; m != nil; m = s.pHead {
  1445. if m.barrier != nil {
  1446. s.mu.Unlock()
  1447. if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
  1448. m.barrier.f()
  1449. }
  1450. s.mu.Lock()
  1451. }
  1452. s.pHead = m.next
  1453. }
  1454. s.mu.Unlock()
  1455. }
  1456. // processMsg is called by parse and will place the msg on the
  1457. // appropriate channel/pending queue for processing. If the channel is full,
  1458. // or the pending queue is over the pending limits, the connection is
  1459. // considered a slow consumer.
  1460. func (nc *Conn) processMsg(data []byte) {
  1461. // Don't lock the connection to avoid server cutting us off if the
  1462. // flusher is holding the connection lock, trying to send to the server
  1463. // that is itself trying to send data to us.
  1464. nc.subsMu.RLock()
  1465. // Stats
  1466. nc.InMsgs++
  1467. nc.InBytes += uint64(len(data))
  1468. sub := nc.subs[nc.ps.ma.sid]
  1469. if sub == nil {
  1470. nc.subsMu.RUnlock()
  1471. return
  1472. }
  1473. // Copy them into string
  1474. subj := string(nc.ps.ma.subject)
  1475. reply := string(nc.ps.ma.reply)
  1476. // Doing message create outside of the sub's lock to reduce contention.
  1477. // It's possible that we end-up not using the message, but that's ok.
  1478. // FIXME(dlc): Need to copy, should/can do COW?
  1479. msgPayload := make([]byte, len(data))
  1480. copy(msgPayload, data)
  1481. // FIXME(dlc): Should we recycle these containers?
  1482. m := &Msg{Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
  1483. sub.mu.Lock()
  1484. // Subscription internal stats (applicable only for non ChanSubscription's)
  1485. if sub.typ != ChanSubscription {
  1486. sub.pMsgs++
  1487. if sub.pMsgs > sub.pMsgsMax {
  1488. sub.pMsgsMax = sub.pMsgs
  1489. }
  1490. sub.pBytes += len(m.Data)
  1491. if sub.pBytes > sub.pBytesMax {
  1492. sub.pBytesMax = sub.pBytes
  1493. }
  1494. // Check for a Slow Consumer
  1495. if (sub.pMsgsLimit > 0 && sub.pMsgs > sub.pMsgsLimit) ||
  1496. (sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
  1497. goto slowConsumer
  1498. }
  1499. }
  1500. // We have two modes of delivery. One is the channel, used by channel
  1501. // subscribers and syncSubscribers, the other is a linked list for async.
  1502. if sub.mch != nil {
  1503. select {
  1504. case sub.mch <- m:
  1505. default:
  1506. goto slowConsumer
  1507. }
  1508. } else {
  1509. // Push onto the async pList
  1510. if sub.pHead == nil {
  1511. sub.pHead = m
  1512. sub.pTail = m
  1513. sub.pCond.Signal()
  1514. } else {
  1515. sub.pTail.next = m
  1516. sub.pTail = m
  1517. }
  1518. }
  1519. // Clear SlowConsumer status.
  1520. sub.sc = false
  1521. sub.mu.Unlock()
  1522. nc.subsMu.RUnlock()
  1523. return
  1524. slowConsumer:
  1525. sub.dropped++
  1526. sc := !sub.sc
  1527. sub.sc = true
  1528. // Undo stats from above
  1529. if sub.typ != ChanSubscription {
  1530. sub.pMsgs--
  1531. sub.pBytes -= len(m.Data)
  1532. }
  1533. sub.mu.Unlock()
  1534. nc.subsMu.RUnlock()
  1535. if sc {
  1536. // Now we need connection's lock and we may end-up in the situation
  1537. // that we were trying to avoid, except that in this case, the client
  1538. // is already experiencing client-side slow consumer situation.
  1539. nc.mu.Lock()
  1540. nc.err = ErrSlowConsumer
  1541. if nc.Opts.AsyncErrorCB != nil {
  1542. nc.ach <- func() { nc.Opts.AsyncErrorCB(nc, sub, ErrSlowConsumer) }
  1543. }
  1544. nc.mu.Unlock()
  1545. }
  1546. }
  1547. // processPermissionsViolation is called when the server signals a subject
  1548. // permissions violation on either publish or subscribe.
  1549. func (nc *Conn) processPermissionsViolation(err string) {
  1550. nc.mu.Lock()
  1551. // create error here so we can pass it as a closure to the async cb dispatcher.
  1552. e := errors.New("nats: " + err)
  1553. nc.err = e
  1554. if nc.Opts.AsyncErrorCB != nil {
  1555. nc.ach <- func() { nc.Opts.AsyncErrorCB(nc, nil, e) }
  1556. }
  1557. nc.mu.Unlock()
  1558. }
  1559. // processAuthorizationViolation is called when the server signals a user
  1560. // authorization violation.
  1561. func (nc *Conn) processAuthorizationViolation(err string) {
  1562. nc.mu.Lock()
  1563. nc.err = ErrAuthorization
  1564. if nc.Opts.AsyncErrorCB != nil {
  1565. nc.ach <- func() { nc.Opts.AsyncErrorCB(nc, nil, ErrAuthorization) }
  1566. }
  1567. nc.mu.Unlock()
  1568. }
  1569. // flusher is a separate Go routine that will process flush requests for the write
  1570. // bufio. This allows coalescing of writes to the underlying socket.
  1571. func (nc *Conn) flusher(wg *sync.WaitGroup) {
  1572. // Release the wait group
  1573. defer wg.Done()
  1574. // snapshot the bw and conn since they can change from underneath of us.
  1575. nc.mu.Lock()
  1576. bw := nc.bw
  1577. conn := nc.conn
  1578. fch := nc.fch
  1579. flusherTimeout := nc.Opts.FlusherTimeout
  1580. nc.mu.Unlock()
  1581. if conn == nil || bw == nil {
  1582. return
  1583. }
  1584. for {
  1585. if _, ok := <-fch; !ok {
  1586. return
  1587. }
  1588. nc.mu.Lock()
  1589. // Check to see if we should bail out.
  1590. if !nc.isConnected() || nc.isConnecting() || bw != nc.bw || conn != nc.conn {
  1591. nc.mu.Unlock()
  1592. return
  1593. }
  1594. if bw.Buffered() > 0 {
  1595. // Allow customizing how long we should wait for a flush to be done
  1596. // to prevent unhealthy connections blocking the client for too long.
  1597. if flusherTimeout > 0 {
  1598. conn.SetWriteDeadline(time.Now().Add(flusherTimeout))
  1599. }
  1600. if err := bw.Flush(); err != nil {
  1601. if nc.err == nil {
  1602. nc.err = err
  1603. }
  1604. }
  1605. conn.SetWriteDeadline(time.Time{})
  1606. }
  1607. nc.mu.Unlock()
  1608. }
  1609. }
  1610. // processPing will send an immediate pong protocol response to the
  1611. // server. The server uses this mechanism to detect dead clients.
  1612. func (nc *Conn) processPing() {
  1613. nc.sendProto(pongProto)
  1614. }
  1615. // processPong is used to process responses to the client's ping
  1616. // messages. We use pings for the flush mechanism as well.
  1617. func (nc *Conn) processPong() {
  1618. var ch chan struct{}
  1619. nc.mu.Lock()
  1620. if len(nc.pongs) > 0 {
  1621. ch = nc.pongs[0]
  1622. nc.pongs = nc.pongs[1:]
  1623. }
  1624. nc.pout = 0
  1625. nc.mu.Unlock()
  1626. if ch != nil {
  1627. ch <- struct{}{}
  1628. }
  1629. }
  1630. // processOK is a placeholder for processing OK messages.
  1631. func (nc *Conn) processOK() {
  1632. // do nothing
  1633. }
  1634. // processInfo is used to parse the info messages sent
  1635. // from the server.
  1636. // This function may update the server pool.
  1637. func (nc *Conn) processInfo(info string) error {
  1638. if info == _EMPTY_ {
  1639. return nil
  1640. }
  1641. ncInfo := serverInfo{}
  1642. if err := json.Unmarshal([]byte(info), &ncInfo); err != nil {
  1643. return err
  1644. }
  1645. // Copy content into connection's info structure.
  1646. nc.info = ncInfo
  1647. // The array could be empty/not present on initial connect,
  1648. // if advertise is disabled on that server, or servers that
  1649. // did not include themselves in the async INFO protocol.
  1650. // If empty, do not remove the implicit servers from the pool.
  1651. if len(ncInfo.ConnectURLs) == 0 {
  1652. return nil
  1653. }
  1654. // Note about pool randomization: when the pool was first created,
  1655. // it was randomized (if allowed). We keep the order the same (removing
  1656. // implicit servers that are no longer sent to us). New URLs are sent
  1657. // to us in no specific order so don't need extra randomization.
  1658. hasNew := false
  1659. // This is what we got from the server we are connected to.
  1660. urls := nc.info.ConnectURLs
  1661. // Transform that to a map for easy lookups
  1662. tmp := make(map[string]struct{}, len(urls))
  1663. for _, curl := range urls {
  1664. tmp[curl] = struct{}{}
  1665. }
  1666. // Walk the pool and removed the implicit servers that are no longer in the
  1667. // given array/map
  1668. sp := nc.srvPool
  1669. for i := 0; i < len(sp); i++ {
  1670. srv := sp[i]
  1671. curl := srv.url.Host
  1672. // Check if this URL is in the INFO protocol
  1673. _, inInfo := tmp[curl]
  1674. // Remove from the temp map so that at the end we are left with only
  1675. // new (or restarted) servers that need to be added to the pool.
  1676. delete(tmp, curl)
  1677. // Keep servers that were set through Options, but also the one that
  1678. // we are currently connected to (even if it is a discovered server).
  1679. if !srv.isImplicit || srv.url == nc.url {
  1680. continue
  1681. }
  1682. if !inInfo {
  1683. // Remove from server pool. Keep current order.
  1684. copy(sp[i:], sp[i+1:])
  1685. nc.srvPool = sp[:len(sp)-1]
  1686. sp = nc.srvPool
  1687. i--
  1688. }
  1689. }
  1690. // If there are any left in the tmp map, these are new (or restarted) servers
  1691. // and need to be added to the pool.
  1692. for curl := range tmp {
  1693. // Before adding, check if this is a new (as in never seen) URL.
  1694. // This is used to figure out if we invoke the DiscoveredServersCB
  1695. if _, present := nc.urls[curl]; !present {
  1696. hasNew = true
  1697. }
  1698. nc.addURLToPool(fmt.Sprintf("nats://%s", curl), true)
  1699. }
  1700. if hasNew && !nc.initc && nc.Opts.DiscoveredServersCB != nil {
  1701. nc.ach <- func() { nc.Opts.DiscoveredServersCB(nc) }
  1702. }
  1703. return nil
  1704. }
  1705. // processAsyncInfo does the same than processInfo, but is called
  1706. // from the parser. Calls processInfo under connection's lock
  1707. // protection.
  1708. func (nc *Conn) processAsyncInfo(info []byte) {
  1709. nc.mu.Lock()
  1710. // Ignore errors, we will simply not update the server pool...
  1711. nc.processInfo(string(info))
  1712. nc.mu.Unlock()
  1713. }
  1714. // LastError reports the last error encountered via the connection.
  1715. // It can be used reliably within ClosedCB in order to find out reason
  1716. // why connection was closed for example.
  1717. func (nc *Conn) LastError() error {
  1718. if nc == nil {
  1719. return ErrInvalidConnection
  1720. }
  1721. nc.mu.Lock()
  1722. err := nc.err
  1723. nc.mu.Unlock()
  1724. return err
  1725. }
  1726. // processErr processes any error messages from the server and
  1727. // sets the connection's lastError.
  1728. func (nc *Conn) processErr(e string) {
  1729. // Trim, remove quotes, convert to lower case.
  1730. e = normalizeErr(e)
  1731. // FIXME(dlc) - process Slow Consumer signals special.
  1732. if e == STALE_CONNECTION {
  1733. nc.processOpErr(ErrStaleConnection)
  1734. } else if strings.HasPrefix(e, PERMISSIONS_ERR) {
  1735. nc.processPermissionsViolation(e)
  1736. } else if strings.HasPrefix(e, AUTHORIZATION_ERR) {
  1737. nc.processAuthorizationViolation(e)
  1738. } else {
  1739. nc.mu.Lock()
  1740. nc.err = errors.New("nats: " + e)
  1741. nc.mu.Unlock()
  1742. nc.Close()
  1743. }
  1744. }
  1745. // kickFlusher will send a bool on a channel to kick the
  1746. // flush Go routine to flush data to the server.
  1747. func (nc *Conn) kickFlusher() {
  1748. if nc.bw != nil {
  1749. select {
  1750. case nc.fch <- struct{}{}:
  1751. default:
  1752. }
  1753. }
  1754. }
  1755. // Publish publishes the data argument to the given subject. The data
  1756. // argument is left untouched and needs to be correctly interpreted on
  1757. // the receiver.
  1758. func (nc *Conn) Publish(subj string, data []byte) error {
  1759. return nc.publish(subj, _EMPTY_, data)
  1760. }
  1761. // PublishMsg publishes the Msg structure, which includes the
  1762. // Subject, an optional Reply and an optional Data field.
  1763. func (nc *Conn) PublishMsg(m *Msg) error {
  1764. if m == nil {
  1765. return ErrInvalidMsg
  1766. }
  1767. return nc.publish(m.Subject, m.Reply, m.Data)
  1768. }
  1769. // PublishRequest will perform a Publish() excpecting a response on the
  1770. // reply subject. Use Request() for automatically waiting for a response
  1771. // inline.
  1772. func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
  1773. return nc.publish(subj, reply, data)
  1774. }
  1775. // Used for handrolled itoa
  1776. const digits = "0123456789"
  1777. // publish is the internal function to publish messages to a nats-server.
  1778. // Sends a protocol data message by queuing into the bufio writer
  1779. // and kicking the flush go routine. These writes should be protected.
  1780. func (nc *Conn) publish(subj, reply string, data []byte) error {
  1781. if nc == nil {
  1782. return ErrInvalidConnection
  1783. }
  1784. if subj == "" {
  1785. return ErrBadSubject
  1786. }
  1787. nc.mu.Lock()
  1788. // Proactively reject payloads over the threshold set by server.
  1789. msgSize := int64(len(data))
  1790. if msgSize > nc.info.MaxPayload {
  1791. nc.mu.Unlock()
  1792. return ErrMaxPayload
  1793. }
  1794. if nc.isClosed() {
  1795. nc.mu.Unlock()
  1796. return ErrConnectionClosed
  1797. }
  1798. // Check if we are reconnecting, and if so check if
  1799. // we have exceeded our reconnect outbound buffer limits.
  1800. if nc.isReconnecting() {
  1801. // Flush to underlying buffer.
  1802. nc.bw.Flush()
  1803. // Check if we are over
  1804. if nc.pending.Len() >= nc.Opts.ReconnectBufSize {
  1805. nc.mu.Unlock()
  1806. return ErrReconnectBufExceeded
  1807. }
  1808. }
  1809. msgh := nc.scratch[:len(_PUB_P_)]
  1810. msgh = append(msgh, subj...)
  1811. msgh = append(msgh, ' ')
  1812. if reply != "" {
  1813. msgh = append(msgh, reply...)
  1814. msgh = append(msgh, ' ')
  1815. }
  1816. // We could be smarter here, but simple loop is ok,
  1817. // just avoid strconv in fast path
  1818. // FIXME(dlc) - Find a better way here.
  1819. // msgh = strconv.AppendInt(msgh, int64(len(data)), 10)
  1820. var b [12]byte
  1821. var i = len(b)
  1822. if len(data) > 0 {
  1823. for l := len(data); l > 0; l /= 10 {
  1824. i -= 1
  1825. b[i] = digits[l%10]
  1826. }
  1827. } else {
  1828. i -= 1
  1829. b[i] = digits[0]
  1830. }
  1831. msgh = append(msgh, b[i:]...)
  1832. msgh = append(msgh, _CRLF_...)
  1833. _, err := nc.bw.Write(msgh)
  1834. if err == nil {
  1835. _, err = nc.bw.Write(data)
  1836. }
  1837. if err == nil {
  1838. _, err = nc.bw.WriteString(_CRLF_)
  1839. }
  1840. if err != nil {
  1841. nc.mu.Unlock()
  1842. return err
  1843. }
  1844. nc.OutMsgs++
  1845. nc.OutBytes += uint64(len(data))
  1846. if len(nc.fch) == 0 {
  1847. nc.kickFlusher()
  1848. }
  1849. nc.mu.Unlock()
  1850. return nil
  1851. }
  1852. // respHandler is the global response handler. It will look up
  1853. // the appropriate channel based on the last token and place
  1854. // the message on the channel if possible.
  1855. func (nc *Conn) respHandler(m *Msg) {
  1856. rt := respToken(m.Subject)
  1857. nc.mu.Lock()
  1858. // Just return if closed.
  1859. if nc.isClosed() {
  1860. nc.mu.Unlock()
  1861. return
  1862. }
  1863. // Grab mch
  1864. mch := nc.respMap[rt]
  1865. // Delete the key regardless, one response only.
  1866. // FIXME(dlc) - should we track responses past 1
  1867. // just statistics wise?
  1868. delete(nc.respMap, rt)
  1869. nc.mu.Unlock()
  1870. // Don't block, let Request timeout instead, mch is
  1871. // buffered and we should delete the key before a
  1872. // second response is processed.
  1873. select {
  1874. case mch <- m:
  1875. default:
  1876. return
  1877. }
  1878. }
  1879. // Create the response subscription we will use for all
  1880. // new style responses. This will be on an _INBOX with an
  1881. // additional terminal token. The subscription will be on
  1882. // a wildcard. Caller is responsible for ensuring this is
  1883. // only called once.
  1884. func (nc *Conn) createRespMux(respSub string) error {
  1885. s, err := nc.Subscribe(respSub, nc.respHandler)
  1886. if err != nil {
  1887. return err
  1888. }
  1889. nc.mu.Lock()
  1890. nc.respMux = s
  1891. nc.mu.Unlock()
  1892. return nil
  1893. }
  1894. // Request will send a request payload and deliver the response message,
  1895. // or an error, including a timeout if no message was received properly.
  1896. func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
  1897. if nc == nil {
  1898. return nil, ErrInvalidConnection
  1899. }
  1900. nc.mu.Lock()
  1901. // If user wants the old style.
  1902. if nc.Opts.UseOldRequestStyle {
  1903. nc.mu.Unlock()
  1904. return nc.oldRequest(subj, data, timeout)
  1905. }
  1906. // Do setup for the new style.
  1907. if nc.respMap == nil {
  1908. // _INBOX wildcard
  1909. nc.respSub = fmt.Sprintf("%s.*", NewInbox())
  1910. nc.respMap = make(map[string]chan *Msg)
  1911. }
  1912. // Create literal Inbox and map to a chan msg.
  1913. mch := make(chan *Msg, RequestChanLen)
  1914. respInbox := nc.newRespInbox()
  1915. token := respToken(respInbox)
  1916. nc.respMap[token] = mch
  1917. createSub := nc.respMux == nil
  1918. ginbox := nc.respSub
  1919. nc.mu.Unlock()
  1920. if createSub {
  1921. // Make sure scoped subscription is setup only once.
  1922. var err error
  1923. nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
  1924. if err != nil {
  1925. return nil, err
  1926. }
  1927. }
  1928. if err := nc.PublishRequest(subj, respInbox, data); err != nil {
  1929. return nil, err
  1930. }
  1931. t := globalTimerPool.Get(timeout)
  1932. defer globalTimerPool.Put(t)
  1933. var ok bool
  1934. var msg *Msg
  1935. select {
  1936. case msg, ok = <-mch:
  1937. if !ok {
  1938. return nil, ErrConnectionClosed
  1939. }
  1940. case <-t.C:
  1941. nc.mu.Lock()
  1942. delete(nc.respMap, token)
  1943. nc.mu.Unlock()
  1944. return nil, ErrTimeout
  1945. }
  1946. return msg, nil
  1947. }
  1948. // oldRequest will create an Inbox and perform a Request() call
  1949. // with the Inbox reply and return the first reply received.
  1950. // This is optimized for the case of multiple responses.
  1951. func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Msg, error) {
  1952. inbox := NewInbox()
  1953. ch := make(chan *Msg, RequestChanLen)
  1954. s, err := nc.subscribe(inbox, _EMPTY_, nil, ch)
  1955. if err != nil {
  1956. return nil, err
  1957. }
  1958. s.AutoUnsubscribe(1)
  1959. defer s.Unsubscribe()
  1960. err = nc.PublishRequest(subj, inbox, data)
  1961. if err != nil {
  1962. return nil, err
  1963. }
  1964. return s.NextMsg(timeout)
  1965. }
  1966. // InboxPrefix is the prefix for all inbox subjects.
  1967. const InboxPrefix = "_INBOX."
  1968. const inboxPrefixLen = len(InboxPrefix)
  1969. const respInboxPrefixLen = inboxPrefixLen + nuidSize + 1
  1970. // NewInbox will return an inbox string which can be used for directed replies from
  1971. // subscribers. These are guaranteed to be unique, but can be shared and subscribed
  1972. // to by others.
  1973. func NewInbox() string {
  1974. var b [inboxPrefixLen + nuidSize]byte
  1975. pres := b[:inboxPrefixLen]
  1976. copy(pres, InboxPrefix)
  1977. ns := b[inboxPrefixLen:]
  1978. copy(ns, nuid.Next())
  1979. return string(b[:])
  1980. }
  1981. // Creates a new literal response subject that will trigger
  1982. // the global subscription handler.
  1983. func (nc *Conn) newRespInbox() string {
  1984. var b [inboxPrefixLen + (2 * nuidSize) + 1]byte
  1985. pres := b[:respInboxPrefixLen]
  1986. copy(pres, nc.respSub)
  1987. ns := b[respInboxPrefixLen:]
  1988. copy(ns, nuid.Next())
  1989. return string(b[:])
  1990. }
  1991. // respToken will return the last token of a literal response inbox
  1992. // which we use for the message channel lookup.
  1993. func respToken(respInbox string) string {
  1994. return respInbox[respInboxPrefixLen:]
  1995. }
  1996. // Subscribe will express interest in the given subject. The subject
  1997. // can have wildcards (partial:*, full:>). Messages will be delivered
  1998. // to the associated MsgHandler. If no MsgHandler is given, the
  1999. // subscription is a synchronous subscription and can be polled via
  2000. // Subscription.NextMsg().
  2001. func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
  2002. return nc.subscribe(subj, _EMPTY_, cb, nil)
  2003. }
  2004. // ChanSubscribe will place all messages received on the channel.
  2005. // You should not close the channel until sub.Unsubscribe() has been called.
  2006. func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
  2007. return nc.subscribe(subj, _EMPTY_, nil, ch)
  2008. }
  2009. // ChanQueueSubscribe will place all messages received on the channel.
  2010. // You should not close the channel until sub.Unsubscribe() has been called.
  2011. func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
  2012. return nc.subscribe(subj, group, nil, ch)
  2013. }
  2014. // SubscribeSync is syntactic sugar for Subscribe(subject, nil).
  2015. func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
  2016. if nc == nil {
  2017. return nil, ErrInvalidConnection
  2018. }
  2019. mch := make(chan *Msg, nc.Opts.SubChanLen)
  2020. s, e := nc.subscribe(subj, _EMPTY_, nil, mch)
  2021. if s != nil {
  2022. s.typ = SyncSubscription
  2023. }
  2024. return s, e
  2025. }
  2026. // QueueSubscribe creates an asynchronous queue subscriber on the given subject.
  2027. // All subscribers with the same queue name will form the queue group and
  2028. // only one member of the group will be selected to receive any given
  2029. // message asynchronously.
  2030. func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
  2031. return nc.subscribe(subj, queue, cb, nil)
  2032. }
  2033. // QueueSubscribeSync creates a synchronous queue subscriber on the given
  2034. // subject. All subscribers with the same queue name will form the queue
  2035. // group and only one member of the group will be selected to receive any
  2036. // given message synchronously.
  2037. func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
  2038. mch := make(chan *Msg, nc.Opts.SubChanLen)
  2039. s, e := nc.subscribe(subj, queue, nil, mch)
  2040. if s != nil {
  2041. s.typ = SyncSubscription
  2042. }
  2043. return s, e
  2044. }
  2045. // QueueSubscribeSyncWithChan is syntactic sugar for ChanQueueSubscribe(subject, group, ch).
  2046. func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
  2047. return nc.subscribe(subj, queue, nil, ch)
  2048. }
  2049. // subscribe is the internal subscribe function that indicates interest in a subject.
  2050. func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Subscription, error) {
  2051. if nc == nil {
  2052. return nil, ErrInvalidConnection
  2053. }
  2054. nc.mu.Lock()
  2055. // ok here, but defer is generally expensive
  2056. defer nc.mu.Unlock()
  2057. defer nc.kickFlusher()
  2058. // Check for some error conditions.
  2059. if nc.isClosed() {
  2060. return nil, ErrConnectionClosed
  2061. }
  2062. if cb == nil && ch == nil {
  2063. return nil, ErrBadSubscription
  2064. }
  2065. sub := &Subscription{Subject: subj, Queue: queue, mcb: cb, conn: nc}
  2066. // Set pending limits.
  2067. sub.pMsgsLimit = DefaultSubPendingMsgsLimit
  2068. sub.pBytesLimit = DefaultSubPendingBytesLimit
  2069. // If we have an async callback, start up a sub specific
  2070. // Go routine to deliver the messages.
  2071. if cb != nil {
  2072. sub.typ = AsyncSubscription
  2073. sub.pCond = sync.NewCond(&sub.mu)
  2074. go nc.waitForMsgs(sub)
  2075. } else {
  2076. sub.typ = ChanSubscription
  2077. sub.mch = ch
  2078. }
  2079. nc.subsMu.Lock()
  2080. nc.ssid++
  2081. sub.sid = nc.ssid
  2082. nc.subs[sub.sid] = sub
  2083. nc.subsMu.Unlock()
  2084. // We will send these for all subs when we reconnect
  2085. // so that we can suppress here.
  2086. if !nc.isReconnecting() {
  2087. fmt.Fprintf(nc.bw, subProto, subj, queue, sub.sid)
  2088. }
  2089. return sub, nil
  2090. }
  2091. // Lock for nc should be held here upon entry
  2092. func (nc *Conn) removeSub(s *Subscription) {
  2093. nc.subsMu.Lock()
  2094. delete(nc.subs, s.sid)
  2095. nc.subsMu.Unlock()
  2096. s.mu.Lock()
  2097. defer s.mu.Unlock()
  2098. // Release callers on NextMsg for SyncSubscription only
  2099. if s.mch != nil && s.typ == SyncSubscription {
  2100. close(s.mch)
  2101. }
  2102. s.mch = nil
  2103. // Mark as invalid
  2104. s.conn = nil
  2105. s.closed = true
  2106. if s.pCond != nil {
  2107. s.pCond.Broadcast()
  2108. }
  2109. }
  2110. // SubscriptionType is the type of the Subscription.
  2111. type SubscriptionType int
  2112. // The different types of subscription types.
  2113. const (
  2114. AsyncSubscription = SubscriptionType(iota)
  2115. SyncSubscription
  2116. ChanSubscription
  2117. NilSubscription
  2118. )
  2119. // Type returns the type of Subscription.
  2120. func (s *Subscription) Type() SubscriptionType {
  2121. if s == nil {
  2122. return NilSubscription
  2123. }
  2124. s.mu.Lock()
  2125. defer s.mu.Unlock()
  2126. return s.typ
  2127. }
  2128. // IsValid returns a boolean indicating whether the subscription
  2129. // is still active. This will return false if the subscription has
  2130. // already been closed.
  2131. func (s *Subscription) IsValid() bool {
  2132. if s == nil {
  2133. return false
  2134. }
  2135. s.mu.Lock()
  2136. defer s.mu.Unlock()
  2137. return s.conn != nil
  2138. }
  2139. // Unsubscribe will remove interest in the given subject.
  2140. func (s *Subscription) Unsubscribe() error {
  2141. if s == nil {
  2142. return ErrBadSubscription
  2143. }
  2144. s.mu.Lock()
  2145. conn := s.conn
  2146. s.mu.Unlock()
  2147. if conn == nil {
  2148. return ErrBadSubscription
  2149. }
  2150. return conn.unsubscribe(s, 0)
  2151. }
  2152. // AutoUnsubscribe will issue an automatic Unsubscribe that is
  2153. // processed by the server when max messages have been received.
  2154. // This can be useful when sending a request to an unknown number
  2155. // of subscribers. Request() uses this functionality.
  2156. func (s *Subscription) AutoUnsubscribe(max int) error {
  2157. if s == nil {
  2158. return ErrBadSubscription
  2159. }
  2160. s.mu.Lock()
  2161. conn := s.conn
  2162. s.mu.Unlock()
  2163. if conn == nil {
  2164. return ErrBadSubscription
  2165. }
  2166. return conn.unsubscribe(s, max)
  2167. }
  2168. // unsubscribe performs the low level unsubscribe to the server.
  2169. // Use Subscription.Unsubscribe()
  2170. func (nc *Conn) unsubscribe(sub *Subscription, max int) error {
  2171. nc.mu.Lock()
  2172. // ok here, but defer is expensive
  2173. defer nc.mu.Unlock()
  2174. defer nc.kickFlusher()
  2175. if nc.isClosed() {
  2176. return ErrConnectionClosed
  2177. }
  2178. nc.subsMu.RLock()
  2179. s := nc.subs[sub.sid]
  2180. nc.subsMu.RUnlock()
  2181. // Already unsubscribed
  2182. if s == nil {
  2183. return nil
  2184. }
  2185. maxStr := _EMPTY_
  2186. if max > 0 {
  2187. s.max = uint64(max)
  2188. maxStr = strconv.Itoa(max)
  2189. } else {
  2190. nc.removeSub(s)
  2191. }
  2192. // We will send these for all subs when we reconnect
  2193. // so that we can suppress here.
  2194. if !nc.isReconnecting() {
  2195. fmt.Fprintf(nc.bw, unsubProto, s.sid, maxStr)
  2196. }
  2197. return nil
  2198. }
  2199. // NextMsg will return the next message available to a synchronous subscriber
  2200. // or block until one is available. A timeout can be used to return when no
  2201. // message has been delivered.
  2202. func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
  2203. if s == nil {
  2204. return nil, ErrBadSubscription
  2205. }
  2206. s.mu.Lock()
  2207. err := s.validateNextMsgState()
  2208. if err != nil {
  2209. s.mu.Unlock()
  2210. return nil, err
  2211. }
  2212. // snapshot
  2213. mch := s.mch
  2214. s.mu.Unlock()
  2215. var ok bool
  2216. var msg *Msg
  2217. t := globalTimerPool.Get(timeout)
  2218. defer globalTimerPool.Put(t)
  2219. select {
  2220. case msg, ok = <-mch:
  2221. if !ok {
  2222. return nil, ErrConnectionClosed
  2223. }
  2224. err := s.processNextMsgDelivered(msg)
  2225. if err != nil {
  2226. return nil, err
  2227. }
  2228. case <-t.C:
  2229. return nil, ErrTimeout
  2230. }
  2231. return msg, nil
  2232. }
  2233. // validateNextMsgState checks whether the subscription is in a valid
  2234. // state to call NextMsg and be delivered another message synchronously.
  2235. // This should be called while holding the lock.
  2236. func (s *Subscription) validateNextMsgState() error {
  2237. if s.connClosed {
  2238. return ErrConnectionClosed
  2239. }
  2240. if s.mch == nil {
  2241. if s.max > 0 && s.delivered >= s.max {
  2242. return ErrMaxMessages
  2243. } else if s.closed {
  2244. return ErrBadSubscription
  2245. }
  2246. }
  2247. if s.mcb != nil {
  2248. return ErrSyncSubRequired
  2249. }
  2250. if s.sc {
  2251. s.sc = false
  2252. return ErrSlowConsumer
  2253. }
  2254. return nil
  2255. }
  2256. // processNextMsgDelivered takes a message and applies the needed
  2257. // accounting to the stats from the subscription, returning an
  2258. // error in case we have the maximum number of messages have been
  2259. // delivered already. It should not be called while holding the lock.
  2260. func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
  2261. s.mu.Lock()
  2262. nc := s.conn
  2263. max := s.max
  2264. // Update some stats.
  2265. s.delivered++
  2266. delivered := s.delivered
  2267. if s.typ == SyncSubscription {
  2268. s.pMsgs--
  2269. s.pBytes -= len(msg.Data)
  2270. }
  2271. s.mu.Unlock()
  2272. if max > 0 {
  2273. if delivered > max {
  2274. return ErrMaxMessages
  2275. }
  2276. // Remove subscription if we have reached max.
  2277. if delivered == max {
  2278. nc.mu.Lock()
  2279. nc.removeSub(s)
  2280. nc.mu.Unlock()
  2281. }
  2282. }
  2283. return nil
  2284. }
  2285. // Queued returns the number of queued messages in the client for this subscription.
  2286. // DEPRECATED: Use Pending()
  2287. func (s *Subscription) QueuedMsgs() (int, error) {
  2288. m, _, err := s.Pending()
  2289. return int(m), err
  2290. }
  2291. // Pending returns the number of queued messages and queued bytes in the client for this subscription.
  2292. func (s *Subscription) Pending() (int, int, error) {
  2293. if s == nil {
  2294. return -1, -1, ErrBadSubscription
  2295. }
  2296. s.mu.Lock()
  2297. defer s.mu.Unlock()
  2298. if s.conn == nil {
  2299. return -1, -1, ErrBadSubscription
  2300. }
  2301. if s.typ == ChanSubscription {
  2302. return -1, -1, ErrTypeSubscription
  2303. }
  2304. return s.pMsgs, s.pBytes, nil
  2305. }
  2306. // MaxPending returns the maximum number of queued messages and queued bytes seen so far.
  2307. func (s *Subscription) MaxPending() (int, int, error) {
  2308. if s == nil {
  2309. return -1, -1, ErrBadSubscription
  2310. }
  2311. s.mu.Lock()
  2312. defer s.mu.Unlock()
  2313. if s.conn == nil {
  2314. return -1, -1, ErrBadSubscription
  2315. }
  2316. if s.typ == ChanSubscription {
  2317. return -1, -1, ErrTypeSubscription
  2318. }
  2319. return s.pMsgsMax, s.pBytesMax, nil
  2320. }
  2321. // ClearMaxPending resets the maximums seen so far.
  2322. func (s *Subscription) ClearMaxPending() error {
  2323. if s == nil {
  2324. return ErrBadSubscription
  2325. }
  2326. s.mu.Lock()
  2327. defer s.mu.Unlock()
  2328. if s.conn == nil {
  2329. return ErrBadSubscription
  2330. }
  2331. if s.typ == ChanSubscription {
  2332. return ErrTypeSubscription
  2333. }
  2334. s.pMsgsMax, s.pBytesMax = 0, 0
  2335. return nil
  2336. }
  2337. // Pending Limits
  2338. const (
  2339. DefaultSubPendingMsgsLimit = 65536
  2340. DefaultSubPendingBytesLimit = 65536 * 1024
  2341. )
  2342. // PendingLimits returns the current limits for this subscription.
  2343. // If no error is returned, a negative value indicates that the
  2344. // given metric is not limited.
  2345. func (s *Subscription) PendingLimits() (int, int, error) {
  2346. if s == nil {
  2347. return -1, -1, ErrBadSubscription
  2348. }
  2349. s.mu.Lock()
  2350. defer s.mu.Unlock()
  2351. if s.conn == nil {
  2352. return -1, -1, ErrBadSubscription
  2353. }
  2354. if s.typ == ChanSubscription {
  2355. return -1, -1, ErrTypeSubscription
  2356. }
  2357. return s.pMsgsLimit, s.pBytesLimit, nil
  2358. }
  2359. // SetPendingLimits sets the limits for pending msgs and bytes for this subscription.
  2360. // Zero is not allowed. Any negative value means that the given metric is not limited.
  2361. func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error {
  2362. if s == nil {
  2363. return ErrBadSubscription
  2364. }
  2365. s.mu.Lock()
  2366. defer s.mu.Unlock()
  2367. if s.conn == nil {
  2368. return ErrBadSubscription
  2369. }
  2370. if s.typ == ChanSubscription {
  2371. return ErrTypeSubscription
  2372. }
  2373. if msgLimit == 0 || bytesLimit == 0 {
  2374. return ErrInvalidArg
  2375. }
  2376. s.pMsgsLimit, s.pBytesLimit = msgLimit, bytesLimit
  2377. return nil
  2378. }
  2379. // Delivered returns the number of delivered messages for this subscription.
  2380. func (s *Subscription) Delivered() (int64, error) {
  2381. if s == nil {
  2382. return -1, ErrBadSubscription
  2383. }
  2384. s.mu.Lock()
  2385. defer s.mu.Unlock()
  2386. if s.conn == nil {
  2387. return -1, ErrBadSubscription
  2388. }
  2389. return int64(s.delivered), nil
  2390. }
  2391. // Dropped returns the number of known dropped messages for this subscription.
  2392. // This will correspond to messages dropped by violations of PendingLimits. If
  2393. // the server declares the connection a SlowConsumer, this number may not be
  2394. // valid.
  2395. func (s *Subscription) Dropped() (int, error) {
  2396. if s == nil {
  2397. return -1, ErrBadSubscription
  2398. }
  2399. s.mu.Lock()
  2400. defer s.mu.Unlock()
  2401. if s.conn == nil {
  2402. return -1, ErrBadSubscription
  2403. }
  2404. return s.dropped, nil
  2405. }
  2406. // FIXME: This is a hack
  2407. // removeFlushEntry is needed when we need to discard queued up responses
  2408. // for our pings as part of a flush call. This happens when we have a flush
  2409. // call outstanding and we call close.
  2410. func (nc *Conn) removeFlushEntry(ch chan struct{}) bool {
  2411. nc.mu.Lock()
  2412. defer nc.mu.Unlock()
  2413. if nc.pongs == nil {
  2414. return false
  2415. }
  2416. for i, c := range nc.pongs {
  2417. if c == ch {
  2418. nc.pongs[i] = nil
  2419. return true
  2420. }
  2421. }
  2422. return false
  2423. }
  2424. // The lock must be held entering this function.
  2425. func (nc *Conn) sendPing(ch chan struct{}) {
  2426. nc.pongs = append(nc.pongs, ch)
  2427. nc.bw.WriteString(pingProto)
  2428. // Flush in place.
  2429. nc.bw.Flush()
  2430. }
  2431. // This will fire periodically and send a client origin
  2432. // ping to the server. Will also check that we have received
  2433. // responses from the server.
  2434. func (nc *Conn) processPingTimer() {
  2435. nc.mu.Lock()
  2436. if nc.status != CONNECTED {
  2437. nc.mu.Unlock()
  2438. return
  2439. }
  2440. // Check for violation
  2441. nc.pout++
  2442. if nc.pout > nc.Opts.MaxPingsOut {
  2443. nc.mu.Unlock()
  2444. nc.processOpErr(ErrStaleConnection)
  2445. return
  2446. }
  2447. nc.sendPing(nil)
  2448. nc.ptmr.Reset(nc.Opts.PingInterval)
  2449. nc.mu.Unlock()
  2450. }
  2451. // FlushTimeout allows a Flush operation to have an associated timeout.
  2452. func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) {
  2453. if nc == nil {
  2454. return ErrInvalidConnection
  2455. }
  2456. if timeout <= 0 {
  2457. return ErrBadTimeout
  2458. }
  2459. nc.mu.Lock()
  2460. if nc.isClosed() {
  2461. nc.mu.Unlock()
  2462. return ErrConnectionClosed
  2463. }
  2464. t := globalTimerPool.Get(timeout)
  2465. defer globalTimerPool.Put(t)
  2466. // Create a buffered channel to prevent chan send to block
  2467. // in processPong() if this code here times out just when
  2468. // PONG was received.
  2469. ch := make(chan struct{}, 1)
  2470. nc.sendPing(ch)
  2471. nc.mu.Unlock()
  2472. select {
  2473. case _, ok := <-ch:
  2474. if !ok {
  2475. err = ErrConnectionClosed
  2476. } else {
  2477. close(ch)
  2478. }
  2479. case <-t.C:
  2480. err = ErrTimeout
  2481. }
  2482. if err != nil {
  2483. nc.removeFlushEntry(ch)
  2484. }
  2485. return
  2486. }
  2487. // Flush will perform a round trip to the server and return when it
  2488. // receives the internal reply.
  2489. func (nc *Conn) Flush() error {
  2490. return nc.FlushTimeout(60 * time.Second)
  2491. }
  2492. // Buffered will return the number of bytes buffered to be sent to the server.
  2493. // FIXME(dlc) take into account disconnected state.
  2494. func (nc *Conn) Buffered() (int, error) {
  2495. nc.mu.Lock()
  2496. defer nc.mu.Unlock()
  2497. if nc.isClosed() || nc.bw == nil {
  2498. return -1, ErrConnectionClosed
  2499. }
  2500. return nc.bw.Buffered(), nil
  2501. }
  2502. // resendSubscriptions will send our subscription state back to the
  2503. // server. Used in reconnects
  2504. func (nc *Conn) resendSubscriptions() {
  2505. // Since we are going to send protocols to the server, we don't want to
  2506. // be holding the subsMu lock (which is used in processMsg). So copy
  2507. // the subscriptions in a temporary array.
  2508. nc.subsMu.RLock()
  2509. subs := make([]*Subscription, 0, len(nc.subs))
  2510. for _, s := range nc.subs {
  2511. subs = append(subs, s)
  2512. }
  2513. nc.subsMu.RUnlock()
  2514. for _, s := range subs {
  2515. adjustedMax := uint64(0)
  2516. s.mu.Lock()
  2517. if s.max > 0 {
  2518. if s.delivered < s.max {
  2519. adjustedMax = s.max - s.delivered
  2520. }
  2521. // adjustedMax could be 0 here if the number of delivered msgs
  2522. // reached the max, if so unsubscribe.
  2523. if adjustedMax == 0 {
  2524. s.mu.Unlock()
  2525. fmt.Fprintf(nc.bw, unsubProto, s.sid, _EMPTY_)
  2526. continue
  2527. }
  2528. }
  2529. s.mu.Unlock()
  2530. fmt.Fprintf(nc.bw, subProto, s.Subject, s.Queue, s.sid)
  2531. if adjustedMax > 0 {
  2532. maxStr := strconv.Itoa(int(adjustedMax))
  2533. fmt.Fprintf(nc.bw, unsubProto, s.sid, maxStr)
  2534. }
  2535. }
  2536. }
  2537. // This will clear any pending flush calls and release pending calls.
  2538. // Lock is assumed to be held by the caller.
  2539. func (nc *Conn) clearPendingFlushCalls() {
  2540. // Clear any queued pongs, e.g. pending flush calls.
  2541. for _, ch := range nc.pongs {
  2542. if ch != nil {
  2543. close(ch)
  2544. }
  2545. }
  2546. nc.pongs = nil
  2547. }
  2548. // This will clear any pending Request calls.
  2549. // Lock is assumed to be held by the caller.
  2550. func (nc *Conn) clearPendingRequestCalls() {
  2551. if nc.respMap == nil {
  2552. return
  2553. }
  2554. for key, ch := range nc.respMap {
  2555. if ch != nil {
  2556. close(ch)
  2557. delete(nc.respMap, key)
  2558. }
  2559. }
  2560. }
  2561. // Low level close call that will do correct cleanup and set
  2562. // desired status. Also controls whether user defined callbacks
  2563. // will be triggered. The lock should not be held entering this
  2564. // function. This function will handle the locking manually.
  2565. func (nc *Conn) close(status Status, doCBs bool) {
  2566. nc.mu.Lock()
  2567. if nc.isClosed() {
  2568. nc.status = status
  2569. nc.mu.Unlock()
  2570. return
  2571. }
  2572. nc.status = CLOSED
  2573. // Kick the Go routines so they fall out.
  2574. nc.kickFlusher()
  2575. nc.mu.Unlock()
  2576. nc.mu.Lock()
  2577. // Clear any queued pongs, e.g. pending flush calls.
  2578. nc.clearPendingFlushCalls()
  2579. // Clear any queued and blocking Requests.
  2580. nc.clearPendingRequestCalls()
  2581. if nc.ptmr != nil {
  2582. nc.ptmr.Stop()
  2583. }
  2584. // Go ahead and make sure we have flushed the outbound
  2585. if nc.conn != nil {
  2586. nc.bw.Flush()
  2587. defer nc.conn.Close()
  2588. }
  2589. // Close sync subscriber channels and release any
  2590. // pending NextMsg() calls.
  2591. nc.subsMu.Lock()
  2592. for _, s := range nc.subs {
  2593. s.mu.Lock()
  2594. // Release callers on NextMsg for SyncSubscription only
  2595. if s.mch != nil && s.typ == SyncSubscription {
  2596. close(s.mch)
  2597. }
  2598. s.mch = nil
  2599. // Mark as invalid, for signaling to deliverMsgs
  2600. s.closed = true
  2601. // Mark connection closed in subscription
  2602. s.connClosed = true
  2603. // If we have an async subscription, signals it to exit
  2604. if s.typ == AsyncSubscription && s.pCond != nil {
  2605. s.pCond.Signal()
  2606. }
  2607. s.mu.Unlock()
  2608. }
  2609. nc.subs = nil
  2610. nc.subsMu.Unlock()
  2611. // Perform appropriate callback if needed for a disconnect.
  2612. if doCBs {
  2613. if nc.Opts.DisconnectedCB != nil && nc.conn != nil {
  2614. nc.ach <- func() { nc.Opts.DisconnectedCB(nc) }
  2615. }
  2616. if nc.Opts.ClosedCB != nil {
  2617. nc.ach <- func() { nc.Opts.ClosedCB(nc) }
  2618. }
  2619. nc.ach <- nc.closeAsyncFunc()
  2620. }
  2621. nc.status = status
  2622. nc.mu.Unlock()
  2623. }
  2624. // Close will close the connection to the server. This call will release
  2625. // all blocking calls, such as Flush() and NextMsg()
  2626. func (nc *Conn) Close() {
  2627. nc.close(CLOSED, true)
  2628. }
  2629. // IsClosed tests if a Conn has been closed.
  2630. func (nc *Conn) IsClosed() bool {
  2631. nc.mu.Lock()
  2632. defer nc.mu.Unlock()
  2633. return nc.isClosed()
  2634. }
  2635. // IsReconnecting tests if a Conn is reconnecting.
  2636. func (nc *Conn) IsReconnecting() bool {
  2637. nc.mu.Lock()
  2638. defer nc.mu.Unlock()
  2639. return nc.isReconnecting()
  2640. }
  2641. // IsConnected tests if a Conn is connected.
  2642. func (nc *Conn) IsConnected() bool {
  2643. nc.mu.Lock()
  2644. defer nc.mu.Unlock()
  2645. return nc.isConnected()
  2646. }
  2647. // caller must lock
  2648. func (nc *Conn) getServers(implicitOnly bool) []string {
  2649. poolSize := len(nc.srvPool)
  2650. var servers = make([]string, 0)
  2651. for i := 0; i < poolSize; i++ {
  2652. if implicitOnly && !nc.srvPool[i].isImplicit {
  2653. continue
  2654. }
  2655. url := nc.srvPool[i].url
  2656. servers = append(servers, fmt.Sprintf("%s://%s", url.Scheme, url.Host))
  2657. }
  2658. return servers
  2659. }
  2660. // Servers returns the list of known server urls, including additional
  2661. // servers discovered after a connection has been established. If
  2662. // authentication is enabled, use UserInfo or Token when connecting with
  2663. // these urls.
  2664. func (nc *Conn) Servers() []string {
  2665. nc.mu.Lock()
  2666. defer nc.mu.Unlock()
  2667. return nc.getServers(false)
  2668. }
  2669. // DiscoveredServers returns only the server urls that have been discovered
  2670. // after a connection has been established. If authentication is enabled,
  2671. // use UserInfo or Token when connecting with these urls.
  2672. func (nc *Conn) DiscoveredServers() []string {
  2673. nc.mu.Lock()
  2674. defer nc.mu.Unlock()
  2675. return nc.getServers(true)
  2676. }
  2677. // Status returns the current state of the connection.
  2678. func (nc *Conn) Status() Status {
  2679. nc.mu.Lock()
  2680. defer nc.mu.Unlock()
  2681. return nc.status
  2682. }
  2683. // Test if Conn has been closed Lock is assumed held.
  2684. func (nc *Conn) isClosed() bool {
  2685. return nc.status == CLOSED
  2686. }
  2687. // Test if Conn is in the process of connecting
  2688. func (nc *Conn) isConnecting() bool {
  2689. return nc.status == CONNECTING
  2690. }
  2691. // Test if Conn is being reconnected.
  2692. func (nc *Conn) isReconnecting() bool {
  2693. return nc.status == RECONNECTING
  2694. }
  2695. // Test if Conn is connected or connecting.
  2696. func (nc *Conn) isConnected() bool {
  2697. return nc.status == CONNECTED
  2698. }
  2699. // Stats will return a race safe copy of the Statistics section for the connection.
  2700. func (nc *Conn) Stats() Statistics {
  2701. // Stats are updated either under connection's mu or subsMu mutexes.
  2702. // Lock both to safely get them.
  2703. nc.mu.Lock()
  2704. nc.subsMu.RLock()
  2705. stats := Statistics{
  2706. InMsgs: nc.InMsgs,
  2707. InBytes: nc.InBytes,
  2708. OutMsgs: nc.OutMsgs,
  2709. OutBytes: nc.OutBytes,
  2710. Reconnects: nc.Reconnects,
  2711. }
  2712. nc.subsMu.RUnlock()
  2713. nc.mu.Unlock()
  2714. return stats
  2715. }
  2716. // MaxPayload returns the size limit that a message payload can have.
  2717. // This is set by the server configuration and delivered to the client
  2718. // upon connect.
  2719. func (nc *Conn) MaxPayload() int64 {
  2720. nc.mu.Lock()
  2721. defer nc.mu.Unlock()
  2722. return nc.info.MaxPayload
  2723. }
  2724. // AuthRequired will return if the connected server requires authorization.
  2725. func (nc *Conn) AuthRequired() bool {
  2726. nc.mu.Lock()
  2727. defer nc.mu.Unlock()
  2728. return nc.info.AuthRequired
  2729. }
  2730. // TLSRequired will return if the connected server requires TLS connections.
  2731. func (nc *Conn) TLSRequired() bool {
  2732. nc.mu.Lock()
  2733. defer nc.mu.Unlock()
  2734. return nc.info.TLSRequired
  2735. }
  2736. // Barrier schedules the given function `f` to all registered asynchronous
  2737. // subscriptions.
  2738. // Only the last subscription to see this barrier will invoke the function.
  2739. // If no subscription is registered at the time of this call, `f()` is invoked
  2740. // right away.
  2741. // ErrConnectionClosed is returned if the connection is closed prior to
  2742. // the call.
  2743. func (nc *Conn) Barrier(f func()) error {
  2744. nc.mu.Lock()
  2745. if nc.isClosed() {
  2746. nc.mu.Unlock()
  2747. return ErrConnectionClosed
  2748. }
  2749. nc.subsMu.Lock()
  2750. // Need to figure out how many non chan subscriptions there are
  2751. numSubs := 0
  2752. for _, sub := range nc.subs {
  2753. if sub.typ == AsyncSubscription {
  2754. numSubs++
  2755. }
  2756. }
  2757. if numSubs == 0 {
  2758. nc.subsMu.Unlock()
  2759. nc.mu.Unlock()
  2760. f()
  2761. return nil
  2762. }
  2763. barrier := &barrierInfo{refs: int64(numSubs), f: f}
  2764. for _, sub := range nc.subs {
  2765. sub.mu.Lock()
  2766. if sub.mch == nil {
  2767. msg := &Msg{barrier: barrier}
  2768. // Push onto the async pList
  2769. if sub.pTail != nil {
  2770. sub.pTail.next = msg
  2771. } else {
  2772. sub.pHead = msg
  2773. sub.pCond.Signal()
  2774. }
  2775. sub.pTail = msg
  2776. }
  2777. sub.mu.Unlock()
  2778. }
  2779. nc.subsMu.Unlock()
  2780. nc.mu.Unlock()
  2781. return nil
  2782. }