Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: disabling merge before processor.Store(...) - default true #5338

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type Handle struct {
isolationMode isolation.Mode
mainLoopTimeout time.Duration
enablePipelining bool
disableStoreMerge config.ValueLoader[bool]
pipelineBufferedItems int
subJobSize int
pingerSleep config.ValueLoader[time.Duration]
Expand Down Expand Up @@ -792,6 +793,7 @@ func (proc *Handle) loadConfig() {
}

proc.config.enablePipelining = config.GetBoolVar(true, "Processor.enablePipelining")
proc.config.disableStoreMerge = config.GetReloadableBoolVar(false, "Processor.disableStoreMerge")
proc.config.pipelineBufferedItems = config.GetIntVar(0, 1, "Processor.pipelineBufferedItems")
proc.config.subJobSize = config.GetIntVar(defaultSubJobSize, 1, "Processor.subJobSize")
// Enable dedup of incoming events by default
Expand Down
11 changes: 8 additions & 3 deletions processor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,14 @@ func (w *worker) start() {
defer w.lifecycle.wg.Done()
defer w.logger.Debugf("store routine stopped for worker: %s", w.partition)
for subJob := range w.channel.store {

if firstSubJob && !subJob.hasMore {
w.handle.Store(w.partition, subJob)
if w.handle.config().disableStoreMerge.Load() {
if firstSubJob {
w.handle.Store(w.partition, subJob)
continue
}
mergedJob.merge(subJob)
w.handle.Store(w.partition, mergedJob)
firstSubJob = true
continue
}
Comment on lines +108 to 117
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Sidddddarth I'm a bit confused by this. Shouldn't feature flags be backwards compatible? Meaning, given the default of the new setting, nothing should change in terms of behaviour.

I see that the default for the new setting is true. In that case we get into the new if on line 108 but we're not doing what we have on main. The logic is different now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right
I wanted to make this the new behaviour. But it might not make sense in all cases
Will change the default


Expand Down
1 change: 1 addition & 0 deletions processor/worker_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type workerHandle interface {
type workerHandleConfig struct {
maxEventsToProcess config.ValueLoader[int]

disableStoreMerge config.ValueLoader[bool]
enablePipelining bool
enableParallelScan bool
pipelineBufferedItems int
Expand Down
1 change: 1 addition & 0 deletions processor/worker_handle_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func (h *workerHandleAdapter) logger() logger.Logger {

func (h *workerHandleAdapter) config() workerHandleConfig {
return workerHandleConfig{
disableStoreMerge: h.Handle.config.disableStoreMerge,
enablePipelining: h.Handle.config.enablePipelining,
pipelineBufferedItems: h.Handle.config.pipelineBufferedItems,
maxEventsToProcess: h.Handle.config.maxEventsToProcess,
Expand Down
121 changes: 119 additions & 2 deletions processor/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -247,6 +248,7 @@ func (m *mockWorkerHandle) logger() logger.Logger {
func (m *mockWorkerHandle) config() workerHandleConfig {
return workerHandleConfig{
enablePipelining: m.pipelining,
disableStoreMerge: config.SingleValueLoader(true),
maxEventsToProcess: config.SingleValueLoader(m.loopEvents),
pipelineBufferedItems: 1,
subJobSize: 10,
Expand Down Expand Up @@ -351,12 +353,12 @@ func (m *mockWorkerHandle) jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsourc
rsourcesStats: rsourcesStats,
},
{
subJobs: jobs[len(jobs)/3 : 2*len(jobs)/2],
subJobs: jobs[len(jobs)/3 : 2*len(jobs)/3],
hasMore: true,
rsourcesStats: rsourcesStats,
},
{
subJobs: jobs[2*len(jobs)/2:],
subJobs: jobs[2*len(jobs)/3:],
hasMore: false,
rsourcesStats: rsourcesStats,
},
Expand Down Expand Up @@ -414,3 +416,118 @@ func (m *mockWorkerHandle) Store(partition string, in *storeMessage) {
m.partitionStats[partition] = s
m.log.Infof("Store partition: %s stats: %+v", partition, s)
}

type mockWorkerHandle2 struct {
*mockWorkerHandle
doTransform chan struct{}
storeResult []int
disableStoreMerge *atomic.Bool
}

func (m *mockWorkerHandle2) getJobs(partition string) jobsdb.JobsResult {
j := m.mockWorkerHandle.getJobs(partition)
for i := range j.Jobs {
j.Jobs[i].JobID = int64(i)
}
return j
}

func (m *mockWorkerHandle2) transformations(partition string, in *transformationMessage) *storeMessage {
<-m.doTransform
return m.mockWorkerHandle.transformations(partition, in)
}

func (m *mockWorkerHandle2) Store(partition string, in *storeMessage) {
m.mockWorkerHandle.Store(partition, in)
m.statsMu.Lock()
defer m.statsMu.Unlock()
m.storeResult = append(m.storeResult, in.totalEvents)
}

func (m *mockWorkerHandle2) config() workerHandleConfig {
return workerHandleConfig{
enablePipelining: m.pipelining,
disableStoreMerge: m.disableStoreMerge,
maxEventsToProcess: config.SingleValueLoader(m.loopEvents),
pipelineBufferedItems: 1,
subJobSize: 10,
readLoopSleep: config.SingleValueLoader(1 * time.Millisecond),
maxLoopSleep: config.SingleValueLoader(100 * time.Millisecond),
enableParallelScan: m.enableParallelScan,
}
}

func TestReloadableMergeBeforeStore(t *testing.T) {
wh1 := &mockWorkerHandle{
pipelining: true,
log: logger.NOP,
loopEvents: 100,
partitionStats: map[string]struct {
queried int
marked int
processed int
transformed int
stored int
subBatches int
trackedUsers int
}{},
limitsReached: true,
shouldProcessMultipleSubJobs: true,
enableParallelScan: true,
}
wh := &mockWorkerHandle2{
mockWorkerHandle: wh1,
doTransform: make(chan struct{}),
disableStoreMerge: &atomic.Bool{},
}

ctx, cancel := context.WithCancel(context.Background())
poolCtx, poolCancel := context.WithCancel(ctx)
var limiterWg sync.WaitGroup
wh.limiters.query = kitsync.NewLimiter(poolCtx, &limiterWg, "query", 2, stats.Default)
wh.limiters.process = kitsync.NewLimiter(poolCtx, &limiterWg, "process", 2, stats.Default)
wh.limiters.store = kitsync.NewLimiter(poolCtx, &limiterWg, "store", 2, stats.Default)
wh.limiters.transform = kitsync.NewLimiter(poolCtx, &limiterWg, "transform", 2, stats.Default)
defer limiterWg.Wait()
defer poolCancel()
wp := workerpool.New(poolCtx, func(partition string) workerpool.Worker { return newProcessorWorker(partition, wh) }, logger.NOP)
wp.PingWorker("somePartition")

wh.doTransform <- struct{}{}
wh.doTransform <- struct{}{}
wh.doTransform <- struct{}{}
time.Sleep(1 * time.Second)
wh.validate(t)
wh.statsMu.Lock()
require.Equal(t, []int{100}, wh.storeResult)
wh.storeResult = nil
wh.statsMu.Unlock()

wp.PingWorker("somePartition")
wh.doTransform <- struct{}{}
time.Sleep(time.Second)
wh.disableStoreMerge.Store(true)
wh.doTransform <- struct{}{}
wh.doTransform <- struct{}{}
time.Sleep(1 * time.Second)
wh.validate(t)
wh.statsMu.Lock()
require.Equal(t, []int{66, 34}, wh.storeResult)
wh.storeResult = nil
wh.statsMu.Unlock()

wp.PingWorker("somePartition")
wh.doTransform <- struct{}{}
time.Sleep(time.Second)
wh.disableStoreMerge.Store(false)
wh.doTransform <- struct{}{}
wh.doTransform <- struct{}{}
time.Sleep(1 * time.Second)
wh.validate(t)
wh.statsMu.Lock()
require.Equal(t, []int{33, 67}, wh.storeResult)
wh.storeResult = nil

cancel()
wp.Shutdown()
}
Loading