1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172 |
- // Copyright 2012-2018 The NATS Authors
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- // A Go client for the NATS messaging system (https://nats.io).
- package nats
- import (
- "bufio"
- "bytes"
- "crypto/tls"
- "crypto/x509"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "math/rand"
- "net"
- "net/url"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/nats-io/go-nats/util"
- "github.com/nats-io/nuid"
- )
- // Default Constants
- const (
- Version = "1.5.0"
- DefaultURL = "nats://localhost:4222"
- DefaultPort = 4222
- DefaultMaxReconnect = 60
- DefaultReconnectWait = 2 * time.Second
- DefaultTimeout = 2 * time.Second
- DefaultPingInterval = 2 * time.Minute
- DefaultMaxPingOut = 2
- DefaultMaxChanLen = 8192 // 8k
- DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB
- RequestChanLen = 8
- LangString = "go"
- )
- // STALE_CONNECTION is for detection and proper handling of stale connections.
- const STALE_CONNECTION = "stale connection"
- // PERMISSIONS_ERR is for when nats server subject authorization has failed.
- const PERMISSIONS_ERR = "permissions violation"
- // AUTHORIZATION_ERR is for when nats server user authorization has failed.
- const AUTHORIZATION_ERR = "authorization violation"
- // Errors
- var (
- ErrConnectionClosed = errors.New("nats: connection closed")
- ErrSecureConnRequired = errors.New("nats: secure connection required")
- ErrSecureConnWanted = errors.New("nats: secure connection not available")
- ErrBadSubscription = errors.New("nats: invalid subscription")
- ErrTypeSubscription = errors.New("nats: invalid subscription type")
- ErrBadSubject = errors.New("nats: invalid subject")
- ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
- ErrTimeout = errors.New("nats: timeout")
- ErrBadTimeout = errors.New("nats: timeout invalid")
- ErrAuthorization = errors.New("nats: authorization violation")
- ErrNoServers = errors.New("nats: no servers available for connection")
- ErrJsonParse = errors.New("nats: connect message, json parse error")
- ErrChanArg = errors.New("nats: argument needs to be a channel type")
- ErrMaxPayload = errors.New("nats: maximum payload exceeded")
- ErrMaxMessages = errors.New("nats: maximum messages delivered")
- ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
- ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
- ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
- ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
- ErrInvalidConnection = errors.New("nats: invalid connection")
- ErrInvalidMsg = errors.New("nats: invalid message or message nil")
- ErrInvalidArg = errors.New("nats: invalid argument")
- ErrInvalidContext = errors.New("nats: invalid context")
- ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
- )
- // GetDefaultOptions returns default configuration options for the client.
- func GetDefaultOptions() Options {
- return Options{
- AllowReconnect: true,
- MaxReconnect: DefaultMaxReconnect,
- ReconnectWait: DefaultReconnectWait,
- Timeout: DefaultTimeout,
- PingInterval: DefaultPingInterval,
- MaxPingsOut: DefaultMaxPingOut,
- SubChanLen: DefaultMaxChanLen,
- ReconnectBufSize: DefaultReconnectBufSize,
- }
- }
- // DEPRECATED: Use GetDefaultOptions() instead.
- // DefaultOptions is not safe for use by multiple clients.
- // For details see #308.
- var DefaultOptions = GetDefaultOptions()
- // Status represents the state of the connection.
- type Status int
- const (
- DISCONNECTED = Status(iota)
- CONNECTED
- CLOSED
- RECONNECTING
- CONNECTING
- )
- // ConnHandler is used for asynchronous events such as
- // disconnected and closed connections.
- type ConnHandler func(*Conn)
- // ErrHandler is used to process asynchronous errors encountered
- // while processing inbound messages.
- type ErrHandler func(*Conn, *Subscription, error)
- // asyncCB is used to preserve order for async callbacks.
- type asyncCB func()
- // Option is a function on the options for a connection.
- type Option func(*Options) error
- // CustomDialer can be used to specify any dialer, not necessarily
- // a *net.Dialer.
- type CustomDialer interface {
- Dial(network, address string) (net.Conn, error)
- }
- // Options can be used to create a customized connection.
- type Options struct {
- // Url represents a single NATS server url to which the client
- // will be connecting. If the Servers option is also set, it
- // then becomes the first server in the Servers array.
- Url string
- // Servers is a configured set of servers which this client
- // will use when attempting to connect.
- Servers []string
- // NoRandomize configures whether we will randomize the
- // server pool.
- NoRandomize bool
- // Name is an optional name label which will be sent to the server
- // on CONNECT to identify the client.
- Name string
- // Verbose signals the server to send an OK ack for commands
- // successfully processed by the server.
- Verbose bool
- // Pedantic signals the server whether it should be doing further
- // validation of subjects.
- Pedantic bool
- // Secure enables TLS secure connections that skip server
- // verification by default. NOT RECOMMENDED.
- Secure bool
- // TLSConfig is a custom TLS configuration to use for secure
- // transports.
- TLSConfig *tls.Config
- // AllowReconnect enables reconnection logic to be used when we
- // encounter a disconnect from the current server.
- AllowReconnect bool
- // MaxReconnect sets the number of reconnect attempts that will be
- // tried before giving up. If negative, then it will never give up
- // trying to reconnect.
- MaxReconnect int
- // ReconnectWait sets the time to backoff after attempting a reconnect
- // to a server that we were already connected to previously.
- ReconnectWait time.Duration
- // Timeout sets the timeout for a Dial operation on a connection.
- Timeout time.Duration
- // FlusherTimeout is the maximum time to wait for the flusher loop
- // to be able to finish writing to the underlying connection.
- FlusherTimeout time.Duration
- // PingInterval is the period at which the client will be sending ping
- // commands to the server, disabled if 0 or negative.
- PingInterval time.Duration
- // MaxPingsOut is the maximum number of pending ping commands that can
- // be awaiting a response before raising an ErrStaleConnection error.
- MaxPingsOut int
- // ClosedCB sets the closed handler that is called when a client will
- // no longer be connected.
- ClosedCB ConnHandler
- // DisconnectedCB sets the disconnected handler that is called
- // whenever the connection is disconnected.
- DisconnectedCB ConnHandler
- // ReconnectedCB sets the reconnected handler called whenever
- // the connection is successfully reconnected.
- ReconnectedCB ConnHandler
- // DiscoveredServersCB sets the callback that is invoked whenever a new
- // server has joined the cluster.
- DiscoveredServersCB ConnHandler
- // AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
- AsyncErrorCB ErrHandler
- // ReconnectBufSize is the size of the backing bufio during reconnect.
- // Once this has been exhausted publish operations will return an error.
- ReconnectBufSize int
- // SubChanLen is the size of the buffered channel used between the socket
- // Go routine and the message delivery for SyncSubscriptions.
- // NOTE: This does not affect AsyncSubscriptions which are
- // dictated by PendingLimits()
- SubChanLen int
- // User sets the username to be used when connecting to the server.
- User string
- // Password sets the password to be used when connecting to a server.
- Password string
- // Token sets the token to be used when connecting to a server.
- Token string
- // Dialer allows a custom net.Dialer when forming connections.
- // DEPRECATED: should use CustomDialer instead.
- Dialer *net.Dialer
- // CustomDialer allows to specify a custom dialer (not necessarily
- // a *net.Dialer).
- CustomDialer CustomDialer
- // UseOldRequestStyle forces the old method of Requests that utilize
- // a new Inbox and a new Subscription for each request.
- UseOldRequestStyle bool
- }
- const (
- // Scratch storage for assembling protocol headers
- scratchSize = 512
- // The size of the bufio reader/writer on top of the socket.
- defaultBufSize = 32768
- // The buffered size of the flush "kick" channel
- flushChanSize = 1024
- // Default server pool size
- srvPoolSize = 4
- // Channel size for the async callback handler.
- asyncCBChanSize = 32
- // NUID size
- nuidSize = 22
- )
- // A Conn represents a bare connection to a nats-server.
- // It can send and receive []byte payloads.
- type Conn struct {
- // Keep all members for which we use atomic at the beginning of the
- // struct and make sure they are all 64bits (or use padding if necessary).
- // atomic.* functions crash on 32bit machines if operand is not aligned
- // at 64bit. See https://github.com/golang/go/issues/599
- Statistics
- mu sync.Mutex
- Opts Options
- wg *sync.WaitGroup
- url *url.URL
- conn net.Conn
- srvPool []*srv
- urls map[string]struct{} // Keep track of all known URLs (used by processInfo)
- bw *bufio.Writer
- pending *bytes.Buffer
- fch chan struct{}
- info serverInfo
- ssid int64
- subsMu sync.RWMutex
- subs map[int64]*Subscription
- ach chan asyncCB
- pongs []chan struct{}
- scratch [scratchSize]byte
- status Status
- initc bool // true if the connection is performing the initial connect
- err error
- ps *parseState
- ptmr *time.Timer
- pout int
- // New style response handler
- respSub string // The wildcard subject
- respMux *Subscription // A single response subscription
- respMap map[string]chan *Msg // Request map for the response msg channels
- respSetup sync.Once // Ensures response subscription occurs once
- }
- // A Subscription represents interest in a given subject.
- type Subscription struct {
- mu sync.Mutex
- sid int64
- // Subject that represents this subscription. This can be different
- // than the received subject inside a Msg if this is a wildcard.
- Subject string
- // Optional queue group name. If present, all subscriptions with the
- // same name will form a distributed queue, and each message will
- // only be processed by one member of the group.
- Queue string
- delivered uint64
- max uint64
- conn *Conn
- mcb MsgHandler
- mch chan *Msg
- closed bool
- sc bool
- connClosed bool
- // Type of Subscription
- typ SubscriptionType
- // Async linked list
- pHead *Msg
- pTail *Msg
- pCond *sync.Cond
- // Pending stats, async subscriptions, high-speed etc.
- pMsgs int
- pBytes int
- pMsgsMax int
- pBytesMax int
- pMsgsLimit int
- pBytesLimit int
- dropped int
- }
- // Msg is a structure used by Subscribers and PublishMsg().
- type Msg struct {
- Subject string
- Reply string
- Data []byte
- Sub *Subscription
- next *Msg
- barrier *barrierInfo
- }
- type barrierInfo struct {
- refs int64
- f func()
- }
- // Tracks various stats received and sent on this connection,
- // including counts for messages and bytes.
- type Statistics struct {
- InMsgs uint64
- OutMsgs uint64
- InBytes uint64
- OutBytes uint64
- Reconnects uint64
- }
- // Tracks individual backend servers.
- type srv struct {
- url *url.URL
- didConnect bool
- reconnects int
- lastAttempt time.Time
- isImplicit bool
- }
- type serverInfo struct {
- Id string `json:"server_id"`
- Host string `json:"host"`
- Port uint `json:"port"`
- Version string `json:"version"`
- AuthRequired bool `json:"auth_required"`
- TLSRequired bool `json:"tls_required"`
- MaxPayload int64 `json:"max_payload"`
- ConnectURLs []string `json:"connect_urls,omitempty"`
- }
- const (
- // clientProtoZero is the original client protocol from 2009.
- // http://nats.io/documentation/internals/nats-protocol/
- /* clientProtoZero */ _ = iota
- // clientProtoInfo signals a client can receive more then the original INFO block.
- // This can be used to update clients on other cluster members, etc.
- clientProtoInfo
- )
- type connectInfo struct {
- Verbose bool `json:"verbose"`
- Pedantic bool `json:"pedantic"`
- User string `json:"user,omitempty"`
- Pass string `json:"pass,omitempty"`
- Token string `json:"auth_token,omitempty"`
- TLS bool `json:"tls_required"`
- Name string `json:"name"`
- Lang string `json:"lang"`
- Version string `json:"version"`
- Protocol int `json:"protocol"`
- }
- // MsgHandler is a callback function that processes messages delivered to
- // asynchronous subscribers.
- type MsgHandler func(msg *Msg)
- // Connect will attempt to connect to the NATS system.
- // The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
- // Comma separated arrays are also supported, e.g. urlA, urlB.
- // Options start with the defaults but can be overridden.
- func Connect(url string, options ...Option) (*Conn, error) {
- opts := GetDefaultOptions()
- opts.Servers = processUrlString(url)
- for _, opt := range options {
- if err := opt(&opts); err != nil {
- return nil, err
- }
- }
- return opts.Connect()
- }
- // Options that can be passed to Connect.
- // Name is an Option to set the client name.
- func Name(name string) Option {
- return func(o *Options) error {
- o.Name = name
- return nil
- }
- }
- // Secure is an Option to enable TLS secure connections that skip server verification by default.
- // Pass a TLS Configuration for proper TLS.
- func Secure(tls ...*tls.Config) Option {
- return func(o *Options) error {
- o.Secure = true
- // Use of variadic just simplifies testing scenarios. We only take the first one.
- // fixme(DLC) - Could panic if more than one. Could also do TLS option.
- if len(tls) > 1 {
- return ErrMultipleTLSConfigs
- }
- if len(tls) == 1 {
- o.TLSConfig = tls[0]
- }
- return nil
- }
- }
- // RootCAs is a helper option to provide the RootCAs pool from a list of filenames. If Secure is
- // not already set this will set it as well.
- func RootCAs(file ...string) Option {
- return func(o *Options) error {
- pool := x509.NewCertPool()
- for _, f := range file {
- rootPEM, err := ioutil.ReadFile(f)
- if err != nil || rootPEM == nil {
- return fmt.Errorf("nats: error loading or parsing rootCA file: %v", err)
- }
- ok := pool.AppendCertsFromPEM(rootPEM)
- if !ok {
- return fmt.Errorf("nats: failed to parse root certificate from %q", f)
- }
- }
- if o.TLSConfig == nil {
- o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
- }
- o.TLSConfig.RootCAs = pool
- o.Secure = true
- return nil
- }
- }
- // ClientCert is a helper option to provide the client certificate from a file. If Secure is
- // not already set this will set it as well
- func ClientCert(certFile, keyFile string) Option {
- return func(o *Options) error {
- cert, err := tls.LoadX509KeyPair(certFile, keyFile)
- if err != nil {
- return fmt.Errorf("nats: error loading client certificate: %v", err)
- }
- cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
- if err != nil {
- return fmt.Errorf("nats: error parsing client certificate: %v", err)
- }
- if o.TLSConfig == nil {
- o.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
- }
- o.TLSConfig.Certificates = []tls.Certificate{cert}
- o.Secure = true
- return nil
- }
- }
- // NoReconnect is an Option to turn off reconnect behavior.
- func NoReconnect() Option {
- return func(o *Options) error {
- o.AllowReconnect = false
- return nil
- }
- }
- // DontRandomize is an Option to turn off randomizing the server pool.
- func DontRandomize() Option {
- return func(o *Options) error {
- o.NoRandomize = true
- return nil
- }
- }
- // ReconnectWait is an Option to set the wait time between reconnect attempts.
- func ReconnectWait(t time.Duration) Option {
- return func(o *Options) error {
- o.ReconnectWait = t
- return nil
- }
- }
- // MaxReconnects is an Option to set the maximum number of reconnect attempts.
- func MaxReconnects(max int) Option {
- return func(o *Options) error {
- o.MaxReconnect = max
- return nil
- }
- }
- // PingInterval is an Option to set the period for client ping commands
- func PingInterval(t time.Duration) Option {
- return func(o *Options) error {
- o.PingInterval = t
- return nil
- }
- }
- // ReconnectBufSize sets the buffer size of messages kept while busy reconnecting
- func ReconnectBufSize(size int) Option {
- return func(o *Options) error {
- o.ReconnectBufSize = size
- return nil
- }
- }
- // Timeout is an Option to set the timeout for Dial on a connection.
- func Timeout(t time.Duration) Option {
- return func(o *Options) error {
- o.Timeout = t
- return nil
- }
- }
- // DisconnectHandler is an Option to set the disconnected handler.
- func DisconnectHandler(cb ConnHandler) Option {
- return func(o *Options) error {
- o.DisconnectedCB = cb
- return nil
- }
- }
- // ReconnectHandler is an Option to set the reconnected handler.
- func ReconnectHandler(cb ConnHandler) Option {
- return func(o *Options) error {
- o.ReconnectedCB = cb
- return nil
- }
- }
- // ClosedHandler is an Option to set the closed handler.
- func ClosedHandler(cb ConnHandler) Option {
- return func(o *Options) error {
- o.ClosedCB = cb
- return nil
- }
- }
- // DiscoveredServersHandler is an Option to set the new servers handler.
- func DiscoveredServersHandler(cb ConnHandler) Option {
- return func(o *Options) error {
- o.DiscoveredServersCB = cb
- return nil
- }
- }
- // ErrorHandler is an Option to set the async error handler.
- func ErrorHandler(cb ErrHandler) Option {
- return func(o *Options) error {
- o.AsyncErrorCB = cb
- return nil
- }
- }
- // UserInfo is an Option to set the username and password to
- // use when not included directly in the URLs.
- func UserInfo(user, password string) Option {
- return func(o *Options) error {
- o.User = user
- o.Password = password
- return nil
- }
- }
- // Token is an Option to set the token to use when not included
- // directly in the URLs.
- func Token(token string) Option {
- return func(o *Options) error {
- o.Token = token
- return nil
- }
- }
- // Dialer is an Option to set the dialer which will be used when
- // attempting to establish a connection.
- // DEPRECATED: Should use CustomDialer instead.
- func Dialer(dialer *net.Dialer) Option {
- return func(o *Options) error {
- o.Dialer = dialer
- return nil
- }
- }
- // SetCustomDialer is an Option to set a custom dialer which will be
- // used when attempting to establish a connection. If both Dialer
- // and CustomDialer are specified, CustomDialer takes precedence.
- func SetCustomDialer(dialer CustomDialer) Option {
- return func(o *Options) error {
- o.CustomDialer = dialer
- return nil
- }
- }
- // UseOldRequestStyle is an Option to force usage of the old Request style.
- func UseOldRequestStyle() Option {
- return func(o *Options) error {
- o.UseOldRequestStyle = true
- return nil
- }
- }
- // Handler processing
- // SetDisconnectHandler will set the disconnect event handler.
- func (nc *Conn) SetDisconnectHandler(dcb ConnHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.DisconnectedCB = dcb
- }
- // SetReconnectHandler will set the reconnect event handler.
- func (nc *Conn) SetReconnectHandler(rcb ConnHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.ReconnectedCB = rcb
- }
- // SetDiscoveredServersHandler will set the discovered servers handler.
- func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.DiscoveredServersCB = dscb
- }
- // SetClosedHandler will set the reconnect event handler.
- func (nc *Conn) SetClosedHandler(cb ConnHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.ClosedCB = cb
- }
- // SetErrorHandler will set the async error handler.
- func (nc *Conn) SetErrorHandler(cb ErrHandler) {
- if nc == nil {
- return
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- nc.Opts.AsyncErrorCB = cb
- }
- // Process the url string argument to Connect. Return an array of
- // urls, even if only one.
- func processUrlString(url string) []string {
- urls := strings.Split(url, ",")
- for i, s := range urls {
- urls[i] = strings.TrimSpace(s)
- }
- return urls
- }
- // Connect will attempt to connect to a NATS server with multiple options.
- func (o Options) Connect() (*Conn, error) {
- nc := &Conn{Opts: o}
- // Some default options processing.
- if nc.Opts.MaxPingsOut == 0 {
- nc.Opts.MaxPingsOut = DefaultMaxPingOut
- }
- // Allow old default for channel length to work correctly.
- if nc.Opts.SubChanLen == 0 {
- nc.Opts.SubChanLen = DefaultMaxChanLen
- }
- // Default ReconnectBufSize
- if nc.Opts.ReconnectBufSize == 0 {
- nc.Opts.ReconnectBufSize = DefaultReconnectBufSize
- }
- // Ensure that Timeout is not 0
- if nc.Opts.Timeout == 0 {
- nc.Opts.Timeout = DefaultTimeout
- }
- // Allow custom Dialer for connecting using DialTimeout by default
- if nc.Opts.Dialer == nil {
- nc.Opts.Dialer = &net.Dialer{
- Timeout: nc.Opts.Timeout,
- }
- }
- if err := nc.setupServerPool(); err != nil {
- return nil, err
- }
- // Create the async callback channel.
- nc.ach = make(chan asyncCB, asyncCBChanSize)
- if err := nc.connect(); err != nil {
- return nil, err
- }
- // Spin up the async cb dispatcher on success
- go nc.asyncDispatch()
- return nc, nil
- }
- const (
- _CRLF_ = "\r\n"
- _EMPTY_ = ""
- _SPC_ = " "
- _PUB_P_ = "PUB "
- )
- const (
- _OK_OP_ = "+OK"
- _ERR_OP_ = "-ERR"
- _PONG_OP_ = "PONG"
- _INFO_OP_ = "INFO"
- )
- const (
- conProto = "CONNECT %s" + _CRLF_
- pingProto = "PING" + _CRLF_
- pongProto = "PONG" + _CRLF_
- subProto = "SUB %s %s %d" + _CRLF_
- unsubProto = "UNSUB %d %s" + _CRLF_
- okProto = _OK_OP_ + _CRLF_
- )
- // Return the currently selected server
- func (nc *Conn) currentServer() (int, *srv) {
- for i, s := range nc.srvPool {
- if s == nil {
- continue
- }
- if s.url == nc.url {
- return i, s
- }
- }
- return -1, nil
- }
- // Pop the current server and put onto the end of the list. Select head of list as long
- // as number of reconnect attempts under MaxReconnect.
- func (nc *Conn) selectNextServer() (*srv, error) {
- i, s := nc.currentServer()
- if i < 0 {
- return nil, ErrNoServers
- }
- sp := nc.srvPool
- num := len(sp)
- copy(sp[i:num-1], sp[i+1:num])
- maxReconnect := nc.Opts.MaxReconnect
- if maxReconnect < 0 || s.reconnects < maxReconnect {
- nc.srvPool[num-1] = s
- } else {
- nc.srvPool = sp[0 : num-1]
- }
- if len(nc.srvPool) <= 0 {
- nc.url = nil
- return nil, ErrNoServers
- }
- nc.url = nc.srvPool[0].url
- return nc.srvPool[0], nil
- }
- // Will assign the correct server to the nc.Url
- func (nc *Conn) pickServer() error {
- nc.url = nil
- if len(nc.srvPool) <= 0 {
- return ErrNoServers
- }
- for _, s := range nc.srvPool {
- if s != nil {
- nc.url = s.url
- return nil
- }
- }
- return ErrNoServers
- }
- const tlsScheme = "tls"
- // Create the server pool using the options given.
- // We will place a Url option first, followed by any
- // Server Options. We will randomize the server pool unless
- // the NoRandomize flag is set.
- func (nc *Conn) setupServerPool() error {
- nc.srvPool = make([]*srv, 0, srvPoolSize)
- nc.urls = make(map[string]struct{}, srvPoolSize)
- // Create srv objects from each url string in nc.Opts.Servers
- // and add them to the pool
- for _, urlString := range nc.Opts.Servers {
- if err := nc.addURLToPool(urlString, false); err != nil {
- return err
- }
- }
- // Randomize if allowed to
- if !nc.Opts.NoRandomize {
- nc.shufflePool()
- }
- // Normally, if this one is set, Options.Servers should not be,
- // but we always allowed that, so continue to do so.
- if nc.Opts.Url != _EMPTY_ {
- // Add to the end of the array
- if err := nc.addURLToPool(nc.Opts.Url, false); err != nil {
- return err
- }
- // Then swap it with first to guarantee that Options.Url is tried first.
- last := len(nc.srvPool) - 1
- if last > 0 {
- nc.srvPool[0], nc.srvPool[last] = nc.srvPool[last], nc.srvPool[0]
- }
- } else if len(nc.srvPool) <= 0 {
- // Place default URL if pool is empty.
- if err := nc.addURLToPool(DefaultURL, false); err != nil {
- return err
- }
- }
- // Check for Scheme hint to move to TLS mode.
- for _, srv := range nc.srvPool {
- if srv.url.Scheme == tlsScheme {
- // FIXME(dlc), this is for all in the pool, should be case by case.
- nc.Opts.Secure = true
- if nc.Opts.TLSConfig == nil {
- nc.Opts.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12}
- }
- }
- }
- return nc.pickServer()
- }
- // addURLToPool adds an entry to the server pool
- func (nc *Conn) addURLToPool(sURL string, implicit bool) error {
- u, err := url.Parse(sURL)
- if err != nil {
- return err
- }
- s := &srv{url: u, isImplicit: implicit}
- nc.srvPool = append(nc.srvPool, s)
- nc.urls[u.Host] = struct{}{}
- return nil
- }
- // shufflePool swaps randomly elements in the server pool
- func (nc *Conn) shufflePool() {
- if len(nc.srvPool) <= 1 {
- return
- }
- source := rand.NewSource(time.Now().UnixNano())
- r := rand.New(source)
- for i := range nc.srvPool {
- j := r.Intn(i + 1)
- nc.srvPool[i], nc.srvPool[j] = nc.srvPool[j], nc.srvPool[i]
- }
- }
- // createConn will connect to the server and wrap the appropriate
- // bufio structures. It will do the right thing when an existing
- // connection is in place.
- func (nc *Conn) createConn() (err error) {
- if nc.Opts.Timeout < 0 {
- return ErrBadTimeout
- }
- if _, cur := nc.currentServer(); cur == nil {
- return ErrNoServers
- } else {
- cur.lastAttempt = time.Now()
- }
- // CustomDialer takes precedence. If not set, use Opts.Dialer which
- // is set to a default *net.Dialer (in Connect()) if not explicitly
- // set by the user.
- dialer := nc.Opts.CustomDialer
- if dialer == nil {
- dialer = nc.Opts.Dialer
- }
- nc.conn, err = dialer.Dial("tcp", nc.url.Host)
- if err != nil {
- return err
- }
- // No clue why, but this stalls and kills performance on Mac (Mavericks).
- // https://code.google.com/p/go/issues/detail?id=6930
- //if ip, ok := nc.conn.(*net.TCPConn); ok {
- // ip.SetReadBuffer(defaultBufSize)
- //}
- if nc.pending != nil && nc.bw != nil {
- // Move to pending buffer.
- nc.bw.Flush()
- }
- nc.bw = bufio.NewWriterSize(nc.conn, defaultBufSize)
- return nil
- }
- // makeTLSConn will wrap an existing Conn using TLS
- func (nc *Conn) makeTLSConn() {
- // Allow the user to configure their own tls.Config structure, otherwise
- // default to InsecureSkipVerify.
- // TODO(dlc) - We should make the more secure version the default.
- if nc.Opts.TLSConfig != nil {
- tlsCopy := util.CloneTLSConfig(nc.Opts.TLSConfig)
- // If its blank we will override it with the current host
- if tlsCopy.ServerName == _EMPTY_ {
- h, _, _ := net.SplitHostPort(nc.url.Host)
- tlsCopy.ServerName = h
- }
- nc.conn = tls.Client(nc.conn, tlsCopy)
- } else {
- nc.conn = tls.Client(nc.conn, &tls.Config{InsecureSkipVerify: true})
- }
- conn := nc.conn.(*tls.Conn)
- conn.Handshake()
- nc.bw = bufio.NewWriterSize(nc.conn, defaultBufSize)
- }
- // waitForExits will wait for all socket watcher Go routines to
- // be shutdown before proceeding.
- func (nc *Conn) waitForExits(wg *sync.WaitGroup) {
- // Kick old flusher forcefully.
- select {
- case nc.fch <- struct{}{}:
- default:
- }
- // Wait for any previous go routines.
- if wg != nil {
- wg.Wait()
- }
- }
- // spinUpGoRoutines will launch the Go routines responsible for
- // reading and writing to the socket. This will be launched via a
- // go routine itself to release any locks that may be held.
- // We also use a WaitGroup to make sure we only start them on a
- // reconnect when the previous ones have exited.
- func (nc *Conn) spinUpGoRoutines() {
- // Make sure everything has exited.
- nc.waitForExits(nc.wg)
- // Create a new waitGroup instance for this run.
- nc.wg = &sync.WaitGroup{}
- // We will wait on both.
- nc.wg.Add(2)
- // Spin up the readLoop and the socket flusher.
- go nc.readLoop(nc.wg)
- go nc.flusher(nc.wg)
- nc.mu.Lock()
- if nc.Opts.PingInterval > 0 {
- if nc.ptmr == nil {
- nc.ptmr = time.AfterFunc(nc.Opts.PingInterval, nc.processPingTimer)
- } else {
- nc.ptmr.Reset(nc.Opts.PingInterval)
- }
- }
- nc.mu.Unlock()
- }
- // Report the connected server's Url
- func (nc *Conn) ConnectedUrl() string {
- if nc == nil {
- return _EMPTY_
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- if nc.status != CONNECTED {
- return _EMPTY_
- }
- return nc.url.String()
- }
- // Report the connected server's Id
- func (nc *Conn) ConnectedServerId() string {
- if nc == nil {
- return _EMPTY_
- }
- nc.mu.Lock()
- defer nc.mu.Unlock()
- if nc.status != CONNECTED {
- return _EMPTY_
- }
- return nc.info.Id
- }
- // Low level setup for structs, etc
- func (nc *Conn) setup() {
- nc.subs = make(map[int64]*Subscription)
- nc.pongs = make([]chan struct{}, 0, 8)
- nc.fch = make(chan struct{}, flushChanSize)
- // Setup scratch outbound buffer for PUB
- pub := nc.scratch[:len(_PUB_P_)]
- copy(pub, _PUB_P_)
- }
- // Process a connected connection and initialize properly.
- func (nc *Conn) processConnectInit() error {
- // Set out deadline for the whole connect process
- nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout))
- defer nc.conn.SetDeadline(time.Time{})
- // Set our status to connecting.
- nc.status = CONNECTING
- // Process the INFO protocol received from the server
- err := nc.processExpectedInfo()
- if err != nil {
- return err
- }
- // Send the CONNECT protocol along with the initial PING protocol.
- // Wait for the PONG response (or any error that we get from the server).
- err = nc.sendConnect()
- if err != nil {
- return err
- }
- // Reset the number of PING sent out
- nc.pout = 0
- go nc.spinUpGoRoutines()
- return nil
- }
- // Main connect function. Will connect to the nats-server
- func (nc *Conn) connect() error {
- var returnedErr error
- // Create actual socket connection
- // For first connect we walk all servers in the pool and try
- // to connect immediately.
- nc.mu.Lock()
- nc.initc = true
- // The pool may change inside the loop iteration due to INFO protocol.
- for i := 0; i < len(nc.srvPool); i++ {
- nc.url = nc.srvPool[i].url
- if err := nc.createConn(); err == nil {
- // This was moved out of processConnectInit() because
- // that function is now invoked from doReconnect() too.
- nc.setup()
- err = nc.processConnectInit()
- if err == nil {
- nc.srvPool[i].didConnect = true
- nc.srvPool[i].reconnects = 0
- returnedErr = nil
- break
- } else {
- returnedErr = err
- nc.mu.Unlock()
- nc.close(DISCONNECTED, false)
- nc.mu.Lock()
- nc.url = nil
- }
- } else {
- // Cancel out default connection refused, will trigger the
- // No servers error conditional
- if strings.Contains(err.Error(), "connection refused") {
- returnedErr = nil
- }
- }
- }
- nc.initc = false
- defer nc.mu.Unlock()
- if returnedErr == nil && nc.status != CONNECTED {
- returnedErr = ErrNoServers
- }
- return returnedErr
- }
- // This will check to see if the connection should be
- // secure. This can be dictated from either end and should
- // only be called after the INIT protocol has been received.
- func (nc *Conn) checkForSecure() error {
- // Check to see if we need to engage TLS
- o := nc.Opts
- // Check for mismatch in setups
- if o.Secure && !nc.info.TLSRequired {
- return ErrSecureConnWanted
- } else if nc.info.TLSRequired && !o.Secure {
- return ErrSecureConnRequired
- }
- // Need to rewrap with bufio
- if o.Secure {
- nc.makeTLSConn()
- }
- return nil
- }
- // processExpectedInfo will look for the expected first INFO message
- // sent when a connection is established. The lock should be held entering.
- func (nc *Conn) processExpectedInfo() error {
- c := &control{}
- // Read the protocol
- err := nc.readOp(c)
- if err != nil {
- return err
- }
- // The nats protocol should send INFO first always.
- if c.op != _INFO_OP_ {
- return ErrNoInfoReceived
- }
- // Parse the protocol
- if err := nc.processInfo(c.args); err != nil {
- return err
- }
- return nc.checkForSecure()
- }
- // Sends a protocol control message by queuing into the bufio writer
- // and kicking the flush Go routine. These writes are protected.
- func (nc *Conn) sendProto(proto string) {
- nc.mu.Lock()
- nc.bw.WriteString(proto)
- nc.kickFlusher()
- nc.mu.Unlock()
- }
- // Generate a connect protocol message, issuing user/password if
- // applicable. The lock is assumed to be held upon entering.
- func (nc *Conn) connectProto() (string, error) {
- o := nc.Opts
- var user, pass, token string
- u := nc.url.User
- if u != nil {
- // if no password, assume username is authToken
- if _, ok := u.Password(); !ok {
- token = u.Username()
- } else {
- user = u.Username()
- pass, _ = u.Password()
- }
- } else {
- // Take from options (pssibly all empty strings)
- user = nc.Opts.User
- pass = nc.Opts.Password
- token = nc.Opts.Token
- }
- cinfo := connectInfo{o.Verbose, o.Pedantic,
- user, pass, token,
- o.Secure, o.Name, LangString, Version, clientProtoInfo}
- b, err := json.Marshal(cinfo)
- if err != nil {
- return _EMPTY_, ErrJsonParse
- }
- return fmt.Sprintf(conProto, b), nil
- }
- // normalizeErr removes the prefix -ERR, trim spaces and remove the quotes.
- func normalizeErr(line string) string {
- s := strings.ToLower(strings.TrimSpace(strings.TrimPrefix(line, _ERR_OP_)))
- s = strings.TrimLeft(strings.TrimRight(s, "'"), "'")
- return s
- }
- // Send a connect protocol message to the server, issue user/password if
- // applicable. Will wait for a flush to return from the server for error
- // processing.
- func (nc *Conn) sendConnect() error {
- // Construct the CONNECT protocol string
- cProto, err := nc.connectProto()
- if err != nil {
- return err
- }
- // Write the protocol into the buffer
- _, err = nc.bw.WriteString(cProto)
- if err != nil {
- return err
- }
- // Add to the buffer the PING protocol
- _, err = nc.bw.WriteString(pingProto)
- if err != nil {
- return err
- }
- // Flush the buffer
- err = nc.bw.Flush()
- if err != nil {
- return err
- }
- // We don't want to read more than we need here, otherwise
- // we would need to transfer the excess read data to the readLoop.
- // Since in normal situations we just are looking for a PONG\r\n,
- // reading byte-by-byte here is ok.
- proto, err := nc.readProto()
- if err != nil {
- return err
- }
- // If opts.Verbose is set, handle +OK
- if nc.Opts.Verbose && proto == okProto {
- // Read the rest now...
- proto, err = nc.readProto()
- if err != nil {
- return err
- }
- }
- // We expect a PONG
- if proto != pongProto {
- // But it could be something else, like -ERR
- // Since we no longer use ReadLine(), trim the trailing "\r\n"
- proto = strings.TrimRight(proto, "\r\n")
- // If it's a server error...
- if strings.HasPrefix(proto, _ERR_OP_) {
- // Remove -ERR, trim spaces and quotes, and convert to lower case.
- proto = normalizeErr(proto)
- return errors.New("nats: " + proto)
- }
- // Notify that we got an unexpected protocol.
- return fmt.Errorf("nats: expected '%s', got '%s'", _PONG_OP_, proto)
- }
- // This is where we are truly connected.
- nc.status = CONNECTED
- return nil
- }
- // reads a protocol one byte at a time.
- func (nc *Conn) readProto() (string, error) {
- var (
- _buf = [10]byte{}
- buf = _buf[:0]
- b = [1]byte{}
- protoEnd = byte('\n')
- )
- for {
- if _, err := nc.conn.Read(b[:1]); err != nil {
- // Do not report EOF error
- if err == io.EOF {
- return string(buf), nil
- }
- return "", err
- }
- buf = append(buf, b[0])
- if b[0] == protoEnd {
- return string(buf), nil
- }
- }
- }
- // A control protocol line.
- type control struct {
- op, args string
- }
- // Read a control line and process the intended op.
- func (nc *Conn) readOp(c *control) error {
- br := bufio.NewReaderSize(nc.conn, defaultBufSize)
- line, err := br.ReadString('\n')
- if err != nil {
- return err
- }
- parseControl(line, c)
- return nil
- }
- // Parse a control line from the server.
- func parseControl(line string, c *control) {
- toks := strings.SplitN(line, _SPC_, 2)
- if len(toks) == 1 {
- c.op = strings.TrimSpace(toks[0])
- c.args = _EMPTY_
- } else if len(toks) == 2 {
- c.op, c.args = strings.TrimSpace(toks[0]), strings.TrimSpace(toks[1])
- } else {
- c.op = _EMPTY_
- }
- }
- // flushReconnectPending will push the pending items that were
- // gathered while we were in a RECONNECTING state to the socket.
- func (nc *Conn) flushReconnectPendingItems() {
- if nc.pending == nil {
- return
- }
- if nc.pending.Len() > 0 {
- nc.bw.Write(nc.pending.Bytes())
- }
- }
- // Try to reconnect using the option parameters.
- // This function assumes we are allowed to reconnect.
- func (nc *Conn) doReconnect() {
- // We want to make sure we have the other watchers shutdown properly
- // here before we proceed past this point.
- nc.mu.Lock()
- wg := nc.wg
- nc.mu.Unlock()
- nc.waitForExits(wg)
- // FIXME(dlc) - We have an issue here if we have
- // outstanding flush points (pongs) and they were not
- // sent out, but are still in the pipe.
- // Hold the lock manually and release where needed below,
- // can't do defer here.
- nc.mu.Lock()
- // Clear any queued pongs, e.g. pending flush calls.
- nc.clearPendingFlushCalls()
- // Clear any errors.
- nc.err = nil
- // Perform appropriate callback if needed for a disconnect.
- if nc.Opts.DisconnectedCB != nil {
- nc.ach <- func() { nc.Opts.DisconnectedCB(nc) }
- }
- for len(nc.srvPool) > 0 {
- cur, err := nc.selectNextServer()
- if err != nil {
- nc.err = err
- break
- }
- sleepTime := int64(0)
- // Sleep appropriate amount of time before the
- // connection attempt if connecting to same server
- // we just got disconnected from..
- if time.Since(cur.lastAttempt) < nc.Opts.ReconnectWait {
- sleepTime = int64(nc.Opts.ReconnectWait - time.Since(cur.lastAttempt))
- }
- // On Windows, createConn() will take more than a second when no
- // server is running at that address. So it could be that the
- // time elapsed between reconnect attempts is always > than
- // the set option. Release the lock to give a chance to a parallel
- // nc.Close() to break the loop.
- nc.mu.Unlock()
- if sleepTime <= 0 {
- runtime.Gosched()
- } else {
- time.Sleep(time.Duration(sleepTime))
- }
- nc.mu.Lock()
- // Check if we have been closed first.
- if nc.isClosed() {
- break
- }
- // Mark that we tried a reconnect
- cur.reconnects++
- // Try to create a new connection
- err = nc.createConn()
- // Not yet connected, retry...
- // Continue to hold the lock
- if err != nil {
- nc.err = nil
- continue
- }
- // We are reconnected
- nc.Reconnects++
- // Process connect logic
- if nc.err = nc.processConnectInit(); nc.err != nil {
- nc.status = RECONNECTING
- continue
- }
- // Clear out server stats for the server we connected to..
- cur.didConnect = true
- cur.reconnects = 0
- // Send existing subscription state
- nc.resendSubscriptions()
- // Now send off and clear pending buffer
- nc.flushReconnectPendingItems()
- // Flush the buffer
- nc.err = nc.bw.Flush()
- if nc.err != nil {
- nc.status = RECONNECTING
- continue
- }
- // Done with the pending buffer
- nc.pending = nil
- // This is where we are truly connected.
- nc.status = CONNECTED
- // Queue up the reconnect callback.
- if nc.Opts.ReconnectedCB != nil {
- nc.ach <- func() { nc.Opts.ReconnectedCB(nc) }
- }
- // Release lock here, we will return below.
- nc.mu.Unlock()
- // Make sure to flush everything
- nc.Flush()
- return
- }
- // Call into close.. We have no servers left..
- if nc.err == nil {
- nc.err = ErrNoServers
- }
- nc.mu.Unlock()
- nc.Close()
- }
- // processOpErr handles errors from reading or parsing the protocol.
- // The lock should not be held entering this function.
- func (nc *Conn) processOpErr(err error) {
- nc.mu.Lock()
- if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() {
- nc.mu.Unlock()
- return
- }
- if nc.Opts.AllowReconnect && nc.status == CONNECTED {
- // Set our new status
- nc.status = RECONNECTING
- if nc.ptmr != nil {
- nc.ptmr.Stop()
- }
- if nc.conn != nil {
- nc.bw.Flush()
- nc.conn.Close()
- nc.conn = nil
- }
- // Reset pending buffers before reconnecting.
- if nc.pending == nil {
- nc.pending = new(bytes.Buffer)
- }
- nc.pending.Reset()
- nc.bw.Reset(nc.pending)
- go nc.doReconnect()
- nc.mu.Unlock()
- return
- }
- nc.status = DISCONNECTED
- nc.err = err
- nc.mu.Unlock()
- nc.Close()
- }
- // Marker to close the channel to kick out the Go routine.
- func (nc *Conn) closeAsyncFunc() asyncCB {
- return func() {
- nc.mu.Lock()
- if nc.ach != nil {
- close(nc.ach)
- nc.ach = nil
- }
- nc.mu.Unlock()
- }
- }
- // asyncDispatch is responsible for calling any async callbacks
- func (nc *Conn) asyncDispatch() {
- // snapshot since they can change from underneath of us.
- nc.mu.Lock()
- ach := nc.ach
- nc.mu.Unlock()
- // Loop on the channel and process async callbacks.
- for {
- if f, ok := <-ach; !ok {
- return
- } else {
- f()
- }
- }
- }
- // readLoop() will sit on the socket reading and processing the
- // protocol from the server. It will dispatch appropriately based
- // on the op type.
- func (nc *Conn) readLoop(wg *sync.WaitGroup) {
- // Release the wait group on exit
- defer wg.Done()
- // Create a parseState if needed.
- nc.mu.Lock()
- if nc.ps == nil {
- nc.ps = &parseState{}
- }
- nc.mu.Unlock()
- // Stack based buffer.
- b := make([]byte, defaultBufSize)
- for {
- // FIXME(dlc): RWLock here?
- nc.mu.Lock()
- sb := nc.isClosed() || nc.isReconnecting()
- if sb {
- nc.ps = &parseState{}
- }
- conn := nc.conn
- nc.mu.Unlock()
- if sb || conn == nil {
- break
- }
- n, err := conn.Read(b)
- if err != nil {
- nc.processOpErr(err)
- break
- }
- if err := nc.parse(b[:n]); err != nil {
- nc.processOpErr(err)
- break
- }
- }
- // Clear the parseState here..
- nc.mu.Lock()
- nc.ps = nil
- nc.mu.Unlock()
- }
- // waitForMsgs waits on the conditional shared with readLoop and processMsg.
- // It is used to deliver messages to asynchronous subscribers.
- func (nc *Conn) waitForMsgs(s *Subscription) {
- var closed bool
- var delivered, max uint64
- for {
- s.mu.Lock()
- if s.pHead == nil && !s.closed {
- s.pCond.Wait()
- }
- // Pop the msg off the list
- m := s.pHead
- if m != nil {
- s.pHead = m.next
- if s.pHead == nil {
- s.pTail = nil
- }
- if m.barrier != nil {
- s.mu.Unlock()
- if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
- m.barrier.f()
- }
- continue
- }
- s.pMsgs--
- s.pBytes -= len(m.Data)
- }
- mcb := s.mcb
- max = s.max
- closed = s.closed
- if !s.closed {
- s.delivered++
- delivered = s.delivered
- }
- s.mu.Unlock()
- if closed {
- break
- }
- // Deliver the message.
- if m != nil && (max == 0 || delivered <= max) {
- mcb(m)
- }
- // If we have hit the max for delivered msgs, remove sub.
- if max > 0 && delivered >= max {
- nc.mu.Lock()
- nc.removeSub(s)
- nc.mu.Unlock()
- break
- }
- }
- // Check for barrier messages
- s.mu.Lock()
- for m := s.pHead; m != nil; m = s.pHead {
- if m.barrier != nil {
- s.mu.Unlock()
- if atomic.AddInt64(&m.barrier.refs, -1) == 0 {
- m.barrier.f()
- }
- s.mu.Lock()
- }
- s.pHead = m.next
- }
- s.mu.Unlock()
- }
- // processMsg is called by parse and will place the msg on the
- // appropriate channel/pending queue for processing. If the channel is full,
- // or the pending queue is over the pending limits, the connection is
- // considered a slow consumer.
- func (nc *Conn) processMsg(data []byte) {
- // Don't lock the connection to avoid server cutting us off if the
- // flusher is holding the connection lock, trying to send to the server
- // that is itself trying to send data to us.
- nc.subsMu.RLock()
- // Stats
- nc.InMsgs++
- nc.InBytes += uint64(len(data))
- sub := nc.subs[nc.ps.ma.sid]
- if sub == nil {
- nc.subsMu.RUnlock()
- return
- }
- // Copy them into string
- subj := string(nc.ps.ma.subject)
- reply := string(nc.ps.ma.reply)
- // Doing message create outside of the sub's lock to reduce contention.
- // It's possible that we end-up not using the message, but that's ok.
- // FIXME(dlc): Need to copy, should/can do COW?
- msgPayload := make([]byte, len(data))
- copy(msgPayload, data)
- // FIXME(dlc): Should we recycle these containers?
- m := &Msg{Data: msgPayload, Subject: subj, Reply: reply, Sub: sub}
- sub.mu.Lock()
- // Subscription internal stats (applicable only for non ChanSubscription's)
- if sub.typ != ChanSubscription {
- sub.pMsgs++
- if sub.pMsgs > sub.pMsgsMax {
- sub.pMsgsMax = sub.pMsgs
- }
- sub.pBytes += len(m.Data)
- if sub.pBytes > sub.pBytesMax {
- sub.pBytesMax = sub.pBytes
- }
- // Check for a Slow Consumer
- if (sub.pMsgsLimit > 0 && sub.pMsgs > sub.pMsgsLimit) ||
- (sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
- goto slowConsumer
- }
- }
- // We have two modes of delivery. One is the channel, used by channel
- // subscribers and syncSubscribers, the other is a linked list for async.
- if sub.mch != nil {
- select {
- case sub.mch <- m:
- default:
- goto slowConsumer
- }
- } else {
- // Push onto the async pList
- if sub.pHead == nil {
- sub.pHead = m
- sub.pTail = m
- sub.pCond.Signal()
- } else {
- sub.pTail.next = m
- sub.pTail = m
- }
- }
- // Clear SlowConsumer status.
- sub.sc = false
- sub.mu.Unlock()
- nc.subsMu.RUnlock()
- return
- slowConsumer:
- sub.dropped++
- sc := !sub.sc
- sub.sc = true
- // Undo stats from above
- if sub.typ != ChanSubscription {
- sub.pMsgs--
- sub.pBytes -= len(m.Data)
- }
- sub.mu.Unlock()
- nc.subsMu.RUnlock()
- if sc {
- // Now we need connection's lock and we may end-up in the situation
- // that we were trying to avoid, except that in this case, the client
- // is already experiencing client-side slow consumer situation.
- nc.mu.Lock()
- nc.err = ErrSlowConsumer
- if nc.Opts.AsyncErrorCB != nil {
- nc.ach <- func() { nc.Opts.AsyncErrorCB(nc, sub, ErrSlowConsumer) }
- }
- nc.mu.Unlock()
- }
- }
- // processPermissionsViolation is called when the server signals a subject
- // permissions violation on either publish or subscribe.
- func (nc *Conn) processPermissionsViolation(err string) {
- nc.mu.Lock()
- // create error here so we can pass it as a closure to the async cb dispatcher.
- e := errors.New("nats: " + err)
- nc.err = e
- if nc.Opts.AsyncErrorCB != nil {
- nc.ach <- func() { nc.Opts.AsyncErrorCB(nc, nil, e) }
- }
- nc.mu.Unlock()
- }
- // processAuthorizationViolation is called when the server signals a user
- // authorization violation.
- func (nc *Conn) processAuthorizationViolation(err string) {
- nc.mu.Lock()
- nc.err = ErrAuthorization
- if nc.Opts.AsyncErrorCB != nil {
- nc.ach <- func() { nc.Opts.AsyncErrorCB(nc, nil, ErrAuthorization) }
- }
- nc.mu.Unlock()
- }
- // flusher is a separate Go routine that will process flush requests for the write
- // bufio. This allows coalescing of writes to the underlying socket.
- func (nc *Conn) flusher(wg *sync.WaitGroup) {
- // Release the wait group
- defer wg.Done()
- // snapshot the bw and conn since they can change from underneath of us.
- nc.mu.Lock()
- bw := nc.bw
- conn := nc.conn
- fch := nc.fch
- flusherTimeout := nc.Opts.FlusherTimeout
- nc.mu.Unlock()
- if conn == nil || bw == nil {
- return
- }
- for {
- if _, ok := <-fch; !ok {
- return
- }
- nc.mu.Lock()
- // Check to see if we should bail out.
- if !nc.isConnected() || nc.isConnecting() || bw != nc.bw || conn != nc.conn {
- nc.mu.Unlock()
- return
- }
- if bw.Buffered() > 0 {
- // Allow customizing how long we should wait for a flush to be done
- // to prevent unhealthy connections blocking the client for too long.
- if flusherTimeout > 0 {
- conn.SetWriteDeadline(time.Now().Add(flusherTimeout))
- }
- if err := bw.Flush(); err != nil {
- if nc.err == nil {
- nc.err = err
- }
- }
- conn.SetWriteDeadline(time.Time{})
- }
- nc.mu.Unlock()
- }
- }
- // processPing will send an immediate pong protocol response to the
- // server. The server uses this mechanism to detect dead clients.
- func (nc *Conn) processPing() {
- nc.sendProto(pongProto)
- }
- // processPong is used to process responses to the client's ping
- // messages. We use pings for the flush mechanism as well.
- func (nc *Conn) processPong() {
- var ch chan struct{}
- nc.mu.Lock()
- if len(nc.pongs) > 0 {
- ch = nc.pongs[0]
- nc.pongs = nc.pongs[1:]
- }
- nc.pout = 0
- nc.mu.Unlock()
- if ch != nil {
- ch <- struct{}{}
- }
- }
- // processOK is a placeholder for processing OK messages.
- func (nc *Conn) processOK() {
- // do nothing
- }
- // processInfo is used to parse the info messages sent
- // from the server.
- // This function may update the server pool.
- func (nc *Conn) processInfo(info string) error {
- if info == _EMPTY_ {
- return nil
- }
- ncInfo := serverInfo{}
- if err := json.Unmarshal([]byte(info), &ncInfo); err != nil {
- return err
- }
- // Copy content into connection's info structure.
- nc.info = ncInfo
- // The array could be empty/not present on initial connect,
- // if advertise is disabled on that server, or servers that
- // did not include themselves in the async INFO protocol.
- // If empty, do not remove the implicit servers from the pool.
- if len(ncInfo.ConnectURLs) == 0 {
- return nil
- }
- // Note about pool randomization: when the pool was first created,
- // it was randomized (if allowed). We keep the order the same (removing
- // implicit servers that are no longer sent to us). New URLs are sent
- // to us in no specific order so don't need extra randomization.
- hasNew := false
- // This is what we got from the server we are connected to.
- urls := nc.info.ConnectURLs
- // Transform that to a map for easy lookups
- tmp := make(map[string]struct{}, len(urls))
- for _, curl := range urls {
- tmp[curl] = struct{}{}
- }
- // Walk the pool and removed the implicit servers that are no longer in the
- // given array/map
- sp := nc.srvPool
- for i := 0; i < len(sp); i++ {
- srv := sp[i]
- curl := srv.url.Host
- // Check if this URL is in the INFO protocol
- _, inInfo := tmp[curl]
- // Remove from the temp map so that at the end we are left with only
- // new (or restarted) servers that need to be added to the pool.
- delete(tmp, curl)
- // Keep servers that were set through Options, but also the one that
- // we are currently connected to (even if it is a discovered server).
- if !srv.isImplicit || srv.url == nc.url {
- continue
- }
- if !inInfo {
- // Remove from server pool. Keep current order.
- copy(sp[i:], sp[i+1:])
- nc.srvPool = sp[:len(sp)-1]
- sp = nc.srvPool
- i--
- }
- }
- // If there are any left in the tmp map, these are new (or restarted) servers
- // and need to be added to the pool.
- for curl := range tmp {
- // Before adding, check if this is a new (as in never seen) URL.
- // This is used to figure out if we invoke the DiscoveredServersCB
- if _, present := nc.urls[curl]; !present {
- hasNew = true
- }
- nc.addURLToPool(fmt.Sprintf("nats://%s", curl), true)
- }
- if hasNew && !nc.initc && nc.Opts.DiscoveredServersCB != nil {
- nc.ach <- func() { nc.Opts.DiscoveredServersCB(nc) }
- }
- return nil
- }
- // processAsyncInfo does the same than processInfo, but is called
- // from the parser. Calls processInfo under connection's lock
- // protection.
- func (nc *Conn) processAsyncInfo(info []byte) {
- nc.mu.Lock()
- // Ignore errors, we will simply not update the server pool...
- nc.processInfo(string(info))
- nc.mu.Unlock()
- }
- // LastError reports the last error encountered via the connection.
- // It can be used reliably within ClosedCB in order to find out reason
- // why connection was closed for example.
- func (nc *Conn) LastError() error {
- if nc == nil {
- return ErrInvalidConnection
- }
- nc.mu.Lock()
- err := nc.err
- nc.mu.Unlock()
- return err
- }
- // processErr processes any error messages from the server and
- // sets the connection's lastError.
- func (nc *Conn) processErr(e string) {
- // Trim, remove quotes, convert to lower case.
- e = normalizeErr(e)
- // FIXME(dlc) - process Slow Consumer signals special.
- if e == STALE_CONNECTION {
- nc.processOpErr(ErrStaleConnection)
- } else if strings.HasPrefix(e, PERMISSIONS_ERR) {
- nc.processPermissionsViolation(e)
- } else if strings.HasPrefix(e, AUTHORIZATION_ERR) {
- nc.processAuthorizationViolation(e)
- } else {
- nc.mu.Lock()
- nc.err = errors.New("nats: " + e)
- nc.mu.Unlock()
- nc.Close()
- }
- }
- // kickFlusher will send a bool on a channel to kick the
- // flush Go routine to flush data to the server.
- func (nc *Conn) kickFlusher() {
- if nc.bw != nil {
- select {
- case nc.fch <- struct{}{}:
- default:
- }
- }
- }
- // Publish publishes the data argument to the given subject. The data
- // argument is left untouched and needs to be correctly interpreted on
- // the receiver.
- func (nc *Conn) Publish(subj string, data []byte) error {
- return nc.publish(subj, _EMPTY_, data)
- }
- // PublishMsg publishes the Msg structure, which includes the
- // Subject, an optional Reply and an optional Data field.
- func (nc *Conn) PublishMsg(m *Msg) error {
- if m == nil {
- return ErrInvalidMsg
- }
- return nc.publish(m.Subject, m.Reply, m.Data)
- }
- // PublishRequest will perform a Publish() excpecting a response on the
- // reply subject. Use Request() for automatically waiting for a response
- // inline.
- func (nc *Conn) PublishRequest(subj, reply string, data []byte) error {
- return nc.publish(subj, reply, data)
- }
- // Used for handrolled itoa
- const digits = "0123456789"
- // publish is the internal function to publish messages to a nats-server.
- // Sends a protocol data message by queuing into the bufio writer
- // and kicking the flush go routine. These writes should be protected.
- func (nc *Conn) publish(subj, reply string, data []byte) error {
- if nc == nil {
- return ErrInvalidConnection
- }
- if subj == "" {
- return ErrBadSubject
- }
- nc.mu.Lock()
- // Proactively reject payloads over the threshold set by server.
- msgSize := int64(len(data))
- if msgSize > nc.info.MaxPayload {
- nc.mu.Unlock()
- return ErrMaxPayload
- }
- if nc.isClosed() {
- nc.mu.Unlock()
- return ErrConnectionClosed
- }
- // Check if we are reconnecting, and if so check if
- // we have exceeded our reconnect outbound buffer limits.
- if nc.isReconnecting() {
- // Flush to underlying buffer.
- nc.bw.Flush()
- // Check if we are over
- if nc.pending.Len() >= nc.Opts.ReconnectBufSize {
- nc.mu.Unlock()
- return ErrReconnectBufExceeded
- }
- }
- msgh := nc.scratch[:len(_PUB_P_)]
- msgh = append(msgh, subj...)
- msgh = append(msgh, ' ')
- if reply != "" {
- msgh = append(msgh, reply...)
- msgh = append(msgh, ' ')
- }
- // We could be smarter here, but simple loop is ok,
- // just avoid strconv in fast path
- // FIXME(dlc) - Find a better way here.
- // msgh = strconv.AppendInt(msgh, int64(len(data)), 10)
- var b [12]byte
- var i = len(b)
- if len(data) > 0 {
- for l := len(data); l > 0; l /= 10 {
- i -= 1
- b[i] = digits[l%10]
- }
- } else {
- i -= 1
- b[i] = digits[0]
- }
- msgh = append(msgh, b[i:]...)
- msgh = append(msgh, _CRLF_...)
- _, err := nc.bw.Write(msgh)
- if err == nil {
- _, err = nc.bw.Write(data)
- }
- if err == nil {
- _, err = nc.bw.WriteString(_CRLF_)
- }
- if err != nil {
- nc.mu.Unlock()
- return err
- }
- nc.OutMsgs++
- nc.OutBytes += uint64(len(data))
- if len(nc.fch) == 0 {
- nc.kickFlusher()
- }
- nc.mu.Unlock()
- return nil
- }
- // respHandler is the global response handler. It will look up
- // the appropriate channel based on the last token and place
- // the message on the channel if possible.
- func (nc *Conn) respHandler(m *Msg) {
- rt := respToken(m.Subject)
- nc.mu.Lock()
- // Just return if closed.
- if nc.isClosed() {
- nc.mu.Unlock()
- return
- }
- // Grab mch
- mch := nc.respMap[rt]
- // Delete the key regardless, one response only.
- // FIXME(dlc) - should we track responses past 1
- // just statistics wise?
- delete(nc.respMap, rt)
- nc.mu.Unlock()
- // Don't block, let Request timeout instead, mch is
- // buffered and we should delete the key before a
- // second response is processed.
- select {
- case mch <- m:
- default:
- return
- }
- }
- // Create the response subscription we will use for all
- // new style responses. This will be on an _INBOX with an
- // additional terminal token. The subscription will be on
- // a wildcard. Caller is responsible for ensuring this is
- // only called once.
- func (nc *Conn) createRespMux(respSub string) error {
- s, err := nc.Subscribe(respSub, nc.respHandler)
- if err != nil {
- return err
- }
- nc.mu.Lock()
- nc.respMux = s
- nc.mu.Unlock()
- return nil
- }
- // Request will send a request payload and deliver the response message,
- // or an error, including a timeout if no message was received properly.
- func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) {
- if nc == nil {
- return nil, ErrInvalidConnection
- }
- nc.mu.Lock()
- // If user wants the old style.
- if nc.Opts.UseOldRequestStyle {
- nc.mu.Unlock()
- return nc.oldRequest(subj, data, timeout)
- }
- // Do setup for the new style.
- if nc.respMap == nil {
- // _INBOX wildcard
- nc.respSub = fmt.Sprintf("%s.*", NewInbox())
- nc.respMap = make(map[string]chan *Msg)
- }
- // Create literal Inbox and map to a chan msg.
- mch := make(chan *Msg, RequestChanLen)
- respInbox := nc.newRespInbox()
- token := respToken(respInbox)
- nc.respMap[token] = mch
- createSub := nc.respMux == nil
- ginbox := nc.respSub
- nc.mu.Unlock()
- if createSub {
- // Make sure scoped subscription is setup only once.
- var err error
- nc.respSetup.Do(func() { err = nc.createRespMux(ginbox) })
- if err != nil {
- return nil, err
- }
- }
- if err := nc.PublishRequest(subj, respInbox, data); err != nil {
- return nil, err
- }
- t := globalTimerPool.Get(timeout)
- defer globalTimerPool.Put(t)
- var ok bool
- var msg *Msg
- select {
- case msg, ok = <-mch:
- if !ok {
- return nil, ErrConnectionClosed
- }
- case <-t.C:
- nc.mu.Lock()
- delete(nc.respMap, token)
- nc.mu.Unlock()
- return nil, ErrTimeout
- }
- return msg, nil
- }
- // oldRequest will create an Inbox and perform a Request() call
- // with the Inbox reply and return the first reply received.
- // This is optimized for the case of multiple responses.
- func (nc *Conn) oldRequest(subj string, data []byte, timeout time.Duration) (*Msg, error) {
- inbox := NewInbox()
- ch := make(chan *Msg, RequestChanLen)
- s, err := nc.subscribe(inbox, _EMPTY_, nil, ch)
- if err != nil {
- return nil, err
- }
- s.AutoUnsubscribe(1)
- defer s.Unsubscribe()
- err = nc.PublishRequest(subj, inbox, data)
- if err != nil {
- return nil, err
- }
- return s.NextMsg(timeout)
- }
- // InboxPrefix is the prefix for all inbox subjects.
- const InboxPrefix = "_INBOX."
- const inboxPrefixLen = len(InboxPrefix)
- const respInboxPrefixLen = inboxPrefixLen + nuidSize + 1
- // NewInbox will return an inbox string which can be used for directed replies from
- // subscribers. These are guaranteed to be unique, but can be shared and subscribed
- // to by others.
- func NewInbox() string {
- var b [inboxPrefixLen + nuidSize]byte
- pres := b[:inboxPrefixLen]
- copy(pres, InboxPrefix)
- ns := b[inboxPrefixLen:]
- copy(ns, nuid.Next())
- return string(b[:])
- }
- // Creates a new literal response subject that will trigger
- // the global subscription handler.
- func (nc *Conn) newRespInbox() string {
- var b [inboxPrefixLen + (2 * nuidSize) + 1]byte
- pres := b[:respInboxPrefixLen]
- copy(pres, nc.respSub)
- ns := b[respInboxPrefixLen:]
- copy(ns, nuid.Next())
- return string(b[:])
- }
- // respToken will return the last token of a literal response inbox
- // which we use for the message channel lookup.
- func respToken(respInbox string) string {
- return respInbox[respInboxPrefixLen:]
- }
- // Subscribe will express interest in the given subject. The subject
- // can have wildcards (partial:*, full:>). Messages will be delivered
- // to the associated MsgHandler. If no MsgHandler is given, the
- // subscription is a synchronous subscription and can be polled via
- // Subscription.NextMsg().
- func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
- return nc.subscribe(subj, _EMPTY_, cb, nil)
- }
- // ChanSubscribe will place all messages received on the channel.
- // You should not close the channel until sub.Unsubscribe() has been called.
- func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
- return nc.subscribe(subj, _EMPTY_, nil, ch)
- }
- // ChanQueueSubscribe will place all messages received on the channel.
- // You should not close the channel until sub.Unsubscribe() has been called.
- func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
- return nc.subscribe(subj, group, nil, ch)
- }
- // SubscribeSync is syntactic sugar for Subscribe(subject, nil).
- func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
- if nc == nil {
- return nil, ErrInvalidConnection
- }
- mch := make(chan *Msg, nc.Opts.SubChanLen)
- s, e := nc.subscribe(subj, _EMPTY_, nil, mch)
- if s != nil {
- s.typ = SyncSubscription
- }
- return s, e
- }
- // QueueSubscribe creates an asynchronous queue subscriber on the given subject.
- // All subscribers with the same queue name will form the queue group and
- // only one member of the group will be selected to receive any given
- // message asynchronously.
- func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
- return nc.subscribe(subj, queue, cb, nil)
- }
- // QueueSubscribeSync creates a synchronous queue subscriber on the given
- // subject. All subscribers with the same queue name will form the queue
- // group and only one member of the group will be selected to receive any
- // given message synchronously.
- func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
- mch := make(chan *Msg, nc.Opts.SubChanLen)
- s, e := nc.subscribe(subj, queue, nil, mch)
- if s != nil {
- s.typ = SyncSubscription
- }
- return s, e
- }
- // QueueSubscribeSyncWithChan is syntactic sugar for ChanQueueSubscribe(subject, group, ch).
- func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
- return nc.subscribe(subj, queue, nil, ch)
- }
- // subscribe is the internal subscribe function that indicates interest in a subject.
- func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg) (*Subscription, error) {
- if nc == nil {
- return nil, ErrInvalidConnection
- }
- nc.mu.Lock()
- // ok here, but defer is generally expensive
- defer nc.mu.Unlock()
- defer nc.kickFlusher()
- // Check for some error conditions.
- if nc.isClosed() {
- return nil, ErrConnectionClosed
- }
- if cb == nil && ch == nil {
- return nil, ErrBadSubscription
- }
- sub := &Subscription{Subject: subj, Queue: queue, mcb: cb, conn: nc}
- // Set pending limits.
- sub.pMsgsLimit = DefaultSubPendingMsgsLimit
- sub.pBytesLimit = DefaultSubPendingBytesLimit
- // If we have an async callback, start up a sub specific
- // Go routine to deliver the messages.
- if cb != nil {
- sub.typ = AsyncSubscription
- sub.pCond = sync.NewCond(&sub.mu)
- go nc.waitForMsgs(sub)
- } else {
- sub.typ = ChanSubscription
- sub.mch = ch
- }
- nc.subsMu.Lock()
- nc.ssid++
- sub.sid = nc.ssid
- nc.subs[sub.sid] = sub
- nc.subsMu.Unlock()
- // We will send these for all subs when we reconnect
- // so that we can suppress here.
- if !nc.isReconnecting() {
- fmt.Fprintf(nc.bw, subProto, subj, queue, sub.sid)
- }
- return sub, nil
- }
- // Lock for nc should be held here upon entry
- func (nc *Conn) removeSub(s *Subscription) {
- nc.subsMu.Lock()
- delete(nc.subs, s.sid)
- nc.subsMu.Unlock()
- s.mu.Lock()
- defer s.mu.Unlock()
- // Release callers on NextMsg for SyncSubscription only
- if s.mch != nil && s.typ == SyncSubscription {
- close(s.mch)
- }
- s.mch = nil
- // Mark as invalid
- s.conn = nil
- s.closed = true
- if s.pCond != nil {
- s.pCond.Broadcast()
- }
- }
- // SubscriptionType is the type of the Subscription.
- type SubscriptionType int
- // The different types of subscription types.
- const (
- AsyncSubscription = SubscriptionType(iota)
- SyncSubscription
- ChanSubscription
- NilSubscription
- )
- // Type returns the type of Subscription.
- func (s *Subscription) Type() SubscriptionType {
- if s == nil {
- return NilSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- return s.typ
- }
- // IsValid returns a boolean indicating whether the subscription
- // is still active. This will return false if the subscription has
- // already been closed.
- func (s *Subscription) IsValid() bool {
- if s == nil {
- return false
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- return s.conn != nil
- }
- // Unsubscribe will remove interest in the given subject.
- func (s *Subscription) Unsubscribe() error {
- if s == nil {
- return ErrBadSubscription
- }
- s.mu.Lock()
- conn := s.conn
- s.mu.Unlock()
- if conn == nil {
- return ErrBadSubscription
- }
- return conn.unsubscribe(s, 0)
- }
- // AutoUnsubscribe will issue an automatic Unsubscribe that is
- // processed by the server when max messages have been received.
- // This can be useful when sending a request to an unknown number
- // of subscribers. Request() uses this functionality.
- func (s *Subscription) AutoUnsubscribe(max int) error {
- if s == nil {
- return ErrBadSubscription
- }
- s.mu.Lock()
- conn := s.conn
- s.mu.Unlock()
- if conn == nil {
- return ErrBadSubscription
- }
- return conn.unsubscribe(s, max)
- }
- // unsubscribe performs the low level unsubscribe to the server.
- // Use Subscription.Unsubscribe()
- func (nc *Conn) unsubscribe(sub *Subscription, max int) error {
- nc.mu.Lock()
- // ok here, but defer is expensive
- defer nc.mu.Unlock()
- defer nc.kickFlusher()
- if nc.isClosed() {
- return ErrConnectionClosed
- }
- nc.subsMu.RLock()
- s := nc.subs[sub.sid]
- nc.subsMu.RUnlock()
- // Already unsubscribed
- if s == nil {
- return nil
- }
- maxStr := _EMPTY_
- if max > 0 {
- s.max = uint64(max)
- maxStr = strconv.Itoa(max)
- } else {
- nc.removeSub(s)
- }
- // We will send these for all subs when we reconnect
- // so that we can suppress here.
- if !nc.isReconnecting() {
- fmt.Fprintf(nc.bw, unsubProto, s.sid, maxStr)
- }
- return nil
- }
- // NextMsg will return the next message available to a synchronous subscriber
- // or block until one is available. A timeout can be used to return when no
- // message has been delivered.
- func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
- if s == nil {
- return nil, ErrBadSubscription
- }
- s.mu.Lock()
- err := s.validateNextMsgState()
- if err != nil {
- s.mu.Unlock()
- return nil, err
- }
- // snapshot
- mch := s.mch
- s.mu.Unlock()
- var ok bool
- var msg *Msg
- t := globalTimerPool.Get(timeout)
- defer globalTimerPool.Put(t)
- select {
- case msg, ok = <-mch:
- if !ok {
- return nil, ErrConnectionClosed
- }
- err := s.processNextMsgDelivered(msg)
- if err != nil {
- return nil, err
- }
- case <-t.C:
- return nil, ErrTimeout
- }
- return msg, nil
- }
- // validateNextMsgState checks whether the subscription is in a valid
- // state to call NextMsg and be delivered another message synchronously.
- // This should be called while holding the lock.
- func (s *Subscription) validateNextMsgState() error {
- if s.connClosed {
- return ErrConnectionClosed
- }
- if s.mch == nil {
- if s.max > 0 && s.delivered >= s.max {
- return ErrMaxMessages
- } else if s.closed {
- return ErrBadSubscription
- }
- }
- if s.mcb != nil {
- return ErrSyncSubRequired
- }
- if s.sc {
- s.sc = false
- return ErrSlowConsumer
- }
- return nil
- }
- // processNextMsgDelivered takes a message and applies the needed
- // accounting to the stats from the subscription, returning an
- // error in case we have the maximum number of messages have been
- // delivered already. It should not be called while holding the lock.
- func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
- s.mu.Lock()
- nc := s.conn
- max := s.max
- // Update some stats.
- s.delivered++
- delivered := s.delivered
- if s.typ == SyncSubscription {
- s.pMsgs--
- s.pBytes -= len(msg.Data)
- }
- s.mu.Unlock()
- if max > 0 {
- if delivered > max {
- return ErrMaxMessages
- }
- // Remove subscription if we have reached max.
- if delivered == max {
- nc.mu.Lock()
- nc.removeSub(s)
- nc.mu.Unlock()
- }
- }
- return nil
- }
- // Queued returns the number of queued messages in the client for this subscription.
- // DEPRECATED: Use Pending()
- func (s *Subscription) QueuedMsgs() (int, error) {
- m, _, err := s.Pending()
- return int(m), err
- }
- // Pending returns the number of queued messages and queued bytes in the client for this subscription.
- func (s *Subscription) Pending() (int, int, error) {
- if s == nil {
- return -1, -1, ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil {
- return -1, -1, ErrBadSubscription
- }
- if s.typ == ChanSubscription {
- return -1, -1, ErrTypeSubscription
- }
- return s.pMsgs, s.pBytes, nil
- }
- // MaxPending returns the maximum number of queued messages and queued bytes seen so far.
- func (s *Subscription) MaxPending() (int, int, error) {
- if s == nil {
- return -1, -1, ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil {
- return -1, -1, ErrBadSubscription
- }
- if s.typ == ChanSubscription {
- return -1, -1, ErrTypeSubscription
- }
- return s.pMsgsMax, s.pBytesMax, nil
- }
- // ClearMaxPending resets the maximums seen so far.
- func (s *Subscription) ClearMaxPending() error {
- if s == nil {
- return ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil {
- return ErrBadSubscription
- }
- if s.typ == ChanSubscription {
- return ErrTypeSubscription
- }
- s.pMsgsMax, s.pBytesMax = 0, 0
- return nil
- }
- // Pending Limits
- const (
- DefaultSubPendingMsgsLimit = 65536
- DefaultSubPendingBytesLimit = 65536 * 1024
- )
- // PendingLimits returns the current limits for this subscription.
- // If no error is returned, a negative value indicates that the
- // given metric is not limited.
- func (s *Subscription) PendingLimits() (int, int, error) {
- if s == nil {
- return -1, -1, ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil {
- return -1, -1, ErrBadSubscription
- }
- if s.typ == ChanSubscription {
- return -1, -1, ErrTypeSubscription
- }
- return s.pMsgsLimit, s.pBytesLimit, nil
- }
- // SetPendingLimits sets the limits for pending msgs and bytes for this subscription.
- // Zero is not allowed. Any negative value means that the given metric is not limited.
- func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error {
- if s == nil {
- return ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil {
- return ErrBadSubscription
- }
- if s.typ == ChanSubscription {
- return ErrTypeSubscription
- }
- if msgLimit == 0 || bytesLimit == 0 {
- return ErrInvalidArg
- }
- s.pMsgsLimit, s.pBytesLimit = msgLimit, bytesLimit
- return nil
- }
- // Delivered returns the number of delivered messages for this subscription.
- func (s *Subscription) Delivered() (int64, error) {
- if s == nil {
- return -1, ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil {
- return -1, ErrBadSubscription
- }
- return int64(s.delivered), nil
- }
- // Dropped returns the number of known dropped messages for this subscription.
- // This will correspond to messages dropped by violations of PendingLimits. If
- // the server declares the connection a SlowConsumer, this number may not be
- // valid.
- func (s *Subscription) Dropped() (int, error) {
- if s == nil {
- return -1, ErrBadSubscription
- }
- s.mu.Lock()
- defer s.mu.Unlock()
- if s.conn == nil {
- return -1, ErrBadSubscription
- }
- return s.dropped, nil
- }
- // FIXME: This is a hack
- // removeFlushEntry is needed when we need to discard queued up responses
- // for our pings as part of a flush call. This happens when we have a flush
- // call outstanding and we call close.
- func (nc *Conn) removeFlushEntry(ch chan struct{}) bool {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- if nc.pongs == nil {
- return false
- }
- for i, c := range nc.pongs {
- if c == ch {
- nc.pongs[i] = nil
- return true
- }
- }
- return false
- }
- // The lock must be held entering this function.
- func (nc *Conn) sendPing(ch chan struct{}) {
- nc.pongs = append(nc.pongs, ch)
- nc.bw.WriteString(pingProto)
- // Flush in place.
- nc.bw.Flush()
- }
- // This will fire periodically and send a client origin
- // ping to the server. Will also check that we have received
- // responses from the server.
- func (nc *Conn) processPingTimer() {
- nc.mu.Lock()
- if nc.status != CONNECTED {
- nc.mu.Unlock()
- return
- }
- // Check for violation
- nc.pout++
- if nc.pout > nc.Opts.MaxPingsOut {
- nc.mu.Unlock()
- nc.processOpErr(ErrStaleConnection)
- return
- }
- nc.sendPing(nil)
- nc.ptmr.Reset(nc.Opts.PingInterval)
- nc.mu.Unlock()
- }
- // FlushTimeout allows a Flush operation to have an associated timeout.
- func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) {
- if nc == nil {
- return ErrInvalidConnection
- }
- if timeout <= 0 {
- return ErrBadTimeout
- }
- nc.mu.Lock()
- if nc.isClosed() {
- nc.mu.Unlock()
- return ErrConnectionClosed
- }
- t := globalTimerPool.Get(timeout)
- defer globalTimerPool.Put(t)
- // Create a buffered channel to prevent chan send to block
- // in processPong() if this code here times out just when
- // PONG was received.
- ch := make(chan struct{}, 1)
- nc.sendPing(ch)
- nc.mu.Unlock()
- select {
- case _, ok := <-ch:
- if !ok {
- err = ErrConnectionClosed
- } else {
- close(ch)
- }
- case <-t.C:
- err = ErrTimeout
- }
- if err != nil {
- nc.removeFlushEntry(ch)
- }
- return
- }
- // Flush will perform a round trip to the server and return when it
- // receives the internal reply.
- func (nc *Conn) Flush() error {
- return nc.FlushTimeout(60 * time.Second)
- }
- // Buffered will return the number of bytes buffered to be sent to the server.
- // FIXME(dlc) take into account disconnected state.
- func (nc *Conn) Buffered() (int, error) {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- if nc.isClosed() || nc.bw == nil {
- return -1, ErrConnectionClosed
- }
- return nc.bw.Buffered(), nil
- }
- // resendSubscriptions will send our subscription state back to the
- // server. Used in reconnects
- func (nc *Conn) resendSubscriptions() {
- // Since we are going to send protocols to the server, we don't want to
- // be holding the subsMu lock (which is used in processMsg). So copy
- // the subscriptions in a temporary array.
- nc.subsMu.RLock()
- subs := make([]*Subscription, 0, len(nc.subs))
- for _, s := range nc.subs {
- subs = append(subs, s)
- }
- nc.subsMu.RUnlock()
- for _, s := range subs {
- adjustedMax := uint64(0)
- s.mu.Lock()
- if s.max > 0 {
- if s.delivered < s.max {
- adjustedMax = s.max - s.delivered
- }
- // adjustedMax could be 0 here if the number of delivered msgs
- // reached the max, if so unsubscribe.
- if adjustedMax == 0 {
- s.mu.Unlock()
- fmt.Fprintf(nc.bw, unsubProto, s.sid, _EMPTY_)
- continue
- }
- }
- s.mu.Unlock()
- fmt.Fprintf(nc.bw, subProto, s.Subject, s.Queue, s.sid)
- if adjustedMax > 0 {
- maxStr := strconv.Itoa(int(adjustedMax))
- fmt.Fprintf(nc.bw, unsubProto, s.sid, maxStr)
- }
- }
- }
- // This will clear any pending flush calls and release pending calls.
- // Lock is assumed to be held by the caller.
- func (nc *Conn) clearPendingFlushCalls() {
- // Clear any queued pongs, e.g. pending flush calls.
- for _, ch := range nc.pongs {
- if ch != nil {
- close(ch)
- }
- }
- nc.pongs = nil
- }
- // This will clear any pending Request calls.
- // Lock is assumed to be held by the caller.
- func (nc *Conn) clearPendingRequestCalls() {
- if nc.respMap == nil {
- return
- }
- for key, ch := range nc.respMap {
- if ch != nil {
- close(ch)
- delete(nc.respMap, key)
- }
- }
- }
- // Low level close call that will do correct cleanup and set
- // desired status. Also controls whether user defined callbacks
- // will be triggered. The lock should not be held entering this
- // function. This function will handle the locking manually.
- func (nc *Conn) close(status Status, doCBs bool) {
- nc.mu.Lock()
- if nc.isClosed() {
- nc.status = status
- nc.mu.Unlock()
- return
- }
- nc.status = CLOSED
- // Kick the Go routines so they fall out.
- nc.kickFlusher()
- nc.mu.Unlock()
- nc.mu.Lock()
- // Clear any queued pongs, e.g. pending flush calls.
- nc.clearPendingFlushCalls()
- // Clear any queued and blocking Requests.
- nc.clearPendingRequestCalls()
- if nc.ptmr != nil {
- nc.ptmr.Stop()
- }
- // Go ahead and make sure we have flushed the outbound
- if nc.conn != nil {
- nc.bw.Flush()
- defer nc.conn.Close()
- }
- // Close sync subscriber channels and release any
- // pending NextMsg() calls.
- nc.subsMu.Lock()
- for _, s := range nc.subs {
- s.mu.Lock()
- // Release callers on NextMsg for SyncSubscription only
- if s.mch != nil && s.typ == SyncSubscription {
- close(s.mch)
- }
- s.mch = nil
- // Mark as invalid, for signaling to deliverMsgs
- s.closed = true
- // Mark connection closed in subscription
- s.connClosed = true
- // If we have an async subscription, signals it to exit
- if s.typ == AsyncSubscription && s.pCond != nil {
- s.pCond.Signal()
- }
- s.mu.Unlock()
- }
- nc.subs = nil
- nc.subsMu.Unlock()
- // Perform appropriate callback if needed for a disconnect.
- if doCBs {
- if nc.Opts.DisconnectedCB != nil && nc.conn != nil {
- nc.ach <- func() { nc.Opts.DisconnectedCB(nc) }
- }
- if nc.Opts.ClosedCB != nil {
- nc.ach <- func() { nc.Opts.ClosedCB(nc) }
- }
- nc.ach <- nc.closeAsyncFunc()
- }
- nc.status = status
- nc.mu.Unlock()
- }
- // Close will close the connection to the server. This call will release
- // all blocking calls, such as Flush() and NextMsg()
- func (nc *Conn) Close() {
- nc.close(CLOSED, true)
- }
- // IsClosed tests if a Conn has been closed.
- func (nc *Conn) IsClosed() bool {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- return nc.isClosed()
- }
- // IsReconnecting tests if a Conn is reconnecting.
- func (nc *Conn) IsReconnecting() bool {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- return nc.isReconnecting()
- }
- // IsConnected tests if a Conn is connected.
- func (nc *Conn) IsConnected() bool {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- return nc.isConnected()
- }
- // caller must lock
- func (nc *Conn) getServers(implicitOnly bool) []string {
- poolSize := len(nc.srvPool)
- var servers = make([]string, 0)
- for i := 0; i < poolSize; i++ {
- if implicitOnly && !nc.srvPool[i].isImplicit {
- continue
- }
- url := nc.srvPool[i].url
- servers = append(servers, fmt.Sprintf("%s://%s", url.Scheme, url.Host))
- }
- return servers
- }
- // Servers returns the list of known server urls, including additional
- // servers discovered after a connection has been established. If
- // authentication is enabled, use UserInfo or Token when connecting with
- // these urls.
- func (nc *Conn) Servers() []string {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- return nc.getServers(false)
- }
- // DiscoveredServers returns only the server urls that have been discovered
- // after a connection has been established. If authentication is enabled,
- // use UserInfo or Token when connecting with these urls.
- func (nc *Conn) DiscoveredServers() []string {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- return nc.getServers(true)
- }
- // Status returns the current state of the connection.
- func (nc *Conn) Status() Status {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- return nc.status
- }
- // Test if Conn has been closed Lock is assumed held.
- func (nc *Conn) isClosed() bool {
- return nc.status == CLOSED
- }
- // Test if Conn is in the process of connecting
- func (nc *Conn) isConnecting() bool {
- return nc.status == CONNECTING
- }
- // Test if Conn is being reconnected.
- func (nc *Conn) isReconnecting() bool {
- return nc.status == RECONNECTING
- }
- // Test if Conn is connected or connecting.
- func (nc *Conn) isConnected() bool {
- return nc.status == CONNECTED
- }
- // Stats will return a race safe copy of the Statistics section for the connection.
- func (nc *Conn) Stats() Statistics {
- // Stats are updated either under connection's mu or subsMu mutexes.
- // Lock both to safely get them.
- nc.mu.Lock()
- nc.subsMu.RLock()
- stats := Statistics{
- InMsgs: nc.InMsgs,
- InBytes: nc.InBytes,
- OutMsgs: nc.OutMsgs,
- OutBytes: nc.OutBytes,
- Reconnects: nc.Reconnects,
- }
- nc.subsMu.RUnlock()
- nc.mu.Unlock()
- return stats
- }
- // MaxPayload returns the size limit that a message payload can have.
- // This is set by the server configuration and delivered to the client
- // upon connect.
- func (nc *Conn) MaxPayload() int64 {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- return nc.info.MaxPayload
- }
- // AuthRequired will return if the connected server requires authorization.
- func (nc *Conn) AuthRequired() bool {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- return nc.info.AuthRequired
- }
- // TLSRequired will return if the connected server requires TLS connections.
- func (nc *Conn) TLSRequired() bool {
- nc.mu.Lock()
- defer nc.mu.Unlock()
- return nc.info.TLSRequired
- }
- // Barrier schedules the given function `f` to all registered asynchronous
- // subscriptions.
- // Only the last subscription to see this barrier will invoke the function.
- // If no subscription is registered at the time of this call, `f()` is invoked
- // right away.
- // ErrConnectionClosed is returned if the connection is closed prior to
- // the call.
- func (nc *Conn) Barrier(f func()) error {
- nc.mu.Lock()
- if nc.isClosed() {
- nc.mu.Unlock()
- return ErrConnectionClosed
- }
- nc.subsMu.Lock()
- // Need to figure out how many non chan subscriptions there are
- numSubs := 0
- for _, sub := range nc.subs {
- if sub.typ == AsyncSubscription {
- numSubs++
- }
- }
- if numSubs == 0 {
- nc.subsMu.Unlock()
- nc.mu.Unlock()
- f()
- return nil
- }
- barrier := &barrierInfo{refs: int64(numSubs), f: f}
- for _, sub := range nc.subs {
- sub.mu.Lock()
- if sub.mch == nil {
- msg := &Msg{barrier: barrier}
- // Push onto the async pList
- if sub.pTail != nil {
- sub.pTail.next = msg
- } else {
- sub.pHead = msg
- sub.pCond.Signal()
- }
- sub.pTail = msg
- }
- sub.mu.Unlock()
- }
- nc.subsMu.Unlock()
- nc.mu.Unlock()
- return nil
- }
|