Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate -distributor.shard-by-all-labels #6022

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
* [CHANGE] Ingester: Remove `-querier.query-store-for-labels-enabled` flag. Querying long-term store for labels is always enabled. #5984
* [CHANGE] Distributor: Remove `-distributor.shard-by-all-labels` flag. Shard by all labels is always enabled now. The flag is ignored now and will be removed in cortex v1.19.0 #6021
* [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
auth_enabled: false

distributor:
shard_by_all_labels: true
pool:
health_check_ingesters: true

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
auth_enabled: false

distributor:
shard_by_all_labels: true
pool:
health_check_ingesters: true

Expand Down
1 change: 0 additions & 1 deletion development/tsdb-blocks-storage-s3/config/cortex.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
auth_enabled: false

distributor:
shard_by_all_labels: true
pool:
health_check_ingesters: true

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
auth_enabled: false

distributor:
shard_by_all_labels: true
pool:
health_check_ingesters: true

Expand Down
14 changes: 0 additions & 14 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,6 @@ The next three options only apply when the querier is used together with the Que

## Distributor

- `-distributor.shard-by-all-labels`

In the original Cortex design, samples were sharded amongst distributors by the combination of (userid, metric name). Sharding by metric name was designed to reduce the number of ingesters you need to hit on the read path; the downside was that you could hotspot the write path.

In hindsight, this seems like the wrong choice: we do many orders of magnitude more writes than reads, and ingester reads are in-memory and cheap. It seems the right thing to do is to use all the labels to shard, improving load balancing and support for very high cardinality metrics.

Set this flag to `true` for the new behaviour.

Important to note is that when setting this flag to `true`, it has to be set on both the distributor and the querier (called `-distributor.shard-by-all-labels` on Querier as well). If the flag is only set on the distributor and not on the querier, you will get incomplete query results because not all ingesters are queried.

**Upgrade notes**: As this flag also makes all queries always read from all ingesters, the upgrade path is pretty trivial; just enable the flag. When you do enable it, you'll see a spike in the number of active series as the writes are "reshuffled" amongst the ingesters, but over the next stale period all the old series will be flushed, and you should end up with much better load balancing. With this flag enabled in the queriers, reads will always catch all the data from all ingesters.

**Warning**: disabling this flag can lead to a much less balanced distribution of load among the ingesters.

- `-distributor.extra-query-delay`
This is used by a component with an embedded distributor (Querier and Ruler) to control how long to wait until sending more than the minimum amount of queries needed for a successful response.

Expand Down
5 changes: 0 additions & 5 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2447,11 +2447,6 @@ ha_tracker:
# CLI flag: -distributor.sharding-strategy
[sharding_strategy: <string> | default = "default"]

# Distribute samples based on all labels, as opposed to solely by user and
# metric name.
# CLI flag: -distributor.shard-by-all-labels
[shard_by_all_labels: <boolean> | default = false]

# Try writing to an additional ingester in the presence of an ingester not in
# the ACTIVE state. It is useful to disable this along with
# -ingester.unregister-on-shutdown=false in order to not spread samples to extra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ server:
grpc_server_max_concurrent_streams: 1000

distributor:
shard_by_all_labels: true
pool:
health_check_ingesters: true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ server:
grpc_server_max_concurrent_streams: 1000

distributor:
shard_by_all_labels: true
pool:
health_check_ingesters: true

Expand Down
1 change: 0 additions & 1 deletion docs/configuration/single-process-config-blocks-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ server:
grpc_server_max_concurrent_streams: 1000

distributor:
shard_by_all_labels: true
pool:
health_check_ingesters: true

Expand Down
1 change: 0 additions & 1 deletion docs/configuration/single-process-config-blocks-tls.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ server:


distributor:
shard_by_all_labels: true
pool:
health_check_ingesters: true

Expand Down
1 change: 0 additions & 1 deletion docs/configuration/single-process-config-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ server:
grpc_server_max_concurrent_streams: 1000

distributor:
shard_by_all_labels: true
pool:
health_check_ingesters: true

Expand Down
1 change: 0 additions & 1 deletion docs/getting-started/cortex-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ server:
grpc_server_max_concurrent_streams: 1000

distributor:
shard_by_all_labels: true
pool:
health_check_ingesters: true

Expand Down
1 change: 0 additions & 1 deletion docs/getting-started/cortex-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ config:
distributor:
# -- Distribute samples based on all labels, as opposed to solely by user and
# metric name.
shard_by_all_labels: true
pool:
health_check_ingesters: true
memberlist:
Expand Down
1 change: 0 additions & 1 deletion integration/ingester_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func TestIngesterGlobalLimits(t *testing.T) {

flags := BlocksStorageFlags()
flags["-distributor.replication-factor"] = "1"
flags["-distributor.shard-by-all-labels"] = "true"
flags["-distributor.sharding-strategy"] = testData.shardingStrategy
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)
flags["-ingester.max-series-per-user"] = "0"
Expand Down
1 change: 0 additions & 1 deletion integration/ingester_sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func TestIngesterSharding(t *testing.T) {
defer s.Close()

flags := BlocksStorageFlags()
flags["-distributor.shard-by-all-labels"] = "true"
flags["-distributor.sharding-strategy"] = testData.shardingStrategy
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)

Expand Down
2 changes: 0 additions & 2 deletions integration/zone_aware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ func TestZoneAwareReplication(t *testing.T) {
defer s.Close()

flags := BlocksStorageFlags()
flags["-distributor.shard-by-all-labels"] = "true"
flags["-distributor.replication-factor"] = "3"
flags["-distributor.zone-awareness-enabled"] = "true"

Expand Down Expand Up @@ -158,7 +157,6 @@ func TestZoneResultsQuorum(t *testing.T) {
defer s.Close()

flags := BlocksStorageFlags()
flags["-distributor.shard-by-all-labels"] = "true"
flags["-distributor.replication-factor"] = "3"
flags["-distributor.zone-awareness-enabled"] = "true"

Expand Down
3 changes: 0 additions & 3 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.BlocksStorage.Validate(); err != nil {
return errors.Wrap(err, "invalid TSDB config")
}
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil {
return errors.Wrap(err, "invalid limits config")
}
if err := c.Distributor.Validate(c.LimitsConfig); err != nil {
return errors.Wrap(err, "invalid distributor config")
}
Expand Down
1 change: 0 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig)
t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig)
t.Cfg.Ingester.QueryIngestersWithin = t.Cfg.Querier.QueryIngestersWithin
t.tsdbIngesterConfig()
Expand Down
22 changes: 5 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
util_math "github.com/cortexproject/cortex/pkg/util/math"
Expand Down Expand Up @@ -138,7 +138,6 @@ type Config struct {
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
ExtendWrites bool `yaml:"extend_writes"`
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`

Expand Down Expand Up @@ -178,14 +177,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "remote_write API max receive message size (bytes).")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
tesla59 marked this conversation as resolved.
Show resolved Hide resolved
f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.")
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.")
f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names and values for now), only results from quorum number of zones will be included.")

f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")

flagext.DeprecatedFlag(f, "distributor.shard-by-all-labels", "Deprecated: Setting this flag will not take any effect, shard by all labels is always enabled now.", util_log.Logger)
}

// Validate config and returns error on failure
Expand Down Expand Up @@ -455,23 +455,11 @@ func (d *Distributor) stopping(_ error) error {
}

func (d *Distributor) tokenForLabels(userID string, labels []cortexpb.LabelAdapter) (uint32, error) {
if d.cfg.ShardByAllLabels {
return shardByAllLabels(userID, labels), nil
}

unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(labels)
if err != nil {
return 0, err
}
return shardByMetricName(userID, unsafeMetricName), nil
return shardByAllLabels(userID, labels), nil
}

func (d *Distributor) tokenForMetadata(userID string, metricName string) uint32 {
if d.cfg.ShardByAllLabels {
return shardByMetricName(userID, metricName)
}

return shardByUser(userID)
return shardByMetricName(userID, metricName)
}

// shardByMetricName returns the token for the given metric. The provided metricName
Expand Down
Loading
Loading