clientconn.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "errors"
  21. "fmt"
  22. "math"
  23. "net"
  24. "reflect"
  25. "strings"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "golang.org/x/net/context"
  30. "golang.org/x/net/trace"
  31. "google.golang.org/grpc/balancer"
  32. _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
  33. "google.golang.org/grpc/codes"
  34. "google.golang.org/grpc/connectivity"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/grpclog"
  37. "google.golang.org/grpc/internal/backoff"
  38. "google.golang.org/grpc/internal/channelz"
  39. "google.golang.org/grpc/internal/transport"
  40. "google.golang.org/grpc/keepalive"
  41. "google.golang.org/grpc/resolver"
  42. _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
  43. _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
  44. "google.golang.org/grpc/status"
  45. )
  46. const (
  47. // minimum time to give a connection to complete
  48. minConnectTimeout = 20 * time.Second
  49. // must match grpclbName in grpclb/grpclb.go
  50. grpclbName = "grpclb"
  51. )
  52. var (
  53. // ErrClientConnClosing indicates that the operation is illegal because
  54. // the ClientConn is closing.
  55. //
  56. // Deprecated: this error should not be relied upon by users; use the status
  57. // code of Canceled instead.
  58. ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
  59. // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
  60. errConnDrain = errors.New("grpc: the connection is drained")
  61. // errConnClosing indicates that the connection is closing.
  62. errConnClosing = errors.New("grpc: the connection is closing")
  63. // errConnUnavailable indicates that the connection is unavailable.
  64. errConnUnavailable = errors.New("grpc: the connection is unavailable")
  65. // errBalancerClosed indicates that the balancer is closed.
  66. errBalancerClosed = errors.New("grpc: balancer is closed")
  67. // We use an accessor so that minConnectTimeout can be
  68. // atomically read and updated while testing.
  69. getMinConnectTimeout = func() time.Duration {
  70. return minConnectTimeout
  71. }
  72. )
  73. // The following errors are returned from Dial and DialContext
  74. var (
  75. // errNoTransportSecurity indicates that there is no transport security
  76. // being set for ClientConn. Users should either set one or explicitly
  77. // call WithInsecure DialOption to disable security.
  78. errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
  79. // errTransportCredentialsMissing indicates that users want to transmit security
  80. // information (e.g., oauth2 token) which requires secure connection on an insecure
  81. // connection.
  82. errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
  83. // errCredentialsConflict indicates that grpc.WithTransportCredentials()
  84. // and grpc.WithInsecure() are both called for a connection.
  85. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
  86. // errNetworkIO indicates that the connection is down due to some network I/O error.
  87. errNetworkIO = errors.New("grpc: failed with network I/O error")
  88. )
  89. const (
  90. defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
  91. defaultClientMaxSendMessageSize = math.MaxInt32
  92. // http2IOBufSize specifies the buffer size for sending frames.
  93. defaultWriteBufSize = 32 * 1024
  94. defaultReadBufSize = 32 * 1024
  95. )
  96. // RegisterChannelz turns on channelz service.
  97. // This is an EXPERIMENTAL API.
  98. func RegisterChannelz() {
  99. channelz.TurnOn()
  100. }
  101. // Dial creates a client connection to the given target.
  102. func Dial(target string, opts ...DialOption) (*ClientConn, error) {
  103. return DialContext(context.Background(), target, opts...)
  104. }
  105. // DialContext creates a client connection to the given target. By default, it's
  106. // a non-blocking dial (the function won't wait for connections to be
  107. // established, and connecting happens in the background). To make it a blocking
  108. // dial, use WithBlock() dial option.
  109. //
  110. // In the non-blocking case, the ctx does not act against the connection. It
  111. // only controls the setup steps.
  112. //
  113. // In the blocking case, ctx can be used to cancel or expire the pending
  114. // connection. Once this function returns, the cancellation and expiration of
  115. // ctx will be noop. Users should call ClientConn.Close to terminate all the
  116. // pending operations after this function returns.
  117. //
  118. // The target name syntax is defined in
  119. // https://github.com/grpc/grpc/blob/master/doc/naming.md.
  120. // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
  121. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  122. cc := &ClientConn{
  123. target: target,
  124. csMgr: &connectivityStateManager{},
  125. conns: make(map[*addrConn]struct{}),
  126. dopts: defaultDialOptions(),
  127. blockingpicker: newPickerWrapper(),
  128. }
  129. cc.retryThrottler.Store((*retryThrottler)(nil))
  130. cc.ctx, cc.cancel = context.WithCancel(context.Background())
  131. for _, opt := range opts {
  132. opt(&cc.dopts)
  133. }
  134. if channelz.IsOn() {
  135. if cc.dopts.channelzParentID != 0 {
  136. cc.channelzID = channelz.RegisterChannel(cc, cc.dopts.channelzParentID, target)
  137. } else {
  138. cc.channelzID = channelz.RegisterChannel(cc, 0, target)
  139. }
  140. }
  141. if !cc.dopts.insecure {
  142. if cc.dopts.copts.TransportCredentials == nil {
  143. return nil, errNoTransportSecurity
  144. }
  145. } else {
  146. if cc.dopts.copts.TransportCredentials != nil {
  147. return nil, errCredentialsConflict
  148. }
  149. for _, cd := range cc.dopts.copts.PerRPCCredentials {
  150. if cd.RequireTransportSecurity() {
  151. return nil, errTransportCredentialsMissing
  152. }
  153. }
  154. }
  155. cc.mkp = cc.dopts.copts.KeepaliveParams
  156. if cc.dopts.copts.Dialer == nil {
  157. cc.dopts.copts.Dialer = newProxyDialer(
  158. func(ctx context.Context, addr string) (net.Conn, error) {
  159. network, addr := parseDialTarget(addr)
  160. return dialContext(ctx, network, addr)
  161. },
  162. )
  163. }
  164. if cc.dopts.copts.UserAgent != "" {
  165. cc.dopts.copts.UserAgent += " " + grpcUA
  166. } else {
  167. cc.dopts.copts.UserAgent = grpcUA
  168. }
  169. if cc.dopts.timeout > 0 {
  170. var cancel context.CancelFunc
  171. ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
  172. defer cancel()
  173. }
  174. defer func() {
  175. select {
  176. case <-ctx.Done():
  177. conn, err = nil, ctx.Err()
  178. default:
  179. }
  180. if err != nil {
  181. cc.Close()
  182. }
  183. }()
  184. scSet := false
  185. if cc.dopts.scChan != nil {
  186. // Try to get an initial service config.
  187. select {
  188. case sc, ok := <-cc.dopts.scChan:
  189. if ok {
  190. cc.sc = sc
  191. scSet = true
  192. }
  193. default:
  194. }
  195. }
  196. if cc.dopts.bs == nil {
  197. cc.dopts.bs = backoff.Exponential{
  198. MaxDelay: DefaultBackoffConfig.MaxDelay,
  199. }
  200. }
  201. if cc.dopts.resolverBuilder == nil {
  202. // Only try to parse target when resolver builder is not already set.
  203. cc.parsedTarget = parseTarget(cc.target)
  204. grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
  205. cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
  206. if cc.dopts.resolverBuilder == nil {
  207. // If resolver builder is still nil, the parse target's scheme is
  208. // not registered. Fallback to default resolver and set Endpoint to
  209. // the original unparsed target.
  210. grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
  211. cc.parsedTarget = resolver.Target{
  212. Scheme: resolver.GetDefaultScheme(),
  213. Endpoint: target,
  214. }
  215. cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
  216. }
  217. } else {
  218. cc.parsedTarget = resolver.Target{Endpoint: target}
  219. }
  220. creds := cc.dopts.copts.TransportCredentials
  221. if creds != nil && creds.Info().ServerName != "" {
  222. cc.authority = creds.Info().ServerName
  223. } else if cc.dopts.insecure && cc.dopts.authority != "" {
  224. cc.authority = cc.dopts.authority
  225. } else {
  226. // Use endpoint from "scheme://authority/endpoint" as the default
  227. // authority for ClientConn.
  228. cc.authority = cc.parsedTarget.Endpoint
  229. }
  230. if cc.dopts.scChan != nil && !scSet {
  231. // Blocking wait for the initial service config.
  232. select {
  233. case sc, ok := <-cc.dopts.scChan:
  234. if ok {
  235. cc.sc = sc
  236. }
  237. case <-ctx.Done():
  238. return nil, ctx.Err()
  239. }
  240. }
  241. if cc.dopts.scChan != nil {
  242. go cc.scWatcher()
  243. }
  244. var credsClone credentials.TransportCredentials
  245. if creds := cc.dopts.copts.TransportCredentials; creds != nil {
  246. credsClone = creds.Clone()
  247. }
  248. cc.balancerBuildOpts = balancer.BuildOptions{
  249. DialCreds: credsClone,
  250. Dialer: cc.dopts.copts.Dialer,
  251. ChannelzParentID: cc.channelzID,
  252. }
  253. // Build the resolver.
  254. cc.resolverWrapper, err = newCCResolverWrapper(cc)
  255. if err != nil {
  256. return nil, fmt.Errorf("failed to build resolver: %v", err)
  257. }
  258. // Start the resolver wrapper goroutine after resolverWrapper is created.
  259. //
  260. // If the goroutine is started before resolverWrapper is ready, the
  261. // following may happen: The goroutine sends updates to cc. cc forwards
  262. // those to balancer. Balancer creates new addrConn. addrConn fails to
  263. // connect, and calls resolveNow(). resolveNow() tries to use the non-ready
  264. // resolverWrapper.
  265. cc.resolverWrapper.start()
  266. // A blocking dial blocks until the clientConn is ready.
  267. if cc.dopts.block {
  268. for {
  269. s := cc.GetState()
  270. if s == connectivity.Ready {
  271. break
  272. }
  273. if !cc.WaitForStateChange(ctx, s) {
  274. // ctx got timeout or canceled.
  275. return nil, ctx.Err()
  276. }
  277. }
  278. }
  279. return cc, nil
  280. }
  281. // connectivityStateManager keeps the connectivity.State of ClientConn.
  282. // This struct will eventually be exported so the balancers can access it.
  283. type connectivityStateManager struct {
  284. mu sync.Mutex
  285. state connectivity.State
  286. notifyChan chan struct{}
  287. }
  288. // updateState updates the connectivity.State of ClientConn.
  289. // If there's a change it notifies goroutines waiting on state change to
  290. // happen.
  291. func (csm *connectivityStateManager) updateState(state connectivity.State) {
  292. csm.mu.Lock()
  293. defer csm.mu.Unlock()
  294. if csm.state == connectivity.Shutdown {
  295. return
  296. }
  297. if csm.state == state {
  298. return
  299. }
  300. csm.state = state
  301. if csm.notifyChan != nil {
  302. // There are other goroutines waiting on this channel.
  303. close(csm.notifyChan)
  304. csm.notifyChan = nil
  305. }
  306. }
  307. func (csm *connectivityStateManager) getState() connectivity.State {
  308. csm.mu.Lock()
  309. defer csm.mu.Unlock()
  310. return csm.state
  311. }
  312. func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
  313. csm.mu.Lock()
  314. defer csm.mu.Unlock()
  315. if csm.notifyChan == nil {
  316. csm.notifyChan = make(chan struct{})
  317. }
  318. return csm.notifyChan
  319. }
  320. // ClientConn represents a client connection to an RPC server.
  321. type ClientConn struct {
  322. ctx context.Context
  323. cancel context.CancelFunc
  324. target string
  325. parsedTarget resolver.Target
  326. authority string
  327. dopts dialOptions
  328. csMgr *connectivityStateManager
  329. balancerBuildOpts balancer.BuildOptions
  330. resolverWrapper *ccResolverWrapper
  331. blockingpicker *pickerWrapper
  332. mu sync.RWMutex
  333. sc ServiceConfig
  334. scRaw string
  335. conns map[*addrConn]struct{}
  336. // Keepalive parameter can be updated if a GoAway is received.
  337. mkp keepalive.ClientParameters
  338. curBalancerName string
  339. preBalancerName string // previous balancer name.
  340. curAddresses []resolver.Address
  341. balancerWrapper *ccBalancerWrapper
  342. retryThrottler atomic.Value
  343. channelzID int64 // channelz unique identification number
  344. czmu sync.RWMutex
  345. callsStarted int64
  346. callsSucceeded int64
  347. callsFailed int64
  348. lastCallStartedTime time.Time
  349. }
  350. // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
  351. // ctx expires. A true value is returned in former case and false in latter.
  352. // This is an EXPERIMENTAL API.
  353. func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
  354. ch := cc.csMgr.getNotifyChan()
  355. if cc.csMgr.getState() != sourceState {
  356. return true
  357. }
  358. select {
  359. case <-ctx.Done():
  360. return false
  361. case <-ch:
  362. return true
  363. }
  364. }
  365. // GetState returns the connectivity.State of ClientConn.
  366. // This is an EXPERIMENTAL API.
  367. func (cc *ClientConn) GetState() connectivity.State {
  368. return cc.csMgr.getState()
  369. }
  370. func (cc *ClientConn) scWatcher() {
  371. for {
  372. select {
  373. case sc, ok := <-cc.dopts.scChan:
  374. if !ok {
  375. return
  376. }
  377. cc.mu.Lock()
  378. // TODO: load balance policy runtime change is ignored.
  379. // We may revist this decision in the future.
  380. cc.sc = sc
  381. cc.scRaw = ""
  382. cc.mu.Unlock()
  383. case <-cc.ctx.Done():
  384. return
  385. }
  386. }
  387. }
  388. func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
  389. cc.mu.Lock()
  390. defer cc.mu.Unlock()
  391. if cc.conns == nil {
  392. // cc was closed.
  393. return
  394. }
  395. if reflect.DeepEqual(cc.curAddresses, addrs) {
  396. return
  397. }
  398. cc.curAddresses = addrs
  399. if cc.dopts.balancerBuilder == nil {
  400. // Only look at balancer types and switch balancer if balancer dial
  401. // option is not set.
  402. var isGRPCLB bool
  403. for _, a := range addrs {
  404. if a.Type == resolver.GRPCLB {
  405. isGRPCLB = true
  406. break
  407. }
  408. }
  409. var newBalancerName string
  410. if isGRPCLB {
  411. newBalancerName = grpclbName
  412. } else {
  413. // Address list doesn't contain grpclb address. Try to pick a
  414. // non-grpclb balancer.
  415. newBalancerName = cc.curBalancerName
  416. // If current balancer is grpclb, switch to the previous one.
  417. if newBalancerName == grpclbName {
  418. newBalancerName = cc.preBalancerName
  419. }
  420. // The following could be true in two cases:
  421. // - the first time handling resolved addresses
  422. // (curBalancerName="")
  423. // - the first time handling non-grpclb addresses
  424. // (curBalancerName="grpclb", preBalancerName="")
  425. if newBalancerName == "" {
  426. newBalancerName = PickFirstBalancerName
  427. }
  428. }
  429. cc.switchBalancer(newBalancerName)
  430. } else if cc.balancerWrapper == nil {
  431. // Balancer dial option was set, and this is the first time handling
  432. // resolved addresses. Build a balancer with dopts.balancerBuilder.
  433. cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
  434. }
  435. cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
  436. }
  437. // switchBalancer starts the switching from current balancer to the balancer
  438. // with the given name.
  439. //
  440. // It will NOT send the current address list to the new balancer. If needed,
  441. // caller of this function should send address list to the new balancer after
  442. // this function returns.
  443. //
  444. // Caller must hold cc.mu.
  445. func (cc *ClientConn) switchBalancer(name string) {
  446. if cc.conns == nil {
  447. return
  448. }
  449. if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
  450. return
  451. }
  452. grpclog.Infof("ClientConn switching balancer to %q", name)
  453. if cc.dopts.balancerBuilder != nil {
  454. grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
  455. return
  456. }
  457. // TODO(bar switching) change this to two steps: drain and close.
  458. // Keep track of sc in wrapper.
  459. if cc.balancerWrapper != nil {
  460. cc.balancerWrapper.close()
  461. }
  462. builder := balancer.Get(name)
  463. if builder == nil {
  464. grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
  465. builder = newPickfirstBuilder()
  466. }
  467. cc.preBalancerName = cc.curBalancerName
  468. cc.curBalancerName = builder.Name()
  469. cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
  470. }
  471. func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  472. cc.mu.Lock()
  473. if cc.conns == nil {
  474. cc.mu.Unlock()
  475. return
  476. }
  477. // TODO(bar switching) send updates to all balancer wrappers when balancer
  478. // gracefully switching is supported.
  479. cc.balancerWrapper.handleSubConnStateChange(sc, s)
  480. cc.mu.Unlock()
  481. }
  482. // newAddrConn creates an addrConn for addrs and adds it to cc.conns.
  483. //
  484. // Caller needs to make sure len(addrs) > 0.
  485. func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
  486. ac := &addrConn{
  487. cc: cc,
  488. addrs: addrs,
  489. dopts: cc.dopts,
  490. }
  491. ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
  492. // Track ac in cc. This needs to be done before any getTransport(...) is called.
  493. cc.mu.Lock()
  494. if cc.conns == nil {
  495. cc.mu.Unlock()
  496. return nil, ErrClientConnClosing
  497. }
  498. if channelz.IsOn() {
  499. ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
  500. }
  501. cc.conns[ac] = struct{}{}
  502. cc.mu.Unlock()
  503. return ac, nil
  504. }
  505. // removeAddrConn removes the addrConn in the subConn from clientConn.
  506. // It also tears down the ac with the given error.
  507. func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
  508. cc.mu.Lock()
  509. if cc.conns == nil {
  510. cc.mu.Unlock()
  511. return
  512. }
  513. delete(cc.conns, ac)
  514. cc.mu.Unlock()
  515. ac.tearDown(err)
  516. }
  517. // ChannelzMetric returns ChannelInternalMetric of current ClientConn.
  518. // This is an EXPERIMENTAL API.
  519. func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric {
  520. state := cc.GetState()
  521. cc.czmu.RLock()
  522. defer cc.czmu.RUnlock()
  523. return &channelz.ChannelInternalMetric{
  524. State: state,
  525. Target: cc.target,
  526. CallsStarted: cc.callsStarted,
  527. CallsSucceeded: cc.callsSucceeded,
  528. CallsFailed: cc.callsFailed,
  529. LastCallStartedTimestamp: cc.lastCallStartedTime,
  530. }
  531. }
  532. func (cc *ClientConn) incrCallsStarted() {
  533. cc.czmu.Lock()
  534. cc.callsStarted++
  535. // TODO(yuxuanli): will make this a time.Time pointer improve performance?
  536. cc.lastCallStartedTime = time.Now()
  537. cc.czmu.Unlock()
  538. }
  539. func (cc *ClientConn) incrCallsSucceeded() {
  540. cc.czmu.Lock()
  541. cc.callsSucceeded++
  542. cc.czmu.Unlock()
  543. }
  544. func (cc *ClientConn) incrCallsFailed() {
  545. cc.czmu.Lock()
  546. cc.callsFailed++
  547. cc.czmu.Unlock()
  548. }
  549. // connect starts to creating transport and also starts the transport monitor
  550. // goroutine for this ac.
  551. // It does nothing if the ac is not IDLE.
  552. // TODO(bar) Move this to the addrConn section.
  553. // This was part of resetAddrConn, keep it here to make the diff look clean.
  554. func (ac *addrConn) connect() error {
  555. ac.mu.Lock()
  556. if ac.state == connectivity.Shutdown {
  557. ac.mu.Unlock()
  558. return errConnClosing
  559. }
  560. if ac.state != connectivity.Idle {
  561. ac.mu.Unlock()
  562. return nil
  563. }
  564. ac.state = connectivity.Connecting
  565. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  566. ac.mu.Unlock()
  567. // Start a goroutine connecting to the server asynchronously.
  568. go func() {
  569. if err := ac.resetTransport(); err != nil {
  570. grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
  571. if err != errConnClosing {
  572. // Keep this ac in cc.conns, to get the reason it's torn down.
  573. ac.tearDown(err)
  574. }
  575. return
  576. }
  577. ac.transportMonitor()
  578. }()
  579. return nil
  580. }
  581. // tryUpdateAddrs tries to update ac.addrs with the new addresses list.
  582. //
  583. // It checks whether current connected address of ac is in the new addrs list.
  584. // - If true, it updates ac.addrs and returns true. The ac will keep using
  585. // the existing connection.
  586. // - If false, it does nothing and returns false.
  587. func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
  588. ac.mu.Lock()
  589. defer ac.mu.Unlock()
  590. grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
  591. if ac.state == connectivity.Shutdown {
  592. ac.addrs = addrs
  593. return true
  594. }
  595. var curAddrFound bool
  596. for _, a := range addrs {
  597. if reflect.DeepEqual(ac.curAddr, a) {
  598. curAddrFound = true
  599. break
  600. }
  601. }
  602. grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
  603. if curAddrFound {
  604. ac.addrs = addrs
  605. ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list.
  606. }
  607. return curAddrFound
  608. }
  609. // GetMethodConfig gets the method config of the input method.
  610. // If there's an exact match for input method (i.e. /service/method), we return
  611. // the corresponding MethodConfig.
  612. // If there isn't an exact match for the input method, we look for the default config
  613. // under the service (i.e /service/). If there is a default MethodConfig for
  614. // the service, we return it.
  615. // Otherwise, we return an empty MethodConfig.
  616. func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
  617. // TODO: Avoid the locking here.
  618. cc.mu.RLock()
  619. defer cc.mu.RUnlock()
  620. m, ok := cc.sc.Methods[method]
  621. if !ok {
  622. i := strings.LastIndex(method, "/")
  623. m = cc.sc.Methods[method[:i+1]]
  624. }
  625. return m
  626. }
  627. func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
  628. t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
  629. FullMethodName: method,
  630. })
  631. if err != nil {
  632. return nil, nil, toRPCErr(err)
  633. }
  634. return t, done, nil
  635. }
  636. // handleServiceConfig parses the service config string in JSON format to Go native
  637. // struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
  638. func (cc *ClientConn) handleServiceConfig(js string) error {
  639. if cc.dopts.disableServiceConfig {
  640. return nil
  641. }
  642. sc, err := parseServiceConfig(js)
  643. if err != nil {
  644. return err
  645. }
  646. cc.mu.Lock()
  647. cc.scRaw = js
  648. cc.sc = sc
  649. if sc.retryThrottling != nil {
  650. newThrottler := &retryThrottler{
  651. tokens: sc.retryThrottling.MaxTokens,
  652. max: sc.retryThrottling.MaxTokens,
  653. thresh: sc.retryThrottling.MaxTokens / 2,
  654. ratio: sc.retryThrottling.TokenRatio,
  655. }
  656. cc.retryThrottler.Store(newThrottler)
  657. } else {
  658. cc.retryThrottler.Store((*retryThrottler)(nil))
  659. }
  660. if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
  661. if cc.curBalancerName == grpclbName {
  662. // If current balancer is grpclb, there's at least one grpclb
  663. // balancer address in the resolved list. Don't switch the balancer,
  664. // but change the previous balancer name, so if a new resolved
  665. // address list doesn't contain grpclb address, balancer will be
  666. // switched to *sc.LB.
  667. cc.preBalancerName = *sc.LB
  668. } else {
  669. cc.switchBalancer(*sc.LB)
  670. cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
  671. }
  672. }
  673. cc.mu.Unlock()
  674. return nil
  675. }
  676. func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
  677. cc.mu.RLock()
  678. r := cc.resolverWrapper
  679. cc.mu.RUnlock()
  680. if r == nil {
  681. return
  682. }
  683. go r.resolveNow(o)
  684. }
  685. // Close tears down the ClientConn and all underlying connections.
  686. func (cc *ClientConn) Close() error {
  687. defer cc.cancel()
  688. cc.mu.Lock()
  689. if cc.conns == nil {
  690. cc.mu.Unlock()
  691. return ErrClientConnClosing
  692. }
  693. conns := cc.conns
  694. cc.conns = nil
  695. cc.csMgr.updateState(connectivity.Shutdown)
  696. rWrapper := cc.resolverWrapper
  697. cc.resolverWrapper = nil
  698. bWrapper := cc.balancerWrapper
  699. cc.balancerWrapper = nil
  700. cc.mu.Unlock()
  701. cc.blockingpicker.close()
  702. if rWrapper != nil {
  703. rWrapper.close()
  704. }
  705. if bWrapper != nil {
  706. bWrapper.close()
  707. }
  708. for ac := range conns {
  709. ac.tearDown(ErrClientConnClosing)
  710. }
  711. if channelz.IsOn() {
  712. channelz.RemoveEntry(cc.channelzID)
  713. }
  714. return nil
  715. }
  716. // addrConn is a network connection to a given address.
  717. type addrConn struct {
  718. ctx context.Context
  719. cancel context.CancelFunc
  720. cc *ClientConn
  721. addrs []resolver.Address
  722. dopts dialOptions
  723. events trace.EventLog
  724. acbw balancer.SubConn
  725. mu sync.Mutex
  726. curAddr resolver.Address
  727. reconnectIdx int // The index in addrs list to start reconnecting from.
  728. state connectivity.State
  729. // ready is closed and becomes nil when a new transport is up or failed
  730. // due to timeout.
  731. ready chan struct{}
  732. transport transport.ClientTransport
  733. // The reason this addrConn is torn down.
  734. tearDownErr error
  735. connectRetryNum int
  736. // backoffDeadline is the time until which resetTransport needs to
  737. // wait before increasing connectRetryNum count.
  738. backoffDeadline time.Time
  739. // connectDeadline is the time by which all connection
  740. // negotiations must complete.
  741. connectDeadline time.Time
  742. channelzID int64 // channelz unique identification number
  743. czmu sync.RWMutex
  744. callsStarted int64
  745. callsSucceeded int64
  746. callsFailed int64
  747. lastCallStartedTime time.Time
  748. }
  749. // adjustParams updates parameters used to create transports upon
  750. // receiving a GoAway.
  751. func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
  752. switch r {
  753. case transport.GoAwayTooManyPings:
  754. v := 2 * ac.dopts.copts.KeepaliveParams.Time
  755. ac.cc.mu.Lock()
  756. if v > ac.cc.mkp.Time {
  757. ac.cc.mkp.Time = v
  758. }
  759. ac.cc.mu.Unlock()
  760. }
  761. }
  762. // printf records an event in ac's event log, unless ac has been closed.
  763. // REQUIRES ac.mu is held.
  764. func (ac *addrConn) printf(format string, a ...interface{}) {
  765. if ac.events != nil {
  766. ac.events.Printf(format, a...)
  767. }
  768. }
  769. // errorf records an error in ac's event log, unless ac has been closed.
  770. // REQUIRES ac.mu is held.
  771. func (ac *addrConn) errorf(format string, a ...interface{}) {
  772. if ac.events != nil {
  773. ac.events.Errorf(format, a...)
  774. }
  775. }
  776. // resetTransport recreates a transport to the address for ac. The old
  777. // transport will close itself on error or when the clientconn is closed.
  778. // The created transport must receive initial settings frame from the server.
  779. // In case that doesn't happen, transportMonitor will kill the newly created
  780. // transport after connectDeadline has expired.
  781. // In case there was an error on the transport before the settings frame was
  782. // received, resetTransport resumes connecting to backends after the one that
  783. // was previously connected to. In case end of the list is reached, resetTransport
  784. // backs off until the original deadline.
  785. // If the DialOption WithWaitForHandshake was set, resetTrasport returns
  786. // successfully only after server settings are received.
  787. //
  788. // TODO(bar) make sure all state transitions are valid.
  789. func (ac *addrConn) resetTransport() error {
  790. ac.mu.Lock()
  791. if ac.state == connectivity.Shutdown {
  792. ac.mu.Unlock()
  793. return errConnClosing
  794. }
  795. if ac.ready != nil {
  796. close(ac.ready)
  797. ac.ready = nil
  798. }
  799. ac.transport = nil
  800. ridx := ac.reconnectIdx
  801. ac.mu.Unlock()
  802. ac.cc.mu.RLock()
  803. ac.dopts.copts.KeepaliveParams = ac.cc.mkp
  804. ac.cc.mu.RUnlock()
  805. var backoffDeadline, connectDeadline time.Time
  806. for connectRetryNum := 0; ; connectRetryNum++ {
  807. ac.mu.Lock()
  808. if ac.backoffDeadline.IsZero() {
  809. // This means either a successful HTTP2 connection was established
  810. // or this is the first time this addrConn is trying to establish a
  811. // connection.
  812. backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration.
  813. // This will be the duration that dial gets to finish.
  814. dialDuration := getMinConnectTimeout()
  815. if backoffFor > dialDuration {
  816. // Give dial more time as we keep failing to connect.
  817. dialDuration = backoffFor
  818. }
  819. start := time.Now()
  820. backoffDeadline = start.Add(backoffFor)
  821. connectDeadline = start.Add(dialDuration)
  822. ridx = 0 // Start connecting from the beginning.
  823. } else {
  824. // Continue trying to connect with the same deadlines.
  825. connectRetryNum = ac.connectRetryNum
  826. backoffDeadline = ac.backoffDeadline
  827. connectDeadline = ac.connectDeadline
  828. ac.backoffDeadline = time.Time{}
  829. ac.connectDeadline = time.Time{}
  830. ac.connectRetryNum = 0
  831. }
  832. if ac.state == connectivity.Shutdown {
  833. ac.mu.Unlock()
  834. return errConnClosing
  835. }
  836. ac.printf("connecting")
  837. if ac.state != connectivity.Connecting {
  838. ac.state = connectivity.Connecting
  839. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  840. }
  841. // copy ac.addrs in case of race
  842. addrsIter := make([]resolver.Address, len(ac.addrs))
  843. copy(addrsIter, ac.addrs)
  844. copts := ac.dopts.copts
  845. ac.mu.Unlock()
  846. connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
  847. if err != nil {
  848. return err
  849. }
  850. if connected {
  851. return nil
  852. }
  853. }
  854. }
  855. // createTransport creates a connection to one of the backends in addrs.
  856. // It returns true if a connection was established.
  857. func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) {
  858. for i := ridx; i < len(addrs); i++ {
  859. addr := addrs[i]
  860. target := transport.TargetInfo{
  861. Addr: addr.Addr,
  862. Metadata: addr.Metadata,
  863. Authority: ac.cc.authority,
  864. }
  865. done := make(chan struct{})
  866. onPrefaceReceipt := func() {
  867. ac.mu.Lock()
  868. close(done)
  869. if !ac.backoffDeadline.IsZero() {
  870. // If we haven't already started reconnecting to
  871. // other backends.
  872. // Note, this can happen when writer notices an error
  873. // and triggers resetTransport while at the same time
  874. // reader receives the preface and invokes this closure.
  875. ac.backoffDeadline = time.Time{}
  876. ac.connectDeadline = time.Time{}
  877. ac.connectRetryNum = 0
  878. }
  879. ac.mu.Unlock()
  880. }
  881. // Do not cancel in the success path because of
  882. // this issue in Go1.6: https://github.com/golang/go/issues/15078.
  883. connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
  884. if channelz.IsOn() {
  885. copts.ChannelzParentID = ac.channelzID
  886. }
  887. newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt)
  888. if err != nil {
  889. cancel()
  890. ac.cc.blockingpicker.updateConnectionError(err)
  891. ac.mu.Lock()
  892. if ac.state == connectivity.Shutdown {
  893. // ac.tearDown(...) has been invoked.
  894. ac.mu.Unlock()
  895. return false, errConnClosing
  896. }
  897. ac.mu.Unlock()
  898. grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
  899. continue
  900. }
  901. if ac.dopts.waitForHandshake {
  902. select {
  903. case <-done:
  904. case <-connectCtx.Done():
  905. // Didn't receive server preface, must kill this new transport now.
  906. grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
  907. newTr.Close()
  908. continue
  909. case <-ac.ctx.Done():
  910. }
  911. }
  912. ac.mu.Lock()
  913. if ac.state == connectivity.Shutdown {
  914. ac.mu.Unlock()
  915. // ac.tearDonn(...) has been invoked.
  916. newTr.Close()
  917. return false, errConnClosing
  918. }
  919. ac.printf("ready")
  920. ac.state = connectivity.Ready
  921. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  922. ac.transport = newTr
  923. ac.curAddr = addr
  924. if ac.ready != nil {
  925. close(ac.ready)
  926. ac.ready = nil
  927. }
  928. select {
  929. case <-done:
  930. // If the server has responded back with preface already,
  931. // don't set the reconnect parameters.
  932. default:
  933. ac.connectRetryNum = connectRetryNum
  934. ac.backoffDeadline = backoffDeadline
  935. ac.connectDeadline = connectDeadline
  936. ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list.
  937. }
  938. ac.mu.Unlock()
  939. return true, nil
  940. }
  941. ac.mu.Lock()
  942. if ac.state == connectivity.Shutdown {
  943. ac.mu.Unlock()
  944. return false, errConnClosing
  945. }
  946. ac.state = connectivity.TransientFailure
  947. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  948. ac.cc.resolveNow(resolver.ResolveNowOption{})
  949. if ac.ready != nil {
  950. close(ac.ready)
  951. ac.ready = nil
  952. }
  953. ac.mu.Unlock()
  954. timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
  955. select {
  956. case <-timer.C:
  957. case <-ac.ctx.Done():
  958. timer.Stop()
  959. return false, ac.ctx.Err()
  960. }
  961. return false, nil
  962. }
  963. // Run in a goroutine to track the error in transport and create the
  964. // new transport if an error happens. It returns when the channel is closing.
  965. func (ac *addrConn) transportMonitor() {
  966. for {
  967. var timer *time.Timer
  968. var cdeadline <-chan time.Time
  969. ac.mu.Lock()
  970. t := ac.transport
  971. if !ac.connectDeadline.IsZero() {
  972. timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
  973. cdeadline = timer.C
  974. }
  975. ac.mu.Unlock()
  976. // Block until we receive a goaway or an error occurs.
  977. select {
  978. case <-t.GoAway():
  979. done := t.Error()
  980. cleanup := t.Close
  981. // Since this transport will be orphaned (won't have a transportMonitor)
  982. // we need to launch a goroutine to keep track of clientConn.Close()
  983. // happening since it might not be noticed by any other goroutine for a while.
  984. go func() {
  985. <-done
  986. cleanup()
  987. }()
  988. case <-t.Error():
  989. // In case this is triggered because clientConn.Close()
  990. // was called, we want to immeditately close the transport
  991. // since no other goroutine might notice it for a while.
  992. t.Close()
  993. case <-cdeadline:
  994. ac.mu.Lock()
  995. // This implies that client received server preface.
  996. if ac.backoffDeadline.IsZero() {
  997. ac.mu.Unlock()
  998. continue
  999. }
  1000. ac.mu.Unlock()
  1001. timer = nil
  1002. // No server preface received until deadline.
  1003. // Kill the connection.
  1004. grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
  1005. t.Close()
  1006. }
  1007. if timer != nil {
  1008. timer.Stop()
  1009. }
  1010. // If a GoAway happened, regardless of error, adjust our keepalive
  1011. // parameters as appropriate.
  1012. select {
  1013. case <-t.GoAway():
  1014. ac.adjustParams(t.GetGoAwayReason())
  1015. default:
  1016. }
  1017. ac.mu.Lock()
  1018. if ac.state == connectivity.Shutdown {
  1019. ac.mu.Unlock()
  1020. return
  1021. }
  1022. // Set connectivity state to TransientFailure before calling
  1023. // resetTransport. Transition READY->CONNECTING is not valid.
  1024. ac.state = connectivity.TransientFailure
  1025. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  1026. ac.cc.resolveNow(resolver.ResolveNowOption{})
  1027. ac.curAddr = resolver.Address{}
  1028. ac.mu.Unlock()
  1029. if err := ac.resetTransport(); err != nil {
  1030. ac.mu.Lock()
  1031. ac.printf("transport exiting: %v", err)
  1032. ac.mu.Unlock()
  1033. grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
  1034. if err != errConnClosing {
  1035. // Keep this ac in cc.conns, to get the reason it's torn down.
  1036. ac.tearDown(err)
  1037. }
  1038. return
  1039. }
  1040. }
  1041. }
  1042. // getReadyTransport returns the transport if ac's state is READY.
  1043. // Otherwise it returns nil, false.
  1044. // If ac's state is IDLE, it will trigger ac to connect.
  1045. func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
  1046. ac.mu.Lock()
  1047. if ac.state == connectivity.Ready {
  1048. t := ac.transport
  1049. ac.mu.Unlock()
  1050. return t, true
  1051. }
  1052. var idle bool
  1053. if ac.state == connectivity.Idle {
  1054. idle = true
  1055. }
  1056. ac.mu.Unlock()
  1057. // Trigger idle ac to connect.
  1058. if idle {
  1059. ac.connect()
  1060. }
  1061. return nil, false
  1062. }
  1063. // tearDown starts to tear down the addrConn.
  1064. // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
  1065. // some edge cases (e.g., the caller opens and closes many addrConn's in a
  1066. // tight loop.
  1067. // tearDown doesn't remove ac from ac.cc.conns.
  1068. func (ac *addrConn) tearDown(err error) {
  1069. ac.cancel()
  1070. ac.mu.Lock()
  1071. defer ac.mu.Unlock()
  1072. if ac.state == connectivity.Shutdown {
  1073. return
  1074. }
  1075. ac.curAddr = resolver.Address{}
  1076. if err == errConnDrain && ac.transport != nil {
  1077. // GracefulClose(...) may be executed multiple times when
  1078. // i) receiving multiple GoAway frames from the server; or
  1079. // ii) there are concurrent name resolver/Balancer triggered
  1080. // address removal and GoAway.
  1081. ac.transport.GracefulClose()
  1082. }
  1083. ac.state = connectivity.Shutdown
  1084. ac.tearDownErr = err
  1085. ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
  1086. if ac.events != nil {
  1087. ac.events.Finish()
  1088. ac.events = nil
  1089. }
  1090. if ac.ready != nil {
  1091. close(ac.ready)
  1092. ac.ready = nil
  1093. }
  1094. if channelz.IsOn() {
  1095. channelz.RemoveEntry(ac.channelzID)
  1096. }
  1097. }
  1098. func (ac *addrConn) getState() connectivity.State {
  1099. ac.mu.Lock()
  1100. defer ac.mu.Unlock()
  1101. return ac.state
  1102. }
  1103. func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
  1104. ac.mu.Lock()
  1105. addr := ac.curAddr.Addr
  1106. ac.mu.Unlock()
  1107. state := ac.getState()
  1108. ac.czmu.RLock()
  1109. defer ac.czmu.RUnlock()
  1110. return &channelz.ChannelInternalMetric{
  1111. State: state,
  1112. Target: addr,
  1113. CallsStarted: ac.callsStarted,
  1114. CallsSucceeded: ac.callsSucceeded,
  1115. CallsFailed: ac.callsFailed,
  1116. LastCallStartedTimestamp: ac.lastCallStartedTime,
  1117. }
  1118. }
  1119. func (ac *addrConn) incrCallsStarted() {
  1120. ac.czmu.Lock()
  1121. ac.callsStarted++
  1122. ac.lastCallStartedTime = time.Now()
  1123. ac.czmu.Unlock()
  1124. }
  1125. func (ac *addrConn) incrCallsSucceeded() {
  1126. ac.czmu.Lock()
  1127. ac.callsSucceeded++
  1128. ac.czmu.Unlock()
  1129. }
  1130. func (ac *addrConn) incrCallsFailed() {
  1131. ac.czmu.Lock()
  1132. ac.callsFailed++
  1133. ac.czmu.Unlock()
  1134. }
  1135. type retryThrottler struct {
  1136. max float64
  1137. thresh float64
  1138. ratio float64
  1139. mu sync.Mutex
  1140. tokens float64 // TODO(dfawley): replace with atomic and remove lock.
  1141. }
  1142. // throttle subtracts a retry token from the pool and returns whether a retry
  1143. // should be throttled (disallowed) based upon the retry throttling policy in
  1144. // the service config.
  1145. func (rt *retryThrottler) throttle() bool {
  1146. if rt == nil {
  1147. return false
  1148. }
  1149. rt.mu.Lock()
  1150. defer rt.mu.Unlock()
  1151. rt.tokens--
  1152. if rt.tokens < 0 {
  1153. rt.tokens = 0
  1154. }
  1155. return rt.tokens <= rt.thresh
  1156. }
  1157. func (rt *retryThrottler) successfulRPC() {
  1158. if rt == nil {
  1159. return
  1160. }
  1161. rt.mu.Lock()
  1162. defer rt.mu.Unlock()
  1163. rt.tokens += rt.ratio
  1164. if rt.tokens > rt.max {
  1165. rt.tokens = rt.max
  1166. }
  1167. }
  1168. // ErrClientConnTimeout indicates that the ClientConn cannot establish the
  1169. // underlying connections within the specified timeout.
  1170. //
  1171. // Deprecated: This error is never returned by grpc and should not be
  1172. // referenced by users.
  1173. var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")