Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic splitting by interval for range queries #6458

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4193,6 +4193,23 @@ The `query_range_config` configures the query splitting and caching in the Corte
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]

dynamic_query_splits:
afhassan marked this conversation as resolved.
Show resolved Hide resolved
# [EXPERIMENTAL] Maximum number of shards for a query, 0 disables it.
# Dynamically uses a multiple of `split-queries-by-interval` to maintain the
# number of splits below the limit. If vertical sharding is enabled for a
# query, the combined total number of vertical and interval shards is kept
# below this limit.
# CLI flag: -querier.max-shards-per-query
[max_shards_per_query: <int> | default = 0]

# [EXPERIMENTAL] Max total duration of data fetched by all query shards from
# storage, 0 disables it. Dynamically uses a multiple of
# `split-queries-by-interval` to ensure the total fetched duration of data is
afhassan marked this conversation as resolved.
Show resolved Hide resolved
# lower than the value set. It takes into account additional data fetched by
# matrix selectors and subqueries.
# CLI flag: -querier.max-duration-of-data-fetched-from-storage-per-query
[max_duration_of_data_fetched_from_storage_per_query: <duration> | default = 0s]
afhassan marked this conversation as resolved.
Show resolved Hide resolved

# Mutate incoming queries to align their start and end with their step.
# CLI flag: -querier.align-querier-with-step
[align_queries_with_step: <boolean> | default = false]
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,6 @@ Currently experimental features are:
- Enable string interning for metrics labels by setting `-ingester.labels-string-interning-enabled` on Ingester.
- Query-frontend: query rejection (`-frontend.query-rejection.enabled`)
- Querier: protobuf codec (`-api.querier-default-codec`)
- Query-frontend: dynamic query splits
- `querier.max-shards-per-query` (int) CLI flag
afhassan marked this conversation as resolved.
Show resolved Hide resolved
- `querier.max-duration-of-data-fetched-from-storage-per-query` (duration) CLI flag
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
prometheusCodec,
shardedPrometheusCodec,
t.Cfg.Querier.LookbackDelta,
t.Cfg.Querier.QueryStoreAfter,
)
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
splitQueries := stats.LoadSplitQueries()
dataSelectMaxTime := stats.LoadDataSelectMaxTime()
dataSelectMinTime := stats.LoadDataSelectMinTime()
splitInterval := stats.LoadSplitInterval()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
Expand Down Expand Up @@ -425,6 +426,10 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
logMessage = append(logMessage, "query_storage_wall_time_seconds", sws)
}

if splitInterval > 0 {
logMessage = append(logMessage, "split_interval", splitInterval.String())
}

if error != nil {
s, ok := status.FromError(error)
if !ok {
Expand Down
9 changes: 9 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type QueryStats struct {
Priority int64
DataSelectMaxTime int64
DataSelectMinTime int64
SplitInterval time.Duration
m sync.Mutex
}

Expand Down Expand Up @@ -287,6 +288,14 @@ func (s *QueryStats) LoadDataSelectMinTime() int64 {
return atomic.LoadInt64(&s.DataSelectMinTime)
}

func (s *QueryStats) LoadSplitInterval() time.Duration {
if s == nil {
return 0
}

return s.SplitInterval
}

func (s *QueryStats) AddStoreGatewayTouchedPostings(count uint64) {
if s == nil {
return
Expand Down
17 changes: 17 additions & 0 deletions pkg/querier/tripperware/queryrange/dynamic_query_splits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package queryrange
afhassan marked this conversation as resolved.
Show resolved Hide resolved

import (
"flag"
"time"
)

type DynamicQuerySplitsConfig struct {
MaxShardsPerQuery int `yaml:"max_shards_per_query"`
MaxDurationOfDataFetchedFromStoragePerQuery time.Duration `yaml:"max_duration_of_data_fetched_from_storage_per_query"`
}

// RegisterFlags registers flags foy dynamic query splits
func (cfg *DynamicQuerySplitsConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxShardsPerQuery, "querier.max-shards-per-query", 0, "[EXPERIMENTAL] Maximum number of shards for a query, 0 disables it. Dynamically uses a multiple of `split-queries-by-interval` to maintain the number of splits below the limit. If vertical sharding is enabled for a query, the combined total number of vertical and interval shards is kept below this limit.")
f.DurationVar(&cfg.MaxDurationOfDataFetchedFromStoragePerQuery, "querier.max-duration-of-data-fetched-from-storage-per-query", 0, "[EXPERIMENTAL] Max total duration of data fetched by all query shards from storage, 0 disables it. Dynamically uses a multiple of `split-queries-by-interval` to ensure the total fetched duration of data is lower than the value set. It takes into account additional data fetched by matrix selectors and subqueries.")
}
27 changes: 20 additions & 7 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ const day = 24 * time.Hour

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// Query splits config
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
DynamicQuerySplitsConfig DynamicQuerySplitsConfig `yaml:"dynamic_query_splits"`

AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`

Expand All @@ -54,6 +57,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
cfg.ResultsCacheConfig.RegisterFlags(f)
cfg.DynamicQuerySplitsConfig.RegisterFlags(f)
}

// Validate validates the config.
Expand All @@ -66,6 +70,11 @@ func (cfg *Config) Validate(qCfg querier.Config) error {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery > 0 {
if cfg.SplitQueriesByInterval <= 0 {
return errors.New("configs under dynamic-query-splits requires that a value for split-queries-by-interval is set.")
}
}
return nil
}

Expand All @@ -80,6 +89,7 @@ func Middlewares(
prometheusCodec tripperware.Codec,
shardedPrometheusCodec tripperware.Codec,
lookbackDelta time.Duration,
queryStoreAfter time.Duration,
) ([]tripperware.Middleware, cache.Cache, error) {
// Metric used to keep track of each middleware execution duration.
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
Expand All @@ -89,8 +99,11 @@ func Middlewares(
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("step_align", metrics), StepAlignMiddleware)
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
intervalFn := staticIntervalFn(cfg)
if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery > 0 {
intervalFn = dynamicIntervalFn(cfg, limits, queryAnalyzer, queryStoreAfter, lookbackDelta)
}
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer, queryStoreAfter, lookbackDelta))
}

var c cache.Cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestRoundTrip(t *testing.T) {
PrometheusCodec,
ShardedPrometheusCodec,
5*time.Minute,
24*time.Hour,
)
require.NoError(t, err)

Expand Down
159 changes: 156 additions & 3 deletions pkg/querier/tripperware/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/querysharding"
"github.com/weaveworks/common/httpgrpc"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
)

type IntervalFn func(r tripperware.Request) time.Duration
type IntervalFn func(ctx context.Context, r tripperware.Request) (time.Duration, error)

// SplitByIntervalMiddleware creates a new Middleware that splits requests by a given interval.
func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer) tripperware.Middleware {
func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer, queryStoreAfter time.Duration, lookbackDelta time.Duration) tripperware.Middleware {
return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler {
return splitByInterval{
next: next,
Expand All @@ -28,6 +33,8 @@ func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, m
Name: "frontend_split_queries_total",
Help: "Total number of underlying query requests after the split by interval is applied",
}),
queryStoreAfter: queryStoreAfter,
lookbackDelta: lookbackDelta,
}
})
}
Expand All @@ -40,17 +47,28 @@ type splitByInterval struct {

// Metrics.
splitByCounter prometheus.Counter

queryStoreAfter time.Duration
lookbackDelta time.Duration
}

func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripperware.Response, error) {
// First we're going to build new requests, one for each day, taking care
// to line up the boundaries with step.
reqs, err := splitQuery(r, s.interval(r))
interval, err := s.interval(ctx, r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, err.Error())
}
reqs, err := splitQuery(r, interval)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
s.splitByCounter.Add(float64(len(reqs)))

stats := querier_stats.FromContext(ctx)

This comment was marked as resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is only used to log the interval size used for splitting the query.

if stats != nil {
stats.SplitInterval = interval
}
reqResps, err := tripperware.DoRequests(ctx, s.next, reqs, s.limits)
if err != nil {
return nil, err
Expand Down Expand Up @@ -135,3 +153,138 @@ func nextIntervalBoundary(t, step int64, interval time.Duration) int64 {
}
return target
}

func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
return func(_ context.Context, _ tripperware.Request) (time.Duration, error) {
return cfg.SplitQueriesByInterval, nil
}
}

func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, queryStoreAfter time.Duration, lookbackDelta time.Duration) func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
return func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return cfg.SplitQueriesByInterval, err
}

analysis, err := queryAnalyzer.Analyze(r.GetQuery())
if err != nil {
return cfg.SplitQueriesByInterval, err
}

queryVerticalShardSize := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize)
if queryVerticalShardSize <= 0 || !analysis.IsShardable() {
queryVerticalShardSize = 1
}

queryExpr, err := parser.ParseExpr(r.GetQuery())
if err != nil {
return cfg.SplitQueriesByInterval, err
}

// Calculates: duration of data fetched if the query was not sharded, the original range covered by the query start and end times,
// and the duration of data fetched by lookbackDelta for the first split
durationFetchedWithoutSharding, originalRangeCount, firstSplitLookbackDeltaCompensation := durationFetchedByQuery(queryExpr, r, queryStoreAfter, lookbackDelta, cfg.SplitQueriesByInterval, time.Now())
extraDaysFetchedPerShard := durationFetchedWithoutSharding - originalRangeCount

// Calculate the extra duration of data fetched by lookbackDelta per each split except the first split
nextIntervalStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), cfg.SplitQueriesByInterval) + r.GetStep()
nextIntervalReq := r.WithStartEnd(nextIntervalStart, r.GetEnd())
_, _, lookbackDeltaCompensation := durationFetchedByQuery(queryExpr, nextIntervalReq, queryStoreAfter, lookbackDelta, cfg.SplitQueriesByInterval, time.Now())

var maxSplitsByFetchedDaysOfData int
if cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery > 0 {
if extraDaysFetchedPerShard == 0 {
extraDaysFetchedPerShard = 1 // prevent division by 0
}
maxIntervalsFetchedByQuery := int(cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery / cfg.SplitQueriesByInterval)
maxSplitsByFetchedDaysOfData = ((maxIntervalsFetchedByQuery / queryVerticalShardSize) - originalRangeCount + firstSplitLookbackDeltaCompensation) / (extraDaysFetchedPerShard + lookbackDeltaCompensation)
if maxSplitsByFetchedDaysOfData <= 0 {
maxSplitsByFetchedDaysOfData = 1
}
}

var maxSplitsByConfig int
if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 {
maxSplitsByConfig = cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery / queryVerticalShardSize
if maxSplitsByConfig <= 0 {
maxSplitsByConfig = 1
}
}

var maxSplits time.Duration
switch {
case maxSplitsByFetchedDaysOfData <= 0 && maxSplitsByConfig <= 0:
return cfg.SplitQueriesByInterval, nil
case maxSplitsByFetchedDaysOfData <= 0:
maxSplits = time.Duration(maxSplitsByConfig)
case maxSplitsByConfig <= 0:
maxSplits = time.Duration(maxSplitsByFetchedDaysOfData)
default:
// Use the more restricting shard limit
maxSplits = time.Duration(min(maxSplitsByConfig, maxSplitsByFetchedDaysOfData))
}

queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond))
baseInterval := cfg.SplitQueriesByInterval

// Calculate the multiple of interval needed to shard query to <= maxSplits
n1 := (queryRange + baseInterval*maxSplits - 1) / (baseInterval * maxSplits)
if n1 <= 0 {
n1 = 1
}

// The first split can be truncated and not cover the full length of n*interval.
// So we remove it and calculate the multiple of interval needed to shard <= maxSplits-1
nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), n1*baseInterval) + r.GetStep()
queryRangeWithoutFirstSplit := time.Duration((r.GetEnd() - nextSplitStart) * int64(time.Millisecond))
var n2 time.Duration
if maxSplits > 1 {
n2 = (queryRangeWithoutFirstSplit + baseInterval*(maxSplits-1) - 1) / (baseInterval * (maxSplits - 1))
} else {
// If maxSplits is <= 1 then we should not shard at all
n1 += (queryRangeWithoutFirstSplit + baseInterval - 1) / baseInterval
}
n := max(n1, n2)
return n * cfg.SplitQueriesByInterval, nil
}
}

// calculates the total duration of data the query will have to fetch from storage as a multiple of baseInterval.
// also returns the total time range fetched by the original query start and end times
func durationFetchedByQuery(expr parser.Expr, req tripperware.Request, queryStoreAfter, lookbackDelta time.Duration, baseInterval time.Duration, now time.Time) (durationFetchedCount int, originalRangeCount int, lookbackDeltaCount int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Follow the convention for adding comments.

https://tip.golang.org/doc/comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote the comment to be more clear. The convention seems to be general guidelines not a specific format but I tried to follow it as much as possible.

afhassan marked this conversation as resolved.
Show resolved Hide resolved
durationFetchedCount = 0
originalRangeCount = 0
lookbackDeltaCount = 0
baseIntervalMillis := util.DurationMilliseconds(baseInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why the func returns as multiples of baseInterval rather than just a duration?
Since the function is called durationFetchedByQuery, I'd expect it to return a duration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calculations done with these are all based on integers and rounding down is important for the result. I refactored the function to be more readable and changed its name. Let me know if you think we should make more changes to it.

queryStoreMaxT := util.TimeToMillis(now.Add(-queryStoreAfter))
afhassan marked this conversation as resolved.
Show resolved Hide resolved
var evalRange time.Duration

parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.VectorSelector:
originalRangeCount += int((req.GetEnd()/baseIntervalMillis)-(req.GetStart()/baseIntervalMillis)) + 1
start, end := util.GetTimeRangesForSelector(req.GetStart(), req.GetEnd(), 0, n, path, evalRange)
// Query shouldn't touch Store Gateway.
if start > queryStoreMaxT {
return nil
} else {
// If the query split needs to query store, cap the max time to now - queryStoreAfter.
end = min(end, queryStoreMaxT)
}

startIntervalIndex := start / baseIntervalMillis
endIntervalIndex := end / baseIntervalMillis
durationFetchedCount += int(endIntervalIndex-startIntervalIndex) + 1

if evalRange == 0 && (start-util.DurationMilliseconds(lookbackDelta))/baseIntervalMillis == start/baseIntervalMillis {
lookbackDeltaCount += 1
}
evalRange = 0
case *parser.MatrixSelector:
evalRange = n.Range
}
return nil
})
return durationFetchedCount, originalRangeCount, lookbackDeltaCount
}
Loading
Loading