From 81940c86d5c64874c02420a618eb60ed9dc146b7 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 22 Jan 2025 19:08:31 +0530 Subject: [PATCH] feat: dedupe duplicate delete requests while loading them for processing in compactor (#15852) --- pkg/compactor/deletion/delete_request.go | 28 ++++ pkg/compactor/deletion/delete_request_test.go | 141 ++++++++++++++++++ .../deletion/delete_requests_manager.go | 27 ++++ .../deletion/delete_requests_manager_test.go | 39 +++++ 4 files changed, 235 insertions(+) diff --git a/pkg/compactor/deletion/delete_request.go b/pkg/compactor/deletion/delete_request.go index 9ce7f381fb105..592ae810b2a98 100644 --- a/pkg/compactor/deletion/delete_request.go +++ b/pkg/compactor/deletion/delete_request.go @@ -4,6 +4,7 @@ import ( "time" "github.com/go-kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -160,6 +161,33 @@ func (d *DeleteRequest) IsDeleted(entry retention.ChunkEntry) (bool, filter.Func return true, ff } +func (d *DeleteRequest) IsDuplicate(o *DeleteRequest) (bool, error) { + // we would never have duplicates from same request + if d.RequestID == o.RequestID { + return false, nil + } + if d.UserID != o.UserID || d.StartTime != o.StartTime || d.EndTime != o.EndTime { + return false, nil + } + + if d.logSelectorExpr == nil { + if err := d.SetQuery(d.Query); err != nil { + return false, errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", d.RequestID, d.UserID) + } + } + if o.logSelectorExpr == nil { + if err := o.SetQuery(o.Query); err != nil { + return false, errors.Wrapf(err, "failed to init log selector expr for request_id=%s, user_id=%s", o.RequestID, o.UserID) + } + } + + if d.logSelectorExpr.String() != o.logSelectorExpr.String() { + return false, nil + } + + return true, nil +} + func intervalsOverlap(interval1, interval2 model.Interval) bool { if interval1.Start > interval2.End || interval2.Start > interval1.End { return false diff --git a/pkg/compactor/deletion/delete_request_test.go b/pkg/compactor/deletion/delete_request_test.go index 899e83f802e37..d8b64f2031f2d 100644 --- a/pkg/compactor/deletion/delete_request_test.go +++ b/pkg/compactor/deletion/delete_request_test.go @@ -432,3 +432,144 @@ func TestDeleteRequest_FilterFunction(t *testing.T) { require.Panics(t, func() { testutil.ToFloat64(dr.Metrics.deletedLinesTotal) }) }) } + +func TestDeleteRequest_IsDuplicate(t *testing.T) { + query1 := `{foo="bar", fizz="buzz"} |= "foo"` + query2 := `{foo="bar", fizz="buzz2"} |= "foo"` + + for _, tc := range []struct { + name string + req1, req2 DeleteRequest + expIsDuplicate bool + }{ + { + name: "not duplicate - different user id", + req1: DeleteRequest{ + RequestID: "1", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + req2: DeleteRequest{ + RequestID: "1", + UserID: user2, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + expIsDuplicate: false, + }, + { + name: "not duplicate - same request id", + req1: DeleteRequest{ + RequestID: "1", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + req2: DeleteRequest{ + RequestID: "1", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + expIsDuplicate: false, + }, + { + name: "not duplicate - different start time", + req1: DeleteRequest{ + RequestID: "1", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + req2: DeleteRequest{ + RequestID: "2", + UserID: user1, + StartTime: now.Add(-13 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + }, + { + name: "not duplicate - different end time", + req1: DeleteRequest{ + RequestID: "1", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + req2: DeleteRequest{ + RequestID: "2", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-11 * time.Hour), + Query: query1, + }, + }, + { + name: "not duplicate - different labels", + req1: DeleteRequest{ + RequestID: "1", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + req2: DeleteRequest{ + RequestID: "2", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query2, + }, + }, + { + name: "duplicate - same request", + req1: DeleteRequest{ + RequestID: "1", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + req2: DeleteRequest{ + RequestID: "2", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + expIsDuplicate: true, + }, + { + name: "duplicate - same request with irregularities in query", + req1: DeleteRequest{ + RequestID: "1", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: query1, + }, + req2: DeleteRequest{ + RequestID: "2", + UserID: user1, + StartTime: now.Add(-12 * time.Hour), + EndTime: now.Add(-10 * time.Hour), + Query: "{foo=\"bar\", fizz=`buzz`} |= `foo`", + }, + expIsDuplicate: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + isDuplicate, err := tc.req1.IsDuplicate(&tc.req2) + require.NoError(t, err) + require.Equal(t, tc.expIsDuplicate, isDuplicate) + }) + } +} diff --git a/pkg/compactor/deletion/delete_requests_manager.go b/pkg/compactor/deletion/delete_requests_manager.go index c9d03b354fc7a..97b8c73c9f7f6 100644 --- a/pkg/compactor/deletion/delete_requests_manager.go +++ b/pkg/compactor/deletion/delete_requests_manager.go @@ -35,6 +35,7 @@ type DeleteRequestsManager struct { deleteRequestsToProcess map[string]*userDeleteRequests deleteRequestsToProcessMtx sync.Mutex + duplicateRequests []DeleteRequest metrics *deleteRequestsManagerMetrics wg sync.WaitGroup done chan struct{} @@ -153,6 +154,23 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { continue } } + if ur, ok := d.deleteRequestsToProcess[deleteRequest.UserID]; ok { + for _, requestLoadedForProcessing := range ur.requests { + isDuplicate, err := requestLoadedForProcessing.IsDuplicate(&deleteRequest) + if err != nil { + return err + } + if isDuplicate { + level.Info(util_log.Logger).Log( + "msg", "found duplicate request of one of the requests loaded for processing", + "loaded_request_id", requestLoadedForProcessing.RequestID, + "duplicate_request_id", deleteRequest.RequestID, + "user", deleteRequest.UserID, + ) + d.duplicateRequests = append(d.duplicateRequests, deleteRequest) + } + } + } if reqCount >= d.batchSize { logBatchTruncation(reqCount, len(deleteRequests)) break @@ -366,6 +384,15 @@ func (d *DeleteRequestsManager) MarkPhaseFinished() { d.markRequestAsProcessed(*deleteRequest) } } + + for _, req := range d.duplicateRequests { + level.Info(util_log.Logger).Log("msg", "marking duplicate delete request as processed", + "delete_request_id", req.RequestID, + "sequence_num", req.SequenceNum, + "user", req.UserID, + ) + d.markRequestAsProcessed(req) + } } func (d *DeleteRequestsManager) IntervalMayHaveExpiredChunks(_ model.Interval, userID string) bool { diff --git a/pkg/compactor/deletion/delete_requests_manager_test.go b/pkg/compactor/deletion/delete_requests_manager_test.go index 6eabf2de38799..baa7b60b312db 100644 --- a/pkg/compactor/deletion/delete_requests_manager_test.go +++ b/pkg/compactor/deletion/delete_requests_manager_test.go @@ -48,6 +48,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { expectedResp resp expectedDeletionRangeByUser map[string]model.Interval expectedRequestsMarkedAsProcessed []int + expectedDuplicateRequestsCount int }{ { name: "no delete requests", @@ -895,6 +896,43 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { }, expectedRequestsMarkedAsProcessed: []int{0, 1}, }, + { + name: "duplicate delete request marked as processed with loaded request", + deletionMode: deletionmode.FilterAndDelete, + batchSize: 1, + deleteRequestsFromStore: []DeleteRequest{ + { + RequestID: "1", + UserID: testUserID, + Query: streamSelectorWithLineFilters, + StartTime: now.Add(-24 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + { + RequestID: "2", + UserID: testUserID, + Query: streamSelectorWithLineFilters, + StartTime: now.Add(-24 * time.Hour), + EndTime: now, + Status: StatusReceived, + }, + }, + expectedResp: resp{ + isExpired: true, + expectedFilter: func(_ time.Time, s string, _ ...labels.Label) bool { + return strings.Contains(s, "fizz") + }, + }, + expectedDeletionRangeByUser: map[string]model.Interval{ + testUserID: { + Start: now.Add(-24 * time.Hour), + End: now, + }, + }, + expectedRequestsMarkedAsProcessed: []int{0, 1}, + expectedDuplicateRequestsCount: 1, + }, } { t.Run(tc.name, func(t *testing.T) { mockDeleteRequestsStore := &mockDeleteRequestsStore{deleteRequests: tc.deleteRequestsFromStore} @@ -947,6 +985,7 @@ func TestDeleteRequestsManager_Expired(t *testing.T) { for i, reqIdx := range tc.expectedRequestsMarkedAsProcessed { require.True(t, requestsAreEqual(tc.deleteRequestsFromStore[reqIdx], processedRequests[i])) } + require.Len(t, mgr.duplicateRequests, tc.expectedDuplicateRequestsCount) }) } }