Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic splitting by interval for range queries #6458

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
28 changes: 21 additions & 7 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package queryrange

import (
"flag"
"math"
"time"

"github.com/go-kit/log"
Expand All @@ -34,11 +35,12 @@ const day = 24 * time.Hour

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
SplitQueriesByIntervalMaxSplits int `yaml:"split_queries_by_interval_max_splits"`
afhassan marked this conversation as resolved.
Show resolved Hide resolved
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`

Expand All @@ -50,6 +52,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.IntVar(&cfg.SplitQueriesByIntervalMaxSplits, "querier.split-queries-by-interval-max-splits", 0, "Maximum number of splits by interval for a query, 0 disables it. Uses a multiple of `split-queries-by-interval` to ensure the number of splits remain below the limit.")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
Expand All @@ -66,6 +69,9 @@ func (cfg *Config) Validate(qCfg querier.Config) error {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
if cfg.SplitQueriesByIntervalMaxSplits > 0 && cfg.SplitQueriesByInterval <= 0 {
return errors.New("split-queries-by-interval-max-splits requires that a value for split-queries-by-interval is set.")
}
return nil
}

Expand All @@ -89,8 +95,16 @@ func Middlewares(
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("step_align", metrics), StepAlignMiddleware)
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
intervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
if cfg.SplitQueriesByIntervalMaxSplits != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the limit be applied to both range splits and vertical spits?

func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {

Copy link
Contributor Author

@afhassan afhassan Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this sets a limit for the total range and vertical splits for a given query. The number of vertical shards is static, so the max number of of splits for a given query becomes split_queries_by_interval_max_splits x query_vertical_shard_size. Because of this adding a separate limit for vertical sharding when the number of vertical shards is a static config would be redundant because we limit it already.

intervalFn = func(r tripperware.Request) time.Duration {
queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond))
baseInterval := cfg.SplitQueriesByInterval
n := int(math.Ceil(float64(queryRange) / float64(baseInterval*time.Duration(cfg.SplitQueriesByIntervalMaxSplits))))
return time.Duration(n) * cfg.SplitQueriesByInterval
}
}
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer))
}

var c cache.Cache
Expand Down
Loading