Skip to content

Commit

Permalink
Fixed blocksGroupWithPartition unable to reuse functions from blocksG…
Browse files Browse the repository at this point in the history
…roup (#6547)

* Fixed blocksGroupWithPartition unable to reuse functions from blocksGroup

Signed-off-by: Alex Le <[email protected]>

* update tests

Signed-off-by: Alex Le <[email protected]>

---------

Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle authored Jan 24, 2025
1 parent 6660ba1 commit 69f5fb4
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 11 deletions.
11 changes: 5 additions & 6 deletions pkg/compactor/partition_compaction_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,11 @@ func (g *PartitionCompactionGrouper) partitionBlocksGroup(partitionCount int, bl
addToPartitionedGroups := func(blocks []*metadata.Meta, partitionID int) {
if _, ok := partitionedGroups[partitionID]; !ok {
partitionedGroups[partitionID] = blocksGroupWithPartition{
rangeStart: rangeStart,
rangeEnd: rangeEnd,
blocks: []*metadata.Meta{},
blocksGroup: blocksGroup{
rangeStart: rangeStart,
rangeEnd: rangeEnd,
blocks: []*metadata.Meta{},
},
}
}
partitionedGroup := partitionedGroups[partitionID]
Expand Down Expand Up @@ -868,9 +870,6 @@ func (t *timeRangeStatus) previousTimeRangeDuration() time.Duration {

type blocksGroupWithPartition struct {
blocksGroup
rangeStart int64 // Included.
rangeEnd int64 // Excluded.
blocks []*metadata.Meta
groupHash uint32
partitionedGroupInfo *PartitionedGroupInfo
partition Partition
Expand Down
132 changes: 127 additions & 5 deletions pkg/compactor/partition_compaction_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,64 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) {
{blocks: []ulid.ULID{block3, block4}, partitionCount: 1, partitionID: 0, rangeStart: 2 * H, rangeEnd: 4 * H},
},
},
"only level 1 blocks with ingestion replication factor 3": {
ranges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour},
blocks: map[ulid.ULID]mockBlock{
block1: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block1, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
block2: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block2, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
block3: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block3, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
block4: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block4, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
block5: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block5, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
block6: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block6, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
},
existingPartitionedGroups: []mockExistingPartitionedGroup{},
expected: []expectedCompactionJob{
{blocks: []ulid.ULID{block1, block2, block3, block4, block5, block6}, partitionCount: 1, partitionID: 0, rangeStart: 0 * H, rangeEnd: 2 * H},
},
ingestionReplicationFactor: 3,
},
"only level 1 blocks, there is existing partitioned group file": {
ranges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour},
blocks: map[ulid.ULID]mockBlock{
Expand Down Expand Up @@ -499,6 +557,65 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) {
{blocks: []ulid.ULID{block1, block2, block3}, partitionCount: 1, partitionID: 0, rangeStart: 0 * H, rangeEnd: 12 * H},
},
},
"level 2 blocks with ingestion replication factor 3": {
ranges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour},
blocks: map[ulid.ULID]mockBlock{
block1: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block1, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 0}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
block2: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block2, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 1}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
block3: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block3, MinTime: 2 * H, MaxTime: 4 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 0}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
block4: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block4, MinTime: 2 * H, MaxTime: 4 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 1}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
block5: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block5, MinTime: 4 * H, MaxTime: 6 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 0}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
block6: {
meta: &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: block6, MinTime: 4 * H, MaxTime: 6 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}},
Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 1}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}},
},
timeRange: 2 * time.Hour,
hasNoCompactMark: false,
},
},
existingPartitionedGroups: []mockExistingPartitionedGroup{},
expected: []expectedCompactionJob{
{blocks: []ulid.ULID{block1, block3, block5}, partitionCount: 2, partitionID: 0, rangeStart: 0 * H, rangeEnd: 12 * H},
{blocks: []ulid.ULID{block2, block4, block6}, partitionCount: 2, partitionID: 1, rangeStart: 0 * H, rangeEnd: 12 * H},
},
ingestionReplicationFactor: 3,
},
"level 2 blocks along with level 3 blocks from some of partitions, level 1 blocks in different time range, there are partitioned group files for all groups": {
ranges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour},
blocks: map[ulid.ULID]mockBlock{
Expand Down Expand Up @@ -1966,6 +2083,10 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ingestionReplicationFactor := 1
if testCase.ingestionReplicationFactor > 1 {
ingestionReplicationFactor = testCase.ingestionReplicationFactor
}
g := NewPartitionCompactionGrouper(
ctx,
nil,
Expand All @@ -1988,7 +2109,7 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) {
false,
visitMarkerTimeout,
noCompactFilter,
1,
ingestionReplicationFactor,
)
actual, err := g.generateCompactionJobs(testCase.getBlocks())
require.NoError(t, err)
Expand All @@ -2011,10 +2132,11 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) {
}

type generateCompactionJobsTestCase struct {
ranges []time.Duration
blocks map[ulid.ULID]mockBlock
existingPartitionedGroups []mockExistingPartitionedGroup
expected []expectedCompactionJob
ranges []time.Duration
blocks map[ulid.ULID]mockBlock
existingPartitionedGroups []mockExistingPartitionedGroup
expected []expectedCompactionJob
ingestionReplicationFactor int
}

func (g *generateCompactionJobsTestCase) setupBucketStore(t *testing.T, bkt *bucket.ClientMock, userID string, visitMarkerTimeout time.Duration) {
Expand Down

0 comments on commit 69f5fb4

Please sign in to comment.