Skip to content

Commit

Permalink
add unit tests for getIntervalFromMaxSplits
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmed Hassan <[email protected]>
  • Loading branch information
afhassan committed Jan 27, 2025
1 parent 3e95b45 commit cfc5078
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 65 deletions.
17 changes: 0 additions & 17 deletions pkg/querier/tripperware/queryrange/dynamic_query_splits.go

This file was deleted.

9 changes: 5 additions & 4 deletions pkg/querier/tripperware/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,10 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
}

type mockLimits struct {
maxQueryLookback time.Duration
maxQueryLength time.Duration
maxCacheFreshness time.Duration
maxQueryLookback time.Duration
maxQueryLength time.Duration
maxCacheFreshness time.Duration
queryVerticalShardSize int
}

func (m mockLimits) MaxQueryLookback(string) time.Duration {
Expand All @@ -255,7 +256,7 @@ func (m mockLimits) MaxCacheFreshness(string) time.Duration {
}

func (m mockLimits) QueryVerticalShardSize(userID string) int {
return 0
return m.queryVerticalShardSize
}

func (m mockLimits) QueryPriority(userID string) validation.QueryPriority {
Expand Down
15 changes: 13 additions & 2 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,25 @@ func (cfg *Config) Validate(qCfg querier.Config) error {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery > 0 {
if cfg.DynamicQuerySplitsConfig.MaxSplitsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxFetchedStorageDataDurationPerQuery > 0 {
if cfg.SplitQueriesByInterval <= 0 {
return errors.New("configs under dynamic-query-splits requires that a value for split-queries-by-interval is set.")
}
}
return nil
}

type DynamicQuerySplitsConfig struct {
MaxSplitsPerQuery int `yaml:"max_splits_per_query"`
MaxFetchedStorageDataDurationPerQuery time.Duration `yaml:"max_fetched_storage_data_duration_per_query"`
}

// RegisterFlags registers flags foy dynamic query splits
func (cfg *DynamicQuerySplitsConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxSplitsPerQuery, "querier.max-splits-per-query", 0, "[EXPERIMENTAL] Maximum number of splits for a query, 0 disables it. Dynamically uses a multiple of split interval to maintain a total number of splits below the set value. If vertical sharding is enabled for a query, the combined total number of vertical and interval splits is kept below this value.")
f.DurationVar(&cfg.MaxFetchedStorageDataDurationPerQuery, "querier.max-fetched-storage-data-duration-per-query", 0, "[EXPERIMENTAL] Max total duration of data fetched from storage by all query splits, 0 disables it. Dynamically uses a multiple of split interval to maintain a total fetched duration of data lower than the value set. It takes into account additional duration fetched by matrix selectors and subqueries.")
}

// Middlewares returns list of middlewares that should be applied for range query.
func Middlewares(
cfg Config,
Expand All @@ -99,7 +110,7 @@ func Middlewares(
}
if cfg.SplitQueriesByInterval != 0 {
intervalFn := staticIntervalFn(cfg)
if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery > 0 {
if cfg.DynamicQuerySplitsConfig.MaxSplitsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxFetchedStorageDataDurationPerQuery > 0 {
intervalFn = dynamicIntervalFn(cfg, limits, queryAnalyzer, lookbackDelta)
}
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer, lookbackDelta))
Expand Down
77 changes: 45 additions & 32 deletions pkg/querier/tripperware/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Reques
func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, lookbackDelta time.Duration) func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
return func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
baseInterval := cfg.SplitQueriesByInterval
maxDurationFetchedConfig := cfg.DynamicQuerySplitsConfig.MaxDurationOfDataFetchedFromStoragePerQuery
maxSplitsConfig := cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery
maxDurationFetchedConfig := cfg.DynamicQuerySplitsConfig.MaxFetchedStorageDataDurationPerQuery
maxSplitsConfig := cfg.DynamicQuerySplitsConfig.MaxSplitsPerQuery

queryVerticalShardSize, err := getMaxVerticalShardSizeForQuery(ctx, r, limits, queryAnalyzer)
if err != nil {
Expand All @@ -173,33 +173,37 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer
return baseInterval, err
}

// First we analyze the query using original start-end time. Additional duration fetched by lookbackDelta here only reflects the start time of first split
queryRangeIntervalsCount, extraIntervalsPerSplitCount, firstSplitLookbackDeltaIntervals := analyzeDurationFetchedByQuery(queryExpr, r, baseInterval, lookbackDelta)
// First analyze the query using original start-end time. Additional duration fetched by lookbackDelta here only reflects the start time of first split
queryRangeIntervalsCount, extraIntervalsPerSplitCount, firstSplitLookbackDeltaIntervals := analyzeDurationFetchedByQuery(queryExpr, r.GetStart(), r.GetEnd(), baseInterval, lookbackDelta)
if extraIntervalsPerSplitCount == 0 {
extraIntervalsPerSplitCount = 1 // prevent division by 0
extraIntervalsPerSplitCount = 1 // avoid division by 0
}

// Next we analyze the query using the next split start time to find the additional duration fetched by lookbackDelta for all subsequent splits
// Next analyze the query using the next split start time to find the additional duration fetched by lookbackDelta for other subsequent splits
nextIntervalStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), baseInterval) + r.GetStep()
nextIntervalReq := r.WithStartEnd(nextIntervalStart, r.GetEnd())
_, _, otherSplitsLookbackDeltaIntervals := analyzeDurationFetchedByQuery(queryExpr, nextIntervalReq, baseInterval, lookbackDelta)
_, _, otherSplitsLookbackDeltaIntervals := analyzeDurationFetchedByQuery(queryExpr, nextIntervalStart, r.GetEnd(), baseInterval, lookbackDelta)

// By default we subtract the 'first split' duration fetched by loookbackDelta, and divide by the 'other splits' duration fetched by loookbackDelta.
// By default subtract the 'first split' duration fetched by loookbackDelta, and divide by the 'other splits' duration fetched by loookbackDelta.
if firstSplitLookbackDeltaIntervals > 0 && otherSplitsLookbackDeltaIntervals > 0 {
firstSplitLookbackDeltaIntervals = 0 // Dividing is enough if additional duration is fetched by loookbackDelta for all splits
} else if otherSplitsLookbackDeltaIntervals > 0 {
firstSplitLookbackDeltaIntervals = otherSplitsLookbackDeltaIntervals * -1 // Adding instead of subtracting for first split, if additional duration is fetched by loookbackDelta for all splits except first one
}

// Find the max number of splits that will fetch less than MaxFetchedStorageDataDurationPerQuery
var maxSplitsByDurationFetched int
if maxDurationFetchedConfig > 0 {
maxIntervalsFetchedByQuery := int(maxDurationFetchedConfig / baseInterval)
// Equation for max duration fetched by example query: up[15d:1h] with a range of 30 days, a base split interval of 24 hours, and 5 min lookbackDelta
// MaxFetchedStorageDataDurationPerQuery > (30 + ((15 + 1) x horizontal splits)) x vertical shards
// Rearranging the equation to find the max horizontal splits
maxSplitsByDurationFetched = ((maxIntervalsFetchedByQuery / queryVerticalShardSize) - queryRangeIntervalsCount - firstSplitLookbackDeltaIntervals) / (extraIntervalsPerSplitCount + otherSplitsLookbackDeltaIntervals)
if maxSplitsByDurationFetched <= 0 {
maxSplitsByDurationFetched = 1
}
}

// Find max number of splits from MaxSplitsPerQuery after accounting for vertical sharding
var maxSplitsByConfig int
if maxSplitsConfig > 0 {
maxSplitsByConfig = maxSplitsConfig / queryVerticalShardSize
Expand Down Expand Up @@ -244,34 +248,43 @@ func getMaxVerticalShardSizeForQuery(ctx context.Context, r tripperware.Request,
return queryVerticalShardSize, nil
}

func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, maxSplits int) time.Duration {
maxSplitsDuration := time.Duration(maxSplits)
func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, maxSplitsInt int) time.Duration {
maxSplits := time.Duration(maxSplitsInt)
queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond))

// Calculate the multiple of interval needed to shard query to <= maxSplits
n1 := (queryRange + baseInterval*maxSplitsDuration - 1) / (baseInterval * maxSplitsDuration)
if n1 <= 0 {
n1 = 1
// Calculate the multiple n of interval needed to shard query to <= maxSplits
n := (queryRange + baseInterval*maxSplits - 1) / (baseInterval * maxSplits)
if n <= 0 {
n = 1
}

// The first split can be truncated and not cover the full length of n*interval.
// So we remove it and calculate the multiple of interval needed to shard <= maxSplits-1
nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), n1*baseInterval) + r.GetStep()
queryRangeWithoutFirstSplit := time.Duration((r.GetEnd() - nextSplitStart) * int64(time.Millisecond))
var n2 time.Duration
if maxSplitsDuration > 1 {
n2 = (queryRangeWithoutFirstSplit + baseInterval*(maxSplitsDuration-1) - 1) / (baseInterval * (maxSplitsDuration - 1))
if maxSplits == 1 || queryRange < baseInterval {
// No splitting, interval should be long enough to result in 1 split only
nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), n*baseInterval) + r.GetStep()
if nextSplitStart < r.GetEnd() {
queryRangeWithoutFirstSplit := time.Duration((r.GetEnd() - nextSplitStart) * int64(time.Millisecond))
n += (queryRangeWithoutFirstSplit + baseInterval - 1) / baseInterval
}
} else {
// If maxSplits is <= 1 then we should not shard at all
n1 += (queryRangeWithoutFirstSplit + baseInterval - 1) / baseInterval
for n <= 2*(queryRange/baseInterval) {
// The first split can be truncated and shorter than other splits.
// So it is removed to check if a larger interval is needed to shard <= maxSplits-1
nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), n*baseInterval) + r.GetStep()
queryRangeWithoutFirstSplit := time.Duration((r.GetEnd() - nextSplitStart) * int64(time.Millisecond))
n_temp := (queryRangeWithoutFirstSplit + baseInterval*(maxSplits-1) - 1) / (baseInterval * (maxSplits - 1))
if n >= n_temp {
break
}
n++
}
}
n := max(n1, n2)
return n * baseInterval
}

// Analyzes the query to identify variables useful for calculating the duration of data
// that will be fetched from storage when the query is executed after being split.
// All variables are expressed as a count of multiples of the base split interval.
// analyzeDurationFetchedByQuery analyzes the query to identify variables useful for
// calculating the duration of data that will be fetched from storage when the query
// is executed after being split. All variables are expressed as a count of multiples
// of the base split interval.
//
// Returns:
// - queryRangeIntervalsCount: The total count of intervals fetched by the original start-end
Expand All @@ -282,11 +295,11 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration,
// for the specified start time.
//
// Example:
// Query: up[15d:1h] with a range of 30 days, a base split interval of 24 hours, and 5 min lookbackDelta
// Query up[15d:1h] with a range of 30 days, a base split interval of 24 hours, and 5 min lookbackDelta
// - queryRangeIntervalsCount = 30
// - extraIntervalsPerSplitCount = 15
// - lookbackDeltaIntervalsCount = 1
func analyzeDurationFetchedByQuery(expr parser.Expr, req tripperware.Request, baseInterval time.Duration, lookbackDelta time.Duration) (queryRangeIntervalsCount int, extraIntervalsPerSplitCount int, lookbackDeltaIntervalsCount int) {
func analyzeDurationFetchedByQuery(expr parser.Expr, queryStart int64, queryEnd int64, baseInterval time.Duration, lookbackDelta time.Duration) (queryRangeIntervalsCount int, extraIntervalsPerSplitCount int, lookbackDeltaIntervalsCount int) {
queryRangeIntervalsCount = 0
lookbackDeltaIntervalsCount = 0
baseIntervalMillis := util.DurationMilliseconds(baseInterval)
Expand All @@ -297,10 +310,10 @@ func analyzeDurationFetchedByQuery(expr parser.Expr, req tripperware.Request, ba
switch n := node.(type) {
case *parser.VectorSelector:
// Increment count of intervals fetched by the original start-end time range
queryRangeIntervalsCount += int((req.GetEnd()/baseIntervalMillis)-(req.GetStart()/baseIntervalMillis)) + 1
queryRangeIntervalsCount += int((queryEnd/baseIntervalMillis)-(queryStart/baseIntervalMillis)) + 1

// Adjust start and end time based on matrix selectors or subquery, this excludes lookbackDelta
start, end := util.GetTimeRangesForSelector(req.GetStart(), req.GetEnd(), 0, n, path, evalRange)
start, end := util.GetTimeRangesForSelector(queryStart, queryEnd, 0, n, path, evalRange)
startIntervalIndex := floorDiv(start, baseIntervalMillis)
endIntervalIndex := floorDiv(end, baseIntervalMillis)
totalDurationFetchedCount += int(endIntervalIndex-startIntervalIndex) + 1
Expand Down
Loading

0 comments on commit cfc5078

Please sign in to comment.