Skip to content

Commit

Permalink
chore: remove unused columns from scylla
Browse files Browse the repository at this point in the history
  • Loading branch information
Rohith BCS authored and Rohith BCS committed Nov 4, 2024
1 parent 2aae3ec commit c033b50
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 193 deletions.
14 changes: 6 additions & 8 deletions mocks/services/dedup/mock_dedup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 7 additions & 15 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1585,8 +1585,7 @@ func getDiffMetrics(
}

type dupStatKey struct {
sourceID string
equalSize bool
sourceID string
}

func (proc *Handle) eventAuditEnabled(workspaceID string) bool {
Expand Down Expand Up @@ -1771,7 +1770,6 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
})
dedupKey := dedupTypes.KeyValue{
Key: fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId),
Value: int64(len(payloadFunc())),
WorkspaceID: batchEvent.WorkspaceId,
JobID: batchEvent.JobID,
}
Expand All @@ -1798,10 +1796,9 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
}

var keyMap map[dedupTypes.KeyValue]bool
var sizeMap map[dedupTypes.KeyValue]int64
var err error
if proc.config.enableDedup {
keyMap, sizeMap, err = proc.dedup.GetBatch(dedupKeysWithWorkspaceID)
keyMap, err = proc.dedup.GetBatch(dedupKeysWithWorkspaceID)
if err != nil {
return nil, err
}
Expand All @@ -1813,13 +1810,11 @@ func (proc *Handle) processJobsForDestV2(partition string, subJobs subJob) (*tra
}

if proc.config.enableDedup {
p := event.payloadFunc()
messageSize := int64(len(p))
dedupKey := event.dedupKey
ok, previousSize := keyMap[dedupKey], sizeMap[dedupKey]
ok := keyMap[dedupKey]
if !ok {
proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey)
sourceDupStats[dupStatKey{sourceID: event.eventParams.SourceId, equalSize: messageSize == previousSize}] += 1
sourceDupStats[dupStatKey{sourceID: event.eventParams.SourceId}] += 1
continue
}
dedupKeys[dedupKey.Key] = struct{}{}
Expand Down Expand Up @@ -2334,16 +2329,14 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*trans
})

if proc.config.enableDedup {
p := payloadFunc()
messageSize := int64(len(p))
dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId)
ok, previousSize, err := proc.dedup.Get(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize, WorkspaceID: batchEvent.WorkspaceId})
ok, err := proc.dedup.Get(dedupTypes.KeyValue{Key: dedupKey, WorkspaceID: batchEvent.WorkspaceId})
if err != nil {
return nil, err
}
if !ok {
proc.logger.Debugf("Dropping event with duplicate dedupKey: %s", dedupKey)
sourceDupStats[dupStatKey{sourceID: source.ID, equalSize: messageSize == previousSize}] += 1
sourceDupStats[dupStatKey{sourceID: source.ID}] += 1
continue
}
dedupKeys[dedupKey] = struct{}{}
Expand Down Expand Up @@ -3624,8 +3617,7 @@ func (proc *Handle) crashRecover() {
func (proc *Handle) updateSourceStats(sourceStats map[dupStatKey]int, bucket string) {
for dupStat, count := range sourceStats {
tags := map[string]string{
"source": dupStat.sourceID,
"equalSize": strconv.FormatBool(dupStat.equalSize),
"source": dupStat.sourceID,
}
sourceStatsD := proc.statsFactory.NewTaggedStat(bucket, stats.CountType, tags)
sourceStatsD.Count(count)
Expand Down
5 changes: 2 additions & 3 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3374,9 +3374,8 @@ var _ = Describe("Processor", Ordered, func() {

callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1)
c.MockDedup.EXPECT().GetBatch(gomock.Any()).Return(map[dedupTypes.KeyValue]bool{
{Key: "message-some-id", Value: 230, JobID: 1010, WorkspaceID: ""}: true,
{Key: "message-some-id", Value: 246, JobID: 1010, WorkspaceID: ""}: true,
{Key: "message-some-id", Value: 246, JobID: 2010, WorkspaceID: ""}: true,
{Key: "message-some-id", JobID: 1010, WorkspaceID: ""}: true,
{Key: "message-some-id", JobID: 2010, WorkspaceID: ""}: true,
}, nil, nil).After(callUnprocessed).AnyTimes()
c.MockDedup.EXPECT().Commit(gomock.Any()).Times(1)

Expand Down
52 changes: 23 additions & 29 deletions services/dedup/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -73,30 +72,28 @@ func NewBadgerDB(conf *config.Config, stats stats.Stats, path string) *Dedup {
}
return &Dedup{
badgerDB: db,
cache: make(map[string]int64),
cache: make(map[string]bool),
}
}

func (d *BadgerDB) Get(key string) (int64, bool, error) {
func (d *BadgerDB) Get(key string) (bool, error) {
defer d.stats.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "badger"}).RecordDuration()()

var payloadSize int64
var found bool
err := d.badgerDB.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err != nil {
return err
}
if itemValue, err := item.ValueCopy(nil); err == nil {
payloadSize, _ = strconv.ParseInt(string(itemValue), 10, 64)
if _, err = item.ValueCopy(nil); err == nil {
found = true
}
return nil
})
if err != nil && !errors.Is(err, badger.ErrKeyNotFound) {
return 0, false, err
return false, err
}
return payloadSize, found, nil
return found, nil
}

func (d *BadgerDB) Set(kvs []types.KeyValue) error {
Expand All @@ -105,8 +102,7 @@ func (d *BadgerDB) Set(kvs []types.KeyValue) error {
defer wb.Cancel()
for i := range kvs {
message := kvs[i]
value := strconv.FormatInt(message.Value, 10)
e := badger.NewEntry([]byte(message.Key), []byte(value)).WithTTL(d.window.Load())
e := badger.NewEntry([]byte(message.Key), []byte("")).WithTTL(d.window.Load())
if err := wb.SetEntry(e); err != nil {
return err
}
Expand Down Expand Up @@ -173,53 +169,51 @@ func (d *BadgerDB) gcLoop() {
type Dedup struct {
badgerDB *BadgerDB
cacheMu sync.Mutex
cache map[string]int64
cache map[string]bool
}

func (d *Dedup) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, map[types.KeyValue]int64, error) {
func (d *Dedup) GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, error) {
err := d.badgerDB.init()
if err != nil {
return nil, nil, err
return nil, err
}

found := make(map[types.KeyValue]bool)
previous := make(map[types.KeyValue]int64)
for _, kv := range kvs {
foundKey, size, err := d.Get(kv)
foundKey, err := d.Get(kv)
if err != nil {
return nil, nil, err
return nil, err
}
found[kv] = foundKey
previous[kv] = size
}
return found, previous, nil
return found, nil
}

func (d *Dedup) Get(kv types.KeyValue) (bool, int64, error) {
func (d *Dedup) Get(kv types.KeyValue) (bool, error) {
err := d.badgerDB.init()
if err != nil {
return false, 0, err
return false, err
}

d.cacheMu.Lock()
previous, found := d.cache[kv.Key]
d.cacheMu.Unlock()
_, found := d.cache[kv.Key]
if found {
return false, previous, nil
return false, nil
}
d.cacheMu.Unlock()

previous, found, err = d.badgerDB.Get(kv.Key)
found, err = d.badgerDB.Get(kv.Key)
if err != nil {
return false, 0, err
return false, err
}

d.cacheMu.Lock()
defer d.cacheMu.Unlock()
if !found { // still not in the cache, but it's in the DB so let's refresh the cache
d.cache[kv.Key] = kv.Value
d.cache[kv.Key] = true
}

return !found, previous, nil
return !found, nil
}

func (d *Dedup) Commit(keys []string) error {
Expand All @@ -230,12 +224,12 @@ func (d *Dedup) Commit(keys []string) error {
kvs := make([]types.KeyValue, len(keys))
d.cacheMu.Lock()
for i, key := range keys {
value, ok := d.cache[key]
_, ok := d.cache[key]
if !ok {
d.cacheMu.Unlock()
return fmt.Errorf("key %v has not been previously set", key)
}
kvs[i] = types.KeyValue{Key: key, Value: value}
kvs[i] = types.KeyValue{Key: key}
}
d.cacheMu.Unlock()

Expand Down
32 changes: 16 additions & 16 deletions services/dedup/badger/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,34 @@ func Test_Badger(t *testing.T) {
require.NotNil(t, badger)
defer badger.Close()
t.Run("Same messageID should be deduped from badger", func(t *testing.T) {
key1 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test"}
key2 := types.KeyValue{Key: "a", Value: 1, WorkspaceID: "test"}
notAvailable, _, err := badger.Get(key1)
key1 := types.KeyValue{Key: "a", WorkspaceID: "test"}
key2 := types.KeyValue{Key: "a", WorkspaceID: "test"}
notAvailable, err := badger.Get(key1)
require.NoError(t, err)
require.True(t, notAvailable)
err = badger.Commit([]string{key1.Key})
require.NoError(t, err)
notAvailable, _, err = badger.Get(key2)
notAvailable, err = badger.Get(key2)
require.NoError(t, err)
require.False(t, notAvailable)
})
t.Run("Same messageID should be deduped from cache", func(t *testing.T) {
key1 := types.KeyValue{Key: "b", Value: 1, WorkspaceID: "test"}
key2 := types.KeyValue{Key: "b", Value: 1, WorkspaceID: "test"}
found, _, err := badger.Get(key1)
key1 := types.KeyValue{Key: "b", WorkspaceID: "test"}
key2 := types.KeyValue{Key: "b", WorkspaceID: "test"}
found, err := badger.Get(key1)
require.NoError(t, err)
require.True(t, found)
found, _, err = badger.Get(key2)
found, err = badger.Get(key2)
require.NoError(t, err)
require.False(t, found)
})
t.Run("different messageID should not be deduped for batch", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "c", Value: 1, WorkspaceID: "test"},
{Key: "d", Value: 1, WorkspaceID: "test"},
{Key: "e", Value: 1, WorkspaceID: "test"},
{Key: "c", WorkspaceID: "test"},
{Key: "d", WorkspaceID: "test"},
{Key: "e", WorkspaceID: "test"},
}
found, _, err := badger.GetBatch(keys)
found, err := badger.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 3)
for _, key := range keys {
Expand All @@ -60,16 +60,16 @@ func Test_Badger(t *testing.T) {
})
t.Run("same messageID should be deduped for batch", func(t *testing.T) {
keys := []types.KeyValue{
{Key: "f", Value: 1, WorkspaceID: "test", JobID: 3},
{Key: "f", Value: 1, WorkspaceID: "test", JobID: 4},
{Key: "g", Value: 1, WorkspaceID: "test", JobID: 5},
{Key: "f", WorkspaceID: "test", JobID: 3},
{Key: "f", WorkspaceID: "test", JobID: 4},
{Key: "g", WorkspaceID: "test", JobID: 5},
}
expected := map[types.KeyValue]bool{
keys[0]: true,
keys[1]: false,
keys[2]: true,
}
found, _, err := badger.GetBatch(keys)
found, err := badger.GetBatch(keys)
require.NoError(t, err)
require.Len(t, found, 3)
for _, key := range keys {
Expand Down
4 changes: 2 additions & 2 deletions services/dedup/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ func New(conf *config.Config, stats stats.Stats) (Dedup, error) {
// Dedup is the interface for deduplication service
type Dedup interface {
// Get returns [true] if it was the first time the key was encountered, otherwise it returns [false] along with the previous value
Get(kv types.KeyValue) (bool, int64, error)
Get(kv types.KeyValue) (bool, error)

// GetBatch
GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, map[types.KeyValue]int64, error)
GetBatch(kvs []types.KeyValue) (map[types.KeyValue]bool, error)

// Commit commits a list of previously set keys to the DB
Commit(keys []string) error
Expand Down
Loading

0 comments on commit c033b50

Please sign in to comment.