From 66c2477af0fcc58ae06b6ee14b235e3b2afcab5a Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 16 May 2024 16:25:53 -0700 Subject: [PATCH] expose hook for block querier Signed-off-by: Ben Ye --- tsdb/db.go | 44 +++++++++++++++++++++++------ tsdb/db_test.go | 75 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 9 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index c2e8904a251..387155e6252 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -189,10 +189,20 @@ type Options struct { // EnableSharding enables query sharding support in TSDB. EnableSharding bool + + // BlockQuerierFunc is a function to return storage.Querier from a BlockReader. + BlockQuerierFunc BlockQuerierFunc + + // BlockQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader. + BlockChunkQuerierFunc BlockChunkQuerierFunc } type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{} +type BlockQuerierFunc func(b BlockReader, mint, maxt int64) (storage.Querier, error) + +type BlockChunkQuerierFunc func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) + // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { @@ -239,6 +249,10 @@ type DB struct { writeNotified wlog.WriteNotified registerer prometheus.Registerer + + blockQuerierFunc func(b BlockReader, mint, maxt int64) (storage.Querier, error) + + blockChunkQuerierFunc func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) } type dbMetrics struct { @@ -838,6 +852,18 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs } db.compactCancel = cancel + if opts.BlockQuerierFunc == nil { + db.blockQuerierFunc = NewBlockQuerier + } else { + db.blockQuerierFunc = opts.BlockQuerierFunc + } + + if opts.BlockChunkQuerierFunc == nil { + db.blockChunkQuerierFunc = NewBlockChunkQuerier + } else { + db.blockChunkQuerierFunc = opts.BlockChunkQuerierFunc + } + var wal, wbl *wlog.WL segmentSize := wlog.DefaultSegmentSize // Wal is enabled. @@ -1928,7 +1954,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) var err error - inOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt) + inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head %s: %w", rh, err) } @@ -1945,7 +1971,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = NewBlockQuerier(rh, newMint, maxt) + inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err) } @@ -1959,9 +1985,9 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) var err error - outOfOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt) + outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt) if err != nil { - // If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead. + // If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead. rh.isoState.Close() return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err) @@ -1971,7 +1997,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) { } for _, b := range blocks { - q, err := NewBlockQuerier(b, mint, maxt) + q, err := db.blockQuerierFunc(b, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for block %s: %w", b, err) } @@ -2009,7 +2035,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer if maxt >= db.head.MinTime() { rh := NewRangeHead(db.head, mint, maxt) - inOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt) + inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head %s: %w", rh, err) } @@ -2026,7 +2052,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } if getNew { rh := NewRangeHead(db.head, newMint, maxt) - inOrderHeadQuerier, err = NewBlockChunkQuerier(rh, newMint, maxt) + inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt) if err != nil { return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err) } @@ -2039,7 +2065,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) { rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef) - outOfOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt) + outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt) if err != nil { return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err) } @@ -2048,7 +2074,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer } for _, b := range blocks { - q, err := NewBlockChunkQuerier(b, mint, maxt) + q, err := db.blockChunkQuerierFunc(b, mint, maxt) if err != nil { return nil, fmt.Errorf("open querier for block %s: %w", b, err) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index a682f465549..7d154971064 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -7046,3 +7046,78 @@ func TestAbortBlockCompactions(t *testing.T) { require.True(t, db.head.compactable(), "head should be compactable") require.Equal(t, 4, compactions, "expected 4 compactions to be completed") } + +func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) { + opts := DefaultOptions() + opts.BlockQuerierFunc = func(b BlockReader, mint, maxt int64) (storage.Querier, error) { + // Only block with hints can be queried. + if len(b.Meta().Compaction.Hints) > 0 { + return NewBlockQuerier(b, mint, maxt) + } + return storage.NoopQuerier(), nil + } + opts.BlockChunkQuerierFunc = func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + // Only level 4 compaction block can be queried. + if b.Meta().Compaction.Level == 4 { + return NewBlockChunkQuerier(b, mint, maxt) + } + return storage.NoopChunkedQuerier(), nil + } + + db := openTestDB(t, opts, nil) + defer func() { + require.NoError(t, db.Close()) + }() + + metas := []BlockMeta{ + {Compaction: BlockMetaCompaction{Hints: []string{"test-hint"}}}, + {Compaction: BlockMetaCompaction{Level: 4}}, + } + for i := range metas { + // Include blockID into series to identify which block got touched. + serieses := []storage.Series{storage.NewListSeries(labels.FromMap(map[string]string{"block": fmt.Sprintf("block-%d", i), labels.MetricName: "test_metric"}), []chunks.Sample{sample{t: 0, f: 1}})} + blockDir := createBlock(t, db.Dir(), serieses) + b, err := OpenBlock(db.logger, blockDir, db.chunkPool) + require.NoError(t, err) + + // Overwrite meta.json with compaction section for testing purpose. + b.meta.Compaction = metas[i].Compaction + _, err = writeMetaFile(db.logger, blockDir, &b.meta) + require.NoError(t, err) + require.NoError(t, b.Close()) + } + require.NoError(t, db.reloadBlocks()) + require.Equal(t, 2, len(db.Blocks())) + + querier, err := db.Querier(0, 500) + require.NoError(t, err) + defer querier.Close() + matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric") + seriesSet := querier.Select(context.Background(), false, nil, matcher) + count := 0 + var lbls labels.Labels + for seriesSet.Next() { + count++ + lbls = seriesSet.At().Labels() + } + require.NoError(t, seriesSet.Err()) + require.Equal(t, 1, count) + // Make sure only block-0 is queried. + require.Equal(t, "block-0", lbls.Get("block")) + + chunkQuerier, err := db.ChunkQuerier(0, 500) + require.NoError(t, err) + defer chunkQuerier.Close() + css := chunkQuerier.Select(context.Background(), false, nil, matcher) + count = 0 + // Reset lbls variable. + lbls = labels.EmptyLabels() + for css.Next() { + count++ + lbls = css.At().Labels() + } + require.NoError(t, css.Err()) + require.Equal(t, 1, count) + // Make sure only block-1 is queried. + require.Equal(t, "block-1", lbls.Get("block")) +}