http2_client.go 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "io"
  21. "math"
  22. "net"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "sync/atomic"
  27. "time"
  28. "golang.org/x/net/context"
  29. "golang.org/x/net/http2"
  30. "golang.org/x/net/http2/hpack"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/credentials"
  33. "google.golang.org/grpc/internal/channelz"
  34. "google.golang.org/grpc/keepalive"
  35. "google.golang.org/grpc/metadata"
  36. "google.golang.org/grpc/peer"
  37. "google.golang.org/grpc/stats"
  38. "google.golang.org/grpc/status"
  39. )
  40. // http2Client implements the ClientTransport interface with HTTP2.
  41. type http2Client struct {
  42. ctx context.Context
  43. cancel context.CancelFunc
  44. ctxDone <-chan struct{} // Cache the ctx.Done() chan.
  45. userAgent string
  46. md interface{}
  47. conn net.Conn // underlying communication channel
  48. loopy *loopyWriter
  49. remoteAddr net.Addr
  50. localAddr net.Addr
  51. authInfo credentials.AuthInfo // auth info about the connection
  52. readerDone chan struct{} // sync point to enable testing.
  53. writerDone chan struct{} // sync point to enable testing.
  54. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  55. // that the server sent GoAway on this transport.
  56. goAway chan struct{}
  57. // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
  58. awakenKeepalive chan struct{}
  59. framer *framer
  60. // controlBuf delivers all the control related tasks (e.g., window
  61. // updates, reset streams, and various settings) to the controller.
  62. controlBuf *controlBuffer
  63. fc *trInFlow
  64. // The scheme used: https if TLS is on, http otherwise.
  65. scheme string
  66. isSecure bool
  67. creds []credentials.PerRPCCredentials
  68. // Boolean to keep track of reading activity on transport.
  69. // 1 is true and 0 is false.
  70. activity uint32 // Accessed atomically.
  71. kp keepalive.ClientParameters
  72. keepaliveEnabled bool
  73. statsHandler stats.Handler
  74. initialWindowSize int32
  75. // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
  76. maxSendHeaderListSize *uint32
  77. bdpEst *bdpEstimator
  78. // onSuccess is a callback that client transport calls upon
  79. // receiving server preface to signal that a succefull HTTP2
  80. // connection was established.
  81. onSuccess func()
  82. maxConcurrentStreams uint32
  83. streamQuota int64
  84. streamsQuotaAvailable chan struct{}
  85. waitingStreams uint32
  86. nextID uint32
  87. mu sync.Mutex // guard the following variables
  88. state transportState
  89. activeStreams map[uint32]*Stream
  90. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  91. prevGoAwayID uint32
  92. // goAwayReason records the http2.ErrCode and debug data received with the
  93. // GoAway frame.
  94. goAwayReason GoAwayReason
  95. // Fields below are for channelz metric collection.
  96. channelzID int64 // channelz unique identification number
  97. czmu sync.RWMutex
  98. kpCount int64
  99. // The number of streams that have started, including already finished ones.
  100. streamsStarted int64
  101. // The number of streams that have ended successfully by receiving EoS bit set
  102. // frame from server.
  103. streamsSucceeded int64
  104. streamsFailed int64
  105. lastStreamCreated time.Time
  106. msgSent int64
  107. msgRecv int64
  108. lastMsgSent time.Time
  109. lastMsgRecv time.Time
  110. }
  111. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
  112. if fn != nil {
  113. return fn(ctx, addr)
  114. }
  115. return dialContext(ctx, "tcp", addr)
  116. }
  117. func isTemporary(err error) bool {
  118. switch err := err.(type) {
  119. case interface {
  120. Temporary() bool
  121. }:
  122. return err.Temporary()
  123. case interface {
  124. Timeout() bool
  125. }:
  126. // Timeouts may be resolved upon retry, and are thus treated as
  127. // temporary.
  128. return err.Timeout()
  129. }
  130. return true
  131. }
  132. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  133. // and starts to receive messages on it. Non-nil error returns if construction
  134. // fails.
  135. func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ ClientTransport, err error) {
  136. scheme := "http"
  137. ctx, cancel := context.WithCancel(ctx)
  138. defer func() {
  139. if err != nil {
  140. cancel()
  141. }
  142. }()
  143. conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
  144. if err != nil {
  145. if opts.FailOnNonTempDialError {
  146. return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
  147. }
  148. return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
  149. }
  150. // Any further errors will close the underlying connection
  151. defer func(conn net.Conn) {
  152. if err != nil {
  153. conn.Close()
  154. }
  155. }(conn)
  156. var (
  157. isSecure bool
  158. authInfo credentials.AuthInfo
  159. )
  160. if creds := opts.TransportCredentials; creds != nil {
  161. scheme = "https"
  162. conn, authInfo, err = creds.ClientHandshake(connectCtx, addr.Authority, conn)
  163. if err != nil {
  164. return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
  165. }
  166. isSecure = true
  167. }
  168. kp := opts.KeepaliveParams
  169. // Validate keepalive parameters.
  170. if kp.Time == 0 {
  171. kp.Time = defaultClientKeepaliveTime
  172. }
  173. if kp.Timeout == 0 {
  174. kp.Timeout = defaultClientKeepaliveTimeout
  175. }
  176. dynamicWindow := true
  177. icwz := int32(initialWindowSize)
  178. if opts.InitialConnWindowSize >= defaultWindowSize {
  179. icwz = opts.InitialConnWindowSize
  180. dynamicWindow = false
  181. }
  182. writeBufSize := opts.WriteBufferSize
  183. readBufSize := opts.ReadBufferSize
  184. maxHeaderListSize := defaultClientMaxHeaderListSize
  185. if opts.MaxHeaderListSize != nil {
  186. maxHeaderListSize = *opts.MaxHeaderListSize
  187. }
  188. t := &http2Client{
  189. ctx: ctx,
  190. ctxDone: ctx.Done(), // Cache Done chan.
  191. cancel: cancel,
  192. userAgent: opts.UserAgent,
  193. md: addr.Metadata,
  194. conn: conn,
  195. remoteAddr: conn.RemoteAddr(),
  196. localAddr: conn.LocalAddr(),
  197. authInfo: authInfo,
  198. readerDone: make(chan struct{}),
  199. writerDone: make(chan struct{}),
  200. goAway: make(chan struct{}),
  201. awakenKeepalive: make(chan struct{}, 1),
  202. framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
  203. fc: &trInFlow{limit: uint32(icwz)},
  204. scheme: scheme,
  205. activeStreams: make(map[uint32]*Stream),
  206. isSecure: isSecure,
  207. creds: opts.PerRPCCredentials,
  208. kp: kp,
  209. statsHandler: opts.StatsHandler,
  210. initialWindowSize: initialWindowSize,
  211. onSuccess: onSuccess,
  212. nextID: 1,
  213. maxConcurrentStreams: defaultMaxStreamsClient,
  214. streamQuota: defaultMaxStreamsClient,
  215. streamsQuotaAvailable: make(chan struct{}, 1),
  216. }
  217. t.controlBuf = newControlBuffer(t.ctxDone)
  218. if opts.InitialWindowSize >= defaultWindowSize {
  219. t.initialWindowSize = opts.InitialWindowSize
  220. dynamicWindow = false
  221. }
  222. if dynamicWindow {
  223. t.bdpEst = &bdpEstimator{
  224. bdp: initialWindowSize,
  225. updateFlowControl: t.updateFlowControl,
  226. }
  227. }
  228. // Make sure awakenKeepalive can't be written upon.
  229. // keepalive routine will make it writable, if need be.
  230. t.awakenKeepalive <- struct{}{}
  231. if t.statsHandler != nil {
  232. t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
  233. RemoteAddr: t.remoteAddr,
  234. LocalAddr: t.localAddr,
  235. })
  236. connBegin := &stats.ConnBegin{
  237. Client: true,
  238. }
  239. t.statsHandler.HandleConn(t.ctx, connBegin)
  240. }
  241. if channelz.IsOn() {
  242. t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, "")
  243. }
  244. if t.kp.Time != infinity {
  245. t.keepaliveEnabled = true
  246. go t.keepalive()
  247. }
  248. // Start the reader goroutine for incoming message. Each transport has
  249. // a dedicated goroutine which reads HTTP2 frame from network. Then it
  250. // dispatches the frame to the corresponding stream entity.
  251. go t.reader()
  252. // Send connection preface to server.
  253. n, err := t.conn.Write(clientPreface)
  254. if err != nil {
  255. t.Close()
  256. return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
  257. }
  258. if n != len(clientPreface) {
  259. t.Close()
  260. return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  261. }
  262. var ss []http2.Setting
  263. if t.initialWindowSize != defaultWindowSize {
  264. ss = append(ss, http2.Setting{
  265. ID: http2.SettingInitialWindowSize,
  266. Val: uint32(t.initialWindowSize),
  267. })
  268. }
  269. if opts.MaxHeaderListSize != nil {
  270. ss = append(ss, http2.Setting{
  271. ID: http2.SettingMaxHeaderListSize,
  272. Val: *opts.MaxHeaderListSize,
  273. })
  274. }
  275. err = t.framer.fr.WriteSettings(ss...)
  276. if err != nil {
  277. t.Close()
  278. return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  279. }
  280. // Adjust the connection flow control window if needed.
  281. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  282. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  283. t.Close()
  284. return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  285. }
  286. }
  287. t.framer.writer.Flush()
  288. go func() {
  289. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
  290. err := t.loopy.run()
  291. if err != nil {
  292. errorf("transport: loopyWriter.run returning. Err: %v", err)
  293. }
  294. // If it's a connection error, let reader goroutine handle it
  295. // since there might be data in the buffers.
  296. if _, ok := err.(net.Error); !ok {
  297. t.conn.Close()
  298. }
  299. close(t.writerDone)
  300. }()
  301. return t, nil
  302. }
  303. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  304. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  305. s := &Stream{
  306. done: make(chan struct{}),
  307. method: callHdr.Method,
  308. sendCompress: callHdr.SendCompress,
  309. buf: newRecvBuffer(),
  310. headerChan: make(chan struct{}),
  311. contentSubtype: callHdr.ContentSubtype,
  312. }
  313. s.wq = newWriteQuota(defaultWriteQuota, s.done)
  314. s.requestRead = func(n int) {
  315. t.adjustWindow(s, uint32(n))
  316. }
  317. // The client side stream context should have exactly the same life cycle with the user provided context.
  318. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  319. // So we use the original context here instead of creating a copy.
  320. s.ctx = ctx
  321. s.trReader = &transportReader{
  322. reader: &recvBufferReader{
  323. ctx: s.ctx,
  324. ctxDone: s.ctx.Done(),
  325. recv: s.buf,
  326. },
  327. windowHandler: func(n int) {
  328. t.updateWindow(s, uint32(n))
  329. },
  330. }
  331. return s
  332. }
  333. func (t *http2Client) getPeer() *peer.Peer {
  334. pr := &peer.Peer{
  335. Addr: t.remoteAddr,
  336. }
  337. // Attach Auth info if there is any.
  338. if t.authInfo != nil {
  339. pr.AuthInfo = t.authInfo
  340. }
  341. return pr
  342. }
  343. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  344. aud := t.createAudience(callHdr)
  345. authData, err := t.getTrAuthData(ctx, aud)
  346. if err != nil {
  347. return nil, err
  348. }
  349. callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
  350. if err != nil {
  351. return nil, err
  352. }
  353. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  354. // first and create a slice of that exact size.
  355. // Make the slice of certain predictable size to reduce allocations made by append.
  356. hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
  357. hfLen += len(authData) + len(callAuthData)
  358. headerFields := make([]hpack.HeaderField, 0, hfLen)
  359. headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
  360. headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  361. headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  362. headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  363. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
  364. headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  365. headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
  366. if callHdr.PreviousAttempts > 0 {
  367. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
  368. }
  369. if callHdr.SendCompress != "" {
  370. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  371. }
  372. if dl, ok := ctx.Deadline(); ok {
  373. // Send out timeout regardless its value. The server can detect timeout context by itself.
  374. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
  375. timeout := dl.Sub(time.Now())
  376. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
  377. }
  378. for k, v := range authData {
  379. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  380. }
  381. for k, v := range callAuthData {
  382. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  383. }
  384. if b := stats.OutgoingTags(ctx); b != nil {
  385. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
  386. }
  387. if b := stats.OutgoingTrace(ctx); b != nil {
  388. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
  389. }
  390. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  391. var k string
  392. for _, vv := range added {
  393. for i, v := range vv {
  394. if i%2 == 0 {
  395. k = v
  396. continue
  397. }
  398. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  399. if isReservedHeader(k) {
  400. continue
  401. }
  402. headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
  403. }
  404. }
  405. for k, vv := range md {
  406. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  407. if isReservedHeader(k) {
  408. continue
  409. }
  410. for _, v := range vv {
  411. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  412. }
  413. }
  414. }
  415. if md, ok := t.md.(*metadata.MD); ok {
  416. for k, vv := range *md {
  417. if isReservedHeader(k) {
  418. continue
  419. }
  420. for _, v := range vv {
  421. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  422. }
  423. }
  424. }
  425. return headerFields, nil
  426. }
  427. func (t *http2Client) createAudience(callHdr *CallHdr) string {
  428. // Create an audience string only if needed.
  429. if len(t.creds) == 0 && callHdr.Creds == nil {
  430. return ""
  431. }
  432. // Construct URI required to get auth request metadata.
  433. // Omit port if it is the default one.
  434. host := strings.TrimSuffix(callHdr.Host, ":443")
  435. pos := strings.LastIndex(callHdr.Method, "/")
  436. if pos == -1 {
  437. pos = len(callHdr.Method)
  438. }
  439. return "https://" + host + callHdr.Method[:pos]
  440. }
  441. func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
  442. authData := map[string]string{}
  443. for _, c := range t.creds {
  444. data, err := c.GetRequestMetadata(ctx, audience)
  445. if err != nil {
  446. if _, ok := status.FromError(err); ok {
  447. return nil, err
  448. }
  449. return nil, streamErrorf(codes.Unauthenticated, "transport: %v", err)
  450. }
  451. for k, v := range data {
  452. // Capital header names are illegal in HTTP/2.
  453. k = strings.ToLower(k)
  454. authData[k] = v
  455. }
  456. }
  457. return authData, nil
  458. }
  459. func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
  460. callAuthData := map[string]string{}
  461. // Check if credentials.PerRPCCredentials were provided via call options.
  462. // Note: if these credentials are provided both via dial options and call
  463. // options, then both sets of credentials will be applied.
  464. if callCreds := callHdr.Creds; callCreds != nil {
  465. if !t.isSecure && callCreds.RequireTransportSecurity() {
  466. return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
  467. }
  468. data, err := callCreds.GetRequestMetadata(ctx, audience)
  469. if err != nil {
  470. return nil, streamErrorf(codes.Internal, "transport: %v", err)
  471. }
  472. for k, v := range data {
  473. // Capital header names are illegal in HTTP/2
  474. k = strings.ToLower(k)
  475. callAuthData[k] = v
  476. }
  477. }
  478. return callAuthData, nil
  479. }
  480. // NewStream creates a stream and registers it into the transport as "active"
  481. // streams.
  482. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  483. ctx = peer.NewContext(ctx, t.getPeer())
  484. headerFields, err := t.createHeaderFields(ctx, callHdr)
  485. if err != nil {
  486. return nil, err
  487. }
  488. s := t.newStream(ctx, callHdr)
  489. cleanup := func(err error) {
  490. if s.swapState(streamDone) == streamDone {
  491. // If it was already done, return.
  492. return
  493. }
  494. // The stream was unprocessed by the server.
  495. atomic.StoreUint32(&s.unprocessed, 1)
  496. s.write(recvMsg{err: err})
  497. close(s.done)
  498. // If headerChan isn't closed, then close it.
  499. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  500. close(s.headerChan)
  501. }
  502. }
  503. hdr := &headerFrame{
  504. hf: headerFields,
  505. endStream: false,
  506. initStream: func(id uint32) (bool, error) {
  507. t.mu.Lock()
  508. if state := t.state; state != reachable {
  509. t.mu.Unlock()
  510. // Do a quick cleanup.
  511. err := error(errStreamDrain)
  512. if state == closing {
  513. err = ErrConnClosing
  514. }
  515. cleanup(err)
  516. return false, err
  517. }
  518. t.activeStreams[id] = s
  519. if channelz.IsOn() {
  520. t.czmu.Lock()
  521. t.streamsStarted++
  522. t.lastStreamCreated = time.Now()
  523. t.czmu.Unlock()
  524. }
  525. var sendPing bool
  526. // If the number of active streams change from 0 to 1, then check if keepalive
  527. // has gone dormant. If so, wake it up.
  528. if len(t.activeStreams) == 1 && t.keepaliveEnabled {
  529. select {
  530. case t.awakenKeepalive <- struct{}{}:
  531. sendPing = true
  532. // Fill the awakenKeepalive channel again as this channel must be
  533. // kept non-writable except at the point that the keepalive()
  534. // goroutine is waiting either to be awaken or shutdown.
  535. t.awakenKeepalive <- struct{}{}
  536. default:
  537. }
  538. }
  539. t.mu.Unlock()
  540. return sendPing, nil
  541. },
  542. onOrphaned: cleanup,
  543. wq: s.wq,
  544. }
  545. firstTry := true
  546. var ch chan struct{}
  547. checkForStreamQuota := func(it interface{}) bool {
  548. if t.streamQuota <= 0 { // Can go negative if server decreases it.
  549. if firstTry {
  550. t.waitingStreams++
  551. }
  552. ch = t.streamsQuotaAvailable
  553. return false
  554. }
  555. if !firstTry {
  556. t.waitingStreams--
  557. }
  558. t.streamQuota--
  559. h := it.(*headerFrame)
  560. h.streamID = t.nextID
  561. t.nextID += 2
  562. s.id = h.streamID
  563. s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
  564. if t.streamQuota > 0 && t.waitingStreams > 0 {
  565. select {
  566. case t.streamsQuotaAvailable <- struct{}{}:
  567. default:
  568. }
  569. }
  570. return true
  571. }
  572. var hdrListSizeErr error
  573. checkForHeaderListSize := func(it interface{}) bool {
  574. if t.maxSendHeaderListSize == nil {
  575. return true
  576. }
  577. hdrFrame := it.(*headerFrame)
  578. var sz int64
  579. for _, f := range hdrFrame.hf {
  580. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  581. hdrListSizeErr = streamErrorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
  582. return false
  583. }
  584. }
  585. return true
  586. }
  587. for {
  588. success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
  589. if !checkForStreamQuota(it) {
  590. return false
  591. }
  592. if !checkForHeaderListSize(it) {
  593. return false
  594. }
  595. return true
  596. }, hdr)
  597. if err != nil {
  598. return nil, err
  599. }
  600. if success {
  601. break
  602. }
  603. if hdrListSizeErr != nil {
  604. return nil, hdrListSizeErr
  605. }
  606. firstTry = false
  607. select {
  608. case <-ch:
  609. case <-s.ctx.Done():
  610. return nil, ContextErr(s.ctx.Err())
  611. case <-t.goAway:
  612. return nil, errStreamDrain
  613. case <-t.ctx.Done():
  614. return nil, ErrConnClosing
  615. }
  616. }
  617. if t.statsHandler != nil {
  618. outHeader := &stats.OutHeader{
  619. Client: true,
  620. FullMethod: callHdr.Method,
  621. RemoteAddr: t.remoteAddr,
  622. LocalAddr: t.localAddr,
  623. Compression: callHdr.SendCompress,
  624. }
  625. t.statsHandler.HandleRPC(s.ctx, outHeader)
  626. }
  627. return s, nil
  628. }
  629. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  630. // This must not be executed in reader's goroutine.
  631. func (t *http2Client) CloseStream(s *Stream, err error) {
  632. var (
  633. rst bool
  634. rstCode http2.ErrCode
  635. )
  636. if err != nil {
  637. rst = true
  638. rstCode = http2.ErrCodeCancel
  639. }
  640. t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
  641. }
  642. func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
  643. // Set stream status to done.
  644. if s.swapState(streamDone) == streamDone {
  645. // If it was already done, return.
  646. return
  647. }
  648. // status and trailers can be updated here without any synchronization because the stream goroutine will
  649. // only read it after it sees an io.EOF error from read or write and we'll write those errors
  650. // only after updating this.
  651. s.status = st
  652. if len(mdata) > 0 {
  653. s.trailer = mdata
  654. }
  655. if err != nil {
  656. // This will unblock reads eventually.
  657. s.write(recvMsg{err: err})
  658. }
  659. // This will unblock write.
  660. close(s.done)
  661. // If headerChan isn't closed, then close it.
  662. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  663. s.noHeaders = true
  664. close(s.headerChan)
  665. }
  666. cleanup := &cleanupStream{
  667. streamID: s.id,
  668. onWrite: func() {
  669. t.mu.Lock()
  670. if t.activeStreams != nil {
  671. delete(t.activeStreams, s.id)
  672. }
  673. t.mu.Unlock()
  674. if channelz.IsOn() {
  675. t.czmu.Lock()
  676. if eosReceived {
  677. t.streamsSucceeded++
  678. } else {
  679. t.streamsFailed++
  680. }
  681. t.czmu.Unlock()
  682. }
  683. },
  684. rst: rst,
  685. rstCode: rstCode,
  686. }
  687. addBackStreamQuota := func(interface{}) bool {
  688. t.streamQuota++
  689. if t.streamQuota > 0 && t.waitingStreams > 0 {
  690. select {
  691. case t.streamsQuotaAvailable <- struct{}{}:
  692. default:
  693. }
  694. }
  695. return true
  696. }
  697. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
  698. }
  699. // Close kicks off the shutdown process of the transport. This should be called
  700. // only once on a transport. Once it is called, the transport should not be
  701. // accessed any more.
  702. func (t *http2Client) Close() error {
  703. t.mu.Lock()
  704. // Make sure we only Close once.
  705. if t.state == closing {
  706. t.mu.Unlock()
  707. return nil
  708. }
  709. t.state = closing
  710. streams := t.activeStreams
  711. t.activeStreams = nil
  712. t.mu.Unlock()
  713. t.controlBuf.finish()
  714. t.cancel()
  715. err := t.conn.Close()
  716. if channelz.IsOn() {
  717. channelz.RemoveEntry(t.channelzID)
  718. }
  719. // Notify all active streams.
  720. for _, s := range streams {
  721. t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
  722. }
  723. if t.statsHandler != nil {
  724. connEnd := &stats.ConnEnd{
  725. Client: true,
  726. }
  727. t.statsHandler.HandleConn(t.ctx, connEnd)
  728. }
  729. return err
  730. }
  731. // GracefulClose sets the state to draining, which prevents new streams from
  732. // being created and causes the transport to be closed when the last active
  733. // stream is closed. If there are no active streams, the transport is closed
  734. // immediately. This does nothing if the transport is already draining or
  735. // closing.
  736. func (t *http2Client) GracefulClose() error {
  737. t.mu.Lock()
  738. // Make sure we move to draining only from active.
  739. if t.state == draining || t.state == closing {
  740. t.mu.Unlock()
  741. return nil
  742. }
  743. t.state = draining
  744. active := len(t.activeStreams)
  745. t.mu.Unlock()
  746. if active == 0 {
  747. return t.Close()
  748. }
  749. t.controlBuf.put(&incomingGoAway{})
  750. return nil
  751. }
  752. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  753. // should proceed only if Write returns nil.
  754. func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  755. if opts.Last {
  756. // If it's the last message, update stream state.
  757. if !s.compareAndSwapState(streamActive, streamWriteDone) {
  758. return errStreamDone
  759. }
  760. } else if s.getState() != streamActive {
  761. return errStreamDone
  762. }
  763. df := &dataFrame{
  764. streamID: s.id,
  765. endStream: opts.Last,
  766. }
  767. if hdr != nil || data != nil { // If it's not an empty data frame.
  768. // Add some data to grpc message header so that we can equally
  769. // distribute bytes across frames.
  770. emptyLen := http2MaxFrameLen - len(hdr)
  771. if emptyLen > len(data) {
  772. emptyLen = len(data)
  773. }
  774. hdr = append(hdr, data[:emptyLen]...)
  775. data = data[emptyLen:]
  776. df.h, df.d = hdr, data
  777. // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
  778. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  779. return err
  780. }
  781. }
  782. return t.controlBuf.put(df)
  783. }
  784. func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
  785. t.mu.Lock()
  786. defer t.mu.Unlock()
  787. s, ok := t.activeStreams[f.Header().StreamID]
  788. return s, ok
  789. }
  790. // adjustWindow sends out extra window update over the initial window size
  791. // of stream if the application is requesting data larger in size than
  792. // the window.
  793. func (t *http2Client) adjustWindow(s *Stream, n uint32) {
  794. if w := s.fc.maybeAdjust(n); w > 0 {
  795. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  796. }
  797. }
  798. // updateWindow adjusts the inbound quota for the stream.
  799. // Window updates will be sent out when the cumulative quota
  800. // exceeds the corresponding threshold.
  801. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  802. if w := s.fc.onRead(n); w > 0 {
  803. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  804. }
  805. }
  806. // updateFlowControl updates the incoming flow control windows
  807. // for the transport and the stream based on the current bdp
  808. // estimation.
  809. func (t *http2Client) updateFlowControl(n uint32) {
  810. t.mu.Lock()
  811. for _, s := range t.activeStreams {
  812. s.fc.newLimit(n)
  813. }
  814. t.mu.Unlock()
  815. updateIWS := func(interface{}) bool {
  816. t.initialWindowSize = int32(n)
  817. return true
  818. }
  819. t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
  820. t.controlBuf.put(&outgoingSettings{
  821. ss: []http2.Setting{
  822. {
  823. ID: http2.SettingInitialWindowSize,
  824. Val: n,
  825. },
  826. },
  827. })
  828. }
  829. func (t *http2Client) handleData(f *http2.DataFrame) {
  830. size := f.Header().Length
  831. var sendBDPPing bool
  832. if t.bdpEst != nil {
  833. sendBDPPing = t.bdpEst.add(size)
  834. }
  835. // Decouple connection's flow control from application's read.
  836. // An update on connection's flow control should not depend on
  837. // whether user application has read the data or not. Such a
  838. // restriction is already imposed on the stream's flow control,
  839. // and therefore the sender will be blocked anyways.
  840. // Decoupling the connection flow control will prevent other
  841. // active(fast) streams from starving in presence of slow or
  842. // inactive streams.
  843. //
  844. if w := t.fc.onData(size); w > 0 {
  845. t.controlBuf.put(&outgoingWindowUpdate{
  846. streamID: 0,
  847. increment: w,
  848. })
  849. }
  850. if sendBDPPing {
  851. // Avoid excessive ping detection (e.g. in an L7 proxy)
  852. // by sending a window update prior to the BDP ping.
  853. if w := t.fc.reset(); w > 0 {
  854. t.controlBuf.put(&outgoingWindowUpdate{
  855. streamID: 0,
  856. increment: w,
  857. })
  858. }
  859. t.controlBuf.put(bdpPing)
  860. }
  861. // Select the right stream to dispatch.
  862. s, ok := t.getStream(f)
  863. if !ok {
  864. return
  865. }
  866. if size > 0 {
  867. if err := s.fc.onData(size); err != nil {
  868. t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
  869. return
  870. }
  871. if f.Header().Flags.Has(http2.FlagDataPadded) {
  872. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  873. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  874. }
  875. }
  876. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  877. // guarantee f.Data() is consumed before the arrival of next frame.
  878. // Can this copy be eliminated?
  879. if len(f.Data()) > 0 {
  880. data := make([]byte, len(f.Data()))
  881. copy(data, f.Data())
  882. s.write(recvMsg{data: data})
  883. }
  884. }
  885. // The server has closed the stream without sending trailers. Record that
  886. // the read direction is closed, and set the status appropriately.
  887. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
  888. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
  889. }
  890. }
  891. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  892. s, ok := t.getStream(f)
  893. if !ok {
  894. return
  895. }
  896. if f.ErrCode == http2.ErrCodeRefusedStream {
  897. // The stream was unprocessed by the server.
  898. atomic.StoreUint32(&s.unprocessed, 1)
  899. }
  900. statusCode, ok := http2ErrConvTab[f.ErrCode]
  901. if !ok {
  902. warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
  903. statusCode = codes.Unknown
  904. }
  905. if statusCode == codes.Canceled {
  906. // Our deadline was already exceeded, and that was likely the cause of
  907. // this cancelation. Alter the status code accordingly.
  908. if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
  909. statusCode = codes.DeadlineExceeded
  910. }
  911. }
  912. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
  913. }
  914. func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
  915. if f.IsAck() {
  916. return
  917. }
  918. var maxStreams *uint32
  919. var ss []http2.Setting
  920. var updateFuncs []func()
  921. f.ForeachSetting(func(s http2.Setting) error {
  922. switch s.ID {
  923. case http2.SettingMaxConcurrentStreams:
  924. maxStreams = new(uint32)
  925. *maxStreams = s.Val
  926. case http2.SettingMaxHeaderListSize:
  927. updateFuncs = append(updateFuncs, func() {
  928. t.maxSendHeaderListSize = new(uint32)
  929. *t.maxSendHeaderListSize = s.Val
  930. })
  931. default:
  932. ss = append(ss, s)
  933. }
  934. return nil
  935. })
  936. if isFirst && maxStreams == nil {
  937. maxStreams = new(uint32)
  938. *maxStreams = math.MaxUint32
  939. }
  940. sf := &incomingSettings{
  941. ss: ss,
  942. }
  943. if maxStreams != nil {
  944. updateStreamQuota := func() {
  945. delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
  946. t.maxConcurrentStreams = *maxStreams
  947. t.streamQuota += delta
  948. if delta > 0 && t.waitingStreams > 0 {
  949. close(t.streamsQuotaAvailable) // wake all of them up.
  950. t.streamsQuotaAvailable = make(chan struct{}, 1)
  951. }
  952. }
  953. updateFuncs = append(updateFuncs, updateStreamQuota)
  954. }
  955. t.controlBuf.executeAndPut(func(interface{}) bool {
  956. for _, f := range updateFuncs {
  957. f()
  958. }
  959. return true
  960. }, sf)
  961. }
  962. func (t *http2Client) handlePing(f *http2.PingFrame) {
  963. if f.IsAck() {
  964. // Maybe it's a BDP ping.
  965. if t.bdpEst != nil {
  966. t.bdpEst.calculate(f.Data)
  967. }
  968. return
  969. }
  970. pingAck := &ping{ack: true}
  971. copy(pingAck.data[:], f.Data[:])
  972. t.controlBuf.put(pingAck)
  973. }
  974. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  975. t.mu.Lock()
  976. if t.state == closing {
  977. t.mu.Unlock()
  978. return
  979. }
  980. if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
  981. infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
  982. }
  983. id := f.LastStreamID
  984. if id > 0 && id%2 != 1 {
  985. t.mu.Unlock()
  986. t.Close()
  987. return
  988. }
  989. // A client can receive multiple GoAways from the server (see
  990. // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
  991. // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
  992. // sent after an RTT delay with the ID of the last stream the server will
  993. // process.
  994. //
  995. // Therefore, when we get the first GoAway we don't necessarily close any
  996. // streams. While in case of second GoAway we close all streams created after
  997. // the GoAwayId. This way streams that were in-flight while the GoAway from
  998. // server was being sent don't get killed.
  999. select {
  1000. case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
  1001. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
  1002. if id > t.prevGoAwayID {
  1003. t.mu.Unlock()
  1004. t.Close()
  1005. return
  1006. }
  1007. default:
  1008. t.setGoAwayReason(f)
  1009. close(t.goAway)
  1010. t.state = draining
  1011. t.controlBuf.put(&incomingGoAway{})
  1012. }
  1013. // All streams with IDs greater than the GoAwayId
  1014. // and smaller than the previous GoAway ID should be killed.
  1015. upperLimit := t.prevGoAwayID
  1016. if upperLimit == 0 { // This is the first GoAway Frame.
  1017. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
  1018. }
  1019. for streamID, stream := range t.activeStreams {
  1020. if streamID > id && streamID <= upperLimit {
  1021. // The stream was unprocessed by the server.
  1022. atomic.StoreUint32(&stream.unprocessed, 1)
  1023. t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
  1024. }
  1025. }
  1026. t.prevGoAwayID = id
  1027. active := len(t.activeStreams)
  1028. t.mu.Unlock()
  1029. if active == 0 {
  1030. t.Close()
  1031. }
  1032. }
  1033. // setGoAwayReason sets the value of t.goAwayReason based
  1034. // on the GoAway frame received.
  1035. // It expects a lock on transport's mutext to be held by
  1036. // the caller.
  1037. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
  1038. t.goAwayReason = GoAwayNoReason
  1039. switch f.ErrCode {
  1040. case http2.ErrCodeEnhanceYourCalm:
  1041. if string(f.DebugData()) == "too_many_pings" {
  1042. t.goAwayReason = GoAwayTooManyPings
  1043. }
  1044. }
  1045. }
  1046. func (t *http2Client) GetGoAwayReason() GoAwayReason {
  1047. t.mu.Lock()
  1048. defer t.mu.Unlock()
  1049. return t.goAwayReason
  1050. }
  1051. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  1052. t.controlBuf.put(&incomingWindowUpdate{
  1053. streamID: f.Header().StreamID,
  1054. increment: f.Increment,
  1055. })
  1056. }
  1057. // operateHeaders takes action on the decoded headers.
  1058. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  1059. s, ok := t.getStream(frame)
  1060. if !ok {
  1061. return
  1062. }
  1063. atomic.StoreUint32(&s.bytesReceived, 1)
  1064. var state decodeState
  1065. if err := state.decodeHeader(frame); err != nil {
  1066. t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false)
  1067. // Something wrong. Stops reading even when there is remaining.
  1068. return
  1069. }
  1070. endStream := frame.StreamEnded()
  1071. var isHeader bool
  1072. defer func() {
  1073. if t.statsHandler != nil {
  1074. if isHeader {
  1075. inHeader := &stats.InHeader{
  1076. Client: true,
  1077. WireLength: int(frame.Header().Length),
  1078. }
  1079. t.statsHandler.HandleRPC(s.ctx, inHeader)
  1080. } else {
  1081. inTrailer := &stats.InTrailer{
  1082. Client: true,
  1083. WireLength: int(frame.Header().Length),
  1084. }
  1085. t.statsHandler.HandleRPC(s.ctx, inTrailer)
  1086. }
  1087. }
  1088. }()
  1089. // If headers haven't been received yet.
  1090. if atomic.SwapUint32(&s.headerDone, 1) == 0 {
  1091. if !endStream {
  1092. // Headers frame is not actually a trailers-only frame.
  1093. isHeader = true
  1094. // These values can be set without any synchronization because
  1095. // stream goroutine will read it only after seeing a closed
  1096. // headerChan which we'll close after setting this.
  1097. s.recvCompress = state.encoding
  1098. if len(state.mdata) > 0 {
  1099. s.header = state.mdata
  1100. }
  1101. } else {
  1102. s.noHeaders = true
  1103. }
  1104. close(s.headerChan)
  1105. }
  1106. if !endStream {
  1107. return
  1108. }
  1109. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, state.status(), state.mdata, true)
  1110. }
  1111. // reader runs as a separate goroutine in charge of reading data from network
  1112. // connection.
  1113. //
  1114. // TODO(zhaoq): currently one reader per transport. Investigate whether this is
  1115. // optimal.
  1116. // TODO(zhaoq): Check the validity of the incoming frame sequence.
  1117. func (t *http2Client) reader() {
  1118. defer close(t.readerDone)
  1119. // Check the validity of server preface.
  1120. frame, err := t.framer.fr.ReadFrame()
  1121. if err != nil {
  1122. t.Close()
  1123. return
  1124. }
  1125. if t.keepaliveEnabled {
  1126. atomic.CompareAndSwapUint32(&t.activity, 0, 1)
  1127. }
  1128. sf, ok := frame.(*http2.SettingsFrame)
  1129. if !ok {
  1130. t.Close()
  1131. return
  1132. }
  1133. t.onSuccess()
  1134. t.handleSettings(sf, true)
  1135. // loop to keep reading incoming messages on this transport.
  1136. for {
  1137. frame, err := t.framer.fr.ReadFrame()
  1138. if t.keepaliveEnabled {
  1139. atomic.CompareAndSwapUint32(&t.activity, 0, 1)
  1140. }
  1141. if err != nil {
  1142. // Abort an active stream if the http2.Framer returns a
  1143. // http2.StreamError. This can happen only if the server's response
  1144. // is malformed http2.
  1145. if se, ok := err.(http2.StreamError); ok {
  1146. t.mu.Lock()
  1147. s := t.activeStreams[se.StreamID]
  1148. t.mu.Unlock()
  1149. if s != nil {
  1150. // use error detail to provide better err message
  1151. code := http2ErrConvTab[se.Code]
  1152. msg := t.framer.fr.ErrorDetail().Error()
  1153. t.closeStream(s, streamError(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
  1154. }
  1155. continue
  1156. } else {
  1157. // Transport error.
  1158. t.Close()
  1159. return
  1160. }
  1161. }
  1162. switch frame := frame.(type) {
  1163. case *http2.MetaHeadersFrame:
  1164. t.operateHeaders(frame)
  1165. case *http2.DataFrame:
  1166. t.handleData(frame)
  1167. case *http2.RSTStreamFrame:
  1168. t.handleRSTStream(frame)
  1169. case *http2.SettingsFrame:
  1170. t.handleSettings(frame, false)
  1171. case *http2.PingFrame:
  1172. t.handlePing(frame)
  1173. case *http2.GoAwayFrame:
  1174. t.handleGoAway(frame)
  1175. case *http2.WindowUpdateFrame:
  1176. t.handleWindowUpdate(frame)
  1177. default:
  1178. errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
  1179. }
  1180. }
  1181. }
  1182. // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
  1183. func (t *http2Client) keepalive() {
  1184. p := &ping{data: [8]byte{}}
  1185. timer := time.NewTimer(t.kp.Time)
  1186. for {
  1187. select {
  1188. case <-timer.C:
  1189. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  1190. timer.Reset(t.kp.Time)
  1191. continue
  1192. }
  1193. // Check if keepalive should go dormant.
  1194. t.mu.Lock()
  1195. if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
  1196. // Make awakenKeepalive writable.
  1197. <-t.awakenKeepalive
  1198. t.mu.Unlock()
  1199. select {
  1200. case <-t.awakenKeepalive:
  1201. // If the control gets here a ping has been sent
  1202. // need to reset the timer with keepalive.Timeout.
  1203. case <-t.ctx.Done():
  1204. return
  1205. }
  1206. } else {
  1207. t.mu.Unlock()
  1208. if channelz.IsOn() {
  1209. t.czmu.Lock()
  1210. t.kpCount++
  1211. t.czmu.Unlock()
  1212. }
  1213. // Send ping.
  1214. t.controlBuf.put(p)
  1215. }
  1216. // By the time control gets here a ping has been sent one way or the other.
  1217. timer.Reset(t.kp.Timeout)
  1218. select {
  1219. case <-timer.C:
  1220. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  1221. timer.Reset(t.kp.Time)
  1222. continue
  1223. }
  1224. t.Close()
  1225. return
  1226. case <-t.ctx.Done():
  1227. if !timer.Stop() {
  1228. <-timer.C
  1229. }
  1230. return
  1231. }
  1232. case <-t.ctx.Done():
  1233. if !timer.Stop() {
  1234. <-timer.C
  1235. }
  1236. return
  1237. }
  1238. }
  1239. }
  1240. func (t *http2Client) Error() <-chan struct{} {
  1241. return t.ctx.Done()
  1242. }
  1243. func (t *http2Client) GoAway() <-chan struct{} {
  1244. return t.goAway
  1245. }
  1246. func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
  1247. t.czmu.RLock()
  1248. s := channelz.SocketInternalMetric{
  1249. StreamsStarted: t.streamsStarted,
  1250. StreamsSucceeded: t.streamsSucceeded,
  1251. StreamsFailed: t.streamsFailed,
  1252. MessagesSent: t.msgSent,
  1253. MessagesReceived: t.msgRecv,
  1254. KeepAlivesSent: t.kpCount,
  1255. LastLocalStreamCreatedTimestamp: t.lastStreamCreated,
  1256. LastMessageSentTimestamp: t.lastMsgSent,
  1257. LastMessageReceivedTimestamp: t.lastMsgRecv,
  1258. LocalFlowControlWindow: int64(t.fc.getSize()),
  1259. SocketOptions: channelz.GetSocketOption(t.conn),
  1260. LocalAddr: t.localAddr,
  1261. RemoteAddr: t.remoteAddr,
  1262. // RemoteName :
  1263. }
  1264. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1265. s.Security = au.GetSecurityValue()
  1266. }
  1267. t.czmu.RUnlock()
  1268. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1269. return &s
  1270. }
  1271. func (t *http2Client) IncrMsgSent() {
  1272. t.czmu.Lock()
  1273. t.msgSent++
  1274. t.lastMsgSent = time.Now()
  1275. t.czmu.Unlock()
  1276. }
  1277. func (t *http2Client) IncrMsgRecv() {
  1278. t.czmu.Lock()
  1279. t.msgRecv++
  1280. t.lastMsgRecv = time.Now()
  1281. t.czmu.Unlock()
  1282. }
  1283. func (t *http2Client) getOutFlowWindow() int64 {
  1284. resp := make(chan uint32, 1)
  1285. timer := time.NewTimer(time.Second)
  1286. defer timer.Stop()
  1287. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1288. select {
  1289. case sz := <-resp:
  1290. return int64(sz)
  1291. case <-t.ctxDone:
  1292. return -1
  1293. case <-timer.C:
  1294. return -2
  1295. }
  1296. }