diff --git a/mocks/services/dedup/mock_dedup.go b/mocks/services/dedup/mock_dedup.go index f6015a4629..b3cb90175f 100644 --- a/mocks/services/dedup/mock_dedup.go +++ b/mocks/services/dedup/mock_dedup.go @@ -66,13 +66,12 @@ func (mr *MockDedupMockRecorder) Commit(arg0 any) *gomock.Call { } // Get mocks base method. -func (m *MockDedup) Get(arg0 types.KeyValue) (bool, int64, error) { +func (m *MockDedup) Get(arg0 types.KeyValue) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Get", arg0) ret0, _ := ret[0].(bool) - ret1, _ := ret[1].(int64) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret1, _ := ret[1].(error) + return ret0, ret1 } // Get indicates an expected call of Get. @@ -82,13 +81,12 @@ func (mr *MockDedupMockRecorder) Get(arg0 any) *gomock.Call { } // GetBatch mocks base method. -func (m *MockDedup) GetBatch(arg0 []types.KeyValue) (map[types.KeyValue]bool, map[types.KeyValue]int64, error) { +func (m *MockDedup) GetBatch(arg0 []types.KeyValue) (map[types.KeyValue]bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetBatch", arg0) ret0, _ := ret[0].(map[types.KeyValue]bool) - ret1, _ := ret[1].(map[types.KeyValue]int64) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret1, _ := ret[1].(error) + return ret0, ret1 } // GetBatch indicates an expected call of GetBatch. diff --git a/processor/processor.go b/processor/processor.go index 2d5be097f2..ad306af1d1 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1585,8 +1585,7 @@ func getDiffMetrics( } type dupStatKey struct { - sourceID string - equalSize bool + sourceID string } func (proc *Handle) eventAuditEnabled(workspaceID string) bool { @@ -1771,7 +1770,6 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra }) dedupKey := dedupTypes.KeyValue{ Key: fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId), - Value: int64(len(payloadFunc())), WorkspaceID: batchEvent.WorkspaceId, JobID: batchEvent.JobID, } @@ -1798,10 +1796,9 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra } var keyMap map[dedupTypes.KeyValue]bool - var sizeMap map[dedupTypes.KeyValue]int64 var err error if proc.config.enableDedup { - keyMap, sizeMap, err = proc.dedup.GetBatch(dedupKeysWithWorkspaceID) + keyMap, err = proc.dedup.GetBatch(dedupKeysWithWorkspaceID) if err != nil { return nil, err } @@ -1813,13 +1810,11 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra } if proc.config.enableDedup { - p := event.payloadFunc() - messageSize := int64(len(p)) dedupKey := event.dedupKey - ok, previousSize := keyMap[dedupKey], sizeMap[dedupKey] + ok := keyMap[dedupKey] if !ok { proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey) - sourceDupStats[dupStatKey{sourceID: event.eventParams.SourceId, equalSize: messageSize == previousSize}] += 1 + sourceDupStats[dupStatKey{sourceID: event.eventParams.SourceId}] += 1 continue } dedupKeys[dedupKey.Key] = struct{}{} @@ -2334,16 +2329,14 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*trans }) if proc.config.enableDedup { - p := payloadFunc() - messageSize := int64(len(p)) dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId) - ok, previousSize, err := proc.dedup.Get(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize, WorkspaceID: batchEvent.WorkspaceId}) + ok, err := proc.dedup.Get(dedupTypes.KeyValue{Key: dedupKey, WorkspaceID: batchEvent.WorkspaceId}) if err != nil { return nil, err } if !ok { proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey) - sourceDupStats[dupStatKey{sourceID: source.ID, equalSize: messageSize == previousSize}] += 1 + sourceDupStats[dupStatKey{sourceID: source.ID}] += 1 continue } dedupKeys[dedupKey] = struct{}{} @@ -3624,8 +3617,7 @@ func (proc *Handle) crashRecover() { func (proc *Handle) updateSourceStats(sourceStats map[dupStatKey]int, bucket string) { for dupStat, count := range sourceStats { tags := map[string]string{ - "source": dupStat.sourceID, - "equalSize": strconv.FormatBool(dupStat.equalSize), + "source": dupStat.sourceID, } sourceStatsD := proc.statsFactory.NewTaggedStat(bucket, stats.CountType, tags) sourceStatsD.Count(count) diff --git a/processor/processor_test.go b/processor/processor_test.go index 94c7787dca..e70d3b183b 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -3374,9 +3374,8 @@ var _ = Describe("Processor", Ordered, func() { callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1) c.MockDedup.EXPECT().GetBatch(gomock.Any()).Return(map[dedupTypes.KeyValue]bool{ - {Key: "message-some-id", Value: 230, JobID: 1010, WorkspaceID: ""}: true, - {Key: "message-some-id", Value: 246, JobID: 1010, WorkspaceID: ""}: true, - {Key: "message-some-id", Value: 246, JobID: 2010, WorkspaceID: ""}: true, + {Key: "message-some-id", JobID: 1010, WorkspaceID: ""}: true, + {Key: "message-some-id", JobID: 2010, WorkspaceID: ""}: true, }, nil, nil).After(callUnprocessed).AnyTimes() c.MockDedup.EXPECT().Commit(gomock.Any()).Times(1) diff --git a/services/dedup/badger/badger.go b/services/dedup/badger/badger.go index 7a15d49736..a9d74502dc 100644 --- a/services/dedup/badger/badger.go +++ b/services/dedup/badger/badger.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strconv" "sync" "time" @@ -73,30 +72,28 @@ func NewBadgerDB(conf *config.Config, stats stats.Stats, path string) *Dedup { } return &Dedup{ badgerDB: db, - cache: make(map[string]int64), + cache: make(map[string]bool), } } -func (d *BadgerDB) Get(key string) (int64, bool, error) { +func (d *BadgerDB) Get(key string) (bool, error) { defer d.stats.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "badger"}).RecordDuration()() - var payloadSize int64 var found bool err := d.badgerDB.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(key)) if err != nil { return err } - if itemValue, err := item.ValueCopy(nil); err == nil { - payloadSize, _ = strconv.ParseInt(string(itemValue), 10, 64) + if _, err = item.ValueCopy(nil); err == nil { found = true } return nil }) if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { - return 0, false, err + return false, err } - return payloadSize, found, nil + return found, nil } func (d *BadgerDB) Set(kvs []types.KeyValue) error { @@ -105,8 +102,7 @@ func (d *BadgerDB) Set(kvs []types.KeyValue) error { defer wb.Cancel() for i := range kvs { message := kvs[i] - value := strconv.FormatInt(message.Value, 10) - e := badger.NewEntry([]byte(message.Key), []byte(value)).WithTTL(d.window.Load()) + e := badger.NewEntry([]byte(message.Key), []byte("")).WithTTL(d.window.Load()) if err := wb.SetEntry(e); err != nil { return err } @@ -173,53 +169,51 @@ func (d *BadgerDB) gcLoop() { type Dedup struct { badgerDB *BadgerDB cacheMu sync.Mutex - cache map[string]int64 + cache map[string]bool } -func (d *Dedup) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, map[types.KeyValue]int64, error) { +func (d *Dedup) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, error) { err := d.badgerDB.init() if err != nil { - return nil, nil, err + return nil, err } found := make(map[types.KeyValue]bool) - previous := make(map[types.KeyValue]int64) for _, kv := range kvs { - foundKey, size, err := d.Get(kv) + foundKey, err := d.Get(kv) if err != nil { - return nil, nil, err + return nil, err } found[kv] = foundKey - previous[kv] = size } - return found, previous, nil + return found, nil } -func (d *Dedup) Get(kv types.KeyValue) (bool, int64, error) { +func (d *Dedup) Get(kv types.KeyValue) (bool, error) { err := d.badgerDB.init() if err != nil { - return false, 0, err + return false, err } d.cacheMu.Lock() - previous, found := d.cache[kv.Key] - d.cacheMu.Unlock() + _, found := d.cache[kv.Key] if found { - return false, previous, nil + return false, nil } + d.cacheMu.Unlock() - previous, found, err = d.badgerDB.Get(kv.Key) + found, err = d.badgerDB.Get(kv.Key) if err != nil { - return false, 0, err + return false, err } d.cacheMu.Lock() defer d.cacheMu.Unlock() if !found { // still not in the cache, but it's in the DB so let's refresh the cache - d.cache[kv.Key] = kv.Value + d.cache[kv.Key] = true } - return !found, previous, nil + return !found, nil } func (d *Dedup) Commit(keys []string) error { @@ -230,12 +224,12 @@ func (d *Dedup) Commit(keys []string) error { kvs := make([]types.KeyValue, len(keys)) d.cacheMu.Lock() for i, key := range keys { - value, ok := d.cache[key] + _, ok := d.cache[key] if !ok { d.cacheMu.Unlock() return fmt.Errorf("key %v has not been previously set", key) } - kvs[i] = types.KeyValue{Key: key, Value: value} + kvs[i] = types.KeyValue{Key: key} } d.cacheMu.Unlock() diff --git a/services/dedup/badger/badger_test.go b/services/dedup/badger/badger_test.go index dfce12ed4c..525245cdb3 100644 --- a/services/dedup/badger/badger_test.go +++ b/services/dedup/badger/badger_test.go @@ -24,34 +24,34 @@ func Test_Badger(t *testing.T) { require.NotNil(t, badger) defer badger.Close() t.Run("Same messageID should be deduped from badger", func(t *testing.T) { - key1 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test"} - key2 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test"} - notAvailable, _, err := badger.Get(key1) + key1 := types.KeyValue{Key: "a", WorkspaceID: "test"} + key2 := types.KeyValue{Key: "a", WorkspaceID: "test"} + notAvailable, err := badger.Get(key1) require.NoError(t, err) require.True(t, notAvailable) err = badger.Commit([]string{key1.Key}) require.NoError(t, err) - notAvailable, _, err = badger.Get(key2) + notAvailable, err = badger.Get(key2) require.NoError(t, err) require.False(t, notAvailable) }) t.Run("Same messageID should be deduped from cache", func(t *testing.T) { - key1 := types.KeyValue{Key: "b", Value: 1, WorkspaceID: "test"} - key2 := types.KeyValue{Key: "b", Value: 1, WorkspaceID: "test"} - found, _, err := badger.Get(key1) + key1 := types.KeyValue{Key: "b", WorkspaceID: "test"} + key2 := types.KeyValue{Key: "b", WorkspaceID: "test"} + found, err := badger.Get(key1) require.NoError(t, err) require.True(t, found) - found, _, err = badger.Get(key2) + found, err = badger.Get(key2) require.NoError(t, err) require.False(t, found) }) t.Run("different messageID should not be deduped for batch", func(t *testing.T) { keys := []types.KeyValue{ - {Key: "c", Value: 1, WorkspaceID: "test"}, - {Key: "d", Value: 1, WorkspaceID: "test"}, - {Key: "e", Value: 1, WorkspaceID: "test"}, + {Key: "c", WorkspaceID: "test"}, + {Key: "d", WorkspaceID: "test"}, + {Key: "e", WorkspaceID: "test"}, } - found, _, err := badger.GetBatch(keys) + found, err := badger.GetBatch(keys) require.NoError(t, err) require.Len(t, found, 3) for _, key := range keys { @@ -60,16 +60,16 @@ func Test_Badger(t *testing.T) { }) t.Run("same messageID should be deduped for batch", func(t *testing.T) { keys := []types.KeyValue{ - {Key: "f", Value: 1, WorkspaceID: "test", JobID: 3}, - {Key: "f", Value: 1, WorkspaceID: "test", JobID: 4}, - {Key: "g", Value: 1, WorkspaceID: "test", JobID: 5}, + {Key: "f", WorkspaceID: "test", JobID: 3}, + {Key: "f", WorkspaceID: "test", JobID: 4}, + {Key: "g", WorkspaceID: "test", JobID: 5}, } expected := map[types.KeyValue]bool{ keys[0]: true, keys[1]: false, keys[2]: true, } - found, _, err := badger.GetBatch(keys) + found, err := badger.GetBatch(keys) require.NoError(t, err) require.Len(t, found, 3) for _, key := range keys { diff --git a/services/dedup/dedup.go b/services/dedup/dedup.go index 2ca232f3dc..0fac0ac1c1 100644 --- a/services/dedup/dedup.go +++ b/services/dedup/dedup.go @@ -49,10 +49,10 @@ func New(conf *config.Config, stats stats.Stats) (Dedup, error) { // Dedup is the interface for deduplication service type Dedup interface { // Get returns [true] if it was the first time the key was encountered, otherwise it returns [false] along with the previous value - Get(kv types.KeyValue) (bool, int64, error) + Get(kv types.KeyValue) (bool, error) // GetBatch - GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, map[types.KeyValue]int64, error) + GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, error) // Commit commits a list of previously set keys to the DB Commit(keys []string) error diff --git a/services/dedup/dedup_test.go b/services/dedup/dedup_test.go index 08efd03503..3843967c5d 100644 --- a/services/dedup/dedup_test.go +++ b/services/dedup/dedup_test.go @@ -67,33 +67,31 @@ func Test_Dedup(t *testing.T) { defer d.Close() t.Run("if message id is not present in cache and badger db", func(t *testing.T) { - found, _, err := d.Get(types.KeyValue{Key: "a", Value: 1, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) + found, err := d.Get(types.KeyValue{Key: "a", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) require.NoError(t, err) require.Equal(t, true, found) // Checking it again should give us the previous value from the cache - found, value, err := d.Get(types.KeyValue{Key: "a", Value: 2, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) + found, err = d.Get(types.KeyValue{Key: "a", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) require.Nil(t, err) require.Equal(t, false, found) - require.Equal(t, int64(1), value) }) t.Run("if message is committed, previous value should always return", func(t *testing.T) { - found, _, err := d.Get(types.KeyValue{Key: "b", Value: 1, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) + found, err := d.Get(types.KeyValue{Key: "b", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) require.Nil(t, err) require.Equal(t, true, found) err = d.Commit([]string{"a"}) require.NoError(t, err) - found, value, err := d.Get(types.KeyValue{Key: "b", Value: 2, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) + found, err = d.Get(types.KeyValue{Key: "b", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) require.Nil(t, err) require.Equal(t, false, found) - require.Equal(t, int64(1), value) }) t.Run("committing a messageid not present in cache", func(t *testing.T) { - found, _, err := d.Get(types.KeyValue{Key: "c", Value: 1, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) + found, err := d.Get(types.KeyValue{Key: "c", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) require.Nil(t, err) require.Equal(t, true, found) @@ -103,11 +101,11 @@ func Test_Dedup(t *testing.T) { t.Run("test GetBatch with unique keys", func(t *testing.T) { kvs := []types.KeyValue{ - {Key: "e", Value: 1, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}, - {Key: "f", Value: 1, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}, - {Key: "g", Value: 1, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}, + {Key: "e", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}, + {Key: "f", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}, + {Key: "g", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}, } - found, _, err := d.GetBatch(kvs) + found, err := d.GetBatch(kvs) require.Nil(t, err) require.Len(t, found, 3) for _, kv := range kvs { @@ -119,16 +117,16 @@ func Test_Dedup(t *testing.T) { t.Run("test GetBatch with non-unique keys", func(t *testing.T) { kvs := []types.KeyValue{ - {Key: "g", Value: 1, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh", JobID: 3}, - {Key: "h", Value: 1, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh", JobID: 4}, - {Key: "h", Value: 1, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh", JobID: 5}, + {Key: "g", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh", JobID: 3}, + {Key: "h", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh", JobID: 4}, + {Key: "h", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh", JobID: 5}, } expected := map[types.KeyValue]bool{ kvs[0]: false, kvs[1]: true, kvs[2]: false, } - found, _, err := d.GetBatch(kvs) + found, err := d.GetBatch(kvs) require.Nil(t, err) require.Len(t, found, 3) for _, kv := range kvs { @@ -156,19 +154,19 @@ func Test_Dedup_Window(t *testing.T) { require.Nil(t, err) defer d.Close() - found, _, err := d.Get(types.KeyValue{Key: "to be deleted", Value: 1, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) + found, err := d.Get(types.KeyValue{Key: "to be deleted", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) require.Nil(t, err) require.Equal(t, true, found) err = d.Commit([]string{"to be deleted"}) require.NoError(t, err) - found, _, err = d.Get(types.KeyValue{Key: "to be deleted", Value: 2, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) + found, err = d.Get(types.KeyValue{Key: "to be deleted", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) require.Nil(t, err) require.Equal(t, false, found) require.Eventually(t, func() bool { - found, _, err = d.Get(types.KeyValue{Key: "to be deleted", Value: 3, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) + found, err = d.Get(types.KeyValue{Key: "to be deleted", WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) require.Nil(t, err) return found }, 2*time.Second, 100*time.Millisecond) @@ -180,7 +178,7 @@ func Test_Dedup_ErrTxnTooBig(t *testing.T) { misc.Init() dbPath := os.TempDir() + "/dedup_test_errtxntoobig" - defer os.RemoveAll(dbPath) + defer func() { _ = os.RemoveAll(dbPath) }() conf := config.New() t.Setenv("RUDDER_TMPDIR", dbPath) d, err := dedup.New(conf, stats.Default) @@ -192,7 +190,7 @@ func Test_Dedup_ErrTxnTooBig(t *testing.T) { for i := 0; i < size; i++ { key := uuid.New().String() messages[i] = key - _, _, _ = d.Get(types.KeyValue{Key: key, Value: int64(i + 1), WorkspaceID: "test"}) + _, _ = d.Get(types.KeyValue{Key: key, WorkspaceID: "test"}) } err = d.Commit(messages) require.NoError(t, err) @@ -221,13 +219,12 @@ func Benchmark_Dedup(b *testing.B) { key := uuid.New().String() msgIDs[i%batchSize] = types.KeyValue{ Key: key, - Value: int64(i + 1), WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh", } keys = append(keys, key) if i%batchSize == batchSize-1 || i == b.N-1 { for _, msgID := range msgIDs[:i%batchSize] { - _, _, _ = d.Get(msgID) + _, _ = d.Get(msgID) } err := d.Commit(keys) require.NoError(b, err) @@ -295,7 +292,7 @@ func Benchmark_DedupModes(b *testing.B) { b.Run(tc.name, func(b *testing.B) { for i := 0; i < b.N; i++ { key := uuid.New().String() - _, _, err = d.Get(types.KeyValue{Key: key, Value: int64(i + 1), WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) + _, err = d.Get(types.KeyValue{Key: key, WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) require.NoError(b, err) err = d.Commit([]string{key}) require.NoError(b, err) diff --git a/services/dedup/mirrorBadger/mirrorBadger.go b/services/dedup/mirrorBadger/mirrorBadger.go index 26837a6269..505925754e 100644 --- a/services/dedup/mirrorBadger/mirrorBadger.go +++ b/services/dedup/mirrorBadger/mirrorBadger.go @@ -32,19 +32,19 @@ func (mb *MirrorBadger) Close() { mb.badger.Close() } -func (mb *MirrorBadger) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, map[types.KeyValue]int64, error) { +func (mb *MirrorBadger) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, error) { defer mb.stat.NewTaggedStat("dedup_get_batch_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_badger"}).RecordDuration()() - _, _, err := mb.scylla.GetBatch(kvs) + _, err := mb.scylla.GetBatch(kvs) if err != nil { mb.stat.NewTaggedStat("dedup_mirror_badger_get_batch_error", stats.CountType, stats.Tags{}).Increment() } return mb.badger.GetBatch(kvs) } -func (mb *MirrorBadger) Get(kv types.KeyValue) (bool, int64, error) { +func (mb *MirrorBadger) Get(kv types.KeyValue) (bool, error) { defer mb.stat.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_badger"}).RecordDuration()() - _, _, err := mb.scylla.Get(kv) + _, err := mb.scylla.Get(kv) if err != nil { mb.stat.NewTaggedStat("dedup_mirror_badger_get_error", stats.CountType, stats.Tags{}).Increment() } diff --git a/services/dedup/mirrorBadger/mirrorBadger_test.go b/services/dedup/mirrorBadger/mirrorBadger_test.go index 40a23c8d6e..acbc48d655 100644 --- a/services/dedup/mirrorBadger/mirrorBadger_test.go +++ b/services/dedup/mirrorBadger/mirrorBadger_test.go @@ -41,40 +41,40 @@ func Test_MirrorBadger(t *testing.T) { require.NotNil(t, mirrorBadger) defer mirrorBadger.Close() t.Run("Same messageID should be deduped from badger", func(t *testing.T) { - key1 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test"} - key2 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test"} - found, _, err := mirrorBadger.Get(key1) + key1 := types.KeyValue{Key: "a", WorkspaceID: "test"} + key2 := types.KeyValue{Key: "a", WorkspaceID: "test"} + found, err := mirrorBadger.Get(key1) require.Nil(t, err) require.True(t, found) err = mirrorBadger.Commit([]string{key1.Key}) require.NoError(t, err) - found, _, err = mirrorBadger.Get(key2) + found, err = mirrorBadger.Get(key2) require.Nil(t, err) require.False(t, found) - found, _, err = mirrorBadger.scylla.Get(key1) + found, err = mirrorBadger.scylla.Get(key1) require.Nil(t, err) require.False(t, found) - found, _, err = mirrorBadger.badger.Get(key1) + found, err = mirrorBadger.badger.Get(key1) require.Nil(t, err) require.False(t, found) }) t.Run("Same messageID should be deduped from cache", func(t *testing.T) { - key1 := types.KeyValue{Key: "b", Value: 1, WorkspaceID: "test"} - key2 := types.KeyValue{Key: "b", Value: 1, WorkspaceID: "test"} - found, _, err := mirrorBadger.Get(key1) + key1 := types.KeyValue{Key: "b", WorkspaceID: "test"} + key2 := types.KeyValue{Key: "b", WorkspaceID: "test"} + found, err := mirrorBadger.Get(key1) require.Nil(t, err) require.True(t, found) - found, _, err = mirrorBadger.Get(key2) + found, err = mirrorBadger.Get(key2) require.Nil(t, err) require.False(t, found) }) t.Run("different messageID should not be deduped for batch", func(t *testing.T) { keys := []types.KeyValue{ - {Key: "c", Value: 1, WorkspaceID: "test"}, - {Key: "d", Value: 1, WorkspaceID: "test"}, - {Key: "e", Value: 1, WorkspaceID: "test"}, + {Key: "c", WorkspaceID: "test"}, + {Key: "d", WorkspaceID: "test"}, + {Key: "e", WorkspaceID: "test"}, } - found, _, err := mirrorBadger.GetBatch(keys) + found, err := mirrorBadger.GetBatch(keys) require.NoError(t, err) require.Len(t, found, 3) for _, key := range keys { @@ -83,16 +83,16 @@ func Test_MirrorBadger(t *testing.T) { }) t.Run("same messageID should be deduped for batch", func(t *testing.T) { keys := []types.KeyValue{ - {Key: "f", Value: 1, WorkspaceID: "test", JobID: 3}, - {Key: "f", Value: 1, WorkspaceID: "test", JobID: 4}, - {Key: "g", Value: 1, WorkspaceID: "test", JobID: 5}, + {Key: "f", WorkspaceID: "test", JobID: 3}, + {Key: "f", WorkspaceID: "test", JobID: 4}, + {Key: "g", WorkspaceID: "test", JobID: 5}, } expected := map[types.KeyValue]bool{ keys[0]: true, keys[1]: false, keys[2]: true, } - found, _, err := mirrorBadger.GetBatch(keys) + found, err := mirrorBadger.GetBatch(keys) require.NoError(t, err) require.Len(t, found, 3) for _, key := range keys { diff --git a/services/dedup/mirrorScylla/mirrorScylla.go b/services/dedup/mirrorScylla/mirrorScylla.go index 624a60c4ca..b60b6a9102 100644 --- a/services/dedup/mirrorScylla/mirrorScylla.go +++ b/services/dedup/mirrorScylla/mirrorScylla.go @@ -32,19 +32,19 @@ func (ms *MirrorScylla) Close() { ms.badger.Close() } -func (ms *MirrorScylla) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, map[types.KeyValue]int64, error) { +func (ms *MirrorScylla) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, error) { defer ms.stat.NewTaggedStat("dedup_get_batch_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_scylla"}).RecordDuration()() - _, _, err := ms.badger.GetBatch(kvs) + _, err := ms.badger.GetBatch(kvs) if err != nil { ms.stat.NewTaggedStat("dedup_mirror_scylla_get_batch_error", stats.CountType, stats.Tags{}).Increment() } return ms.scylla.GetBatch(kvs) } -func (ms *MirrorScylla) Get(kv types.KeyValue) (bool, int64, error) { +func (ms *MirrorScylla) Get(kv types.KeyValue) (bool, error) { defer ms.stat.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_scylla"}).RecordDuration()() - _, _, err := ms.badger.Get(kv) + _, err := ms.badger.Get(kv) if err != nil { ms.stat.NewTaggedStat("dedup_mirror_scylla_get_error", stats.CountType, stats.Tags{}).Increment() } diff --git a/services/dedup/mirrorScylla/mirrorScylla_test.go b/services/dedup/mirrorScylla/mirrorScylla_test.go index fd9ede15c0..d056a8ef88 100644 --- a/services/dedup/mirrorScylla/mirrorScylla_test.go +++ b/services/dedup/mirrorScylla/mirrorScylla_test.go @@ -41,54 +41,54 @@ func Test_MirrorBadger(t *testing.T) { require.NotNil(t, mirrorScylla) defer mirrorScylla.Close() t.Run("Same messageID should not be deduped for different workspace", func(t *testing.T) { - key1 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test1"} - key2 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test2"} - found, _, err := mirrorScylla.Get(key1) + key1 := types.KeyValue{Key: "a", WorkspaceID: "test1"} + key2 := types.KeyValue{Key: "a", WorkspaceID: "test2"} + found, err := mirrorScylla.Get(key1) require.Nil(t, err) require.True(t, found) err = mirrorScylla.Commit([]string{key1.Key}) require.NoError(t, err) - found, _, err = mirrorScylla.Get(key2) + found, err = mirrorScylla.Get(key2) require.Nil(t, err) require.True(t, found) err = mirrorScylla.Commit([]string{key2.Key}) require.NoError(t, err) }) t.Run("Same messageID should be deduped for same workspace", func(t *testing.T) { - key1 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test"} - key2 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test"} - found, _, err := mirrorScylla.Get(key1) + key1 := types.KeyValue{Key: "a", WorkspaceID: "test"} + key2 := types.KeyValue{Key: "a", WorkspaceID: "test"} + found, err := mirrorScylla.Get(key1) require.Nil(t, err) require.True(t, found) err = mirrorScylla.Commit([]string{key1.Key}) require.NoError(t, err) - found, _, err = mirrorScylla.Get(key2) + found, err = mirrorScylla.Get(key2) require.Nil(t, err) require.False(t, found) - found, _, err = mirrorScylla.scylla.Get(key1) + found, err = mirrorScylla.scylla.Get(key1) require.Nil(t, err) require.False(t, found) - found, _, err = mirrorScylla.badger.Get(key1) + found, err = mirrorScylla.badger.Get(key1) require.Nil(t, err) require.False(t, found) }) t.Run("Same messageID should be deduped for same workspace from cache", func(t *testing.T) { - key1 := types.KeyValue{Key: "b", Value: 1, WorkspaceID: "test"} - key2 := types.KeyValue{Key: "b", Value: 1, WorkspaceID: "test"} - found, _, err := mirrorScylla.Get(key1) + key1 := types.KeyValue{Key: "b", WorkspaceID: "test"} + key2 := types.KeyValue{Key: "b", WorkspaceID: "test"} + found, err := mirrorScylla.Get(key1) require.Nil(t, err) require.True(t, found) - found, _, err = mirrorScylla.Get(key2) + found, err = mirrorScylla.Get(key2) require.Nil(t, err) require.False(t, found) }) t.Run("different messageID should not be deduped for batch", func(t *testing.T) { keys := []types.KeyValue{ - {Key: "c", Value: 1, WorkspaceID: "test"}, - {Key: "d", Value: 1, WorkspaceID: "test"}, - {Key: "e", Value: 1, WorkspaceID: "test"}, + {Key: "c", WorkspaceID: "test"}, + {Key: "d", WorkspaceID: "test"}, + {Key: "e", WorkspaceID: "test"}, } - found, _, err := mirrorScylla.GetBatch(keys) + found, err := mirrorScylla.GetBatch(keys) require.NoError(t, err) require.Len(t, found, 3) for _, key := range keys { @@ -97,16 +97,16 @@ func Test_MirrorBadger(t *testing.T) { }) t.Run("same messageID should be deduped for batch", func(t *testing.T) { keys := []types.KeyValue{ - {Key: "f", Value: 1, WorkspaceID: "test", JobID: 3}, - {Key: "f", Value: 1, WorkspaceID: "test", JobID: 4}, - {Key: "g", Value: 1, WorkspaceID: "test", JobID: 5}, + {Key: "f", WorkspaceID: "test", JobID: 3}, + {Key: "f", WorkspaceID: "test", JobID: 4}, + {Key: "g", WorkspaceID: "test", JobID: 5}, } expected := map[types.KeyValue]bool{ keys[0]: true, keys[1]: false, keys[2]: true, } - found, _, err := mirrorScylla.GetBatch(keys) + found, err := mirrorScylla.GetBatch(keys) require.NoError(t, err) require.Len(t, found, 3) for _, key := range keys { diff --git a/services/dedup/scylla/scylla.go b/services/dedup/scylla/scylla.go index b36e8aa2ce..c82bc2d2a0 100644 --- a/services/dedup/scylla/scylla.go +++ b/services/dedup/scylla/scylla.go @@ -34,26 +34,22 @@ func (d *ScyllaDB) Close() { d.scylla.Close() } -func (d *ScyllaDB) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, map[types.KeyValue]int64, error) { +func (d *ScyllaDB) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, error) { defer d.stat.NewTaggedStat("dedup_get_batch_duration_seconds", stats.TimerType, stats.Tags{"mode": "scylla"}).RecordDuration()() // Prepare a map to store results for each job (true = accept, false = reject) results := make(map[types.KeyValue]bool) - sizes := make(map[types.KeyValue]int64) - d.stat.NewTaggedStat("dedup_get_batch_size", stats.GaugeType, stats.Tags{"mode": "scylla"}).Gauge(len(kvs)) // Group jobs by workspaceID for batch querying workspaceJobsMap := make(map[string][]types.KeyValue) d.cacheMu.Lock() for _, kv := range kvs { - if previous, found := d.cache[kv.Key]; found { + if _, found := d.cache[kv.Key]; found { results[kv] = false - sizes[kv] = previous.Value continue } d.cache[kv.Key] = kv results[kv] = true - sizes[kv] = kv.Value workspaceJobsMap[kv.WorkspaceID] = append(workspaceJobsMap[kv.WorkspaceID], kv) } d.cacheMu.Unlock() @@ -79,22 +75,21 @@ func (d *ScyllaDB) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, map[ val, ok := d.cache[dbMessageID] if ok { results[val] = false - sizes[val] = size } delete(d.cache, dbMessageID) d.cacheMu.Unlock() } if err := iter.Close(); err != nil { - return nil, nil, fmt.Errorf("error closing iterator: %v", err) + return nil, fmt.Errorf("error closing iterator: %v", err) } d.stat.NewTaggedStat("dedup_get_batch_query_duration_seconds", stats.TimerType, stats.Tags{"mode": "scylla"}).Since(startTime) } } - return results, sizes, nil + return results, nil } -func (d *ScyllaDB) Get(kv types.KeyValue) (bool, int64, error) { +func (d *ScyllaDB) Get(kv types.KeyValue) (bool, error) { defer d.stat.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "scylla"}).RecordDuration()() var err error @@ -103,21 +98,21 @@ func (d *ScyllaDB) Get(kv types.KeyValue) (bool, int64, error) { // Check if the key exists in the cache // This is essential if we get the same key multiple times in the same batch // Since we are not committing the keys immediately, we need to keep track of the keys in the cache - if previous, found := d.cache[kv.Key]; found { - return false, previous.Value, nil + if _, found := d.cache[kv.Key]; found { + return false, nil } // Check if the key exists in the DB - var value int64 - err = d.scylla.Query(fmt.Sprintf("SELECT size FROM %s.%q WHERE id = ? and workspaceId = ?", d.keyspace, d.tableName), kv.Key, kv.WorkspaceID).Scan(&value) + var id string + err = d.scylla.Query(fmt.Sprintf("SELECT id FROM %s.%q WHERE id = ? and workspaceId = ?", d.keyspace, d.tableName), kv.Key, kv.WorkspaceID).Scan(&id) if err != nil && !errors.Is(err, gocql.ErrNotFound) { - return false, 0, fmt.Errorf("error getting key %s: %v", kv.Key, err) + return false, fmt.Errorf("error getting key %s: %v", kv.Key, err) } exists := errors.Is(err, gocql.ErrNotFound) if exists { d.cache[kv.Key] = kv } - return exists, kv.Value, nil + return exists, nil } func (d *ScyllaDB) Commit(keys []string) error { @@ -131,7 +126,7 @@ func (d *ScyllaDB) Commit(keys []string) error { d.cacheMu.Unlock() return fmt.Errorf("key %v has not been previously set", key) } - kvs[i] = types.KeyValue{Key: key, Value: value.Value, WorkspaceID: value.WorkspaceID} + kvs[i] = types.KeyValue{Key: key, WorkspaceID: value.WorkspaceID} } d.cacheMu.Unlock() batches := lo.Chunk(kvs, d.writeBatchSize) @@ -139,8 +134,8 @@ func (d *ScyllaDB) Commit(keys []string) error { scyllaBatch := d.scylla.NewBatch(gocql.LoggedBatch) for _, key := range batch { scyllaBatch.Entries = append(scyllaBatch.Entries, gocql.BatchEntry{ - Stmt: fmt.Sprintf("INSERT INTO %s.%q (id,size,workspaceId,ts) VALUES (?,?,?,?) USING TTL %d", d.keyspace, d.tableName, d.ttl), - Args: []interface{}{key.Key, key.Value, key.WorkspaceID, time.Now()}, + Stmt: fmt.Sprintf("INSERT INTO %s.%q (id,workspaceId) VALUES (?,?) USING TTL %d", d.keyspace, d.tableName, d.ttl), + Args: []interface{}{key.Key, key.WorkspaceID}, }) } if err := d.scylla.ExecuteBatch(scyllaBatch); err != nil { @@ -173,7 +168,7 @@ func New(conf *config.Config, stats stats.Stats) (*ScyllaDB, error) { keySpace := conf.GetString("Scylla.Keyspace", "rudder") table := conf.GetString("Scylla.TableName", "dedup") - err = session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%q ( id text,size bigint, workspaceId text, ts timestamp,PRIMARY KEY ((id, workspaceId), ts)) WITH bloom_filter_fp_chance = 0.005;", keySpace, table)).Exec() + err = session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%q ( id text, workspaceId text, PRIMARY KEY (id, workspaceId) WITH bloom_filter_fp_chance = 0.005;", keySpace, table)).Exec() if err != nil { return nil, err } @@ -183,7 +178,7 @@ func New(conf *config.Config, stats stats.Stats) (*ScyllaDB, error) { conf: conf, keyspace: keySpace, stat: stats, - ttl: conf.GetInt("Scylla.TTL", 1209600), // TTL is defaulted to seconds + ttl: conf.GetInt("Scylla.TTL", 864000), // TTL is defaulted to seconds readBatchSize: conf.GetInt("Scylla.ReadBatchSize", 100), writeBatchSize: conf.GetInt("Scylla.WriteBatchSize", 100), tableName: table, diff --git a/services/dedup/scylla/scylla_test.go b/services/dedup/scylla/scylla_test.go index 98e96580fa..6ddbe8ff78 100644 --- a/services/dedup/scylla/scylla_test.go +++ b/services/dedup/scylla/scylla_test.go @@ -32,53 +32,53 @@ func Test_Scylla(t *testing.T) { require.NotNil(t, scylla) defer scylla.Close() t.Run("Same messageID should not be deduped for different workspace", func(t *testing.T) { - key1 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test1"} - key2 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test2"} - found, _, err := scylla.Get(key1) + key1 := types.KeyValue{Key: "a", WorkspaceID: "test1"} + key2 := types.KeyValue{Key: "a", WorkspaceID: "test2"} + found, err := scylla.Get(key1) require.Nil(t, err) require.True(t, found) err = scylla.Commit([]string{key1.Key}) require.NoError(t, err) - found, _, err = scylla.Get(key2) + found, err = scylla.Get(key2) require.Nil(t, err) require.True(t, found) err = scylla.Commit([]string{key2.Key}) require.NoError(t, err) }) t.Run("Same messageID should be deduped for same workspace", func(t *testing.T) { - key1 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test"} - key2 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test"} - found, _, err := scylla.Get(key1) + key1 := types.KeyValue{Key: "a", WorkspaceID: "test"} + key2 := types.KeyValue{Key: "a", WorkspaceID: "test"} + found, err := scylla.Get(key1) require.NoError(t, err) require.True(t, found) err = scylla.Commit([]string{key1.Key}) require.NoError(t, err) - found, _, err = scylla.Get(key2) + found, err = scylla.Get(key2) require.NoError(t, err) require.False(t, found) }) t.Run("Same messageID should be deduped for same workspace from cache", func(t *testing.T) { - key1 := types.KeyValue{Key: "b", Value: 1, WorkspaceID: "test"} - key2 := types.KeyValue{Key: "b", Value: 1, WorkspaceID: "test"} - found, _, err := scylla.Get(key1) + key1 := types.KeyValue{Key: "b", WorkspaceID: "test"} + key2 := types.KeyValue{Key: "b", WorkspaceID: "test"} + found, err := scylla.Get(key1) require.NoError(t, err) require.True(t, found) - found, _, err = scylla.Get(key2) + found, err = scylla.Get(key2) require.NoError(t, err) require.False(t, found) }) t.Run("Same messageID should be deduped for same workspace from cache for Batch call", func(t *testing.T) { keys := []types.KeyValue{ - {Key: "c", Value: 1, WorkspaceID: "test", JobID: 1}, - {Key: "c", Value: 1, WorkspaceID: "test", JobID: 2}, - {Key: "d", Value: 1, WorkspaceID: "test", JobID: 3}, + {Key: "c", WorkspaceID: "test", JobID: 1}, + {Key: "c", WorkspaceID: "test", JobID: 2}, + {Key: "d", WorkspaceID: "test", JobID: 3}, } expected := map[types.KeyValue]bool{ keys[0]: true, keys[1]: false, keys[2]: true, } - found, _, err := scylla.GetBatch(keys) + found, err := scylla.GetBatch(keys) require.NoError(t, err) require.Len(t, found, 3) for _, key := range keys { @@ -89,11 +89,11 @@ func Test_Scylla(t *testing.T) { }) t.Run("Different messageID should not be deduped for same workspace", func(t *testing.T) { keys := []types.KeyValue{ - {Key: "e", Value: 1, WorkspaceID: "test", JobID: 1}, - {Key: "f", Value: 1, WorkspaceID: "test", JobID: 2}, - {Key: "g", Value: 1, WorkspaceID: "test", JobID: 3}, + {Key: "e", WorkspaceID: "test", JobID: 1}, + {Key: "f", WorkspaceID: "test", JobID: 2}, + {Key: "g", WorkspaceID: "test", JobID: 3}, } - found, _, err := scylla.GetBatch(keys) + found, err := scylla.GetBatch(keys) require.NoError(t, err) require.Len(t, found, 3) for _, key := range keys { @@ -102,9 +102,9 @@ func Test_Scylla(t *testing.T) { }) t.Run("Same messageID should not be deduped for different workspace", func(t *testing.T) { keys := []types.KeyValue{ - {Key: "h", Value: 1, WorkspaceID: "test1", JobID: 1}, + {Key: "h", WorkspaceID: "test1", JobID: 1}, } - found, _, err := scylla.GetBatch(keys) + found, err := scylla.GetBatch(keys) require.NoError(t, err) require.Len(t, found, 1) for _, key := range keys { @@ -113,9 +113,9 @@ func Test_Scylla(t *testing.T) { err = scylla.Commit([]string{"h"}) require.NoError(t, err) keys = []types.KeyValue{ - {Key: "h", Value: 1, WorkspaceID: "test2", JobID: 1}, + {Key: "h", WorkspaceID: "test2", JobID: 1}, } - found, _, err = scylla.GetBatch(keys) + found, err = scylla.GetBatch(keys) require.NoError(t, err) require.Len(t, found, 1) for _, key := range keys { @@ -147,7 +147,7 @@ func Benchmark_ScyllaGet(b *testing.B) { for j := 0; j < 100; j++ { key := uuid.New().String() keys = append(keys, key) - _, _, err = scylla.Get(types.KeyValue{Key: key, Value: 1, WorkspaceID: "test", JobID: int64(i*100 + j)}) + _, err = scylla.Get(types.KeyValue{Key: key, WorkspaceID: "test", JobID: int64(i*100 + j)}) require.NoError(b, err) } err = scylla.Commit(keys) @@ -160,10 +160,10 @@ func Benchmark_ScyllaGet(b *testing.B) { var commitKeys []string for j := 0; j < 100; j++ { key := uuid.New().String() - keys = append(keys, types.KeyValue{Key: key, Value: 1, WorkspaceID: "test", JobID: int64(i*100 + j)}) + keys = append(keys, types.KeyValue{Key: key, WorkspaceID: "test", JobID: int64(i*100 + j)}) commitKeys = append(commitKeys, key) } - _, _, err = scylla.GetBatch(keys) + _, err = scylla.GetBatch(keys) require.NoError(b, err) err = scylla.Commit(commitKeys) require.NoError(b, err) diff --git a/services/dedup/types/types.go b/services/dedup/types/types.go index 418686b48f..221620b0b5 100644 --- a/services/dedup/types/types.go +++ b/services/dedup/types/types.go @@ -2,7 +2,6 @@ package types type KeyValue struct { Key string - Value int64 WorkspaceID string JobID int64 }