transport.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package transport defines and implements message oriented communication
  19. // channel to complete various transactions (e.g., an RPC). It is meant for
  20. // grpc-internal usage and is not intended to be imported directly by users.
  21. package transport
  22. import (
  23. "errors"
  24. "fmt"
  25. "io"
  26. "net"
  27. "sync"
  28. "sync/atomic"
  29. "golang.org/x/net/context"
  30. "google.golang.org/grpc/codes"
  31. "google.golang.org/grpc/credentials"
  32. "google.golang.org/grpc/keepalive"
  33. "google.golang.org/grpc/metadata"
  34. "google.golang.org/grpc/stats"
  35. "google.golang.org/grpc/status"
  36. "google.golang.org/grpc/tap"
  37. )
  38. // recvMsg represents the received msg from the transport. All transport
  39. // protocol specific info has been removed.
  40. type recvMsg struct {
  41. data []byte
  42. // nil: received some data
  43. // io.EOF: stream is completed. data is nil.
  44. // other non-nil error: transport failure. data is nil.
  45. err error
  46. }
  47. // recvBuffer is an unbounded channel of recvMsg structs.
  48. // Note recvBuffer differs from controlBuffer only in that recvBuffer
  49. // holds a channel of only recvMsg structs instead of objects implementing "item" interface.
  50. // recvBuffer is written to much more often than
  51. // controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
  52. type recvBuffer struct {
  53. c chan recvMsg
  54. mu sync.Mutex
  55. backlog []recvMsg
  56. err error
  57. }
  58. func newRecvBuffer() *recvBuffer {
  59. b := &recvBuffer{
  60. c: make(chan recvMsg, 1),
  61. }
  62. return b
  63. }
  64. func (b *recvBuffer) put(r recvMsg) {
  65. b.mu.Lock()
  66. if b.err != nil {
  67. b.mu.Unlock()
  68. // An error had occurred earlier, don't accept more
  69. // data or errors.
  70. return
  71. }
  72. b.err = r.err
  73. if len(b.backlog) == 0 {
  74. select {
  75. case b.c <- r:
  76. b.mu.Unlock()
  77. return
  78. default:
  79. }
  80. }
  81. b.backlog = append(b.backlog, r)
  82. b.mu.Unlock()
  83. }
  84. func (b *recvBuffer) load() {
  85. b.mu.Lock()
  86. if len(b.backlog) > 0 {
  87. select {
  88. case b.c <- b.backlog[0]:
  89. b.backlog[0] = recvMsg{}
  90. b.backlog = b.backlog[1:]
  91. default:
  92. }
  93. }
  94. b.mu.Unlock()
  95. }
  96. // get returns the channel that receives a recvMsg in the buffer.
  97. //
  98. // Upon receipt of a recvMsg, the caller should call load to send another
  99. // recvMsg onto the channel if there is any.
  100. func (b *recvBuffer) get() <-chan recvMsg {
  101. return b.c
  102. }
  103. //
  104. // recvBufferReader implements io.Reader interface to read the data from
  105. // recvBuffer.
  106. type recvBufferReader struct {
  107. ctx context.Context
  108. ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
  109. recv *recvBuffer
  110. last []byte // Stores the remaining data in the previous calls.
  111. err error
  112. }
  113. // Read reads the next len(p) bytes from last. If last is drained, it tries to
  114. // read additional data from recv. It blocks if there no additional data available
  115. // in recv. If Read returns any non-nil error, it will continue to return that error.
  116. func (r *recvBufferReader) Read(p []byte) (n int, err error) {
  117. if r.err != nil {
  118. return 0, r.err
  119. }
  120. n, r.err = r.read(p)
  121. return n, r.err
  122. }
  123. func (r *recvBufferReader) read(p []byte) (n int, err error) {
  124. if r.last != nil && len(r.last) > 0 {
  125. // Read remaining data left in last call.
  126. copied := copy(p, r.last)
  127. r.last = r.last[copied:]
  128. return copied, nil
  129. }
  130. select {
  131. case <-r.ctxDone:
  132. return 0, ContextErr(r.ctx.Err())
  133. case m := <-r.recv.get():
  134. r.recv.load()
  135. if m.err != nil {
  136. return 0, m.err
  137. }
  138. copied := copy(p, m.data)
  139. r.last = m.data[copied:]
  140. return copied, nil
  141. }
  142. }
  143. type streamState uint32
  144. const (
  145. streamActive streamState = iota
  146. streamWriteDone // EndStream sent
  147. streamReadDone // EndStream received
  148. streamDone // the entire stream is finished.
  149. )
  150. // Stream represents an RPC in the transport layer.
  151. type Stream struct {
  152. id uint32
  153. st ServerTransport // nil for client side Stream
  154. ctx context.Context // the associated context of the stream
  155. cancel context.CancelFunc // always nil for client side Stream
  156. done chan struct{} // closed at the end of stream to unblock writers. On the client side.
  157. ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
  158. method string // the associated RPC method of the stream
  159. recvCompress string
  160. sendCompress string
  161. buf *recvBuffer
  162. trReader io.Reader
  163. fc *inFlow
  164. recvQuota uint32
  165. wq *writeQuota
  166. // Callback to state application's intentions to read data. This
  167. // is used to adjust flow control, if needed.
  168. requestRead func(int)
  169. headerChan chan struct{} // closed to indicate the end of header metadata.
  170. headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
  171. // hdrMu protects header and trailer metadata on the server-side.
  172. hdrMu sync.Mutex
  173. header metadata.MD // the received header metadata.
  174. trailer metadata.MD // the key-value map of trailer metadata.
  175. noHeaders bool // set if the client never received headers (set only after the stream is done).
  176. // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
  177. headerSent uint32
  178. state streamState
  179. // On client-side it is the status error received from the server.
  180. // On server-side it is unused.
  181. status *status.Status
  182. bytesReceived uint32 // indicates whether any bytes have been received on this stream
  183. unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
  184. // contentSubtype is the content-subtype for requests.
  185. // this must be lowercase or the behavior is undefined.
  186. contentSubtype string
  187. }
  188. // isHeaderSent is only valid on the server-side.
  189. func (s *Stream) isHeaderSent() bool {
  190. return atomic.LoadUint32(&s.headerSent) == 1
  191. }
  192. // updateHeaderSent updates headerSent and returns true
  193. // if it was alreay set. It is valid only on server-side.
  194. func (s *Stream) updateHeaderSent() bool {
  195. return atomic.SwapUint32(&s.headerSent, 1) == 1
  196. }
  197. func (s *Stream) swapState(st streamState) streamState {
  198. return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
  199. }
  200. func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
  201. return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
  202. }
  203. func (s *Stream) getState() streamState {
  204. return streamState(atomic.LoadUint32((*uint32)(&s.state)))
  205. }
  206. func (s *Stream) waitOnHeader() error {
  207. if s.headerChan == nil {
  208. // On the server headerChan is always nil since a stream originates
  209. // only after having received headers.
  210. return nil
  211. }
  212. select {
  213. case <-s.ctx.Done():
  214. return ContextErr(s.ctx.Err())
  215. case <-s.headerChan:
  216. return nil
  217. }
  218. }
  219. // RecvCompress returns the compression algorithm applied to the inbound
  220. // message. It is empty string if there is no compression applied.
  221. func (s *Stream) RecvCompress() string {
  222. if err := s.waitOnHeader(); err != nil {
  223. return ""
  224. }
  225. return s.recvCompress
  226. }
  227. // SetSendCompress sets the compression algorithm to the stream.
  228. func (s *Stream) SetSendCompress(str string) {
  229. s.sendCompress = str
  230. }
  231. // Done returns a channel which is closed when it receives the final status
  232. // from the server.
  233. func (s *Stream) Done() <-chan struct{} {
  234. return s.done
  235. }
  236. // Header acquires the key-value pairs of header metadata once it
  237. // is available. It blocks until i) the metadata is ready or ii) there is no
  238. // header metadata or iii) the stream is canceled/expired.
  239. func (s *Stream) Header() (metadata.MD, error) {
  240. err := s.waitOnHeader()
  241. // Even if the stream is closed, header is returned if available.
  242. select {
  243. case <-s.headerChan:
  244. if s.header == nil {
  245. return nil, nil
  246. }
  247. return s.header.Copy(), nil
  248. default:
  249. }
  250. return nil, err
  251. }
  252. // TrailersOnly blocks until a header or trailers-only frame is received and
  253. // then returns true if the stream was trailers-only. If the stream ends
  254. // before headers are received, returns true, nil. If a context error happens
  255. // first, returns it as a status error. Client-side only.
  256. func (s *Stream) TrailersOnly() (bool, error) {
  257. err := s.waitOnHeader()
  258. if err != nil {
  259. return false, err
  260. }
  261. // if !headerDone, some other connection error occurred.
  262. return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil
  263. }
  264. // Trailer returns the cached trailer metedata. Note that if it is not called
  265. // after the entire stream is done, it could return an empty MD. Client
  266. // side only.
  267. // It can be safely read only after stream has ended that is either read
  268. // or write have returned io.EOF.
  269. func (s *Stream) Trailer() metadata.MD {
  270. c := s.trailer.Copy()
  271. return c
  272. }
  273. // ContentSubtype returns the content-subtype for a request. For example, a
  274. // content-subtype of "proto" will result in a content-type of
  275. // "application/grpc+proto". This will always be lowercase. See
  276. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
  277. // more details.
  278. func (s *Stream) ContentSubtype() string {
  279. return s.contentSubtype
  280. }
  281. // Context returns the context of the stream.
  282. func (s *Stream) Context() context.Context {
  283. return s.ctx
  284. }
  285. // Method returns the method for the stream.
  286. func (s *Stream) Method() string {
  287. return s.method
  288. }
  289. // Status returns the status received from the server.
  290. // Status can be read safely only after the stream has ended,
  291. // that is, after Done() is closed.
  292. func (s *Stream) Status() *status.Status {
  293. return s.status
  294. }
  295. // SetHeader sets the header metadata. This can be called multiple times.
  296. // Server side only.
  297. // This should not be called in parallel to other data writes.
  298. func (s *Stream) SetHeader(md metadata.MD) error {
  299. if md.Len() == 0 {
  300. return nil
  301. }
  302. if s.isHeaderSent() || s.getState() == streamDone {
  303. return ErrIllegalHeaderWrite
  304. }
  305. s.hdrMu.Lock()
  306. s.header = metadata.Join(s.header, md)
  307. s.hdrMu.Unlock()
  308. return nil
  309. }
  310. // SendHeader sends the given header metadata. The given metadata is
  311. // combined with any metadata set by previous calls to SetHeader and
  312. // then written to the transport stream.
  313. func (s *Stream) SendHeader(md metadata.MD) error {
  314. return s.st.WriteHeader(s, md)
  315. }
  316. // SetTrailer sets the trailer metadata which will be sent with the RPC status
  317. // by the server. This can be called multiple times. Server side only.
  318. // This should not be called parallel to other data writes.
  319. func (s *Stream) SetTrailer(md metadata.MD) error {
  320. if md.Len() == 0 {
  321. return nil
  322. }
  323. if s.getState() == streamDone {
  324. return ErrIllegalHeaderWrite
  325. }
  326. s.hdrMu.Lock()
  327. s.trailer = metadata.Join(s.trailer, md)
  328. s.hdrMu.Unlock()
  329. return nil
  330. }
  331. func (s *Stream) write(m recvMsg) {
  332. s.buf.put(m)
  333. }
  334. // Read reads all p bytes from the wire for this stream.
  335. func (s *Stream) Read(p []byte) (n int, err error) {
  336. // Don't request a read if there was an error earlier
  337. if er := s.trReader.(*transportReader).er; er != nil {
  338. return 0, er
  339. }
  340. s.requestRead(len(p))
  341. return io.ReadFull(s.trReader, p)
  342. }
  343. // tranportReader reads all the data available for this Stream from the transport and
  344. // passes them into the decoder, which converts them into a gRPC message stream.
  345. // The error is io.EOF when the stream is done or another non-nil error if
  346. // the stream broke.
  347. type transportReader struct {
  348. reader io.Reader
  349. // The handler to control the window update procedure for both this
  350. // particular stream and the associated transport.
  351. windowHandler func(int)
  352. er error
  353. }
  354. func (t *transportReader) Read(p []byte) (n int, err error) {
  355. n, err = t.reader.Read(p)
  356. if err != nil {
  357. t.er = err
  358. return
  359. }
  360. t.windowHandler(n)
  361. return
  362. }
  363. // BytesReceived indicates whether any bytes have been received on this stream.
  364. func (s *Stream) BytesReceived() bool {
  365. return atomic.LoadUint32(&s.bytesReceived) == 1
  366. }
  367. // Unprocessed indicates whether the server did not process this stream --
  368. // i.e. it sent a refused stream or GOAWAY including this stream ID.
  369. func (s *Stream) Unprocessed() bool {
  370. return atomic.LoadUint32(&s.unprocessed) == 1
  371. }
  372. // GoString is implemented by Stream so context.String() won't
  373. // race when printing %#v.
  374. func (s *Stream) GoString() string {
  375. return fmt.Sprintf("<stream: %p, %v>", s, s.method)
  376. }
  377. // state of transport
  378. type transportState int
  379. const (
  380. reachable transportState = iota
  381. closing
  382. draining
  383. )
  384. // ServerConfig consists of all the configurations to establish a server transport.
  385. type ServerConfig struct {
  386. MaxStreams uint32
  387. AuthInfo credentials.AuthInfo
  388. InTapHandle tap.ServerInHandle
  389. StatsHandler stats.Handler
  390. KeepaliveParams keepalive.ServerParameters
  391. KeepalivePolicy keepalive.EnforcementPolicy
  392. InitialWindowSize int32
  393. InitialConnWindowSize int32
  394. WriteBufferSize int
  395. ReadBufferSize int
  396. ChannelzParentID int64
  397. MaxHeaderListSize *uint32
  398. }
  399. // NewServerTransport creates a ServerTransport with conn or non-nil error
  400. // if it fails.
  401. func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
  402. return newHTTP2Server(conn, config)
  403. }
  404. // ConnectOptions covers all relevant options for communicating with the server.
  405. type ConnectOptions struct {
  406. // UserAgent is the application user agent.
  407. UserAgent string
  408. // Dialer specifies how to dial a network address.
  409. Dialer func(context.Context, string) (net.Conn, error)
  410. // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
  411. FailOnNonTempDialError bool
  412. // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
  413. PerRPCCredentials []credentials.PerRPCCredentials
  414. // TransportCredentials stores the Authenticator required to setup a client connection.
  415. TransportCredentials credentials.TransportCredentials
  416. // KeepaliveParams stores the keepalive parameters.
  417. KeepaliveParams keepalive.ClientParameters
  418. // StatsHandler stores the handler for stats.
  419. StatsHandler stats.Handler
  420. // InitialWindowSize sets the initial window size for a stream.
  421. InitialWindowSize int32
  422. // InitialConnWindowSize sets the initial window size for a connection.
  423. InitialConnWindowSize int32
  424. // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
  425. WriteBufferSize int
  426. // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
  427. ReadBufferSize int
  428. // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
  429. ChannelzParentID int64
  430. // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
  431. MaxHeaderListSize *uint32
  432. }
  433. // TargetInfo contains the information of the target such as network address and metadata.
  434. type TargetInfo struct {
  435. Addr string
  436. Metadata interface{}
  437. Authority string
  438. }
  439. // NewClientTransport establishes the transport with the required ConnectOptions
  440. // and returns it to the caller.
  441. func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) {
  442. return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess)
  443. }
  444. // Options provides additional hints and information for message
  445. // transmission.
  446. type Options struct {
  447. // Last indicates whether this write is the last piece for
  448. // this stream.
  449. Last bool
  450. }
  451. // CallHdr carries the information of a particular RPC.
  452. type CallHdr struct {
  453. // Host specifies the peer's host.
  454. Host string
  455. // Method specifies the operation to perform.
  456. Method string
  457. // SendCompress specifies the compression algorithm applied on
  458. // outbound message.
  459. SendCompress string
  460. // Creds specifies credentials.PerRPCCredentials for a call.
  461. Creds credentials.PerRPCCredentials
  462. // ContentSubtype specifies the content-subtype for a request. For example, a
  463. // content-subtype of "proto" will result in a content-type of
  464. // "application/grpc+proto". The value of ContentSubtype must be all
  465. // lowercase, otherwise the behavior is undefined. See
  466. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  467. // for more details.
  468. ContentSubtype string
  469. PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
  470. }
  471. // ClientTransport is the common interface for all gRPC client-side transport
  472. // implementations.
  473. type ClientTransport interface {
  474. // Close tears down this transport. Once it returns, the transport
  475. // should not be accessed any more. The caller must make sure this
  476. // is called only once.
  477. Close() error
  478. // GracefulClose starts to tear down the transport. It stops accepting
  479. // new RPCs and wait the completion of the pending RPCs.
  480. GracefulClose() error
  481. // Write sends the data for the given stream. A nil stream indicates
  482. // the write is to be performed on the transport as a whole.
  483. Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  484. // NewStream creates a Stream for an RPC.
  485. NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
  486. // CloseStream clears the footprint of a stream when the stream is
  487. // not needed any more. The err indicates the error incurred when
  488. // CloseStream is called. Must be called when a stream is finished
  489. // unless the associated transport is closing.
  490. CloseStream(stream *Stream, err error)
  491. // Error returns a channel that is closed when some I/O error
  492. // happens. Typically the caller should have a goroutine to monitor
  493. // this in order to take action (e.g., close the current transport
  494. // and create a new one) in error case. It should not return nil
  495. // once the transport is initiated.
  496. Error() <-chan struct{}
  497. // GoAway returns a channel that is closed when ClientTransport
  498. // receives the draining signal from the server (e.g., GOAWAY frame in
  499. // HTTP/2).
  500. GoAway() <-chan struct{}
  501. // GetGoAwayReason returns the reason why GoAway frame was received.
  502. GetGoAwayReason() GoAwayReason
  503. // IncrMsgSent increments the number of message sent through this transport.
  504. IncrMsgSent()
  505. // IncrMsgRecv increments the number of message received through this transport.
  506. IncrMsgRecv()
  507. }
  508. // ServerTransport is the common interface for all gRPC server-side transport
  509. // implementations.
  510. //
  511. // Methods may be called concurrently from multiple goroutines, but
  512. // Write methods for a given Stream will be called serially.
  513. type ServerTransport interface {
  514. // HandleStreams receives incoming streams using the given handler.
  515. HandleStreams(func(*Stream), func(context.Context, string) context.Context)
  516. // WriteHeader sends the header metadata for the given stream.
  517. // WriteHeader may not be called on all streams.
  518. WriteHeader(s *Stream, md metadata.MD) error
  519. // Write sends the data for the given stream.
  520. // Write may not be called on all streams.
  521. Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  522. // WriteStatus sends the status of a stream to the client. WriteStatus is
  523. // the final call made on a stream and always occurs.
  524. WriteStatus(s *Stream, st *status.Status) error
  525. // Close tears down the transport. Once it is called, the transport
  526. // should not be accessed any more. All the pending streams and their
  527. // handlers will be terminated asynchronously.
  528. Close() error
  529. // RemoteAddr returns the remote network address.
  530. RemoteAddr() net.Addr
  531. // Drain notifies the client this ServerTransport stops accepting new RPCs.
  532. Drain()
  533. // IncrMsgSent increments the number of message sent through this transport.
  534. IncrMsgSent()
  535. // IncrMsgRecv increments the number of message received through this transport.
  536. IncrMsgRecv()
  537. }
  538. // streamErrorf creates an StreamError with the specified error code and description.
  539. func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
  540. return StreamError{
  541. Code: c,
  542. Desc: fmt.Sprintf(format, a...),
  543. }
  544. }
  545. // streamError creates an StreamError with the specified error code and description.
  546. func streamError(c codes.Code, desc string) StreamError {
  547. return StreamError{Code: c, Desc: desc}
  548. }
  549. // connectionErrorf creates an ConnectionError with the specified error description.
  550. func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
  551. return ConnectionError{
  552. Desc: fmt.Sprintf(format, a...),
  553. temp: temp,
  554. err: e,
  555. }
  556. }
  557. // ConnectionError is an error that results in the termination of the
  558. // entire connection and the retry of all the active streams.
  559. type ConnectionError struct {
  560. Desc string
  561. temp bool
  562. err error
  563. }
  564. func (e ConnectionError) Error() string {
  565. return fmt.Sprintf("connection error: desc = %q", e.Desc)
  566. }
  567. // Temporary indicates if this connection error is temporary or fatal.
  568. func (e ConnectionError) Temporary() bool {
  569. return e.temp
  570. }
  571. // Origin returns the original error of this connection error.
  572. func (e ConnectionError) Origin() error {
  573. // Never return nil error here.
  574. // If the original error is nil, return itself.
  575. if e.err == nil {
  576. return e
  577. }
  578. return e.err
  579. }
  580. var (
  581. // ErrConnClosing indicates that the transport is closing.
  582. ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
  583. // errStreamDrain indicates that the stream is rejected because the
  584. // connection is draining. This could be caused by goaway or balancer
  585. // removing the address.
  586. errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining")
  587. // errStreamDone is returned from write at the client side to indiacte application
  588. // layer of an error.
  589. errStreamDone = errors.New("the stream is done")
  590. // StatusGoAway indicates that the server sent a GOAWAY that included this
  591. // stream's ID in unprocessed RPCs.
  592. statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
  593. )
  594. // TODO: See if we can replace StreamError with status package errors.
  595. // StreamError is an error that only affects one stream within a connection.
  596. type StreamError struct {
  597. Code codes.Code
  598. Desc string
  599. }
  600. func (e StreamError) Error() string {
  601. return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
  602. }
  603. // GoAwayReason contains the reason for the GoAway frame received.
  604. type GoAwayReason uint8
  605. const (
  606. // GoAwayInvalid indicates that no GoAway frame is received.
  607. GoAwayInvalid GoAwayReason = 0
  608. // GoAwayNoReason is the default value when GoAway frame is received.
  609. GoAwayNoReason GoAwayReason = 1
  610. // GoAwayTooManyPings indicates that a GoAway frame with
  611. // ErrCodeEnhanceYourCalm was received and that the debug data said
  612. // "too_many_pings".
  613. GoAwayTooManyPings GoAwayReason = 2
  614. )