From 76adacd686c001d4814e2c2dadb739e4ee603b1f Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 28 Jan 2025 09:43:45 -0800 Subject: [PATCH 1/7] improve fuzz test for expanded postings cache Signed-off-by: alanprot --- integration/query_fuzz_test.go | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index cb2e851425..4a15f3f2e8 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -433,7 +433,7 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { scrapeInterval, i*numSamples, numSamples, - prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)}, + prompb.Label{Name: "test_label", Value: fmt.Sprintf("test_label_value_%d", j)}, ) ss[i*numberOfLabelsPerSeries+j] = series @@ -472,7 +472,7 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { scrapeInterval, i*numSamples, numSamples, - prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)}, + prompb.Label{Name: "test_label", Value: fmt.Sprintf("test_label_value_%d", j)}, prompb.Label{Name: "k", Value: fmt.Sprintf("%d", k)}, ) } @@ -486,19 +486,31 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { type testCase struct { query string + qt string res1, res2 model.Value err1, err2 error } - queryStart := time.Now().Add(-time.Hour * 24) - queryEnd := time.Now() - cases := make([]*testCase, 0, 200) + cases := make([]*testCase, 0, len(queries)*2) for _, query := range queries { - res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval) - res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval) + fuzzyTime := time.Duration(rand.Int63n(time.Now().UnixMilli() - start.UnixMilli())) + queryEnd := start.Add(fuzzyTime * time.Millisecond) + res1, err1 := c1.Query(query, queryEnd) + res2, err2 := c2.Query(query, queryEnd) cases = append(cases, &testCase{ query: query, + qt: "instant", + res1: res1, + res2: res2, + err1: err1, + err2: err2, + }) + res1, err1 = c1.QueryRange(query, start, queryEnd, scrapeInterval) + res2, err2 = c2.QueryRange(query, start, queryEnd, scrapeInterval) + cases = append(cases, &testCase{ + query: query, + qt: "range query", res1: res1, res2: res2, err1: err1, @@ -508,19 +520,18 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { failures := 0 for i, tc := range cases { - qt := "range query" if tc.err1 != nil || tc.err2 != nil { if !cmp.Equal(tc.err1, tc.err2) { - t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2) + t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, tc.qt, tc.query, tc.err1, tc.err2) failures++ } } else if shouldUseSampleNumComparer(tc.query) { if !cmp.Equal(tc.res1, tc.res2, sampleNumComparer) { - t.Logf("case %d # of samples mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String()) + t.Logf("case %d # of samples mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, tc.qt, tc.query, tc.res1.String(), tc.res2.String()) failures++ } } else if !cmp.Equal(tc.res1, tc.res2, comparer) { - t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String()) + t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, tc.qt, tc.query, tc.res1.String(), tc.res2.String()) failures++ } } From 9e36a253c9674e5cc83a0e8d395e7019bca7ca24 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 28 Jan 2025 11:00:57 -0800 Subject: [PATCH 2/7] create more tests on the expanded postings cache Signed-off-by: alanprot --- pkg/ingester/ingester_test.go | 177 ++++++++++++++++++++ pkg/storage/tsdb/expanded_postings_cache.go | 14 ++ 2 files changed, 191 insertions(+) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 3d386947fa..d0b8e29182 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5605,6 +5605,183 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) { wg.Wait() } +func TestExpendedPostingsCacheMatchers(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} + cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled = true + cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true + + ctx := user.InjectOrgID(context.Background(), userID) + + r := prometheus.NewRegistry() + ing, err := prepareIngesterWithBlocksStorage(t, cfg, r) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return ing.lifecycler.GetState() + }) + + numberOfMetricNames := 10 + seriesPerMetricsNames := 25 + timeStamp := int64(60 * 1000) + seriesCreated := map[string]labels.Labels{} + + for i := 0; i < numberOfMetricNames; i++ { + metricName := fmt.Sprintf("metric_%v", i) + for j := 0; j < seriesPerMetricsNames; j++ { + s := labels.FromStrings(labels.MetricName, metricName, "labelA", fmt.Sprintf("series_%v", j)) + _, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{s}, []cortexpb.Sample{{Value: 2, TimestampMs: timeStamp}}, nil, nil, cortexpb.API)) + seriesCreated[s.String()] = s + require.NoError(t, err) + } + } + + db := ing.getTSDB(userID) + + type testCase struct { + matchers []*client.LabelMatcher + } + + cases := []testCase{} + + nameMatcher := &client.LabelMatcher{ + Type: client.EQUAL, + Name: labels.MetricName, + Value: "metric_0", + } + + for i := 0; i < 4; i++ { + tc := testCase{ + matchers: []*client.LabelMatcher{nameMatcher}, + } + + switch client.MatchType(i) { + case client.EQUAL | client.NOT_EQUAL: + tc.matchers = append(tc.matchers, &client.LabelMatcher{ + Type: client.MatchType(i), + Name: "labelA", + Value: "series_0", + }) + default: + tc.matchers = append(tc.matchers, &client.LabelMatcher{ + Type: client.MatchType(i), + Name: "labelA", + Value: "series_.*", + }) + } + cases = append(cases, tc) + } + + for _, v := range []string{".*", "", ".+"} { + cases = append(cases, + testCase{ + matchers: []*client.LabelMatcher{ + nameMatcher, + { + Type: client.REGEX_MATCH, + Name: "labelA", + Value: v, + }, + }, + }, + testCase{ + matchers: []*client.LabelMatcher{ + nameMatcher, + { + Type: client.REGEX_NO_MATCH, + Name: "labelA", + Value: v, + }, + }, + }, + ) + } + + ranges := []struct { + startTs, endTs int64 + hasSamples bool + }{ + // Totally in the past + { + startTs: 0, + endTs: timeStamp / 2, + hasSamples: false, + }, + { + startTs: timeStamp / 2, + endTs: timeStamp, + hasSamples: true, + }, + { + startTs: timeStamp / 2, + endTs: timeStamp * 2, + hasSamples: true, + }, + { + startTs: timeStamp + 1, + endTs: timeStamp * 2, + hasSamples: false, + }, + } + + verify := func(t *testing.T, tc testCase, startTs, endTs int64, hasSamples bool) { + s := &mockQueryStreamServer{ctx: ctx} + err := ing.QueryStream(&client.QueryRequest{ + StartTimestampMs: startTs, + EndTimestampMs: endTs, + Matchers: tc.matchers, + }, s) + require.NoError(t, err) + if hasSamples { + expectedCount := len(seriesCreated) + matchers, err := client.FromLabelMatchers(ing.matchersCache, tc.matchers) + require.NoError(t, err) + for _, s := range seriesCreated { + for _, m := range matchers { + if !m.Matches(s.Get(m.Name)) { + expectedCount-- + break + } + } + } + + require.Equal(t, expectedCount, len(s.series)) + } else { + require.Equal(t, 0, len(s.series)) + } + } + + for _, tc := range cases { + testName := "" + for _, matcher := range tc.matchers { + t, _ := matcher.MatcherType() + testName += matcher.Name + t.String() + matcher.Value + "|" + + } + t.Run(fmt.Sprintf("%v", testName), func(t *testing.T) { + for _, r := range ranges { + t.Run(fmt.Sprintf("start=%v,end=%v", r.startTs, r.endTs), func(t *testing.T) { + db.postingCache.Clear() + + // lets run 2 times to hit the cache + for i := 0; i < 2; i++ { + verify(t, tc, r.startTs, r.endTs, r.hasSamples) + } + + // run the test again with all other ranges + for _, r1 := range ranges { + verify(t, tc, r1.startTs, r1.endTs, r1.hasSamples) + } + }) + } + }) + } +} + func TestExpendedPostingsCache(t *testing.T) { cfg := defaultIngesterTestConfig(t) cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index a24087e824..a436350b61 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -125,6 +125,7 @@ type ExpandedPostingsCache interface { PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) ExpireSeries(metric labels.Labels) PurgeExpiredItems() + Clear() Size() int } @@ -140,6 +141,11 @@ type blocksPostingsForMatchersCache struct { seedByHash *seedByHash } +func (c *blocksPostingsForMatchersCache) Clear() { + c.headCache.clear() + c.blocksCache.clear() +} + func newBlocksPostingsForMatchersCache(userId string, cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics, seedByHash *seedByHash) ExpandedPostingsCache { if cfg.PostingsForMatchers == nil { cfg.PostingsForMatchers = tsdb.PostingsForMatchers @@ -358,6 +364,14 @@ func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *Expanded } } +func (c *fifoCache[V]) clear() { + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cached = list.New() + c.cachedBytes = 0 + c.cachedValues = new(sync.Map) +} + func (c *fifoCache[V]) expire() { if c.cfg.Ttl <= 0 { return From c30c37760576658c73d4af3065023e4eb7532251 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 28 Jan 2025 12:00:52 -0800 Subject: [PATCH 3/7] adding get series call on the test Signed-off-by: alanprot --- pkg/ingester/ingester_test.go | 44 ++++++++++++++++++++++++----------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index d0b8e29182..f3da262fe6 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5611,6 +5611,7 @@ func TestExpendedPostingsCacheMatchers(t *testing.T) { cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled = true cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true + cfg.QueryIngestersWithin = 24 * time.Hour ctx := user.InjectOrgID(context.Background(), userID) @@ -5729,26 +5730,43 @@ func TestExpendedPostingsCacheMatchers(t *testing.T) { } verify := func(t *testing.T, tc testCase, startTs, endTs int64, hasSamples bool) { + + expectedCount := len(seriesCreated) + matchers, err := client.FromLabelMatchers(ing.matchersCache, tc.matchers) + require.NoError(t, err) + for _, s := range seriesCreated { + for _, m := range matchers { + if !m.Matches(s.Get(m.Name)) { + expectedCount-- + break + } + } + } + + seriesResponse, err := ing.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{ + StartTimestampMs: startTs, + EndTimestampMs: endTs, + MatchersSet: []*client.LabelMatchers{ + { + Matchers: tc.matchers, + }, + }, + }) + require.NoError(t, err) + if hasSamples { + require.Len(t, seriesResponse.Metric, expectedCount) + } else { + require.Len(t, seriesResponse.Metric, 0) + } + s := &mockQueryStreamServer{ctx: ctx} - err := ing.QueryStream(&client.QueryRequest{ + err = ing.QueryStream(&client.QueryRequest{ StartTimestampMs: startTs, EndTimestampMs: endTs, Matchers: tc.matchers, }, s) require.NoError(t, err) if hasSamples { - expectedCount := len(seriesCreated) - matchers, err := client.FromLabelMatchers(ing.matchersCache, tc.matchers) - require.NoError(t, err) - for _, s := range seriesCreated { - for _, m := range matchers { - if !m.Matches(s.Get(m.Name)) { - expectedCount-- - break - } - } - } - require.Equal(t, expectedCount, len(s.series)) } else { require.Equal(t, 0, len(s.series)) From 0e17f08b31487d7e605aeaeda43e350fee507234 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 28 Jan 2025 14:46:18 -0800 Subject: [PATCH 4/7] no use CachedBlockChunkQuerier when query time range is completely after the last sample added in the head Signed-off-by: alanprot --- pkg/ingester/ingester.go | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index bf921243af..df90574d02 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2283,6 +2283,22 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) return db, nil } +func (i *Ingester) blockChunkQuerierFunc(userId string) tsdb.BlockChunkQuerierFunc { + return func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + db := i.getTSDB(userId) + + var postingCache cortex_tsdb.ExpandedPostingsCache + if db != nil { + postingCache = db.postingCache + } + if postingCache == nil || mint > db.Head().MaxTime() { + return tsdb.NewBlockChunkQuerier(b, mint, maxt) + } + + return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt) + } +} + // createTSDB creates a TSDB for a given userID, and returns the created db. func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { tsdbPromReg := prometheus.NewRegistry() @@ -2346,12 +2362,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax, EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks. EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms, - BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { - if postingCache != nil { - return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt) - } - return tsdb.NewBlockChunkQuerier(b, mint, maxt) - }, + BlockChunkQuerierFunc: i.blockChunkQuerierFunc(userID), }, nil) if err != nil { return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) From 87175eb38aae979958add46a9134767701999e35 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 28 Jan 2025 15:03:17 -0800 Subject: [PATCH 5/7] adding comments Signed-off-by: alanprot --- pkg/ingester/ingester.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index df90574d02..4c41a90a99 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2291,6 +2291,11 @@ func (i *Ingester) blockChunkQuerierFunc(userId string) tsdb.BlockChunkQuerierFu if db != nil { postingCache = db.postingCache } + + // Caching expanded postings for queries that are "in the future" may lead to incorrect results being cached. + // This occurs because the tsdb.PostingsForMatchers function can return invalid data in such scenarios. + // For more details, see: https://github.com/cortexproject/cortex/issues/6556 + // TODO: alanprot: Consider removing this logic when prometheus is updated as this logic is "fixed" upstream. if postingCache == nil || mint > db.Head().MaxTime() { return tsdb.NewBlockChunkQuerier(b, mint, maxt) } From 251248cb996e400683fd29be62f73bc723563375 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 28 Jan 2025 16:36:15 -0800 Subject: [PATCH 6/7] increase the number of fuzz test from 100 to 300 Signed-off-by: alanprot --- integration/query_fuzz_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index 4a15f3f2e8..60f6117688 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -453,7 +453,7 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { ps := promqlsmith.New(rnd, lbls, opts...) // Create the queries with the original labels - testRun := 100 + testRun := 300 queries := make([]string, testRun) for i := 0; i < testRun; i++ { expr := ps.WalkRangeQuery() From 90c8dc28fcbe16311b5307941629e7adda3b56d4 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 28 Jan 2025 17:31:51 -0800 Subject: [PATCH 7/7] add get series fuzzy testing Signed-off-by: alanprot --- integration/query_fuzz_test.go | 36 +++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index 60f6117688..c7d28e9eb6 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -455,9 +455,16 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { // Create the queries with the original labels testRun := 300 queries := make([]string, testRun) + matchers := make([]string, testRun) for i := 0; i < testRun; i++ { expr := ps.WalkRangeQuery() queries[i] = expr.Pretty(0) + matchers[i] = storepb.PromMatchersToString( + append( + ps.WalkSelectors(), + labels.MustNewMatcher(labels.MatchEqual, "__name__", fmt.Sprintf("test_series_%d", i%numSeries)), + )..., + ) } // Lets run multiples iterations and create new series every iteration @@ -485,13 +492,14 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { } type testCase struct { - query string - qt string - res1, res2 model.Value - err1, err2 error + query string + qt string + res1, res2 model.Value + sres1, sres2 []model.LabelSet + err1, err2 error } - cases := make([]*testCase, 0, len(queries)*2) + cases := make([]*testCase, 0, len(queries)*3) for _, query := range queries { fuzzyTime := time.Duration(rand.Int63n(time.Now().UnixMilli() - start.UnixMilli())) @@ -518,6 +526,21 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { }) } + for _, m := range matchers { + fuzzyTime := time.Duration(rand.Int63n(time.Now().UnixMilli() - start.UnixMilli())) + queryEnd := start.Add(fuzzyTime * time.Millisecond) + res1, err := c1.Series([]string{m}, start, queryEnd) + require.NoError(t, err) + res2, err := c2.Series([]string{m}, start, queryEnd) + require.NoError(t, err) + cases = append(cases, &testCase{ + query: m, + qt: "get series", + sres1: res1, + sres2: res2, + }) + } + failures := 0 for i, tc := range cases { if tc.err1 != nil || tc.err2 != nil { @@ -533,6 +556,9 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { } else if !cmp.Equal(tc.res1, tc.res2, comparer) { t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, tc.qt, tc.query, tc.res1.String(), tc.res2.String()) failures++ + } else if !cmp.Equal(tc.sres1, tc.sres1, labelSetsComparer) { + t.Logf("case %d results mismatch.\n%s: %s\nsres1: %s\nsres2: %s\n", i, tc.qt, tc.query, tc.sres1, tc.sres2) + failures++ } } if failures > 0 {