From 6358f3d69528aba53527f0902a5260db7b31899c Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 4 Jun 2024 21:11:01 -0700 Subject: [PATCH] change block populator to accept postings index function Signed-off-by: Ben Ye --- tsdb/compact.go | 28 +++++++++++++------- tsdb/compact_test.go | 62 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 79 insertions(+), 11 deletions(-) diff --git a/tsdb/compact.go b/tsdb/compact.go index c2ae23b2e40..567552390f4 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -652,7 +652,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl } closers = append(closers, indexw) - if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil { + if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw, AllSortedPostings); err != nil { return fmt.Errorf("populate block: %w", err) } @@ -718,7 +718,20 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator Bl } type BlockPopulator interface { - PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error + PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) error +} + +// IndexReaderPostingsFunc is a function to get a posting iterator from a given index reader. +type IndexReaderPostingsFunc func(ctx context.Context, reader IndexReader) index.Postings + +// AllSortedPostings returns a sorted all posting iterator from the input index reader. +func AllSortedPostings(ctx context.Context, reader IndexReader) index.Postings { + k, v := index.AllPostingsKey() + all, err := reader.Postings(ctx, k, v) + if err != nil { + return index.ErrPostings(err) + } + return reader.SortedPostings(all) } type DefaultBlockPopulator struct{} @@ -726,7 +739,7 @@ type DefaultBlockPopulator struct{} // PopulateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. // It expects sorted blocks input by mint. -func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { +func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter, postingsFunc IndexReaderPostingsFunc) (err error) { if len(blocks) == 0 { return errors.New("cannot populate block from no readers") } @@ -784,14 +797,9 @@ func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *Compa } closers = append(closers, tombsr) - k, v := index.AllPostingsKey() - all, err := indexr.Postings(ctx, k, v) - if err != nil { - return err - } - all = indexr.SortedPostings(all) + postings := postingsFunc(ctx, indexr) // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. - sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false)) + sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, postings, meta.MinTime, meta.MaxTime-1, false)) syms := indexr.Symbols() if i == 0 { symbols = syms diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 7a353a556a4..c0ba0281497 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "github.com/prometheus/prometheus/tsdb/index" "math" "math/rand" "os" @@ -493,6 +494,7 @@ func TestCompaction_populateBlock(t *testing.T) { inputSeriesSamples [][]seriesSamples compactMinTime int64 compactMaxTime int64 // When not defined the test runner sets a default of math.MaxInt64. + irPostingsFunc IndexReaderPostingsFunc expSeriesSamples []seriesSamples expErr error }{ @@ -961,6 +963,60 @@ func TestCompaction_populateBlock(t *testing.T) { }, }, }, + { + title: "Populate from single block with index reader postings function selecting different series. Expect empty block.", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + }, + }, + irPostingsFunc: func(ctx context.Context, reader IndexReader) index.Postings { + p, err := reader.Postings(ctx, "a", "c") + if err != nil { + return index.EmptyPostings() + } + return reader.SortedPostings(p) + }, + }, + { + title: "Populate from single block with index reader postings function selecting one series. Expect partial block.", + inputSeriesSamples: [][]seriesSamples{ + { + { + lset: map[string]string{"a": "b"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + { + lset: map[string]string{"a": "c"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + { + lset: map[string]string{"a": "d"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + }, + }, + irPostingsFunc: func(ctx context.Context, reader IndexReader) index.Postings { + p, err := reader.Postings(ctx, "a", "c", "d") + if err != nil { + return index.EmptyPostings() + } + return reader.SortedPostings(p) + }, + expSeriesSamples: []seriesSamples{ + { + lset: map[string]string{"a": "c"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + { + lset: map[string]string{"a": "d"}, + chunks: [][]sample{{{t: 0}, {t: 10}}, {{t: 11}, {t: 20}}}, + }, + }, + }, } { t.Run(tc.title, func(t *testing.T) { blocks := make([]BlockReader, 0, len(tc.inputSeriesSamples)) @@ -982,7 +1038,11 @@ func TestCompaction_populateBlock(t *testing.T) { iw := &mockIndexWriter{} blockPopulator := DefaultBlockPopulator{} - err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{}) + irPostingsFunc := AllSortedPostings + if tc.irPostingsFunc != nil { + irPostingsFunc = tc.irPostingsFunc + } + err = blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, iw, nopChunkWriter{}, irPostingsFunc) if tc.expErr != nil { require.Error(t, err) require.Equal(t, tc.expErr.Error(), err.Error())