Skip to content

Commit

Permalink
allow customizing TSDB postings decoder
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed May 17, 2024
1 parent edf5ebd commit 08e538a
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 54 deletions.
2 changes: 1 addition & 1 deletion cmd/promtool/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error)
}
}

b, err := db.Block(blockID)
b, err := db.Block(blockID, tsdb.DefaultPostingsDecoderFactory)
if err != nil {
return nil, nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ type Block struct {

// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory) (pb *Block, err error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -350,7 +350,11 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
}
closers = append(closers, cr)

ir, err := index.NewFileReader(filepath.Join(dir, indexFilename))
decoder := index.DecodePostingsRaw
if postingsDecoderFactory != nil {
decoder = postingsDecoderFactory(meta)
}
ir, err := index.NewFileReader(filepath.Join(dir, indexFilename), decoder)
if err != nil {
return nil, err
}
Expand Down
22 changes: 11 additions & 11 deletions tsdb/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,22 +58,22 @@ func TestSetCompactionFailed(t *testing.T) {
tmpdir := t.TempDir()

blockDir := createBlock(t, tmpdir, genSeries(1, 1, 0, 1))
b, err := OpenBlock(nil, blockDir, nil)
b, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
require.False(t, b.meta.Compaction.Failed)
require.NoError(t, b.setCompactionFailed())
require.True(t, b.meta.Compaction.Failed)
require.NoError(t, b.Close())

b, err = OpenBlock(nil, blockDir, nil)
b, err = OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
require.True(t, b.meta.Compaction.Failed)
require.NoError(t, b.Close())
}

func TestCreateBlock(t *testing.T) {
tmpdir := t.TempDir()
b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil)
b, err := OpenBlock(nil, createBlock(t, tmpdir, genSeries(1, 1, 0, 10)), nil, nil)
require.NoError(t, err)
require.NoError(t, b.Close())
}
Expand All @@ -83,7 +83,7 @@ func BenchmarkOpenBlock(b *testing.B) {
blockDir := createBlock(b, tmpdir, genSeries(1e6, 20, 0, 10))
b.Run("benchmark", func(b *testing.B) {
for i := 0; i < b.N; i++ {
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(b, err)
require.NoError(b, block.Close())
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestCorruptedChunk(t *testing.T) {
require.NoError(t, f.Close())

// Check open err.
b, err := OpenBlock(nil, blockDir, nil)
b, err := OpenBlock(nil, blockDir, nil, nil)
if tc.openErr != nil {
require.Equal(t, tc.openErr.Error(), err.Error())
return
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestLabelValuesWithMatchers(t *testing.T) {
require.NotEmpty(t, files, "No chunk created.")

// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, block.Close()) }()

Expand Down Expand Up @@ -324,7 +324,7 @@ func TestBlockSize(t *testing.T) {
// Create a block and compare the reported size vs actual disk size.
{
blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100))
blockInit, err = OpenBlock(nil, blockDirInit, nil)
blockInit, err = OpenBlock(nil, blockDirInit, nil, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, blockInit.Close())
Expand All @@ -348,7 +348,7 @@ func TestBlockSize(t *testing.T) {
require.NoError(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
require.NoError(t, err)
blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil)
blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, blockAfterCompact.Close())
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestReadIndexFormatV1(t *testing.T) {
*/

blockDir := filepath.Join("testdata", "index_format_v1")
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)

q, err := NewBlockQuerier(block, 0, 1000)
Expand Down Expand Up @@ -416,7 +416,7 @@ func BenchmarkLabelValuesWithMatchers(b *testing.B) {
require.NotEmpty(b, files, "No chunk created.")

// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(b, err)
defer func() { require.NoError(b, block.Close()) }()

Expand Down Expand Up @@ -468,7 +468,7 @@ func TestLabelNamesWithMatchers(t *testing.T) {
require.NotEmpty(t, files, "No chunk created.")

// Check open err.
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, block.Close()) })

Expand Down
2 changes: 1 addition & 1 deletion tsdb/blockwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestBlockWriter(t *testing.T) {

// Confirm the block has the correct data.
blockpath := filepath.Join(outputDir, id.String())
b, err := OpenBlock(nil, blockpath, nil)
b, err := OpenBlock(nil, blockpath, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, b.Close()) }()
q, err := NewBlockQuerier(b, math.MinInt64, math.MaxInt64)
Expand Down
17 changes: 16 additions & 1 deletion tsdb/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type LeveledCompactor struct {
maxBlockChunkSegmentSize int64
mergeFunc storage.VerticalChunkSeriesMergeFunc
postingsEncoder index.PostingsEncoder
postingsDecoderFactory PostingsDecoderFactory
enableOverlappingCompaction bool
}

Expand Down Expand Up @@ -154,6 +155,9 @@ type LeveledCompactorOptions struct {
// PE specifies the postings encoder. It is called when compactor is writing out the postings for a label name/value pair during compaction.
// If it is nil then the default encoder is used. At the moment that is the "raw" encoder. See index.EncodePostingsRaw for more.
PE index.PostingsEncoder
// PD specifies the postings decoder factory to return different postings decoder based on BlockMeta. It is called when opening a block or opening the index file.
// If it is nil then the default decoder is used. At the moment that is the "raw" decoder. See index.DecodePostingsRaw for more.
PD PostingsDecoderFactory
// MaxBlockChunkSegmentSize is the max block chunk segment size. If it is 0 then the default chunks.DefaultChunkSegmentSize is used.
MaxBlockChunkSegmentSize int64
// MergeFunc is used for merging series together in vertical compaction. By default storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) is used.
Expand All @@ -163,6 +167,12 @@ type LeveledCompactorOptions struct {
EnableOverlappingCompaction bool
}

type PostingsDecoderFactory func(meta *BlockMeta) index.PostingsDecoder

func DefaultPostingsDecoderFactory(_ *BlockMeta) index.PostingsDecoder {
return index.DecodePostingsRaw
}

func NewLeveledCompactorWithChunkSize(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool, maxBlockChunkSegmentSize int64, mergeFunc storage.VerticalChunkSeriesMergeFunc) (*LeveledCompactor, error) {
return NewLeveledCompactorWithOptions(ctx, r, l, ranges, pool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
Expand Down Expand Up @@ -200,6 +210,10 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
if pe == nil {
pe = index.EncodePostingsRaw
}
pd := opts.PD
if pd == nil {
pd = DefaultPostingsDecoderFactory
}
return &LeveledCompactor{
ranges: ranges,
chunkPool: pool,
Expand All @@ -209,6 +223,7 @@ func NewLeveledCompactorWithOptions(ctx context.Context, r prometheus.Registerer
maxBlockChunkSegmentSize: maxBlockChunkSegmentSize,
mergeFunc: mergeFunc,
postingsEncoder: pe,
postingsDecoderFactory: pd,
enableOverlappingCompaction: opts.EnableOverlappingCompaction,
}, nil
}
Expand Down Expand Up @@ -473,7 +488,7 @@ func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string,

if b == nil {
var err error
b, err = OpenBlock(c.logger, d, c.chunkPool)
b, err = OpenBlock(c.logger, d, c.chunkPool, c.postingsDecoderFactory)
if err != nil {
return uid, err
}
Expand Down
4 changes: 2 additions & 2 deletions tsdb/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ func BenchmarkCompaction(b *testing.B) {
blockDirs := make([]string, 0, len(c.ranges))
var blocks []*Block
for _, r := range c.ranges {
block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil)
block, err := OpenBlock(nil, createBlock(b, dir, genSeries(nSeries, 10, r[0], r[1])), nil, nil)
require.NoError(b, err)
blocks = append(blocks, block)
defer func() {
Expand Down Expand Up @@ -1489,7 +1489,7 @@ func TestHeadCompactionWithHistograms(t *testing.T) {
require.NotEqual(t, ulid.ULID{}, id)

// Open the block and query it and check the histograms.
block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, id.String()), nil)
block, err := OpenBlock(nil, path.Join(head.opts.ChunkDirRoot, id.String()), nil, nil)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, block.Close())
Expand Down
18 changes: 12 additions & 6 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func DefaultOptions() *Options {
OutOfOrderCapMax: DefaultOutOfOrderCapMax,
EnableOverlappingCompaction: true,
EnableSharding: false,
PostingsDecoderFactory: DefaultPostingsDecoderFactory,
}
}

Expand Down Expand Up @@ -189,6 +190,10 @@ type Options struct {

// EnableSharding enables query sharding support in TSDB.
EnableSharding bool

// PostingsDecoderFactory allows users to customize postings decoders based on BlockMeta.
// By default, DefaultPostingsDecoderFactory will be used to create raw posting decoder.
PostingsDecoderFactory PostingsDecoderFactory
}

type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
Expand Down Expand Up @@ -571,7 +576,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
return nil, ErrClosed
default:
}
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil)
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil, DefaultPostingsDecoderFactory)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -669,7 +674,7 @@ func (db *DBReadOnly) LastBlockID() (string, error) {
}

// Block returns a block reader by given block id.
func (db *DBReadOnly) Block(blockID string) (BlockReader, error) {
func (db *DBReadOnly) Block(blockID string, postingsDecoderFactory PostingsDecoderFactory) (BlockReader, error) {
select {
case <-db.closed:
return nil, ErrClosed
Expand All @@ -681,7 +686,7 @@ func (db *DBReadOnly) Block(blockID string) (BlockReader, error) {
return nil, fmt.Errorf("invalid block ID %s", blockID)
}

block, err := OpenBlock(db.logger, filepath.Join(db.dir, blockID), nil)
block, err := OpenBlock(db.logger, filepath.Join(db.dir, blockID), nil, postingsDecoderFactory)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -831,6 +836,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
db.compactor, err = NewLeveledCompactorWithOptions(ctx, r, l, rngs, db.chunkPool, LeveledCompactorOptions{
MaxBlockChunkSegmentSize: opts.MaxBlockChunkSegmentSize,
EnableOverlappingCompaction: opts.EnableOverlappingCompaction,
PD: opts.PostingsDecoderFactory,
})
if err != nil {
cancel()
Expand Down Expand Up @@ -1437,7 +1443,7 @@ func (db *DB) reloadBlocks() (err error) {
db.mtx.Lock()
defer db.mtx.Unlock()

loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool, db.opts.PostingsDecoderFactory)
if err != nil {
return err
}
Expand Down Expand Up @@ -1529,7 +1535,7 @@ func (db *DB) reloadBlocks() (err error) {
return nil
}

func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool, postingsDecoderFactory PostingsDecoderFactory) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir)
if err != nil {
return nil, nil, fmt.Errorf("find blocks: %w", err)
Expand All @@ -1546,7 +1552,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
// See if we already have the block in memory or open it otherwise.
block, open := getBlock(loaded, meta.ULID)
if !open {
block, err = OpenBlock(l, bDir, chunkPool)
block, err = OpenBlock(l, bDir, chunkPool, postingsDecoderFactory)
if err != nil {
corrupted[meta.ULID] = err
continue
Expand Down
10 changes: 5 additions & 5 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ func TestTombstoneCleanFail(t *testing.T) {
totalBlocks := 2
for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), genSeries(1, 1, int64(i), int64(i)+1))
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
// Add some fake tombstones to trigger the compaction.
tomb := tombstones.NewMemTombstones()
Expand Down Expand Up @@ -1374,7 +1374,7 @@ func TestTombstoneCleanRetentionLimitsRace(t *testing.T) {
// Generate some blocks with old mint (near epoch).
for j := 0; j < totalBlocks; j++ {
blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1))
block, err := OpenBlock(nil, blockDir, nil)
block, err := OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
// Cover block with tombstones so it can be deleted with CleanTombstones() as well.
tomb := tombstones.NewMemTombstones()
Expand Down Expand Up @@ -1435,7 +1435,7 @@ func (c *mockCompactorFailing) Write(dest string, _ BlockReader, _, _ int64, _ *
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
}

block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil)
block, err := OpenBlock(nil, createBlock(c.t, dest, genSeries(1, 1, 0, 1)), nil, nil)
require.NoError(c.t, err)
require.NoError(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block)
Expand Down Expand Up @@ -2508,13 +2508,13 @@ func TestDBReadOnly(t *testing.T) {
})
t.Run("block", func(t *testing.T) {
blockID := expBlock.meta.ULID.String()
block, err := dbReadOnly.Block(blockID)
block, err := dbReadOnly.Block(blockID, nil)
require.NoError(t, err)
require.Equal(t, expBlock.Meta(), block.Meta(), "block meta mismatch")
})
t.Run("invalid block ID", func(t *testing.T) {
blockID := "01GTDVZZF52NSWB5SXQF0P2PGF"
_, err := dbReadOnly.Block(blockID)
_, err := dbReadOnly.Block(blockID, nil)
require.Error(t, err)
})
t.Run("last block ID", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 08e538a

Please sign in to comment.