diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go index 1340093ab2..53f7762df8 100644 --- a/pkg/compactor/partition_compaction_grouper.go +++ b/pkg/compactor/partition_compaction_grouper.go @@ -499,9 +499,11 @@ func (g *PartitionCompactionGrouper) partitionBlocksGroup(partitionCount int, bl addToPartitionedGroups := func(blocks []*metadata.Meta, partitionID int) { if _, ok := partitionedGroups[partitionID]; !ok { partitionedGroups[partitionID] = blocksGroupWithPartition{ - rangeStart: rangeStart, - rangeEnd: rangeEnd, - blocks: []*metadata.Meta{}, + blocksGroup: blocksGroup{ + rangeStart: rangeStart, + rangeEnd: rangeEnd, + blocks: []*metadata.Meta{}, + }, } } partitionedGroup := partitionedGroups[partitionID] @@ -868,9 +870,6 @@ func (t *timeRangeStatus) previousTimeRangeDuration() time.Duration { type blocksGroupWithPartition struct { blocksGroup - rangeStart int64 // Included. - rangeEnd int64 // Excluded. - blocks []*metadata.Meta groupHash uint32 partitionedGroupInfo *PartitionedGroupInfo partition Partition diff --git a/pkg/compactor/partition_compaction_grouper_test.go b/pkg/compactor/partition_compaction_grouper_test.go index 2167a219ae..259981c33c 100644 --- a/pkg/compactor/partition_compaction_grouper_test.go +++ b/pkg/compactor/partition_compaction_grouper_test.go @@ -84,6 +84,64 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) { {blocks: []ulid.ULID{block3, block4}, partitionCount: 1, partitionID: 0, rangeStart: 2 * H, rangeEnd: 4 * H}, }, }, + "only level 1 blocks with ingestion replication factor 3": { + ranges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}, + blocks: map[ulid.ULID]mockBlock{ + block1: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block1, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block2: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block2, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block3: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block3, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block4: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block4, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block5: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block5, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block6: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block6, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 1}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + }, + existingPartitionedGroups: []mockExistingPartitionedGroup{}, + expected: []expectedCompactionJob{ + {blocks: []ulid.ULID{block1, block2, block3, block4, block5, block6}, partitionCount: 1, partitionID: 0, rangeStart: 0 * H, rangeEnd: 2 * H}, + }, + ingestionReplicationFactor: 3, + }, "only level 1 blocks, there is existing partitioned group file": { ranges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}, blocks: map[ulid.ULID]mockBlock{ @@ -499,6 +557,65 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) { {blocks: []ulid.ULID{block1, block2, block3}, partitionCount: 1, partitionID: 0, rangeStart: 0 * H, rangeEnd: 12 * H}, }, }, + "level 2 blocks with ingestion replication factor 3": { + ranges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}, + blocks: map[ulid.ULID]mockBlock{ + block1: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block1, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 0}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block2: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block2, MinTime: 0 * H, MaxTime: 2 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 1}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block3: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block3, MinTime: 2 * H, MaxTime: 4 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 0}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block4: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block4, MinTime: 2 * H, MaxTime: 4 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 1}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block5: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block5, MinTime: 4 * H, MaxTime: 6 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 0}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + block6: { + meta: &metadata.Meta{ + BlockMeta: tsdb.BlockMeta{ULID: block6, MinTime: 4 * H, MaxTime: 6 * H, Compaction: tsdb.BlockMetaCompaction{Level: 2}, Stats: tsdb.BlockStats{NumSeries: 1}}, + Thanos: metadata.Thanos{Extensions: cortextsdb.CortexMetaExtensions{PartitionInfo: &cortextsdb.PartitionInfo{PartitionCount: 2, PartitionID: 1}}, Files: []metadata.File{{RelPath: thanosblock.IndexFilename, SizeBytes: 0}}}, + }, + timeRange: 2 * time.Hour, + hasNoCompactMark: false, + }, + }, + existingPartitionedGroups: []mockExistingPartitionedGroup{}, + expected: []expectedCompactionJob{ + {blocks: []ulid.ULID{block1, block3, block5}, partitionCount: 2, partitionID: 0, rangeStart: 0 * H, rangeEnd: 12 * H}, + {blocks: []ulid.ULID{block2, block4, block6}, partitionCount: 2, partitionID: 1, rangeStart: 0 * H, rangeEnd: 12 * H}, + }, + ingestionReplicationFactor: 3, + }, "level 2 blocks along with level 3 blocks from some of partitions, level 1 blocks in different time range, there are partitioned group files for all groups": { ranges: []time.Duration{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}, blocks: map[ulid.ULID]mockBlock{ @@ -1966,6 +2083,10 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ingestionReplicationFactor := 1 + if testCase.ingestionReplicationFactor > 1 { + ingestionReplicationFactor = testCase.ingestionReplicationFactor + } g := NewPartitionCompactionGrouper( ctx, nil, @@ -1988,7 +2109,7 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) { false, visitMarkerTimeout, noCompactFilter, - 1, + ingestionReplicationFactor, ) actual, err := g.generateCompactionJobs(testCase.getBlocks()) require.NoError(t, err) @@ -2011,10 +2132,11 @@ func TestPartitionCompactionGrouper_GenerateCompactionJobs(t *testing.T) { } type generateCompactionJobsTestCase struct { - ranges []time.Duration - blocks map[ulid.ULID]mockBlock - existingPartitionedGroups []mockExistingPartitionedGroup - expected []expectedCompactionJob + ranges []time.Duration + blocks map[ulid.ULID]mockBlock + existingPartitionedGroups []mockExistingPartitionedGroup + expected []expectedCompactionJob + ingestionReplicationFactor int } func (g *generateCompactionJobsTestCase) setupBucketStore(t *testing.T, bkt *bucket.ClientMock, userID string, visitMarkerTimeout time.Duration) {