diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 8b62d4e9edf..eb3f271076c 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -166,6 +166,7 @@ Flags: --grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. --health_check_interval duration Interval between health checks (default 20s) + --healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration the health check timeout period (default 1m0s) --heartbeat_enable If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the sidecar database's heartbeat table. The result is used to inform the serving state of the vttablet via healthchecks. diff --git a/go/flags/endtoend/vtctld.txt b/go/flags/endtoend/vtctld.txt index da0cef558e5..6056d16c4ac 100644 --- a/go/flags/endtoend/vtctld.txt +++ b/go/flags/endtoend/vtctld.txt @@ -83,6 +83,7 @@ Flags: --grpc_server_initial_window_size int gRPC server initial window size --grpc_server_keepalive_enforcement_policy_min_time duration gRPC server minimum keepalive time (default 10s) --grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs) + --healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) -h, --help help for vtctld --jaeger-agent-host string host and port to send spans to. if empty, no tracing will be done --keep_logs duration keep logs for this long (using ctime) (zero to keep forever) diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index 0d296c35e7c..12f83bcbcee 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -94,6 +94,7 @@ Flags: --grpc_server_keepalive_enforcement_policy_min_time duration gRPC server minimum keepalive time (default 10s) --grpc_server_keepalive_enforcement_policy_permit_without_stream gRPC server permit client keepalive pings even when there are no active streams (RPCs) --grpc_use_effective_callerid If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal. + --healthcheck-dial-concurrency int Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000. (default 1024) --healthcheck_retry_delay duration health check retry delay (default 2ms) --healthcheck_timeout duration the health check timeout period (default 1m0s) -h, --help help for vtgate diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 5d6a5e32662..8f0c70f27b8 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -46,6 +46,7 @@ import ( "github.com/google/safehtml/template" "github.com/google/safehtml/template/uncheckedconversions" "github.com/spf13/pflag" + "golang.org/x/sync/semaphore" "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/stats" @@ -87,6 +88,9 @@ var ( // refreshKnownTablets tells us whether to process all tablets or only new tablets. refreshKnownTablets = true + // healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000. + healthCheckDialConcurrency int64 = 1024 + // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond @@ -168,6 +172,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") + fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.") ParseTabletURLTemplateFromFlag() } @@ -287,6 +292,8 @@ type HealthCheckImpl struct { subscribers map[chan *TabletHealth]struct{} // loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted loadTabletsTrigger chan struct{} + // healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once. + healthCheckDialSem *semaphore.Weighted } // NewHealthCheck creates a new HealthCheck object. @@ -321,6 +328,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur cell: localCell, retryDelay: retryDelay, healthCheckTimeout: healthCheckTimeout, + healthCheckDialSem: semaphore.NewWeighted(healthCheckDialConcurrency), healthByAlias: make(map[tabletAliasString]*tabletHealthCheck), healthData: make(map[KeyspaceShardTabletType]map[tabletAliasString]*TabletHealth), healthy: make(map[KeyspaceShardTabletType][]*TabletHealth), @@ -850,7 +858,7 @@ func (hc *HealthCheckImpl) TabletConnection(alias *topodata.TabletAlias, target // TODO: test that throws this error return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } - return thc.Connection(), nil + return thc.Connection(hc), nil } // getAliasByCell should only be called while holding hc.mu diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index 24496155e74..fc3ab242210 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -19,6 +19,7 @@ package discovery import ( "context" "fmt" + "net" "strings" "sync" "sync/atomic" @@ -33,12 +34,16 @@ import ( "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" + "google.golang.org/grpc" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" ) +// withDialerContextOnce ensures grpc.WithDialContext() is added once to the options. +var withDialerContextOnce sync.Once + // tabletHealthCheck maintains the health status of a tablet. A map of this // structure is maintained in HealthCheck. type tabletHealthCheck struct { @@ -122,8 +127,8 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) { } // stream streams healthcheck responses to callback. -func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.StreamHealthResponse) error) error { - conn := thc.Connection() +func (thc *tabletHealthCheck) stream(ctx context.Context, hc *HealthCheckImpl, callback func(*query.StreamHealthResponse) error) error { + conn := thc.Connection(hc) if conn == nil { // This signals the caller to retry return nil @@ -136,14 +141,34 @@ func (thc *tabletHealthCheck) stream(ctx context.Context, callback func(*query.S return err } -func (thc *tabletHealthCheck) Connection() queryservice.QueryService { +func (thc *tabletHealthCheck) Connection(hc *HealthCheckImpl) queryservice.QueryService { thc.connMu.Lock() defer thc.connMu.Unlock() - return thc.connectionLocked() + return thc.connectionLocked(hc) +} + +func healthCheckDialerFactory(hc *HealthCheckImpl) func(ctx context.Context, addr string) (net.Conn, error) { + return func(ctx context.Context, addr string) (net.Conn, error) { + // Limit the number of healthcheck connections opened in parallel to avoid high OS-thread + // usage due to blocking networking syscalls (eg: DNS lookups, TCP connection opens, + // etc). Without this limit it is possible for vtgates watching >10k tablets to hit + // the panic: 'runtime: program exceeds 10000-thread limit'. + if err := hc.healthCheckDialSem.Acquire(ctx, 1); err != nil { + return nil, err + } + defer hc.healthCheckDialSem.Release(1) + var dialer net.Dialer + return dialer.DialContext(ctx, "tcp", addr) + } } -func (thc *tabletHealthCheck) connectionLocked() queryservice.QueryService { +func (thc *tabletHealthCheck) connectionLocked(hc *HealthCheckImpl) queryservice.QueryService { if thc.Conn == nil { + withDialerContextOnce.Do(func() { + grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { + return append(opts, grpc.WithContextDialer(healthCheckDialerFactory(hc))), nil + }) + }) conn, err := tabletconn.GetDialer()(thc.Tablet, grpcclient.FailFast(true)) if err != nil { thc.LastError = err @@ -272,7 +297,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { }() // Read stream health responses. - err := thc.stream(streamCtx, func(shr *query.StreamHealthResponse) error { + err := thc.stream(streamCtx, hc, func(shr *query.StreamHealthResponse) error { // We received a message. Reset the back-off. retryDelay = hc.retryDelay // Don't block on send to avoid deadlocks. diff --git a/go/vt/grpcclient/client.go b/go/vt/grpcclient/client.go index b2ef0d4fb28..7524298514e 100644 --- a/go/vt/grpcclient/client.go +++ b/go/vt/grpcclient/client.go @@ -21,6 +21,7 @@ package grpcclient import ( "context" "crypto/tls" + "sync" "time" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -39,6 +40,7 @@ import ( ) var ( + grpcDialOptionsMu sync.Mutex keepaliveTime = 10 * time.Second keepaliveTimeout = 10 * time.Second initialConnWindowSize int @@ -86,6 +88,8 @@ var grpcDialOptions []func(opts []grpc.DialOption) ([]grpc.DialOption, error) // RegisterGRPCDialOptions registers an implementation of AuthServer. func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([]grpc.DialOption, error)) { + grpcDialOptionsMu.Lock() + defer grpcDialOptionsMu.Unlock() grpcDialOptions = append(grpcDialOptions, grpcDialOptionsFunc) } @@ -134,12 +138,14 @@ func DialContext(ctx context.Context, target string, failFast FailFast, opts ... newopts = append(newopts, opts...) var err error + grpcDialOptionsMu.Lock() for _, grpcDialOptionInitializer := range grpcDialOptions { newopts, err = grpcDialOptionInitializer(newopts) if err != nil { log.Fatalf("There was an error initializing client grpc.DialOption: %v", err) } } + grpcDialOptionsMu.Unlock() newopts = append(newopts, interceptors()...)