dialoptions.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "fmt"
  21. "net"
  22. "time"
  23. "golang.org/x/net/context"
  24. "google.golang.org/grpc/balancer"
  25. "google.golang.org/grpc/credentials"
  26. "google.golang.org/grpc/internal"
  27. "google.golang.org/grpc/internal/backoff"
  28. "google.golang.org/grpc/internal/envconfig"
  29. "google.golang.org/grpc/internal/transport"
  30. "google.golang.org/grpc/keepalive"
  31. "google.golang.org/grpc/resolver"
  32. "google.golang.org/grpc/stats"
  33. )
  34. // dialOptions configure a Dial call. dialOptions are set by the DialOption
  35. // values passed to Dial.
  36. type dialOptions struct {
  37. unaryInt UnaryClientInterceptor
  38. streamInt StreamClientInterceptor
  39. cp Compressor
  40. dc Decompressor
  41. bs backoff.Strategy
  42. block bool
  43. insecure bool
  44. timeout time.Duration
  45. scChan <-chan ServiceConfig
  46. authority string
  47. copts transport.ConnectOptions
  48. callOptions []CallOption
  49. // This is used by v1 balancer dial option WithBalancer to support v1
  50. // balancer, and also by WithBalancerName dial option.
  51. balancerBuilder balancer.Builder
  52. // This is to support grpclb.
  53. resolverBuilder resolver.Builder
  54. waitForHandshake bool
  55. channelzParentID int64
  56. disableServiceConfig bool
  57. disableRetry bool
  58. }
  59. // DialOption configures how we set up the connection.
  60. type DialOption func(*dialOptions)
  61. // WithWaitForHandshake blocks until the initial settings frame is received from
  62. // the server before assigning RPCs to the connection. Experimental API.
  63. func WithWaitForHandshake() DialOption {
  64. return func(o *dialOptions) {
  65. o.waitForHandshake = true
  66. }
  67. }
  68. // WithWriteBufferSize determines how much data can be batched before doing a
  69. // write on the wire. The corresponding memory allocation for this buffer will
  70. // be twice the size to keep syscalls low. The default value for this buffer is
  71. // 32KB.
  72. //
  73. // Zero will disable the write buffer such that each write will be on underlying
  74. // connection. Note: A Send call may not directly translate to a write.
  75. func WithWriteBufferSize(s int) DialOption {
  76. return func(o *dialOptions) {
  77. o.copts.WriteBufferSize = s
  78. }
  79. }
  80. // WithReadBufferSize lets you set the size of read buffer, this determines how
  81. // much data can be read at most for each read syscall.
  82. //
  83. // The default value for this buffer is 32KB. Zero will disable read buffer for
  84. // a connection so data framer can access the underlying conn directly.
  85. func WithReadBufferSize(s int) DialOption {
  86. return func(o *dialOptions) {
  87. o.copts.ReadBufferSize = s
  88. }
  89. }
  90. // WithInitialWindowSize returns a DialOption which sets the value for initial
  91. // window size on a stream. The lower bound for window size is 64K and any value
  92. // smaller than that will be ignored.
  93. func WithInitialWindowSize(s int32) DialOption {
  94. return func(o *dialOptions) {
  95. o.copts.InitialWindowSize = s
  96. }
  97. }
  98. // WithInitialConnWindowSize returns a DialOption which sets the value for
  99. // initial window size on a connection. The lower bound for window size is 64K
  100. // and any value smaller than that will be ignored.
  101. func WithInitialConnWindowSize(s int32) DialOption {
  102. return func(o *dialOptions) {
  103. o.copts.InitialConnWindowSize = s
  104. }
  105. }
  106. // WithMaxMsgSize returns a DialOption which sets the maximum message size the
  107. // client can receive.
  108. //
  109. // Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
  110. func WithMaxMsgSize(s int) DialOption {
  111. return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
  112. }
  113. // WithDefaultCallOptions returns a DialOption which sets the default
  114. // CallOptions for calls over the connection.
  115. func WithDefaultCallOptions(cos ...CallOption) DialOption {
  116. return func(o *dialOptions) {
  117. o.callOptions = append(o.callOptions, cos...)
  118. }
  119. }
  120. // WithCodec returns a DialOption which sets a codec for message marshaling and
  121. // unmarshaling.
  122. //
  123. // Deprecated: use WithDefaultCallOptions(CallCustomCodec(c)) instead.
  124. func WithCodec(c Codec) DialOption {
  125. return WithDefaultCallOptions(CallCustomCodec(c))
  126. }
  127. // WithCompressor returns a DialOption which sets a Compressor to use for
  128. // message compression. It has lower priority than the compressor set by the
  129. // UseCompressor CallOption.
  130. //
  131. // Deprecated: use UseCompressor instead.
  132. func WithCompressor(cp Compressor) DialOption {
  133. return func(o *dialOptions) {
  134. o.cp = cp
  135. }
  136. }
  137. // WithDecompressor returns a DialOption which sets a Decompressor to use for
  138. // incoming message decompression. If incoming response messages are encoded
  139. // using the decompressor's Type(), it will be used. Otherwise, the message
  140. // encoding will be used to look up the compressor registered via
  141. // encoding.RegisterCompressor, which will then be used to decompress the
  142. // message. If no compressor is registered for the encoding, an Unimplemented
  143. // status error will be returned.
  144. //
  145. // Deprecated: use encoding.RegisterCompressor instead.
  146. func WithDecompressor(dc Decompressor) DialOption {
  147. return func(o *dialOptions) {
  148. o.dc = dc
  149. }
  150. }
  151. // WithBalancer returns a DialOption which sets a load balancer with the v1 API.
  152. // Name resolver will be ignored if this DialOption is specified.
  153. //
  154. // Deprecated: use the new balancer APIs in balancer package and
  155. // WithBalancerName.
  156. func WithBalancer(b Balancer) DialOption {
  157. return func(o *dialOptions) {
  158. o.balancerBuilder = &balancerWrapperBuilder{
  159. b: b,
  160. }
  161. }
  162. }
  163. // WithBalancerName sets the balancer that the ClientConn will be initialized
  164. // with. Balancer registered with balancerName will be used. This function
  165. // panics if no balancer was registered by balancerName.
  166. //
  167. // The balancer cannot be overridden by balancer option specified by service
  168. // config.
  169. //
  170. // This is an EXPERIMENTAL API.
  171. func WithBalancerName(balancerName string) DialOption {
  172. builder := balancer.Get(balancerName)
  173. if builder == nil {
  174. panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
  175. }
  176. return func(o *dialOptions) {
  177. o.balancerBuilder = builder
  178. }
  179. }
  180. // withResolverBuilder is only for grpclb.
  181. func withResolverBuilder(b resolver.Builder) DialOption {
  182. return func(o *dialOptions) {
  183. o.resolverBuilder = b
  184. }
  185. }
  186. // WithServiceConfig returns a DialOption which has a channel to read the
  187. // service configuration.
  188. //
  189. // Deprecated: service config should be received through name resolver, as
  190. // specified here.
  191. // https://github.com/grpc/grpc/blob/master/doc/service_config.md
  192. func WithServiceConfig(c <-chan ServiceConfig) DialOption {
  193. return func(o *dialOptions) {
  194. o.scChan = c
  195. }
  196. }
  197. // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
  198. // when backing off after failed connection attempts.
  199. func WithBackoffMaxDelay(md time.Duration) DialOption {
  200. return WithBackoffConfig(BackoffConfig{MaxDelay: md})
  201. }
  202. // WithBackoffConfig configures the dialer to use the provided backoff
  203. // parameters after connection failures.
  204. //
  205. // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
  206. // for use.
  207. func WithBackoffConfig(b BackoffConfig) DialOption {
  208. return withBackoff(backoff.Exponential{
  209. MaxDelay: b.MaxDelay,
  210. })
  211. }
  212. // withBackoff sets the backoff strategy used for connectRetryNum after a failed
  213. // connection attempt.
  214. //
  215. // This can be exported if arbitrary backoff strategies are allowed by gRPC.
  216. func withBackoff(bs backoff.Strategy) DialOption {
  217. return func(o *dialOptions) {
  218. o.bs = bs
  219. }
  220. }
  221. // WithBlock returns a DialOption which makes caller of Dial blocks until the
  222. // underlying connection is up. Without this, Dial returns immediately and
  223. // connecting the server happens in background.
  224. func WithBlock() DialOption {
  225. return func(o *dialOptions) {
  226. o.block = true
  227. }
  228. }
  229. // WithInsecure returns a DialOption which disables transport security for this
  230. // ClientConn. Note that transport security is required unless WithInsecure is
  231. // set.
  232. func WithInsecure() DialOption {
  233. return func(o *dialOptions) {
  234. o.insecure = true
  235. }
  236. }
  237. // WithTransportCredentials returns a DialOption which configures a connection
  238. // level security credentials (e.g., TLS/SSL).
  239. func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
  240. return func(o *dialOptions) {
  241. o.copts.TransportCredentials = creds
  242. }
  243. }
  244. // WithPerRPCCredentials returns a DialOption which sets credentials and places
  245. // auth state on each outbound RPC.
  246. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
  247. return func(o *dialOptions) {
  248. o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
  249. }
  250. }
  251. // WithTimeout returns a DialOption that configures a timeout for dialing a
  252. // ClientConn initially. This is valid if and only if WithBlock() is present.
  253. //
  254. // Deprecated: use DialContext and context.WithTimeout instead.
  255. func WithTimeout(d time.Duration) DialOption {
  256. return func(o *dialOptions) {
  257. o.timeout = d
  258. }
  259. }
  260. func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
  261. return func(o *dialOptions) {
  262. o.copts.Dialer = f
  263. }
  264. }
  265. func init() {
  266. internal.WithContextDialer = withContextDialer
  267. internal.WithResolverBuilder = withResolverBuilder
  268. }
  269. // WithDialer returns a DialOption that specifies a function to use for dialing
  270. // network addresses. If FailOnNonTempDialError() is set to true, and an error
  271. // is returned by f, gRPC checks the error's Temporary() method to decide if it
  272. // should try to reconnect to the network address.
  273. func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
  274. return withContextDialer(
  275. func(ctx context.Context, addr string) (net.Conn, error) {
  276. if deadline, ok := ctx.Deadline(); ok {
  277. return f(addr, deadline.Sub(time.Now()))
  278. }
  279. return f(addr, 0)
  280. })
  281. }
  282. // WithStatsHandler returns a DialOption that specifies the stats handler for
  283. // all the RPCs and underlying network connections in this ClientConn.
  284. func WithStatsHandler(h stats.Handler) DialOption {
  285. return func(o *dialOptions) {
  286. o.copts.StatsHandler = h
  287. }
  288. }
  289. // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on
  290. // non-temporary dial errors. If f is true, and dialer returns a non-temporary
  291. // error, gRPC will fail the connection to the network address and won't try to
  292. // reconnect. The default value of FailOnNonTempDialError is false.
  293. //
  294. // This is an EXPERIMENTAL API.
  295. func FailOnNonTempDialError(f bool) DialOption {
  296. return func(o *dialOptions) {
  297. o.copts.FailOnNonTempDialError = f
  298. }
  299. }
  300. // WithUserAgent returns a DialOption that specifies a user agent string for all
  301. // the RPCs.
  302. func WithUserAgent(s string) DialOption {
  303. return func(o *dialOptions) {
  304. o.copts.UserAgent = s
  305. }
  306. }
  307. // WithKeepaliveParams returns a DialOption that specifies keepalive parameters
  308. // for the client transport.
  309. func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
  310. return func(o *dialOptions) {
  311. o.copts.KeepaliveParams = kp
  312. }
  313. }
  314. // WithUnaryInterceptor returns a DialOption that specifies the interceptor for
  315. // unary RPCs.
  316. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
  317. return func(o *dialOptions) {
  318. o.unaryInt = f
  319. }
  320. }
  321. // WithStreamInterceptor returns a DialOption that specifies the interceptor for
  322. // streaming RPCs.
  323. func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
  324. return func(o *dialOptions) {
  325. o.streamInt = f
  326. }
  327. }
  328. // WithAuthority returns a DialOption that specifies the value to be used as the
  329. // :authority pseudo-header. This value only works with WithInsecure and has no
  330. // effect if TransportCredentials are present.
  331. func WithAuthority(a string) DialOption {
  332. return func(o *dialOptions) {
  333. o.authority = a
  334. }
  335. }
  336. // WithChannelzParentID returns a DialOption that specifies the channelz ID of
  337. // current ClientConn's parent. This function is used in nested channel creation
  338. // (e.g. grpclb dial).
  339. func WithChannelzParentID(id int64) DialOption {
  340. return func(o *dialOptions) {
  341. o.channelzParentID = id
  342. }
  343. }
  344. // WithDisableServiceConfig returns a DialOption that causes grpc to ignore any
  345. // service config provided by the resolver and provides a hint to the resolver
  346. // to not fetch service configs.
  347. func WithDisableServiceConfig() DialOption {
  348. return func(o *dialOptions) {
  349. o.disableServiceConfig = true
  350. }
  351. }
  352. // WithDisableRetry returns a DialOption that disables retries, even if the
  353. // service config enables them. This does not impact transparent retries, which
  354. // will happen automatically if no data is written to the wire or if the RPC is
  355. // unprocessed by the remote server.
  356. //
  357. // Retry support is currently disabled by default, but will be enabled by
  358. // default in the future. Until then, it may be enabled by setting the
  359. // environment variable "GRPC_GO_RETRY" to "on".
  360. //
  361. // This API is EXPERIMENTAL.
  362. func WithDisableRetry() DialOption {
  363. return func(o *dialOptions) {
  364. o.disableRetry = true
  365. }
  366. }
  367. // WithMaxHeaderListSize returns a DialOption that specifies the maximum
  368. // (uncompressed) size of header list that the client is prepared to accept.
  369. func WithMaxHeaderListSize(s uint32) DialOption {
  370. return func(o *dialOptions) {
  371. o.copts.MaxHeaderListSize = &s
  372. }
  373. }
  374. func defaultDialOptions() dialOptions {
  375. return dialOptions{
  376. disableRetry: !envconfig.Retry,
  377. copts: transport.ConnectOptions{
  378. WriteBufferSize: defaultWriteBufSize,
  379. ReadBufferSize: defaultReadBufSize,
  380. },
  381. }
  382. }