Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: expanded postings can cache wrong data when queries are issued "in the future" #6562

Merged
merged 7 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 52 additions & 15 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
scrapeInterval,
i*numSamples,
numSamples,
prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)},
prompb.Label{Name: "test_label", Value: fmt.Sprintf("test_label_value_%d", j)},
)
ss[i*numberOfLabelsPerSeries+j] = series

Expand All @@ -453,11 +453,18 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
ps := promqlsmith.New(rnd, lbls, opts...)

// Create the queries with the original labels
testRun := 100
testRun := 300
queries := make([]string, testRun)
matchers := make([]string, testRun)
for i := 0; i < testRun; i++ {
expr := ps.WalkRangeQuery()
queries[i] = expr.Pretty(0)
matchers[i] = storepb.PromMatchersToString(
append(
ps.WalkSelectors(),
labels.MustNewMatcher(labels.MatchEqual, "__name__", fmt.Sprintf("test_series_%d", i%numSeries)),
)...,
)
}

// Lets run multiples iterations and create new series every iteration
Expand All @@ -472,7 +479,7 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
scrapeInterval,
i*numSamples,
numSamples,
prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)},
prompb.Label{Name: "test_label", Value: fmt.Sprintf("test_label_value_%d", j)},
prompb.Label{Name: "k", Value: fmt.Sprintf("%d", k)},
)
}
Expand All @@ -485,42 +492,72 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
}

type testCase struct {
query string
res1, res2 model.Value
err1, err2 error
query string
qt string
res1, res2 model.Value
sres1, sres2 []model.LabelSet
err1, err2 error
}

queryStart := time.Now().Add(-time.Hour * 24)
queryEnd := time.Now()
cases := make([]*testCase, 0, 200)
cases := make([]*testCase, 0, len(queries)*3)

for _, query := range queries {
res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval)
res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval)
fuzzyTime := time.Duration(rand.Int63n(time.Now().UnixMilli() - start.UnixMilli()))
queryEnd := start.Add(fuzzyTime * time.Millisecond)
res1, err1 := c1.Query(query, queryEnd)
res2, err2 := c2.Query(query, queryEnd)
cases = append(cases, &testCase{
query: query,
qt: "instant",
res1: res1,
res2: res2,
err1: err1,
err2: err2,
})
res1, err1 = c1.QueryRange(query, start, queryEnd, scrapeInterval)
res2, err2 = c2.QueryRange(query, start, queryEnd, scrapeInterval)
cases = append(cases, &testCase{
query: query,
qt: "range query",
res1: res1,
res2: res2,
err1: err1,
err2: err2,
})
}

for _, m := range matchers {
fuzzyTime := time.Duration(rand.Int63n(time.Now().UnixMilli() - start.UnixMilli()))
queryEnd := start.Add(fuzzyTime * time.Millisecond)
res1, err := c1.Series([]string{m}, start, queryEnd)
require.NoError(t, err)
res2, err := c2.Series([]string{m}, start, queryEnd)
require.NoError(t, err)
cases = append(cases, &testCase{
query: m,
qt: "get series",
sres1: res1,
sres2: res2,
})
}

failures := 0
for i, tc := range cases {
qt := "range query"
if tc.err1 != nil || tc.err2 != nil {
if !cmp.Equal(tc.err1, tc.err2) {
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, tc.qt, tc.query, tc.err1, tc.err2)
failures++
}
} else if shouldUseSampleNumComparer(tc.query) {
if !cmp.Equal(tc.res1, tc.res2, sampleNumComparer) {
t.Logf("case %d # of samples mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
t.Logf("case %d # of samples mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, tc.qt, tc.query, tc.res1.String(), tc.res2.String())
failures++
}
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, tc.qt, tc.query, tc.res1.String(), tc.res2.String())
failures++
} else if !cmp.Equal(tc.sres1, tc.sres1, labelSetsComparer) {
t.Logf("case %d results mismatch.\n%s: %s\nsres1: %s\nsres2: %s\n", i, tc.qt, tc.query, tc.sres1, tc.sres2)
failures++
}
}
Expand Down
28 changes: 22 additions & 6 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2283,6 +2283,27 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
return db, nil
}

func (i *Ingester) blockChunkQuerierFunc(userId string) tsdb.BlockChunkQuerierFunc {
return func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
db := i.getTSDB(userId)

var postingCache cortex_tsdb.ExpandedPostingsCache
if db != nil {
postingCache = db.postingCache
}

// Caching expanded postings for queries that are "in the future" may lead to incorrect results being cached.
// This occurs because the tsdb.PostingsForMatchers function can return invalid data in such scenarios.
// For more details, see: https://github.com/cortexproject/cortex/issues/6556
// TODO: alanprot: Consider removing this logic when prometheus is updated as this logic is "fixed" upstream.
if postingCache == nil || mint > db.Head().MaxTime() {
return tsdb.NewBlockChunkQuerier(b, mint, maxt)
}

return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt)
}
}

// createTSDB creates a TSDB for a given userID, and returns the created db.
func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
tsdbPromReg := prometheus.NewRegistry()
Expand Down Expand Up @@ -2346,12 +2367,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks.
EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms,
BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
if postingCache != nil {
return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt)
}
return tsdb.NewBlockChunkQuerier(b, mint, maxt)
},
BlockChunkQuerierFunc: i.blockChunkQuerierFunc(userID),
}, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Expand Down
195 changes: 195 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5605,6 +5605,201 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) {
wg.Wait()
}

func TestExpendedPostingsCacheMatchers(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled = true
cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true
cfg.QueryIngestersWithin = 24 * time.Hour

ctx := user.InjectOrgID(context.Background(), userID)

r := prometheus.NewRegistry()
ing, err := prepareIngesterWithBlocksStorage(t, cfg, r)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

// Wait until the ingester is ACTIVE
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return ing.lifecycler.GetState()
})

numberOfMetricNames := 10
seriesPerMetricsNames := 25
timeStamp := int64(60 * 1000)
seriesCreated := map[string]labels.Labels{}

for i := 0; i < numberOfMetricNames; i++ {
metricName := fmt.Sprintf("metric_%v", i)
for j := 0; j < seriesPerMetricsNames; j++ {
s := labels.FromStrings(labels.MetricName, metricName, "labelA", fmt.Sprintf("series_%v", j))
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{s}, []cortexpb.Sample{{Value: 2, TimestampMs: timeStamp}}, nil, nil, cortexpb.API))
seriesCreated[s.String()] = s
require.NoError(t, err)
}
}

db := ing.getTSDB(userID)

type testCase struct {
matchers []*client.LabelMatcher
}

cases := []testCase{}

nameMatcher := &client.LabelMatcher{
Type: client.EQUAL,
Name: labels.MetricName,
Value: "metric_0",
}

for i := 0; i < 4; i++ {
tc := testCase{
matchers: []*client.LabelMatcher{nameMatcher},
}

switch client.MatchType(i) {
case client.EQUAL | client.NOT_EQUAL:
tc.matchers = append(tc.matchers, &client.LabelMatcher{
Type: client.MatchType(i),
Name: "labelA",
Value: "series_0",
})
default:
tc.matchers = append(tc.matchers, &client.LabelMatcher{
Type: client.MatchType(i),
Name: "labelA",
Value: "series_.*",
})
}
cases = append(cases, tc)
}

for _, v := range []string{".*", "", ".+"} {
cases = append(cases,
testCase{
matchers: []*client.LabelMatcher{
nameMatcher,
{
Type: client.REGEX_MATCH,
Name: "labelA",
Value: v,
},
},
},
testCase{
matchers: []*client.LabelMatcher{
nameMatcher,
{
Type: client.REGEX_NO_MATCH,
Name: "labelA",
Value: v,
},
},
},
)
}

ranges := []struct {
startTs, endTs int64
hasSamples bool
}{
// Totally in the past
{
startTs: 0,
endTs: timeStamp / 2,
hasSamples: false,
},
{
startTs: timeStamp / 2,
endTs: timeStamp,
hasSamples: true,
},
{
startTs: timeStamp / 2,
endTs: timeStamp * 2,
hasSamples: true,
},
{
startTs: timeStamp + 1,
endTs: timeStamp * 2,
hasSamples: false,
},
}

verify := func(t *testing.T, tc testCase, startTs, endTs int64, hasSamples bool) {

expectedCount := len(seriesCreated)
matchers, err := client.FromLabelMatchers(ing.matchersCache, tc.matchers)
require.NoError(t, err)
for _, s := range seriesCreated {
for _, m := range matchers {
if !m.Matches(s.Get(m.Name)) {
expectedCount--
break
}
}
}

seriesResponse, err := ing.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{
StartTimestampMs: startTs,
EndTimestampMs: endTs,
MatchersSet: []*client.LabelMatchers{
{
Matchers: tc.matchers,
},
},
})
require.NoError(t, err)
if hasSamples {
require.Len(t, seriesResponse.Metric, expectedCount)
} else {
require.Len(t, seriesResponse.Metric, 0)
}

s := &mockQueryStreamServer{ctx: ctx}
err = ing.QueryStream(&client.QueryRequest{
StartTimestampMs: startTs,
EndTimestampMs: endTs,
Matchers: tc.matchers,
}, s)
require.NoError(t, err)
if hasSamples {
require.Equal(t, expectedCount, len(s.series))
} else {
require.Equal(t, 0, len(s.series))
}
}

for _, tc := range cases {
testName := ""
for _, matcher := range tc.matchers {
t, _ := matcher.MatcherType()
testName += matcher.Name + t.String() + matcher.Value + "|"

}
t.Run(fmt.Sprintf("%v", testName), func(t *testing.T) {
for _, r := range ranges {
t.Run(fmt.Sprintf("start=%v,end=%v", r.startTs, r.endTs), func(t *testing.T) {
db.postingCache.Clear()

// lets run 2 times to hit the cache
for i := 0; i < 2; i++ {
verify(t, tc, r.startTs, r.endTs, r.hasSamples)
}

// run the test again with all other ranges
for _, r1 := range ranges {
verify(t, tc, r1.startTs, r1.endTs, r1.hasSamples)
}
})
}
})
}
}

func TestExpendedPostingsCache(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second
Expand Down
Loading
Loading