123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- package grpc
- import (
- "fmt"
- "net"
- "time"
- "golang.org/x/net/context"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/internal"
- "google.golang.org/grpc/internal/backoff"
- "google.golang.org/grpc/internal/envconfig"
- "google.golang.org/grpc/internal/transport"
- "google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/stats"
- )
- type dialOptions struct {
- unaryInt UnaryClientInterceptor
- streamInt StreamClientInterceptor
- cp Compressor
- dc Decompressor
- bs backoff.Strategy
- block bool
- insecure bool
- timeout time.Duration
- scChan <-chan ServiceConfig
- authority string
- copts transport.ConnectOptions
- callOptions []CallOption
-
-
- balancerBuilder balancer.Builder
-
- resolverBuilder resolver.Builder
- waitForHandshake bool
- channelzParentID int64
- disableServiceConfig bool
- disableRetry bool
- }
- type DialOption func(*dialOptions)
- func WithWaitForHandshake() DialOption {
- return func(o *dialOptions) {
- o.waitForHandshake = true
- }
- }
- func WithWriteBufferSize(s int) DialOption {
- return func(o *dialOptions) {
- o.copts.WriteBufferSize = s
- }
- }
- func WithReadBufferSize(s int) DialOption {
- return func(o *dialOptions) {
- o.copts.ReadBufferSize = s
- }
- }
- func WithInitialWindowSize(s int32) DialOption {
- return func(o *dialOptions) {
- o.copts.InitialWindowSize = s
- }
- }
- func WithInitialConnWindowSize(s int32) DialOption {
- return func(o *dialOptions) {
- o.copts.InitialConnWindowSize = s
- }
- }
- func WithMaxMsgSize(s int) DialOption {
- return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
- }
- func WithDefaultCallOptions(cos ...CallOption) DialOption {
- return func(o *dialOptions) {
- o.callOptions = append(o.callOptions, cos...)
- }
- }
- func WithCodec(c Codec) DialOption {
- return WithDefaultCallOptions(CallCustomCodec(c))
- }
- func WithCompressor(cp Compressor) DialOption {
- return func(o *dialOptions) {
- o.cp = cp
- }
- }
- func WithDecompressor(dc Decompressor) DialOption {
- return func(o *dialOptions) {
- o.dc = dc
- }
- }
- func WithBalancer(b Balancer) DialOption {
- return func(o *dialOptions) {
- o.balancerBuilder = &balancerWrapperBuilder{
- b: b,
- }
- }
- }
- func WithBalancerName(balancerName string) DialOption {
- builder := balancer.Get(balancerName)
- if builder == nil {
- panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
- }
- return func(o *dialOptions) {
- o.balancerBuilder = builder
- }
- }
- func withResolverBuilder(b resolver.Builder) DialOption {
- return func(o *dialOptions) {
- o.resolverBuilder = b
- }
- }
- func WithServiceConfig(c <-chan ServiceConfig) DialOption {
- return func(o *dialOptions) {
- o.scChan = c
- }
- }
- func WithBackoffMaxDelay(md time.Duration) DialOption {
- return WithBackoffConfig(BackoffConfig{MaxDelay: md})
- }
- func WithBackoffConfig(b BackoffConfig) DialOption {
- return withBackoff(backoff.Exponential{
- MaxDelay: b.MaxDelay,
- })
- }
- func withBackoff(bs backoff.Strategy) DialOption {
- return func(o *dialOptions) {
- o.bs = bs
- }
- }
- func WithBlock() DialOption {
- return func(o *dialOptions) {
- o.block = true
- }
- }
- func WithInsecure() DialOption {
- return func(o *dialOptions) {
- o.insecure = true
- }
- }
- func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
- return func(o *dialOptions) {
- o.copts.TransportCredentials = creds
- }
- }
- func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
- return func(o *dialOptions) {
- o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
- }
- }
- func WithTimeout(d time.Duration) DialOption {
- return func(o *dialOptions) {
- o.timeout = d
- }
- }
- func withContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
- return func(o *dialOptions) {
- o.copts.Dialer = f
- }
- }
- func init() {
- internal.WithContextDialer = withContextDialer
- internal.WithResolverBuilder = withResolverBuilder
- }
- func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
- return withContextDialer(
- func(ctx context.Context, addr string) (net.Conn, error) {
- if deadline, ok := ctx.Deadline(); ok {
- return f(addr, deadline.Sub(time.Now()))
- }
- return f(addr, 0)
- })
- }
- func WithStatsHandler(h stats.Handler) DialOption {
- return func(o *dialOptions) {
- o.copts.StatsHandler = h
- }
- }
- func FailOnNonTempDialError(f bool) DialOption {
- return func(o *dialOptions) {
- o.copts.FailOnNonTempDialError = f
- }
- }
- func WithUserAgent(s string) DialOption {
- return func(o *dialOptions) {
- o.copts.UserAgent = s
- }
- }
- func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
- return func(o *dialOptions) {
- o.copts.KeepaliveParams = kp
- }
- }
- func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
- return func(o *dialOptions) {
- o.unaryInt = f
- }
- }
- func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
- return func(o *dialOptions) {
- o.streamInt = f
- }
- }
- func WithAuthority(a string) DialOption {
- return func(o *dialOptions) {
- o.authority = a
- }
- }
- func WithChannelzParentID(id int64) DialOption {
- return func(o *dialOptions) {
- o.channelzParentID = id
- }
- }
- func WithDisableServiceConfig() DialOption {
- return func(o *dialOptions) {
- o.disableServiceConfig = true
- }
- }
- func WithDisableRetry() DialOption {
- return func(o *dialOptions) {
- o.disableRetry = true
- }
- }
- func WithMaxHeaderListSize(s uint32) DialOption {
- return func(o *dialOptions) {
- o.copts.MaxHeaderListSize = &s
- }
- }
- func defaultDialOptions() dialOptions {
- return dialOptions{
- disableRetry: !envconfig.Retry,
- copts: transport.ConnectOptions{
- WriteBufferSize: defaultWriteBufSize,
- ReadBufferSize: defaultReadBufSize,
- },
- }
- }
|