Skip to content

Commit

Permalink
expose hook for block querier
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed May 16, 2024
1 parent edf5ebd commit 66c2477
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 9 deletions.
44 changes: 35 additions & 9 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,20 @@ type Options struct {

// EnableSharding enables query sharding support in TSDB.
EnableSharding bool

// BlockQuerierFunc is a function to return storage.Querier from a BlockReader.
BlockQuerierFunc BlockQuerierFunc

// BlockQuerierFunc is a function to return storage.ChunkQuerier from a BlockReader.
BlockChunkQuerierFunc BlockChunkQuerierFunc
}

type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}

type BlockQuerierFunc func(b BlockReader, mint, maxt int64) (storage.Querier, error)

type BlockChunkQuerierFunc func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)

// DB handles reads and writes of time series falling into
// a hashed partition of a seriedb.
type DB struct {
Expand Down Expand Up @@ -239,6 +249,10 @@ type DB struct {
writeNotified wlog.WriteNotified

registerer prometheus.Registerer

blockQuerierFunc func(b BlockReader, mint, maxt int64) (storage.Querier, error)

blockChunkQuerierFunc func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error)
}

type dbMetrics struct {
Expand Down Expand Up @@ -838,6 +852,18 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
}
db.compactCancel = cancel

if opts.BlockQuerierFunc == nil {
db.blockQuerierFunc = NewBlockQuerier
} else {
db.blockQuerierFunc = opts.BlockQuerierFunc
}

if opts.BlockChunkQuerierFunc == nil {
db.blockChunkQuerierFunc = NewBlockChunkQuerier
} else {
db.blockChunkQuerierFunc = opts.BlockChunkQuerierFunc
}

var wal, wbl *wlog.WL
segmentSize := wlog.DefaultSegmentSize
// Wal is enabled.
Expand Down Expand Up @@ -1928,7 +1954,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
if maxt >= db.head.MinTime() {
rh := NewRangeHead(db.head, mint, maxt)
var err error
inOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt)
inOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open block querier for head %s: %w", rh, err)
}
Expand All @@ -1945,7 +1971,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
}
if getNew {
rh := NewRangeHead(db.head, newMint, maxt)
inOrderHeadQuerier, err = NewBlockQuerier(rh, newMint, maxt)
inOrderHeadQuerier, err = db.blockQuerierFunc(rh, newMint, maxt)
if err != nil {
return nil, fmt.Errorf("open block querier for head while getting new querier %s: %w", rh, err)
}
Expand All @@ -1959,9 +1985,9 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
var err error
outOfOrderHeadQuerier, err := NewBlockQuerier(rh, mint, maxt)
outOfOrderHeadQuerier, err := db.blockQuerierFunc(rh, mint, maxt)
if err != nil {
// If NewBlockQuerier() failed, make sure to clean up the pending read created by NewOOORangeHead.
// If BlockQuerierFunc() failed, make sure to clean up the pending read created by NewOOORangeHead.
rh.isoState.Close()

return nil, fmt.Errorf("open block querier for ooo head %s: %w", rh, err)
Expand All @@ -1971,7 +1997,7 @@ func (db *DB) Querier(mint, maxt int64) (_ storage.Querier, err error) {
}

for _, b := range blocks {
q, err := NewBlockQuerier(b, mint, maxt)
q, err := db.blockQuerierFunc(b, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open querier for block %s: %w", b, err)
}
Expand Down Expand Up @@ -2009,7 +2035,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer

if maxt >= db.head.MinTime() {
rh := NewRangeHead(db.head, mint, maxt)
inOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt)
inOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open querier for head %s: %w", rh, err)
}
Expand All @@ -2026,7 +2052,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
}
if getNew {
rh := NewRangeHead(db.head, newMint, maxt)
inOrderHeadQuerier, err = NewBlockChunkQuerier(rh, newMint, maxt)
inOrderHeadQuerier, err = db.blockChunkQuerierFunc(rh, newMint, maxt)
if err != nil {
return nil, fmt.Errorf("open querier for head while getting new querier %s: %w", rh, err)
}
Expand All @@ -2039,7 +2065,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer

if overlapsClosedInterval(mint, maxt, db.head.MinOOOTime(), db.head.MaxOOOTime()) {
rh := NewOOORangeHead(db.head, mint, maxt, db.lastGarbageCollectedMmapRef)
outOfOrderHeadQuerier, err := NewBlockChunkQuerier(rh, mint, maxt)
outOfOrderHeadQuerier, err := db.blockChunkQuerierFunc(rh, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open block chunk querier for ooo head %s: %w", rh, err)
}
Expand All @@ -2048,7 +2074,7 @@ func (db *DB) blockChunkQuerierForRange(mint, maxt int64) (_ []storage.ChunkQuer
}

for _, b := range blocks {
q, err := NewBlockChunkQuerier(b, mint, maxt)
q, err := db.blockChunkQuerierFunc(b, mint, maxt)
if err != nil {
return nil, fmt.Errorf("open querier for block %s: %w", b, err)
}
Expand Down
75 changes: 75 additions & 0 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7046,3 +7046,78 @@ func TestAbortBlockCompactions(t *testing.T) {
require.True(t, db.head.compactable(), "head should be compactable")
require.Equal(t, 4, compactions, "expected 4 compactions to be completed")
}

func TestBlockQuerierAndBlockChunkQuerier(t *testing.T) {
opts := DefaultOptions()
opts.BlockQuerierFunc = func(b BlockReader, mint, maxt int64) (storage.Querier, error) {
// Only block with hints can be queried.
if len(b.Meta().Compaction.Hints) > 0 {
return NewBlockQuerier(b, mint, maxt)
}
return storage.NoopQuerier(), nil
}
opts.BlockChunkQuerierFunc = func(b BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
// Only level 4 compaction block can be queried.
if b.Meta().Compaction.Level == 4 {
return NewBlockChunkQuerier(b, mint, maxt)
}
return storage.NoopChunkedQuerier(), nil
}

db := openTestDB(t, opts, nil)
defer func() {
require.NoError(t, db.Close())
}()

metas := []BlockMeta{
{Compaction: BlockMetaCompaction{Hints: []string{"test-hint"}}},
{Compaction: BlockMetaCompaction{Level: 4}},
}
for i := range metas {
// Include blockID into series to identify which block got touched.
serieses := []storage.Series{storage.NewListSeries(labels.FromMap(map[string]string{"block": fmt.Sprintf("block-%d", i), labels.MetricName: "test_metric"}), []chunks.Sample{sample{t: 0, f: 1}})}
blockDir := createBlock(t, db.Dir(), serieses)
b, err := OpenBlock(db.logger, blockDir, db.chunkPool)
require.NoError(t, err)

// Overwrite meta.json with compaction section for testing purpose.
b.meta.Compaction = metas[i].Compaction
_, err = writeMetaFile(db.logger, blockDir, &b.meta)
require.NoError(t, err)
require.NoError(t, b.Close())
}
require.NoError(t, db.reloadBlocks())
require.Equal(t, 2, len(db.Blocks()))

Check failure on line 7090 in tsdb/db_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

len: use require.Len (testifylint)

querier, err := db.Querier(0, 500)
require.NoError(t, err)
defer querier.Close()
matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "test_metric")
seriesSet := querier.Select(context.Background(), false, nil, matcher)
count := 0
var lbls labels.Labels
for seriesSet.Next() {
count++
lbls = seriesSet.At().Labels()
}
require.NoError(t, seriesSet.Err())
require.Equal(t, 1, count)
// Make sure only block-0 is queried.
require.Equal(t, "block-0", lbls.Get("block"))

chunkQuerier, err := db.ChunkQuerier(0, 500)
require.NoError(t, err)
defer chunkQuerier.Close()
css := chunkQuerier.Select(context.Background(), false, nil, matcher)
count = 0
// Reset lbls variable.
lbls = labels.EmptyLabels()
for css.Next() {
count++
lbls = css.At().Labels()
}
require.NoError(t, css.Err())
require.Equal(t, 1, count)
// Make sure only block-1 is queried.
require.Equal(t, "block-1", lbls.Get("block"))
}

0 comments on commit 66c2477

Please sign in to comment.