Skip to content

Commit

Permalink
Implementing HealthCheck grpc handlers
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot committed Sep 20, 2024
1 parent 0f848d8 commit 6e1bfa5
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 7 deletions.
18 changes: 18 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,24 @@ querier:
# CLI flag: -querier.store-gateway-client.grpc-compression
[grpc_compression: <string> | default = ""]

# EXPERIMENTAL: If enabled, gRPC clients perform health checks for each
# target and fail the request if the target is marked as unhealthy.
healthcheck_config:
# The number of consecutive failed health checks required before
# considering a target unhealthy. 0 means disabled.
# CLI flag: -querier.store-gateway-client.unhealthy-threshold
[unhealthy_threshold: <int> | default = 0]

# The approximate amount of time between health checks of an individual
# target.
# CLI flag: -querier.store-gateway-client.interval
[interval: <duration> | default = 1s]

# The amount of time during which no response from a target means a failed
# health check.
# CLI flag: -querier.store-gateway-client.timeout
[timeout: <duration> | default = 1s]

# If enabled, store gateway query stats will be logged using `info` log level.
# CLI flag: -querier.store-gateway-query-stats-enabled
[store_gateway_query_stats: <boolean> | default = true]
Expand Down
36 changes: 36 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3080,6 +3080,24 @@ grpc_client_config:
# CLI flag: -ingester.client.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]
# EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target
# and fail the request if the target is marked as unhealthy.
healthcheck_config:
# The number of consecutive failed health checks required before considering
# a target unhealthy. 0 means disabled.
# CLI flag: -ingester.client.unhealthy-threshold
[unhealthy_threshold: <int> | default = 0]
# The approximate amount of time between health checks of an individual
# target.
# CLI flag: -ingester.client.interval
[interval: <duration> | default = 1s]
# The amount of time during which no response from a target means a failed
# health check.
# CLI flag: -ingester.client.timeout
[timeout: <duration> | default = 1s]
# Max inflight push requests that this ingester client can handle. This limit is
# per-ingester-client. Additional requests will be rejected. 0 = unlimited.
# CLI flag: -ingester.client.max-inflight-push-requests
Expand Down Expand Up @@ -3787,6 +3805,24 @@ store_gateway_client:
# CLI flag: -querier.store-gateway-client.grpc-compression
[grpc_compression: <string> | default = ""]
# EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target
# and fail the request if the target is marked as unhealthy.
healthcheck_config:
# The number of consecutive failed health checks required before considering
# a target unhealthy. 0 means disabled.
# CLI flag: -querier.store-gateway-client.unhealthy-threshold
[unhealthy_threshold: <int> | default = 0]
# The approximate amount of time between health checks of an individual
# target.
# CLI flag: -querier.store-gateway-client.interval
[interval: <duration> | default = 1s]
# The amount of time during which no response from a target means a failed
# health check.
# CLI flag: -querier.store-gateway-client.timeout
[timeout: <duration> | default = 1s]
# If enabled, store gateway query stats will be logged using `info` log level.
# CLI flag: -querier.store-gateway-query-stats-enabled
[store_gateway_query_stats: <boolean> | default = true]
Expand Down
20 changes: 18 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"net/http"

"github.com/go-kit/log"
Expand Down Expand Up @@ -65,6 +66,7 @@ const (
Server string = "server"
Distributor string = "distributor"
DistributorService string = "distributor-service"
GrpcClientService string = "grpcclient-service"
Ingester string = "ingester"
IngesterService string = "ingester-service"
Flusher string = "flusher"
Expand Down Expand Up @@ -216,6 +218,19 @@ func (t *Cortex) initDistributorService() (serv services.Service, err error) {
return t.Distributor, nil
}

func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) {
s := grpcclient.NewHealthCheckInterceptors(util_log.Logger)
if t.Cfg.IngesterClient.GRPCClientConfig.HealthCheckConfig.UnhealthyThreshold > 0 {
t.Cfg.IngesterClient.GRPCClientConfig.HealthCheckConfig.HealthCheckInterceptors = s
}

if t.Cfg.Querier.StoreGatewayClient.HealthCheckConfig.UnhealthyThreshold > 0 {
t.Cfg.Querier.StoreGatewayClient.HealthCheckConfig.HealthCheckInterceptors = s
}

return s, nil
}

func (t *Cortex) initDistributor() (serv services.Service, err error) {
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor)

Expand Down Expand Up @@ -740,6 +755,7 @@ func (t *Cortex) setupModuleManager() error {
mm.RegisterModule(OverridesExporter, t.initOverridesExporter)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(DistributorService, t.initDistributorService, modules.UserInvisibleModule)
mm.RegisterModule(GrpcClientService, t.initGrpcClientServices, modules.UserInvisibleModule)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(IngesterService, t.initIngesterService, modules.UserInvisibleModule)
mm.RegisterModule(Flusher, t.initFlusher)
Expand Down Expand Up @@ -768,14 +784,14 @@ func (t *Cortex) setupModuleManager() error {
Ring: {API, RuntimeConfig, MemberlistKV},
Overrides: {RuntimeConfig},
OverridesExporter: {RuntimeConfig},
Distributor: {DistributorService, API},
Distributor: {DistributorService, API, GrpcClientService},
DistributorService: {Ring, Overrides},
Ingester: {IngesterService, Overrides, API},
IngesterService: {Overrides, RuntimeConfig, MemberlistKV},
Flusher: {Overrides, API},
Queryable: {Overrides, DistributorService, Overrides, Ring, API, StoreQueryable, MemberlistKV},
Querier: {TenantFederation},
StoreQueryable: {Overrides, Overrides, MemberlistKV},
StoreQueryable: {Overrides, Overrides, MemberlistKV, GrpcClientService},
QueryFrontendTripperware: {API, Overrides},
QueryFrontend: {QueryFrontendTripperware},
QueryScheduler: {API, Overrides},
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (c *closableHealthAndIngesterClient) Close() error {

// Config is the configuration struct for the ingester client
type Config struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
GRPCClientConfig grpcclient.ConfigWithHealthCheck `yaml:"grpc_client_config"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
}

// RegisterFlags registers configuration settings used by the ingester client config.
Expand Down
9 changes: 6 additions & 3 deletions pkg/querier/store_gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConf
MaxRecvMsgSize: 100 << 20,
MaxSendMsgSize: 16 << 20,
GRPCCompression: clientConfig.GRPCCompression,
HealthCheckConfig: clientConfig.HealthCheckConfig,
RateLimit: 0,
RateLimitBurst: 0,
BackoffOnRatelimits: false,
Expand All @@ -96,13 +97,15 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConf
}

type ClientConfig struct {
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
GRPCCompression string `yaml:"grpc_compression"`
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
GRPCCompression string `yaml:"grpc_compression"`
HealthCheckConfig grpcclient.HealthCheckConfig `yaml:"healthcheck_config" doc:"description=EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target and fail the request if the target is marked as unhealthy."`
}

func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS for gRPC client connecting to store-gateway.")
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy' and '' (disable compression)")
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
cfg.HealthCheckConfig.RegisterFlagsWithPrefix(prefix, f)
}
16 changes: 16 additions & 0 deletions pkg/util/grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,28 @@ type Config struct {
BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"`
BackoffConfig backoff.Config `yaml:"backoff_config"`

HealthCheckConfig HealthCheckConfig `yaml:"-"`

TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
SignWriteRequestsEnabled bool `yaml:"-"`
}

type ConfigWithHealthCheck struct {
Config `yaml:",inline"`
HealthCheckConfig HealthCheckConfig `yaml:"healthcheck_config" doc:"description=EXPERIMENTAL: If enabled, gRPC clients perform health checks for each target and fail the request if the target is marked as unhealthy."`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", "", f)
}

func (cfg *ConfigWithHealthCheck) RegisterFlagsWithPrefix(prefix, defaultGrpcCompression string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(prefix, defaultGrpcCompression, f)
cfg.HealthCheckConfig.RegisterFlagsWithPrefix(prefix, f)
}

// RegisterFlagsWithPrefix registers flags with prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix, defaultGrpcCompression string, f *flag.FlagSet) {
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
Expand Down Expand Up @@ -91,6 +103,10 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep
if cfg.RateLimit > 0 {
unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewRateLimiter(cfg)}, unaryClientInterceptors...)
}
if cfg.HealthCheckConfig.HealthCheckInterceptors != nil {
unaryClientInterceptors = append(unaryClientInterceptors, cfg.HealthCheckConfig.UnaryHealthCheckInterceptor(*cfg))
streamClientInterceptors = append(streamClientInterceptors, cfg.HealthCheckConfig.StreamClientInterceptor(*cfg))
}

if cfg.SignWriteRequestsEnabled {
unaryClientInterceptors = append(unaryClientInterceptors, UnarySigningClientInterceptor)
Expand Down
Loading

0 comments on commit 6e1bfa5

Please sign in to comment.