http2_server.go 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "strconv"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "github.com/golang/protobuf/proto"
  31. "golang.org/x/net/context"
  32. "golang.org/x/net/http2"
  33. "golang.org/x/net/http2/hpack"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/grpclog"
  37. "google.golang.org/grpc/internal/channelz"
  38. "google.golang.org/grpc/internal/grpcrand"
  39. "google.golang.org/grpc/keepalive"
  40. "google.golang.org/grpc/metadata"
  41. "google.golang.org/grpc/peer"
  42. "google.golang.org/grpc/stats"
  43. "google.golang.org/grpc/status"
  44. "google.golang.org/grpc/tap"
  45. )
  46. var (
  47. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  48. // the stream's state.
  49. ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
  50. // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
  51. // than the limit set by peer.
  52. ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
  53. )
  54. // http2Server implements the ServerTransport interface with HTTP2.
  55. type http2Server struct {
  56. ctx context.Context
  57. ctxDone <-chan struct{} // Cache the context.Done() chan
  58. cancel context.CancelFunc
  59. conn net.Conn
  60. loopy *loopyWriter
  61. readerDone chan struct{} // sync point to enable testing.
  62. writerDone chan struct{} // sync point to enable testing.
  63. remoteAddr net.Addr
  64. localAddr net.Addr
  65. maxStreamID uint32 // max stream ID ever seen
  66. authInfo credentials.AuthInfo // auth info about the connection
  67. inTapHandle tap.ServerInHandle
  68. framer *framer
  69. // The max number of concurrent streams.
  70. maxStreams uint32
  71. // controlBuf delivers all the control related tasks (e.g., window
  72. // updates, reset streams, and various settings) to the controller.
  73. controlBuf *controlBuffer
  74. fc *trInFlow
  75. stats stats.Handler
  76. // Flag to keep track of reading activity on transport.
  77. // 1 is true and 0 is false.
  78. activity uint32 // Accessed atomically.
  79. // Keepalive and max-age parameters for the server.
  80. kp keepalive.ServerParameters
  81. // Keepalive enforcement policy.
  82. kep keepalive.EnforcementPolicy
  83. // The time instance last ping was received.
  84. lastPingAt time.Time
  85. // Number of times the client has violated keepalive ping policy so far.
  86. pingStrikes uint8
  87. // Flag to signify that number of ping strikes should be reset to 0.
  88. // This is set whenever data or header frames are sent.
  89. // 1 means yes.
  90. resetPingStrikes uint32 // Accessed atomically.
  91. initialWindowSize int32
  92. bdpEst *bdpEstimator
  93. maxSendHeaderListSize *uint32
  94. mu sync.Mutex // guard the following
  95. // drainChan is initialized when drain(...) is called the first time.
  96. // After which the server writes out the first GoAway(with ID 2^31-1) frame.
  97. // Then an independent goroutine will be launched to later send the second GoAway.
  98. // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
  99. // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
  100. // already underway.
  101. drainChan chan struct{}
  102. state transportState
  103. activeStreams map[uint32]*Stream
  104. // idle is the time instant when the connection went idle.
  105. // This is either the beginning of the connection or when the number of
  106. // RPCs go down to 0.
  107. // When the connection is busy, this value is set to 0.
  108. idle time.Time
  109. // Fields below are for channelz metric collection.
  110. channelzID int64 // channelz unique identification number
  111. czmu sync.RWMutex
  112. kpCount int64
  113. // The number of streams that have started, including already finished ones.
  114. streamsStarted int64
  115. // The number of streams that have ended successfully by sending frame with
  116. // EoS bit set.
  117. streamsSucceeded int64
  118. streamsFailed int64
  119. lastStreamCreated time.Time
  120. msgSent int64
  121. msgRecv int64
  122. lastMsgSent time.Time
  123. lastMsgRecv time.Time
  124. }
  125. // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
  126. // returned if something goes wrong.
  127. func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  128. writeBufSize := config.WriteBufferSize
  129. readBufSize := config.ReadBufferSize
  130. maxHeaderListSize := defaultServerMaxHeaderListSize
  131. if config.MaxHeaderListSize != nil {
  132. maxHeaderListSize = *config.MaxHeaderListSize
  133. }
  134. framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
  135. // Send initial settings as connection preface to client.
  136. var isettings []http2.Setting
  137. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
  138. // permitted in the HTTP2 spec.
  139. maxStreams := config.MaxStreams
  140. if maxStreams == 0 {
  141. maxStreams = math.MaxUint32
  142. } else {
  143. isettings = append(isettings, http2.Setting{
  144. ID: http2.SettingMaxConcurrentStreams,
  145. Val: maxStreams,
  146. })
  147. }
  148. dynamicWindow := true
  149. iwz := int32(initialWindowSize)
  150. if config.InitialWindowSize >= defaultWindowSize {
  151. iwz = config.InitialWindowSize
  152. dynamicWindow = false
  153. }
  154. icwz := int32(initialWindowSize)
  155. if config.InitialConnWindowSize >= defaultWindowSize {
  156. icwz = config.InitialConnWindowSize
  157. dynamicWindow = false
  158. }
  159. if iwz != defaultWindowSize {
  160. isettings = append(isettings, http2.Setting{
  161. ID: http2.SettingInitialWindowSize,
  162. Val: uint32(iwz)})
  163. }
  164. if config.MaxHeaderListSize != nil {
  165. isettings = append(isettings, http2.Setting{
  166. ID: http2.SettingMaxHeaderListSize,
  167. Val: *config.MaxHeaderListSize,
  168. })
  169. }
  170. if err := framer.fr.WriteSettings(isettings...); err != nil {
  171. return nil, connectionErrorf(false, err, "transport: %v", err)
  172. }
  173. // Adjust the connection flow control window if needed.
  174. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  175. if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
  176. return nil, connectionErrorf(false, err, "transport: %v", err)
  177. }
  178. }
  179. kp := config.KeepaliveParams
  180. if kp.MaxConnectionIdle == 0 {
  181. kp.MaxConnectionIdle = defaultMaxConnectionIdle
  182. }
  183. if kp.MaxConnectionAge == 0 {
  184. kp.MaxConnectionAge = defaultMaxConnectionAge
  185. }
  186. // Add a jitter to MaxConnectionAge.
  187. kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
  188. if kp.MaxConnectionAgeGrace == 0 {
  189. kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
  190. }
  191. if kp.Time == 0 {
  192. kp.Time = defaultServerKeepaliveTime
  193. }
  194. if kp.Timeout == 0 {
  195. kp.Timeout = defaultServerKeepaliveTimeout
  196. }
  197. kep := config.KeepalivePolicy
  198. if kep.MinTime == 0 {
  199. kep.MinTime = defaultKeepalivePolicyMinTime
  200. }
  201. ctx, cancel := context.WithCancel(context.Background())
  202. t := &http2Server{
  203. ctx: ctx,
  204. cancel: cancel,
  205. ctxDone: ctx.Done(),
  206. conn: conn,
  207. remoteAddr: conn.RemoteAddr(),
  208. localAddr: conn.LocalAddr(),
  209. authInfo: config.AuthInfo,
  210. framer: framer,
  211. readerDone: make(chan struct{}),
  212. writerDone: make(chan struct{}),
  213. maxStreams: maxStreams,
  214. inTapHandle: config.InTapHandle,
  215. fc: &trInFlow{limit: uint32(icwz)},
  216. state: reachable,
  217. activeStreams: make(map[uint32]*Stream),
  218. stats: config.StatsHandler,
  219. kp: kp,
  220. idle: time.Now(),
  221. kep: kep,
  222. initialWindowSize: iwz,
  223. }
  224. t.controlBuf = newControlBuffer(t.ctxDone)
  225. if dynamicWindow {
  226. t.bdpEst = &bdpEstimator{
  227. bdp: initialWindowSize,
  228. updateFlowControl: t.updateFlowControl,
  229. }
  230. }
  231. if t.stats != nil {
  232. t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
  233. RemoteAddr: t.remoteAddr,
  234. LocalAddr: t.localAddr,
  235. })
  236. connBegin := &stats.ConnBegin{}
  237. t.stats.HandleConn(t.ctx, connBegin)
  238. }
  239. if channelz.IsOn() {
  240. t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, "")
  241. }
  242. t.framer.writer.Flush()
  243. defer func() {
  244. if err != nil {
  245. t.Close()
  246. }
  247. }()
  248. // Check the validity of client preface.
  249. preface := make([]byte, len(clientPreface))
  250. if _, err := io.ReadFull(t.conn, preface); err != nil {
  251. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  252. }
  253. if !bytes.Equal(preface, clientPreface) {
  254. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  255. }
  256. frame, err := t.framer.fr.ReadFrame()
  257. if err == io.EOF || err == io.ErrUnexpectedEOF {
  258. return nil, err
  259. }
  260. if err != nil {
  261. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
  262. }
  263. atomic.StoreUint32(&t.activity, 1)
  264. sf, ok := frame.(*http2.SettingsFrame)
  265. if !ok {
  266. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  267. }
  268. t.handleSettings(sf)
  269. go func() {
  270. t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
  271. t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
  272. if err := t.loopy.run(); err != nil {
  273. errorf("transport: loopyWriter.run returning. Err: %v", err)
  274. }
  275. t.conn.Close()
  276. close(t.writerDone)
  277. }()
  278. go t.keepalive()
  279. return t, nil
  280. }
  281. // operateHeader takes action on the decoded headers.
  282. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
  283. streamID := frame.Header().StreamID
  284. state := decodeState{serverSide: true}
  285. if err := state.decodeHeader(frame); err != nil {
  286. if se, ok := err.(StreamError); ok {
  287. t.controlBuf.put(&cleanupStream{
  288. streamID: streamID,
  289. rst: true,
  290. rstCode: statusCodeConvTab[se.Code],
  291. onWrite: func() {},
  292. })
  293. }
  294. return
  295. }
  296. buf := newRecvBuffer()
  297. s := &Stream{
  298. id: streamID,
  299. st: t,
  300. buf: buf,
  301. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  302. recvCompress: state.encoding,
  303. method: state.method,
  304. contentSubtype: state.contentSubtype,
  305. }
  306. if frame.StreamEnded() {
  307. // s is just created by the caller. No lock needed.
  308. s.state = streamReadDone
  309. }
  310. if state.timeoutSet {
  311. s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
  312. } else {
  313. s.ctx, s.cancel = context.WithCancel(t.ctx)
  314. }
  315. pr := &peer.Peer{
  316. Addr: t.remoteAddr,
  317. }
  318. // Attach Auth info if there is any.
  319. if t.authInfo != nil {
  320. pr.AuthInfo = t.authInfo
  321. }
  322. s.ctx = peer.NewContext(s.ctx, pr)
  323. // Attach the received metadata to the context.
  324. if len(state.mdata) > 0 {
  325. s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
  326. }
  327. if state.statsTags != nil {
  328. s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
  329. }
  330. if state.statsTrace != nil {
  331. s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
  332. }
  333. if t.inTapHandle != nil {
  334. var err error
  335. info := &tap.Info{
  336. FullMethodName: state.method,
  337. }
  338. s.ctx, err = t.inTapHandle(s.ctx, info)
  339. if err != nil {
  340. warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
  341. t.controlBuf.put(&cleanupStream{
  342. streamID: s.id,
  343. rst: true,
  344. rstCode: http2.ErrCodeRefusedStream,
  345. onWrite: func() {},
  346. })
  347. return
  348. }
  349. }
  350. t.mu.Lock()
  351. if t.state != reachable {
  352. t.mu.Unlock()
  353. return
  354. }
  355. if uint32(len(t.activeStreams)) >= t.maxStreams {
  356. t.mu.Unlock()
  357. t.controlBuf.put(&cleanupStream{
  358. streamID: streamID,
  359. rst: true,
  360. rstCode: http2.ErrCodeRefusedStream,
  361. onWrite: func() {},
  362. })
  363. return
  364. }
  365. if streamID%2 != 1 || streamID <= t.maxStreamID {
  366. t.mu.Unlock()
  367. // illegal gRPC stream id.
  368. errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
  369. return true
  370. }
  371. t.maxStreamID = streamID
  372. t.activeStreams[streamID] = s
  373. if len(t.activeStreams) == 1 {
  374. t.idle = time.Time{}
  375. }
  376. t.mu.Unlock()
  377. if channelz.IsOn() {
  378. t.czmu.Lock()
  379. t.streamsStarted++
  380. t.lastStreamCreated = time.Now()
  381. t.czmu.Unlock()
  382. }
  383. s.requestRead = func(n int) {
  384. t.adjustWindow(s, uint32(n))
  385. }
  386. s.ctx = traceCtx(s.ctx, s.method)
  387. if t.stats != nil {
  388. s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
  389. inHeader := &stats.InHeader{
  390. FullMethod: s.method,
  391. RemoteAddr: t.remoteAddr,
  392. LocalAddr: t.localAddr,
  393. Compression: s.recvCompress,
  394. WireLength: int(frame.Header().Length),
  395. }
  396. t.stats.HandleRPC(s.ctx, inHeader)
  397. }
  398. s.ctxDone = s.ctx.Done()
  399. s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
  400. s.trReader = &transportReader{
  401. reader: &recvBufferReader{
  402. ctx: s.ctx,
  403. ctxDone: s.ctxDone,
  404. recv: s.buf,
  405. },
  406. windowHandler: func(n int) {
  407. t.updateWindow(s, uint32(n))
  408. },
  409. }
  410. // Register the stream with loopy.
  411. t.controlBuf.put(&registerStream{
  412. streamID: s.id,
  413. wq: s.wq,
  414. })
  415. handle(s)
  416. return
  417. }
  418. // HandleStreams receives incoming streams using the given handler. This is
  419. // typically run in a separate goroutine.
  420. // traceCtx attaches trace to ctx and returns the new context.
  421. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  422. defer close(t.readerDone)
  423. for {
  424. frame, err := t.framer.fr.ReadFrame()
  425. atomic.StoreUint32(&t.activity, 1)
  426. if err != nil {
  427. if se, ok := err.(http2.StreamError); ok {
  428. warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
  429. t.mu.Lock()
  430. s := t.activeStreams[se.StreamID]
  431. t.mu.Unlock()
  432. if s != nil {
  433. t.closeStream(s, true, se.Code, nil, false)
  434. } else {
  435. t.controlBuf.put(&cleanupStream{
  436. streamID: se.StreamID,
  437. rst: true,
  438. rstCode: se.Code,
  439. onWrite: func() {},
  440. })
  441. }
  442. continue
  443. }
  444. if err == io.EOF || err == io.ErrUnexpectedEOF {
  445. t.Close()
  446. return
  447. }
  448. warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
  449. t.Close()
  450. return
  451. }
  452. switch frame := frame.(type) {
  453. case *http2.MetaHeadersFrame:
  454. if t.operateHeaders(frame, handle, traceCtx) {
  455. t.Close()
  456. break
  457. }
  458. case *http2.DataFrame:
  459. t.handleData(frame)
  460. case *http2.RSTStreamFrame:
  461. t.handleRSTStream(frame)
  462. case *http2.SettingsFrame:
  463. t.handleSettings(frame)
  464. case *http2.PingFrame:
  465. t.handlePing(frame)
  466. case *http2.WindowUpdateFrame:
  467. t.handleWindowUpdate(frame)
  468. case *http2.GoAwayFrame:
  469. // TODO: Handle GoAway from the client appropriately.
  470. default:
  471. errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
  472. }
  473. }
  474. }
  475. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  476. t.mu.Lock()
  477. defer t.mu.Unlock()
  478. if t.activeStreams == nil {
  479. // The transport is closing.
  480. return nil, false
  481. }
  482. s, ok := t.activeStreams[f.Header().StreamID]
  483. if !ok {
  484. // The stream is already done.
  485. return nil, false
  486. }
  487. return s, true
  488. }
  489. // adjustWindow sends out extra window update over the initial window size
  490. // of stream if the application is requesting data larger in size than
  491. // the window.
  492. func (t *http2Server) adjustWindow(s *Stream, n uint32) {
  493. if w := s.fc.maybeAdjust(n); w > 0 {
  494. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  495. }
  496. }
  497. // updateWindow adjusts the inbound quota for the stream and the transport.
  498. // Window updates will deliver to the controller for sending when
  499. // the cumulative quota exceeds the corresponding threshold.
  500. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  501. if w := s.fc.onRead(n); w > 0 {
  502. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
  503. increment: w,
  504. })
  505. }
  506. }
  507. // updateFlowControl updates the incoming flow control windows
  508. // for the transport and the stream based on the current bdp
  509. // estimation.
  510. func (t *http2Server) updateFlowControl(n uint32) {
  511. t.mu.Lock()
  512. for _, s := range t.activeStreams {
  513. s.fc.newLimit(n)
  514. }
  515. t.initialWindowSize = int32(n)
  516. t.mu.Unlock()
  517. t.controlBuf.put(&outgoingWindowUpdate{
  518. streamID: 0,
  519. increment: t.fc.newLimit(n),
  520. })
  521. t.controlBuf.put(&outgoingSettings{
  522. ss: []http2.Setting{
  523. {
  524. ID: http2.SettingInitialWindowSize,
  525. Val: n,
  526. },
  527. },
  528. })
  529. }
  530. func (t *http2Server) handleData(f *http2.DataFrame) {
  531. size := f.Header().Length
  532. var sendBDPPing bool
  533. if t.bdpEst != nil {
  534. sendBDPPing = t.bdpEst.add(size)
  535. }
  536. // Decouple connection's flow control from application's read.
  537. // An update on connection's flow control should not depend on
  538. // whether user application has read the data or not. Such a
  539. // restriction is already imposed on the stream's flow control,
  540. // and therefore the sender will be blocked anyways.
  541. // Decoupling the connection flow control will prevent other
  542. // active(fast) streams from starving in presence of slow or
  543. // inactive streams.
  544. if w := t.fc.onData(size); w > 0 {
  545. t.controlBuf.put(&outgoingWindowUpdate{
  546. streamID: 0,
  547. increment: w,
  548. })
  549. }
  550. if sendBDPPing {
  551. // Avoid excessive ping detection (e.g. in an L7 proxy)
  552. // by sending a window update prior to the BDP ping.
  553. if w := t.fc.reset(); w > 0 {
  554. t.controlBuf.put(&outgoingWindowUpdate{
  555. streamID: 0,
  556. increment: w,
  557. })
  558. }
  559. t.controlBuf.put(bdpPing)
  560. }
  561. // Select the right stream to dispatch.
  562. s, ok := t.getStream(f)
  563. if !ok {
  564. return
  565. }
  566. if size > 0 {
  567. if err := s.fc.onData(size); err != nil {
  568. t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
  569. return
  570. }
  571. if f.Header().Flags.Has(http2.FlagDataPadded) {
  572. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  573. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  574. }
  575. }
  576. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  577. // guarantee f.Data() is consumed before the arrival of next frame.
  578. // Can this copy be eliminated?
  579. if len(f.Data()) > 0 {
  580. data := make([]byte, len(f.Data()))
  581. copy(data, f.Data())
  582. s.write(recvMsg{data: data})
  583. }
  584. }
  585. if f.Header().Flags.Has(http2.FlagDataEndStream) {
  586. // Received the end of stream from the client.
  587. s.compareAndSwapState(streamActive, streamReadDone)
  588. s.write(recvMsg{err: io.EOF})
  589. }
  590. }
  591. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  592. s, ok := t.getStream(f)
  593. if !ok {
  594. return
  595. }
  596. t.closeStream(s, false, 0, nil, false)
  597. }
  598. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  599. if f.IsAck() {
  600. return
  601. }
  602. var ss []http2.Setting
  603. var updateFuncs []func()
  604. f.ForeachSetting(func(s http2.Setting) error {
  605. switch s.ID {
  606. case http2.SettingMaxHeaderListSize:
  607. updateFuncs = append(updateFuncs, func() {
  608. t.maxSendHeaderListSize = new(uint32)
  609. *t.maxSendHeaderListSize = s.Val
  610. })
  611. default:
  612. ss = append(ss, s)
  613. }
  614. return nil
  615. })
  616. t.controlBuf.executeAndPut(func(interface{}) bool {
  617. for _, f := range updateFuncs {
  618. f()
  619. }
  620. return true
  621. }, &incomingSettings{
  622. ss: ss,
  623. })
  624. }
  625. const (
  626. maxPingStrikes = 2
  627. defaultPingTimeout = 2 * time.Hour
  628. )
  629. func (t *http2Server) handlePing(f *http2.PingFrame) {
  630. if f.IsAck() {
  631. if f.Data == goAwayPing.data && t.drainChan != nil {
  632. close(t.drainChan)
  633. return
  634. }
  635. // Maybe it's a BDP ping.
  636. if t.bdpEst != nil {
  637. t.bdpEst.calculate(f.Data)
  638. }
  639. return
  640. }
  641. pingAck := &ping{ack: true}
  642. copy(pingAck.data[:], f.Data[:])
  643. t.controlBuf.put(pingAck)
  644. now := time.Now()
  645. defer func() {
  646. t.lastPingAt = now
  647. }()
  648. // A reset ping strikes means that we don't need to check for policy
  649. // violation for this ping and the pingStrikes counter should be set
  650. // to 0.
  651. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  652. t.pingStrikes = 0
  653. return
  654. }
  655. t.mu.Lock()
  656. ns := len(t.activeStreams)
  657. t.mu.Unlock()
  658. if ns < 1 && !t.kep.PermitWithoutStream {
  659. // Keepalive shouldn't be active thus, this new ping should
  660. // have come after at least defaultPingTimeout.
  661. if t.lastPingAt.Add(defaultPingTimeout).After(now) {
  662. t.pingStrikes++
  663. }
  664. } else {
  665. // Check if keepalive policy is respected.
  666. if t.lastPingAt.Add(t.kep.MinTime).After(now) {
  667. t.pingStrikes++
  668. }
  669. }
  670. if t.pingStrikes > maxPingStrikes {
  671. // Send goaway and close the connection.
  672. errorf("transport: Got too many pings from the client, closing the connection.")
  673. t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
  674. }
  675. }
  676. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  677. t.controlBuf.put(&incomingWindowUpdate{
  678. streamID: f.Header().StreamID,
  679. increment: f.Increment,
  680. })
  681. }
  682. func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
  683. for k, vv := range md {
  684. if isReservedHeader(k) {
  685. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  686. continue
  687. }
  688. for _, v := range vv {
  689. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  690. }
  691. }
  692. return headerFields
  693. }
  694. func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
  695. if t.maxSendHeaderListSize == nil {
  696. return true
  697. }
  698. hdrFrame := it.(*headerFrame)
  699. var sz int64
  700. for _, f := range hdrFrame.hf {
  701. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  702. errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
  703. return false
  704. }
  705. }
  706. return true
  707. }
  708. // WriteHeader sends the header metedata md back to the client.
  709. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  710. if s.updateHeaderSent() || s.getState() == streamDone {
  711. return ErrIllegalHeaderWrite
  712. }
  713. s.hdrMu.Lock()
  714. if md.Len() > 0 {
  715. if s.header.Len() > 0 {
  716. s.header = metadata.Join(s.header, md)
  717. } else {
  718. s.header = md
  719. }
  720. }
  721. if err := t.writeHeaderLocked(s); err != nil {
  722. s.hdrMu.Unlock()
  723. return err
  724. }
  725. s.hdrMu.Unlock()
  726. return nil
  727. }
  728. func (t *http2Server) writeHeaderLocked(s *Stream) error {
  729. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  730. // first and create a slice of that exact size.
  731. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  732. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  733. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  734. if s.sendCompress != "" {
  735. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  736. }
  737. headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
  738. success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
  739. streamID: s.id,
  740. hf: headerFields,
  741. endStream: false,
  742. onWrite: func() {
  743. atomic.StoreUint32(&t.resetPingStrikes, 1)
  744. },
  745. })
  746. if !success {
  747. if err != nil {
  748. return err
  749. }
  750. t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
  751. return ErrHeaderListSizeLimitViolation
  752. }
  753. if t.stats != nil {
  754. // Note: WireLength is not set in outHeader.
  755. // TODO(mmukhi): Revisit this later, if needed.
  756. outHeader := &stats.OutHeader{}
  757. t.stats.HandleRPC(s.Context(), outHeader)
  758. }
  759. return nil
  760. }
  761. // WriteStatus sends stream status to the client and terminates the stream.
  762. // There is no further I/O operations being able to perform on this stream.
  763. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  764. // OK is adopted.
  765. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  766. if s.getState() == streamDone {
  767. return nil
  768. }
  769. s.hdrMu.Lock()
  770. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  771. // first and create a slice of that exact size.
  772. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  773. if !s.updateHeaderSent() { // No headers have been sent.
  774. if len(s.header) > 0 { // Send a separate header frame.
  775. if err := t.writeHeaderLocked(s); err != nil {
  776. s.hdrMu.Unlock()
  777. return err
  778. }
  779. } else { // Send a trailer only response.
  780. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  781. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
  782. }
  783. }
  784. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  785. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  786. if p := st.Proto(); p != nil && len(p.Details) > 0 {
  787. stBytes, err := proto.Marshal(p)
  788. if err != nil {
  789. // TODO: return error instead, when callers are able to handle it.
  790. grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
  791. } else {
  792. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
  793. }
  794. }
  795. // Attach the trailer metadata.
  796. headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
  797. trailingHeader := &headerFrame{
  798. streamID: s.id,
  799. hf: headerFields,
  800. endStream: true,
  801. onWrite: func() {
  802. atomic.StoreUint32(&t.resetPingStrikes, 1)
  803. },
  804. }
  805. s.hdrMu.Unlock()
  806. success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
  807. if !success {
  808. if err != nil {
  809. return err
  810. }
  811. t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
  812. return ErrHeaderListSizeLimitViolation
  813. }
  814. t.closeStream(s, false, 0, trailingHeader, true)
  815. if t.stats != nil {
  816. t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
  817. }
  818. return nil
  819. }
  820. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  821. // is returns if it fails (e.g., framing error, transport error).
  822. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  823. if !s.isHeaderSent() { // Headers haven't been written yet.
  824. if err := t.WriteHeader(s, nil); err != nil {
  825. // TODO(mmukhi, dfawley): Make sure this is the right code to return.
  826. return streamErrorf(codes.Internal, "transport: %v", err)
  827. }
  828. } else {
  829. // Writing headers checks for this condition.
  830. if s.getState() == streamDone {
  831. // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
  832. s.cancel()
  833. select {
  834. case <-t.ctx.Done():
  835. return ErrConnClosing
  836. default:
  837. }
  838. return ContextErr(s.ctx.Err())
  839. }
  840. }
  841. // Add some data to header frame so that we can equally distribute bytes across frames.
  842. emptyLen := http2MaxFrameLen - len(hdr)
  843. if emptyLen > len(data) {
  844. emptyLen = len(data)
  845. }
  846. hdr = append(hdr, data[:emptyLen]...)
  847. data = data[emptyLen:]
  848. df := &dataFrame{
  849. streamID: s.id,
  850. h: hdr,
  851. d: data,
  852. onEachWrite: func() {
  853. atomic.StoreUint32(&t.resetPingStrikes, 1)
  854. },
  855. }
  856. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  857. select {
  858. case <-t.ctx.Done():
  859. return ErrConnClosing
  860. default:
  861. }
  862. return ContextErr(s.ctx.Err())
  863. }
  864. return t.controlBuf.put(df)
  865. }
  866. // keepalive running in a separate goroutine does the following:
  867. // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  868. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  869. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  870. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  871. // after an additional duration of keepalive.Timeout.
  872. func (t *http2Server) keepalive() {
  873. p := &ping{}
  874. var pingSent bool
  875. maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
  876. maxAge := time.NewTimer(t.kp.MaxConnectionAge)
  877. keepalive := time.NewTimer(t.kp.Time)
  878. // NOTE: All exit paths of this function should reset their
  879. // respective timers. A failure to do so will cause the
  880. // following clean-up to deadlock and eventually leak.
  881. defer func() {
  882. if !maxIdle.Stop() {
  883. <-maxIdle.C
  884. }
  885. if !maxAge.Stop() {
  886. <-maxAge.C
  887. }
  888. if !keepalive.Stop() {
  889. <-keepalive.C
  890. }
  891. }()
  892. for {
  893. select {
  894. case <-maxIdle.C:
  895. t.mu.Lock()
  896. idle := t.idle
  897. if idle.IsZero() { // The connection is non-idle.
  898. t.mu.Unlock()
  899. maxIdle.Reset(t.kp.MaxConnectionIdle)
  900. continue
  901. }
  902. val := t.kp.MaxConnectionIdle - time.Since(idle)
  903. t.mu.Unlock()
  904. if val <= 0 {
  905. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  906. // Gracefully close the connection.
  907. t.drain(http2.ErrCodeNo, []byte{})
  908. // Resetting the timer so that the clean-up doesn't deadlock.
  909. maxIdle.Reset(infinity)
  910. return
  911. }
  912. maxIdle.Reset(val)
  913. case <-maxAge.C:
  914. t.drain(http2.ErrCodeNo, []byte{})
  915. maxAge.Reset(t.kp.MaxConnectionAgeGrace)
  916. select {
  917. case <-maxAge.C:
  918. // Close the connection after grace period.
  919. t.Close()
  920. // Resetting the timer so that the clean-up doesn't deadlock.
  921. maxAge.Reset(infinity)
  922. case <-t.ctx.Done():
  923. }
  924. return
  925. case <-keepalive.C:
  926. if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
  927. pingSent = false
  928. keepalive.Reset(t.kp.Time)
  929. continue
  930. }
  931. if pingSent {
  932. t.Close()
  933. // Resetting the timer so that the clean-up doesn't deadlock.
  934. keepalive.Reset(infinity)
  935. return
  936. }
  937. pingSent = true
  938. if channelz.IsOn() {
  939. t.czmu.Lock()
  940. t.kpCount++
  941. t.czmu.Unlock()
  942. }
  943. t.controlBuf.put(p)
  944. keepalive.Reset(t.kp.Timeout)
  945. case <-t.ctx.Done():
  946. return
  947. }
  948. }
  949. }
  950. // Close starts shutting down the http2Server transport.
  951. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  952. // could cause some resource issue. Revisit this later.
  953. func (t *http2Server) Close() error {
  954. t.mu.Lock()
  955. if t.state == closing {
  956. t.mu.Unlock()
  957. return errors.New("transport: Close() was already called")
  958. }
  959. t.state = closing
  960. streams := t.activeStreams
  961. t.activeStreams = nil
  962. t.mu.Unlock()
  963. t.controlBuf.finish()
  964. t.cancel()
  965. err := t.conn.Close()
  966. if channelz.IsOn() {
  967. channelz.RemoveEntry(t.channelzID)
  968. }
  969. // Cancel all active streams.
  970. for _, s := range streams {
  971. s.cancel()
  972. }
  973. if t.stats != nil {
  974. connEnd := &stats.ConnEnd{}
  975. t.stats.HandleConn(t.ctx, connEnd)
  976. }
  977. return err
  978. }
  979. // closeStream clears the footprint of a stream when the stream is not needed
  980. // any more.
  981. func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
  982. if s.swapState(streamDone) == streamDone {
  983. // If the stream was already done, return.
  984. return
  985. }
  986. // In case stream sending and receiving are invoked in separate
  987. // goroutines (e.g., bi-directional streaming), cancel needs to be
  988. // called to interrupt the potential blocking on other goroutines.
  989. s.cancel()
  990. cleanup := &cleanupStream{
  991. streamID: s.id,
  992. rst: rst,
  993. rstCode: rstCode,
  994. onWrite: func() {
  995. t.mu.Lock()
  996. if t.activeStreams != nil {
  997. delete(t.activeStreams, s.id)
  998. if len(t.activeStreams) == 0 {
  999. t.idle = time.Now()
  1000. }
  1001. }
  1002. t.mu.Unlock()
  1003. if channelz.IsOn() {
  1004. t.czmu.Lock()
  1005. if eosReceived {
  1006. t.streamsSucceeded++
  1007. } else {
  1008. t.streamsFailed++
  1009. }
  1010. t.czmu.Unlock()
  1011. }
  1012. },
  1013. }
  1014. if hdr != nil {
  1015. hdr.cleanup = cleanup
  1016. t.controlBuf.put(hdr)
  1017. } else {
  1018. t.controlBuf.put(cleanup)
  1019. }
  1020. }
  1021. func (t *http2Server) RemoteAddr() net.Addr {
  1022. return t.remoteAddr
  1023. }
  1024. func (t *http2Server) Drain() {
  1025. t.drain(http2.ErrCodeNo, []byte{})
  1026. }
  1027. func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
  1028. t.mu.Lock()
  1029. defer t.mu.Unlock()
  1030. if t.drainChan != nil {
  1031. return
  1032. }
  1033. t.drainChan = make(chan struct{})
  1034. t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
  1035. }
  1036. var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  1037. // Handles outgoing GoAway and returns true if loopy needs to put itself
  1038. // in draining mode.
  1039. func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
  1040. t.mu.Lock()
  1041. if t.state == closing { // TODO(mmukhi): This seems unnecessary.
  1042. t.mu.Unlock()
  1043. // The transport is closing.
  1044. return false, ErrConnClosing
  1045. }
  1046. sid := t.maxStreamID
  1047. if !g.headsUp {
  1048. // Stop accepting more streams now.
  1049. t.state = draining
  1050. if len(t.activeStreams) == 0 {
  1051. g.closeConn = true
  1052. }
  1053. t.mu.Unlock()
  1054. if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
  1055. return false, err
  1056. }
  1057. if g.closeConn {
  1058. // Abruptly close the connection following the GoAway (via
  1059. // loopywriter). But flush out what's inside the buffer first.
  1060. t.framer.writer.Flush()
  1061. return false, fmt.Errorf("transport: Connection closing")
  1062. }
  1063. return true, nil
  1064. }
  1065. t.mu.Unlock()
  1066. // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1067. // Follow that with a ping and wait for the ack to come back or a timer
  1068. // to expire. During this time accept new streams since they might have
  1069. // originated before the GoAway reaches the client.
  1070. // After getting the ack or timer expiration send out another GoAway this
  1071. // time with an ID of the max stream server intends to process.
  1072. if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
  1073. return false, err
  1074. }
  1075. if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1076. return false, err
  1077. }
  1078. go func() {
  1079. timer := time.NewTimer(time.Minute)
  1080. defer timer.Stop()
  1081. select {
  1082. case <-t.drainChan:
  1083. case <-timer.C:
  1084. case <-t.ctx.Done():
  1085. return
  1086. }
  1087. t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
  1088. }()
  1089. return false, nil
  1090. }
  1091. func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
  1092. t.czmu.RLock()
  1093. s := channelz.SocketInternalMetric{
  1094. StreamsStarted: t.streamsStarted,
  1095. StreamsSucceeded: t.streamsSucceeded,
  1096. StreamsFailed: t.streamsFailed,
  1097. MessagesSent: t.msgSent,
  1098. MessagesReceived: t.msgRecv,
  1099. KeepAlivesSent: t.kpCount,
  1100. LastRemoteStreamCreatedTimestamp: t.lastStreamCreated,
  1101. LastMessageSentTimestamp: t.lastMsgSent,
  1102. LastMessageReceivedTimestamp: t.lastMsgRecv,
  1103. LocalFlowControlWindow: int64(t.fc.getSize()),
  1104. SocketOptions: channelz.GetSocketOption(t.conn),
  1105. LocalAddr: t.localAddr,
  1106. RemoteAddr: t.remoteAddr,
  1107. // RemoteName :
  1108. }
  1109. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1110. s.Security = au.GetSecurityValue()
  1111. }
  1112. t.czmu.RUnlock()
  1113. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1114. return &s
  1115. }
  1116. func (t *http2Server) IncrMsgSent() {
  1117. t.czmu.Lock()
  1118. t.msgSent++
  1119. t.lastMsgSent = time.Now()
  1120. t.czmu.Unlock()
  1121. }
  1122. func (t *http2Server) IncrMsgRecv() {
  1123. t.czmu.Lock()
  1124. t.msgRecv++
  1125. t.lastMsgRecv = time.Now()
  1126. t.czmu.Unlock()
  1127. }
  1128. func (t *http2Server) getOutFlowWindow() int64 {
  1129. resp := make(chan uint32)
  1130. timer := time.NewTimer(time.Second)
  1131. defer timer.Stop()
  1132. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1133. select {
  1134. case sz := <-resp:
  1135. return int64(sz)
  1136. case <-t.ctxDone:
  1137. return -1
  1138. case <-timer.C:
  1139. return -2
  1140. }
  1141. }
  1142. func getJitter(v time.Duration) time.Duration {
  1143. if v == infinity {
  1144. return 0
  1145. }
  1146. // Generate a jitter between +/- 10% of the value.
  1147. r := int64(v / 10)
  1148. j := grpcrand.Int63n(2*r) - r
  1149. return time.Duration(j)
  1150. }