From 2c66f1022b29f5f8fabd2c7e044035786bfc787e Mon Sep 17 00:00:00 2001 From: Joseph Patrick Copenhaver Date: Wed, 27 Sep 2023 09:42:12 -0500 Subject: [PATCH] removing a runtime check for a strategy pattern --- loadtester/gen_strategies.go | 412 +++++++++++------- .../cmd/generate/gen_strategies.go.tmpl | 59 +-- 2 files changed, 275 insertions(+), 196 deletions(-) diff --git a/loadtester/gen_strategies.go b/loadtester/gen_strategies.go index a8ad450..b538f4a 100644 --- a/loadtester/gen_strategies.go +++ b/loadtester/gen_strategies.go @@ -188,6 +188,34 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont taskBuf := make([]Doer, 0, lt.maxIntervalTasks) + var enqueueTasks func() + var updateEnqueueTasksStrategy func() + { + staggerStrategy := func() { + for _, task := range taskBuf { + lt.taskChan <- taskWithMeta{task, intervalID, meta} + } + } + + floodStrategy := func() { + lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} + + for _, task := range taskBuf[1:] { + time.Sleep(interTaskInterval) + lt.taskChan <- taskWithMeta{task, time.Now(), meta} + } + } + + updateEnqueueTasksStrategy = func() { + if interTaskInterval <= skipInterTaskSchedulingThreshold { + enqueueTasks = floodStrategy + } else { + enqueueTasks = staggerStrategy + } + } + } + updateEnqueueTasksStrategy() + var delay time.Duration readRetries := func(p []Doer) int { @@ -356,20 +384,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont meta.IntervalID = intervalID - if taskBufSize == 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() taskBuf = taskBuf[:0] @@ -600,6 +615,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont // && clause: protects against divide by zero if recomputeInterTaskInterval && meta.NumIntervalTasks > 0 { interTaskInterval = interval / time.Duration(meta.NumIntervalTasks) + updateEnqueueTasksStrategy() } if recomputeTaskSlots { @@ -798,20 +814,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont meta.IntervalID = intervalID - if taskBufSize <= 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() if numNewTasks > taskBufSize { // must have hit the end of ReadTasks iterator @@ -1001,6 +1004,34 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con taskBuf := make([]Doer, 0, lt.maxIntervalTasks) + var enqueueTasks func() + var updateEnqueueTasksStrategy func() + { + staggerStrategy := func() { + for _, task := range taskBuf { + lt.taskChan <- taskWithMeta{task, intervalID, meta} + } + } + + floodStrategy := func() { + lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} + + for _, task := range taskBuf[1:] { + time.Sleep(interTaskInterval) + lt.taskChan <- taskWithMeta{task, time.Now(), meta} + } + } + + updateEnqueueTasksStrategy = func() { + if interTaskInterval <= skipInterTaskSchedulingThreshold { + enqueueTasks = floodStrategy + } else { + enqueueTasks = staggerStrategy + } + } + } + updateEnqueueTasksStrategy() + var delay time.Duration // stopping routine runs on return @@ -1183,6 +1214,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con // && clause: protects against divide by zero if recomputeInterTaskInterval && meta.NumIntervalTasks > 0 { interTaskInterval = interval / time.Duration(meta.NumIntervalTasks) + updateEnqueueTasksStrategy() } if recomputeTaskSlots { @@ -1344,20 +1376,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con meta.IntervalID = intervalID - if taskBufSize <= 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() if numNewTasks > taskBufSize { // must have hit the end of ReadTasks iterator @@ -1535,6 +1554,34 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con taskBuf := make([]Doer, 0, lt.maxIntervalTasks) + var enqueueTasks func() + var updateEnqueueTasksStrategy func() + { + staggerStrategy := func() { + for _, task := range taskBuf { + lt.taskChan <- taskWithMeta{task, intervalID, meta} + } + } + + floodStrategy := func() { + lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} + + for _, task := range taskBuf[1:] { + time.Sleep(interTaskInterval) + lt.taskChan <- taskWithMeta{task, time.Now(), meta} + } + } + + updateEnqueueTasksStrategy = func() { + if interTaskInterval <= skipInterTaskSchedulingThreshold { + enqueueTasks = floodStrategy + } else { + enqueueTasks = staggerStrategy + } + } + } + updateEnqueueTasksStrategy() + var delay time.Duration readRetries := func(p []Doer) int { @@ -1702,20 +1749,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con meta.IntervalID = intervalID - if taskBufSize == 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() taskBuf = taskBuf[:0] @@ -1920,6 +1954,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con // && clause: protects against divide by zero if recomputeInterTaskInterval && meta.NumIntervalTasks > 0 { interTaskInterval = interval / time.Duration(meta.NumIntervalTasks) + updateEnqueueTasksStrategy() } if recomputeTaskSlots { @@ -2118,20 +2153,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con meta.IntervalID = intervalID - if taskBufSize <= 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() if numNewTasks > taskBufSize { // must have hit the end of ReadTasks iterator @@ -2262,6 +2284,34 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx co taskBuf := make([]Doer, 0, lt.maxIntervalTasks) + var enqueueTasks func() + var updateEnqueueTasksStrategy func() + { + staggerStrategy := func() { + for _, task := range taskBuf { + lt.taskChan <- taskWithMeta{task, intervalID, meta} + } + } + + floodStrategy := func() { + lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} + + for _, task := range taskBuf[1:] { + time.Sleep(interTaskInterval) + lt.taskChan <- taskWithMeta{task, time.Now(), meta} + } + } + + updateEnqueueTasksStrategy = func() { + if interTaskInterval <= skipInterTaskSchedulingThreshold { + enqueueTasks = floodStrategy + } else { + enqueueTasks = staggerStrategy + } + } + } + updateEnqueueTasksStrategy() + var delay time.Duration // stopping routine runs on return @@ -2431,6 +2481,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx co // && clause: protects against divide by zero if recomputeInterTaskInterval && meta.NumIntervalTasks > 0 { interTaskInterval = interval / time.Duration(meta.NumIntervalTasks) + updateEnqueueTasksStrategy() } if recomputeTaskSlots { @@ -2592,20 +2643,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx co meta.IntervalID = intervalID - if taskBufSize <= 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() if numNewTasks > taskBufSize { // must have hit the end of ReadTasks iterator @@ -2705,6 +2743,34 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context taskBuf := make([]Doer, 0, lt.maxIntervalTasks) + var enqueueTasks func() + var updateEnqueueTasksStrategy func() + { + staggerStrategy := func() { + for _, task := range taskBuf { + lt.taskChan <- taskWithMeta{task, intervalID, meta} + } + } + + floodStrategy := func() { + lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} + + for _, task := range taskBuf[1:] { + time.Sleep(interTaskInterval) + lt.taskChan <- taskWithMeta{task, time.Now(), meta} + } + } + + updateEnqueueTasksStrategy = func() { + if interTaskInterval <= skipInterTaskSchedulingThreshold { + enqueueTasks = floodStrategy + } else { + enqueueTasks = staggerStrategy + } + } + } + updateEnqueueTasksStrategy() + var delay time.Duration readRetries := func(p []Doer) int { @@ -2909,20 +2975,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context meta.IntervalID = intervalID - if taskBufSize == 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() taskBuf = taskBuf[:0] @@ -3153,6 +3206,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context // && clause: protects against divide by zero if recomputeInterTaskInterval && meta.NumIntervalTasks > 0 { interTaskInterval = interval / time.Duration(meta.NumIntervalTasks) + updateEnqueueTasksStrategy() } if recomputeTaskSlots { @@ -3366,20 +3420,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context meta.IntervalID = intervalID - if taskBufSize <= 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() if numNewTasks > taskBufSize { // must have hit the end of ReadTasks iterator @@ -3492,6 +3533,34 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx contex taskBuf := make([]Doer, 0, lt.maxIntervalTasks) + var enqueueTasks func() + var updateEnqueueTasksStrategy func() + { + staggerStrategy := func() { + for _, task := range taskBuf { + lt.taskChan <- taskWithMeta{task, intervalID, meta} + } + } + + floodStrategy := func() { + lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} + + for _, task := range taskBuf[1:] { + time.Sleep(interTaskInterval) + lt.taskChan <- taskWithMeta{task, time.Now(), meta} + } + } + + updateEnqueueTasksStrategy = func() { + if interTaskInterval <= skipInterTaskSchedulingThreshold { + enqueueTasks = floodStrategy + } else { + enqueueTasks = staggerStrategy + } + } + } + updateEnqueueTasksStrategy() + var delay time.Duration // stopping routine runs on return @@ -3674,6 +3743,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx contex // && clause: protects against divide by zero if recomputeInterTaskInterval && meta.NumIntervalTasks > 0 { interTaskInterval = interval / time.Duration(meta.NumIntervalTasks) + updateEnqueueTasksStrategy() } if recomputeTaskSlots { @@ -3850,20 +3920,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx contex meta.IntervalID = intervalID - if taskBufSize <= 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() if numNewTasks > taskBufSize { // must have hit the end of ReadTasks iterator @@ -3948,6 +4005,34 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex taskBuf := make([]Doer, 0, lt.maxIntervalTasks) + var enqueueTasks func() + var updateEnqueueTasksStrategy func() + { + staggerStrategy := func() { + for _, task := range taskBuf { + lt.taskChan <- taskWithMeta{task, intervalID, meta} + } + } + + floodStrategy := func() { + lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} + + for _, task := range taskBuf[1:] { + time.Sleep(interTaskInterval) + lt.taskChan <- taskWithMeta{task, time.Now(), meta} + } + } + + updateEnqueueTasksStrategy = func() { + if interTaskInterval <= skipInterTaskSchedulingThreshold { + enqueueTasks = floodStrategy + } else { + enqueueTasks = staggerStrategy + } + } + } + updateEnqueueTasksStrategy() + var delay time.Duration readRetries := func(p []Doer) int { @@ -4151,20 +4236,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex meta.IntervalID = intervalID - if taskBufSize == 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() taskBuf = taskBuf[:0] @@ -4369,6 +4441,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex // && clause: protects against divide by zero if recomputeInterTaskInterval && meta.NumIntervalTasks > 0 { interTaskInterval = interval / time.Duration(meta.NumIntervalTasks) + updateEnqueueTasksStrategy() } if recomputeTaskSlots { @@ -4582,20 +4655,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex meta.IntervalID = intervalID - if taskBufSize <= 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() if numNewTasks > taskBufSize { // must have hit the end of ReadTasks iterator @@ -4667,6 +4727,34 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsDisabled(ctx conte taskBuf := make([]Doer, 0, lt.maxIntervalTasks) + var enqueueTasks func() + var updateEnqueueTasksStrategy func() + { + staggerStrategy := func() { + for _, task := range taskBuf { + lt.taskChan <- taskWithMeta{task, intervalID, meta} + } + } + + floodStrategy := func() { + lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} + + for _, task := range taskBuf[1:] { + time.Sleep(interTaskInterval) + lt.taskChan <- taskWithMeta{task, time.Now(), meta} + } + } + + updateEnqueueTasksStrategy = func() { + if interTaskInterval <= skipInterTaskSchedulingThreshold { + enqueueTasks = floodStrategy + } else { + enqueueTasks = staggerStrategy + } + } + } + updateEnqueueTasksStrategy() + var delay time.Duration // stopping routine runs on return @@ -4836,6 +4924,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsDisabled(ctx conte // && clause: protects against divide by zero if recomputeInterTaskInterval && meta.NumIntervalTasks > 0 { interTaskInterval = interval / time.Duration(meta.NumIntervalTasks) + updateEnqueueTasksStrategy() } if recomputeTaskSlots { @@ -5012,20 +5101,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsDisabled(ctx conte meta.IntervalID = intervalID - if taskBufSize <= 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() if numNewTasks > taskBufSize { // must have hit the end of ReadTasks iterator diff --git a/loadtester/internal/cmd/generate/gen_strategies.go.tmpl b/loadtester/internal/cmd/generate/gen_strategies.go.tmpl index e8e92e8..0fa16b4 100644 --- a/loadtester/internal/cmd/generate/gen_strategies.go.tmpl +++ b/loadtester/internal/cmd/generate/gen_strategies.go.tmpl @@ -203,6 +203,34 @@ func (lt *Loadtest) run_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{en taskBuf := make([]Doer, 0, lt.maxIntervalTasks) + var enqueueTasks func() + var updateEnqueueTasksStrategy func() + { + staggerStrategy := func(){ + for _, task := range taskBuf { + lt.taskChan <- taskWithMeta{task, intervalID, meta} + } + } + + floodStrategy := func(){ + lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} + + for _, task := range taskBuf[1:] { + time.Sleep(interTaskInterval) + lt.taskChan <- taskWithMeta{task, time.Now(), meta} + } + } + + updateEnqueueTasksStrategy = func() { + if interTaskInterval <= skipInterTaskSchedulingThreshold { + enqueueTasks = floodStrategy + } else { + enqueueTasks = staggerStrategy + } + } + } + updateEnqueueTasksStrategy() + var delay time.Duration {{if .RetriesEnabled}} @@ -412,20 +440,7 @@ func (lt *Loadtest) run_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{en meta.IntervalID = intervalID - if taskBufSize == 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() taskBuf = taskBuf[:0] @@ -662,6 +677,7 @@ func (lt *Loadtest) run_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{en // && clause: protects against divide by zero if recomputeInterTaskInterval && meta.NumIntervalTasks > 0 { interTaskInterval = interval / time.Duration(meta.NumIntervalTasks) + updateEnqueueTasksStrategy() } if recomputeTaskSlots { @@ -894,20 +910,7 @@ func (lt *Loadtest) run_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{en meta.IntervalID = intervalID - if taskBufSize <= 1 || interTaskInterval <= skipInterTaskSchedulingThreshold { - - for _, task := range taskBuf { - lt.taskChan <- taskWithMeta{task, intervalID, meta} - } - } else { - - lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta} - - for _, task := range taskBuf[1:] { - time.Sleep(interTaskInterval) - lt.taskChan <- taskWithMeta{task, time.Now(), meta} - } - } + enqueueTasks() if numNewTasks > taskBufSize { // must have hit the end of ReadTasks iterator