From e57d2b736afd651736211508fb48ca30fafca708 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 3 Dec 2024 16:19:14 +0530 Subject: [PATCH 1/6] chore: disabling merge before processor.Store(...) - default true --- processor/processor.go | 2 ++ processor/worker.go | 4 ++++ processor/worker_handle.go | 1 + processor/worker_handle_adapter.go | 1 + 4 files changed, 8 insertions(+) diff --git a/processor/processor.go b/processor/processor.go index cbf40de16b..a31d577ba0 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -128,6 +128,7 @@ type Handle struct { isolationMode isolation.Mode mainLoopTimeout time.Duration enablePipelining bool + disableStoreMerge bool pipelineBufferedItems int subJobSize int pingerSleep config.ValueLoader[time.Duration] @@ -792,6 +793,7 @@ func (proc *Handle) loadConfig() { } proc.config.enablePipelining = config.GetBoolVar(true, "Processor.enablePipelining") + proc.config.disableStoreMerge = config.GetBoolVar(true, "Processor.disableStoreMerge") proc.config.pipelineBufferedItems = config.GetIntVar(0, 1, "Processor.pipelineBufferedItems") proc.config.subJobSize = config.GetIntVar(defaultSubJobSize, 1, "Processor.subJobSize") // Enable dedup of incoming events by default diff --git a/processor/worker.go b/processor/worker.go index 0b25633e11..2cb1110d71 100644 --- a/processor/worker.go +++ b/processor/worker.go @@ -105,6 +105,10 @@ func (w *worker) start() { defer w.lifecycle.wg.Done() defer w.logger.Debugf("store routine stopped for worker: %s", w.partition) for subJob := range w.channel.store { + if w.handle.config().disableStoreMerge { + w.handle.Store(w.partition, subJob) + continue + } if firstSubJob && !subJob.hasMore { w.handle.Store(w.partition, subJob) diff --git a/processor/worker_handle.go b/processor/worker_handle.go index aa07eda6b4..a8d73e276d 100644 --- a/processor/worker_handle.go +++ b/processor/worker_handle.go @@ -33,6 +33,7 @@ type workerHandle interface { type workerHandleConfig struct { maxEventsToProcess config.ValueLoader[int] + disableStoreMerge bool enablePipelining bool enableParallelScan bool pipelineBufferedItems int diff --git a/processor/worker_handle_adapter.go b/processor/worker_handle_adapter.go index db57c0ae53..e2cdc4a669 100644 --- a/processor/worker_handle_adapter.go +++ b/processor/worker_handle_adapter.go @@ -17,6 +17,7 @@ func (h *workerHandleAdapter) logger() logger.Logger { func (h *workerHandleAdapter) config() workerHandleConfig { return workerHandleConfig{ + disableStoreMerge: h.Handle.config.disableStoreMerge, enablePipelining: h.Handle.config.enablePipelining, pipelineBufferedItems: h.Handle.config.pipelineBufferedItems, maxEventsToProcess: h.Handle.config.maxEventsToProcess, From d6673dabaca2490f89bd367609266a7c35bca576 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Sat, 7 Dec 2024 00:23:14 +0530 Subject: [PATCH 2/6] chore: accommodate a reloadable flag --- processor/processor.go | 4 ++-- processor/worker.go | 14 +++++++------- processor/worker_handle.go | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index a31d577ba0..25e8f571b7 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -128,7 +128,7 @@ type Handle struct { isolationMode isolation.Mode mainLoopTimeout time.Duration enablePipelining bool - disableStoreMerge bool + disableStoreMerge config.ValueLoader[bool] pipelineBufferedItems int subJobSize int pingerSleep config.ValueLoader[time.Duration] @@ -793,7 +793,7 @@ func (proc *Handle) loadConfig() { } proc.config.enablePipelining = config.GetBoolVar(true, "Processor.enablePipelining") - proc.config.disableStoreMerge = config.GetBoolVar(true, "Processor.disableStoreMerge") + proc.config.disableStoreMerge = config.GetReloadableBoolVar(true, "Processor.disableStoreMerge") proc.config.pipelineBufferedItems = config.GetIntVar(0, 1, "Processor.pipelineBufferedItems") proc.config.subJobSize = config.GetIntVar(defaultSubJobSize, 1, "Processor.subJobSize") // Enable dedup of incoming events by default diff --git a/processor/worker.go b/processor/worker.go index 2cb1110d71..4bba3cc24e 100644 --- a/processor/worker.go +++ b/processor/worker.go @@ -105,13 +105,13 @@ func (w *worker) start() { defer w.lifecycle.wg.Done() defer w.logger.Debugf("store routine stopped for worker: %s", w.partition) for subJob := range w.channel.store { - if w.handle.config().disableStoreMerge { - w.handle.Store(w.partition, subJob) - continue - } - - if firstSubJob && !subJob.hasMore { - w.handle.Store(w.partition, subJob) + if w.handle.config().disableStoreMerge.Load() { + if firstSubJob { + w.handle.Store(w.partition, subJob) + continue + } + mergedJob.merge(subJob) + w.handle.Store(w.partition, mergedJob) continue } diff --git a/processor/worker_handle.go b/processor/worker_handle.go index a8d73e276d..a1bd20edd1 100644 --- a/processor/worker_handle.go +++ b/processor/worker_handle.go @@ -33,7 +33,7 @@ type workerHandle interface { type workerHandleConfig struct { maxEventsToProcess config.ValueLoader[int] - disableStoreMerge bool + disableStoreMerge config.ValueLoader[bool] enablePipelining bool enableParallelScan bool pipelineBufferedItems int From a5d663db01fbe3d259131043b0cca8db66fe71dc Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Sat, 7 Dec 2024 19:28:42 +0530 Subject: [PATCH 3/6] fixup! chore: accommodate a reloadable flag --- processor/worker_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/processor/worker_test.go b/processor/worker_test.go index cf84696479..298b7a48ee 100644 --- a/processor/worker_test.go +++ b/processor/worker_test.go @@ -247,6 +247,7 @@ func (m *mockWorkerHandle) logger() logger.Logger { func (m *mockWorkerHandle) config() workerHandleConfig { return workerHandleConfig{ enablePipelining: m.pipelining, + disableStoreMerge: config.SingleValueLoader(true), maxEventsToProcess: config.SingleValueLoader(m.loopEvents), pipelineBufferedItems: 1, subJobSize: 10, From 8c8d57a18c3ab9c408958995704026edddf0562b Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Mon, 9 Dec 2024 11:22:08 +0530 Subject: [PATCH 4/6] fixup! chore: accommodate a reloadable flag --- processor/worker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/processor/worker.go b/processor/worker.go index 4bba3cc24e..3feabe029f 100644 --- a/processor/worker.go +++ b/processor/worker.go @@ -112,6 +112,7 @@ func (w *worker) start() { } mergedJob.merge(subJob) w.handle.Store(w.partition, mergedJob) + firstSubJob = true continue } From 274bf8ac224261fd59bf98cb2dd85a79dde7d557 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Mon, 9 Dec 2024 16:53:05 +0530 Subject: [PATCH 5/6] chore: test reloadable behaviour --- processor/worker_test.go | 120 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 118 insertions(+), 2 deletions(-) diff --git a/processor/worker_test.go b/processor/worker_test.go index 298b7a48ee..2fe83a6bd3 100644 --- a/processor/worker_test.go +++ b/processor/worker_test.go @@ -4,6 +4,7 @@ import ( "context" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -352,12 +353,12 @@ func (m *mockWorkerHandle) jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsourc rsourcesStats: rsourcesStats, }, { - subJobs: jobs[len(jobs)/3 : 2*len(jobs)/2], + subJobs: jobs[len(jobs)/3 : 2*len(jobs)/3], hasMore: true, rsourcesStats: rsourcesStats, }, { - subJobs: jobs[2*len(jobs)/2:], + subJobs: jobs[2*len(jobs)/3:], hasMore: false, rsourcesStats: rsourcesStats, }, @@ -415,3 +416,118 @@ func (m *mockWorkerHandle) Store(partition string, in *storeMessage) { m.partitionStats[partition] = s m.log.Infof("Store partition: %s stats: %+v", partition, s) } + +type mockWorkerHandle2 struct { + *mockWorkerHandle + doTransform chan struct{} + storeResult []int + disableStoreMerge *atomic.Bool +} + +func (m *mockWorkerHandle2) getJobs(partition string) jobsdb.JobsResult { + j := m.mockWorkerHandle.getJobs(partition) + for i := range j.Jobs { + j.Jobs[i].JobID = int64(i) + } + return j +} + +func (m *mockWorkerHandle2) transformations(partition string, in *transformationMessage) *storeMessage { + <-m.doTransform + return m.mockWorkerHandle.transformations(partition, in) +} + +func (m *mockWorkerHandle2) Store(partition string, in *storeMessage) { + m.mockWorkerHandle.Store(partition, in) + m.statsMu.Lock() + defer m.statsMu.Unlock() + m.storeResult = append(m.storeResult, in.totalEvents) +} + +func (m *mockWorkerHandle2) config() workerHandleConfig { + return workerHandleConfig{ + enablePipelining: m.pipelining, + disableStoreMerge: m.disableStoreMerge, + maxEventsToProcess: config.SingleValueLoader(m.loopEvents), + pipelineBufferedItems: 1, + subJobSize: 10, + readLoopSleep: config.SingleValueLoader(1 * time.Millisecond), + maxLoopSleep: config.SingleValueLoader(100 * time.Millisecond), + enableParallelScan: m.enableParallelScan, + } +} + +func TestReloadableMergeBeforeStore(t *testing.T) { + wh1 := &mockWorkerHandle{ + pipelining: true, + log: logger.NOP, + loopEvents: 100, + partitionStats: map[string]struct { + queried int + marked int + processed int + transformed int + stored int + subBatches int + trackedUsers int + }{}, + limitsReached: true, + shouldProcessMultipleSubJobs: true, + enableParallelScan: true, + } + wh := &mockWorkerHandle2{ + mockWorkerHandle: wh1, + doTransform: make(chan struct{}), + disableStoreMerge: &atomic.Bool{}, + } + + ctx, cancel := context.WithCancel(context.Background()) + poolCtx, poolCancel := context.WithCancel(ctx) + var limiterWg sync.WaitGroup + wh.limiters.query = kitsync.NewLimiter(poolCtx, &limiterWg, "query", 2, stats.Default) + wh.limiters.process = kitsync.NewLimiter(poolCtx, &limiterWg, "process", 2, stats.Default) + wh.limiters.store = kitsync.NewLimiter(poolCtx, &limiterWg, "store", 2, stats.Default) + wh.limiters.transform = kitsync.NewLimiter(poolCtx, &limiterWg, "transform", 2, stats.Default) + defer limiterWg.Wait() + defer poolCancel() + wp := workerpool.New(poolCtx, func(partition string) workerpool.Worker { return newProcessorWorker(partition, wh) }, logger.NOP) + wp.PingWorker("somePartition") + + wh.doTransform <- struct{}{} + wh.doTransform <- struct{}{} + wh.doTransform <- struct{}{} + time.Sleep(1 * time.Second) + wh.validate(t) + wh.statsMu.Lock() + require.Equal(t, []int{100}, wh.storeResult) + wh.storeResult = nil + wh.statsMu.Unlock() + + wp.PingWorker("somePartition") + wh.doTransform <- struct{}{} + time.Sleep(time.Second) + wh.disableStoreMerge.Store(true) + wh.doTransform <- struct{}{} + wh.doTransform <- struct{}{} + time.Sleep(1 * time.Second) + wh.validate(t) + wh.statsMu.Lock() + require.Equal(t, []int{66, 34}, wh.storeResult) + wh.storeResult = nil + wh.statsMu.Unlock() + + wp.PingWorker("somePartition") + wh.doTransform <- struct{}{} + time.Sleep(time.Second) + wh.disableStoreMerge.Store(false) + wh.doTransform <- struct{}{} + wh.doTransform <- struct{}{} + time.Sleep(1 * time.Second) + wh.validate(t) + wh.statsMu.Lock() + require.Equal(t, []int{33, 67}, wh.storeResult) + wh.storeResult = nil + + cancel() + wp.Shutdown() +} From 2f244cc24800021d743f404a6879290c6fd200a3 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" <82795818+Sidddddarth@users.noreply.github.com> Date: Thu, 26 Dec 2024 16:50:31 +0530 Subject: [PATCH 6/6] revert default flag --- processor/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/processor.go b/processor/processor.go index 25e8f571b7..02aabc222e 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -793,7 +793,7 @@ func (proc *Handle) loadConfig() { } proc.config.enablePipelining = config.GetBoolVar(true, "Processor.enablePipelining") - proc.config.disableStoreMerge = config.GetReloadableBoolVar(true, "Processor.disableStoreMerge") + proc.config.disableStoreMerge = config.GetReloadableBoolVar(false, "Processor.disableStoreMerge") proc.config.pipelineBufferedItems = config.GetIntVar(0, 1, "Processor.pipelineBufferedItems") proc.config.subJobSize = config.GetIntVar(defaultSubJobSize, 1, "Processor.subJobSize") // Enable dedup of incoming events by default