diff --git a/loadtester/csv_metrics.go b/loadtester/csv_metrics.go index 12edf49..8bd4075 100644 --- a/loadtester/csv_metrics.go +++ b/loadtester/csv_metrics.go @@ -84,6 +84,8 @@ type metricRecord struct { // totalNumTasks is only modified if the loadtest's maxTasks setting is > 0 totalNumTasks int + queuedDurations latencyList + taskDurations latencyList metricRecordResetables } @@ -94,6 +96,11 @@ func (mr *metricRecord) reset() { } } +func (mr *metricRecord) resetLatencySlices() { + mr.queuedDurations = mr.queuedDurations[:0] + mr.taskDurations = mr.taskDurations[:0] +} + func (lt *Loadtest) writeOutputCsvHeaders() error { cd := <.csvData @@ -118,6 +125,15 @@ func (lt *Loadtest) writeOutputCsvHeaders() error { "max_task_duration", "sum_task_duration", "", + "", + "", + } + + if lt.latencyPercentile != 0 { + fields[len(fields)-3] = "p" + strconv.Itoa(int(lt.latencyPercentile)) + "_queued_duration" + fields[len(fields)-2] = "p" + strconv.Itoa(int(lt.latencyPercentile)) + "_task_duration" + } else { + fields = fields[:len(fields)-2] } if lt.maxTasks > 0 { @@ -137,95 +153,6 @@ func (lt *Loadtest) writeOutputCsvHeaders() error { return cd.writer.Error() } -// writeOutputCsvRow_maxTasksGTZero writes the metric record to the target csv file when maxTasks is > 0 -func (lt *Loadtest) writeOutputCsvRow_maxTasksGTZero(mr metricRecord) { - - cd := <.csvData - if cd.writeErr != nil { - return - } - - nowStr := timeToString(time.Now()) - - var percent string - { - high := mr.totalNumTasks * 10000 / lt.maxTasks - low := high % 100 - high /= 100 - - var sep string - if low < 10 { - sep = ".0" - } else { - sep = "." - } - - percent = strconv.Itoa(high) + sep + strconv.Itoa(low) - } - - fields := []string{ - nowStr, - timeToString(mr.intervalID), - strconv.Itoa(mr.numIntervalTasks), - mr.lag.String(), - mr.sumLag.String(), - strconv.Itoa(mr.numTasks), - strconv.Itoa(mr.numPass), - strconv.Itoa(mr.numFail), - strconv.Itoa(mr.numRetry), - strconv.Itoa(mr.numPanic), - mr.minQueuedDuration.String(), - (mr.sumQueuedDuration / time.Duration(mr.numTasks)).String(), - mr.maxQueuedDuration.String(), - mr.sumQueuedDuration.String(), - mr.minTaskDuration.String(), - (mr.sumTaskDuration / time.Duration(mr.numTasks)).String(), - mr.maxTaskDuration.String(), - mr.sumTaskDuration.String(), - percent, - } - - if err := cd.writer.Write(fields); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way - } -} - -// writeOutputCsvRow_maxTasksNotGTZero writes the metric record to the target csv file when maxTasks is <= 0 -func (lt *Loadtest) writeOutputCsvRow_maxTasksNotGTZero(mr metricRecord) { - - cd := <.csvData - if cd.writeErr != nil { - return - } - - nowStr := timeToString(time.Now()) - - fields := []string{ - nowStr, - timeToString(mr.intervalID), - strconv.Itoa(mr.numIntervalTasks), - mr.lag.String(), - mr.sumLag.String(), - strconv.Itoa(mr.numTasks), - strconv.Itoa(mr.numPass), - strconv.Itoa(mr.numFail), - strconv.Itoa(mr.numRetry), - strconv.Itoa(mr.numPanic), - mr.minQueuedDuration.String(), - (mr.sumQueuedDuration / time.Duration(mr.numTasks)).String(), - mr.maxQueuedDuration.String(), - mr.sumQueuedDuration.String(), - mr.minTaskDuration.String(), - (mr.sumTaskDuration / time.Duration(mr.numTasks)).String(), - mr.maxTaskDuration.String(), - mr.sumTaskDuration.String(), - } - - if err := cd.writer.Write(fields); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way - } -} - func (lt *Loadtest) writeOutputCsvFooterAndClose(csvFile *os.File) { cd := <.csvData @@ -255,95 +182,6 @@ func (lt *Loadtest) writeOutputCsvFooterAndClose(csvFile *os.File) { _, cd.writeErr = csvFile.Write([]byte("\n# {\"done\":{\"end_time\":\"" + timeToString(time.Now()) + "\"}}\n")) } -func (lt *Loadtest) resultsHandler() { - - cd := <.csvData - var mr metricRecord - mr.reset() - - var writeRow func() - if lt.maxTasks > 0 { - writeRow = func() { - mr.totalNumTasks += mr.numTasks - lt.writeOutputCsvRow(mr) - } - } else { - writeRow = func() { - lt.writeOutputCsvRow(mr) - } - } - - cd.flushDeadline = time.Now().Add(cd.flushInterval) - - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() - } - return - } - - lt.resultWaitGroup.Done() - - if cd.writeErr != nil { - continue - } - - if tr.taskResultFlags.isZero() { - - mr.sumLag += tr.Meta.Lag - - continue - } - - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } - - if mr.minQueuedDuration > tr.QueuedDuration { - mr.minQueuedDuration = tr.QueuedDuration - } - - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } - - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } - - if mr.maxQueuedDuration < tr.QueuedDuration { - mr.maxQueuedDuration = tr.QueuedDuration - } - - mr.sumQueuedDuration += tr.QueuedDuration - mr.sumTaskDuration += tr.TaskDuration - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) - mr.numRetry += int(tr.RetryQueued) - - mr.numTasks++ - - if mr.numTasks >= mr.numIntervalTasks { - - writeRow() - mr.reset() - - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) - } - } - } -} - // // helpers // diff --git a/loadtester/example/main.go b/loadtester/example/main.go index 418cf83..70b71fe 100644 --- a/loadtester/example/main.go +++ b/loadtester/example/main.go @@ -75,6 +75,7 @@ func main() { loadtester.NumWorkers(numWorkers), loadtester.NumIntervalTasks(25), loadtester.Interval(1*time.Second), + // loadtester.LatencyPercentileUint8(95), // default is 0 (disabled/not-calculated) // loadtester.FlushRetriesOnShutdown(true), // default is false ) if err != nil { diff --git a/loadtester/example_http/main.go b/loadtester/example_http/main.go index 91202fb..7c44bb2 100644 --- a/loadtester/example_http/main.go +++ b/loadtester/example_http/main.go @@ -144,6 +144,7 @@ func main() { loadtester.NumWorkers(parallelism), loadtester.NumIntervalTasks(parallelism), loadtester.Interval(5*time.Second), + // loadtester.LatencyPercentileUint8(95), // default is 0 (disabled/not-calculated) // loadtester.FlushRetriesOnShutdown(true), // default is false ) if err != nil { diff --git a/loadtester/gen_strategies.go b/loadtester/gen_strategies.go index f0fa512..6df4fdb 100644 --- a/loadtester/gen_strategies.go +++ b/loadtester/gen_strategies.go @@ -7,6 +7,7 @@ import ( "encoding/csv" "fmt" "os" + "strconv" "sync" "time" ) @@ -125,7 +126,242 @@ func (lt *Loadtest) doTask_retriesEnabled_metricsEnabled(ctx context.Context, wo return } -func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx context.Context, shutdownErrResp *error) error { +func (lt *Loadtest) doTask_retriesEnabled_metricsDisabled(ctx context.Context, workerID int, twm taskWithMeta) { + + defer lt.resultWaitGroup.Done() + + // phase is the name of the step which has possibly caused a panic + phase := "do" + + var rt *retryTask + if v, ok := twm.doer.(*retryTask); ok { + rt = v + phase = "retry" + defer func() { + *rt = retryTask{} + lt.retryTaskPool.Put(v) + }() + } + defer func() { + + if r := recover(); r != nil { + + switch v := r.(type) { + case error: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", v, + ) + case []byte: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", string(v), + ) + case string: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", v, + ) + default: + const msg = "unknown cause" + + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", msg, + ) + } + } + }() + err := twm.doer.Do(ctx, workerID) + phase = "" // done, no panic occurred + if err == nil { + + return + } + + lt.logger.WarnContext(ctx, + "task error", + "worker_id", workerID, + "error", err, + ) + + var dr DoRetryer + if rt != nil { + dr = rt.DoRetryer + } else if v, ok := twm.doer.(DoRetryer); ok { + dr = v + } else { + return + } + + phase = "can-retry" + if v, ok := dr.(DoRetryChecker); ok && !v.CanRetry(ctx, workerID, err) { + phase = "" // done, no panic occurred + return + } + phase = "" // done, no panic occurred + + // queue a new retry task + { + rt := lt.retryTaskPool.Get().(*retryTask) + + *rt = retryTask{dr, err} + + lt.retryTaskChan <- rt + } + + return +} + +func (lt *Loadtest) doTask_retriesDisabled_metricsEnabled(ctx context.Context, workerID int, twm taskWithMeta) { + + var respFlags taskResultFlags + { + taskStart := time.Now() + defer func() { + taskEnd := time.Now() + + lt.resultsChan <- taskResult{ + taskResultFlags: respFlags, + QueuedDuration: taskStart.Sub(twm.enqueueTime), + TaskDuration: taskEnd.Sub(taskStart), + Meta: twm.meta, + } + }() + } + + // phase is the name of the step which has possibly caused a panic + phase := "do" + + defer func() { + + if r := recover(); r != nil { + + respFlags.Panicked = 1 + respFlags.Errored = 1 + + switch v := r.(type) { + case error: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", v, + ) + case []byte: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", string(v), + ) + case string: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", v, + ) + default: + const msg = "unknown cause" + + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", msg, + ) + } + } + }() + err := twm.doer.Do(ctx, workerID) + phase = "" // done, no panic occurred + if err == nil { + respFlags.Passed = 1 + return + } + + lt.logger.WarnContext(ctx, + "task error", + "worker_id", workerID, + "error", err, + ) + + respFlags.Errored = 1 + + return +} + +func (lt *Loadtest) doTask_retriesDisabled_metricsDisabled(ctx context.Context, workerID int, twm taskWithMeta) { + + defer lt.resultWaitGroup.Done() + + // phase is the name of the step which has possibly caused a panic + phase := "do" + + defer func() { + + if r := recover(); r != nil { + + switch v := r.(type) { + case error: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", v, + ) + case []byte: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", string(v), + ) + case string: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", v, + ) + default: + const msg = "unknown cause" + + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", msg, + ) + } + } + }() + err := twm.doer.Do(ctx, workerID) + phase = "" // done, no panic occurred + if err == nil { + + return + } + + lt.logger.WarnContext(ctx, + "task error", + "worker_id", workerID, + "error", err, + ) + + return +} + +func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context.Context, shutdownErrResp *error) error { cfgUpdateChan := lt.cfgUpdateChan defer close(cfgUpdateChan) @@ -173,6 +409,8 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont intervalID := time.Now() + maxTasks := lt.maxTasks + interval := lt.interval numNewTasks := lt.numIntervalTasks ctxDone := ctx.Done() @@ -321,23 +559,59 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont return ErrRetriesFailedToFlush } - select { - case <-ctxDone: - lt.logger.WarnContext(ctx, - "user stopped loadtest while attempting to flush retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - ) - return nil - default: - // continue with load generating retries - } - - // acquire load generation opportunity slots ( smooths bursts ) - // - // in the shutdown retry flow we always want to acquire before reading retries - // to avoid a deadlock edge case of the retry queue being full, all workers tasks failed and need to be retried - if err := lt.intervalTasksSema.Acquire(shutdownCtx, int64(numNewTasks)); err != nil { + // if maxTasks > 0 + { + if numTasks >= maxTasks { + lt.logger.ErrorContext(ctx, + "failed to flush all retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + "reason", "reached max tasks", + ) + return ErrRetriesFailedToFlush + } + + // 1. the below looks off/odd, why not use?: + // + // ``` + // if n := maxTasks - numTasks; n < numNewTasks { + // numNewTasks = n + // } + // ``` + // + // 2. And for that matter, why not keep meta.NumIntervalTasks in sync with numNewTasks? + // + // --- + // + // 1. The implementation would be exactly the same, just using another variable + // 2. the meta.NumIntervalTasks value is used in RATE calculations, if we keep it in sync + // with BOUNDS values then the last tasks could run at a lower RATE than intended. It + // is only kept in sync when a user adjusts the RATE via a ConfigUpdate. Don't confuse + // bounds purpose values with rate purpose values. + // + numNewTasks = maxTasks - numTasks + if numNewTasks > meta.NumIntervalTasks { + numNewTasks = meta.NumIntervalTasks + } + } + + select { + case <-ctxDone: + lt.logger.WarnContext(ctx, + "user stopped loadtest while attempting to flush retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + ) + return nil + default: + // continue with load generating retries + } + + // acquire load generation opportunity slots ( smooths bursts ) + // + // in the shutdown retry flow we always want to acquire before reading retries + // to avoid a deadlock edge case of the retry queue being full, all workers tasks failed and need to be retried + if err := lt.intervalTasksSema.Acquire(shutdownCtx, int64(numNewTasks)); err != nil { lt.logger.ErrorContext(ctx, "failed to flush all retries", "preflush_num_tasks", preflushNumTasks, @@ -717,6 +991,21 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont // main task scheduling loop for { + // if maxTasks > 0 + { + if numTasks >= maxTasks { + lt.logger.WarnContext(ctx, + "loadtest finished: max task count reached", + "max_tasks", maxTasks, + ) + return nil + } + + numNewTasks = maxTasks - numTasks + if numNewTasks > meta.NumIntervalTasks { + numNewTasks = meta.NumIntervalTasks + } + } // duplicating short-circuit signal control processing to give it priority over the randomizing nature of the multi-select // that follows @@ -862,125 +1151,18 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont } } -func (lt *Loadtest) doTask_retriesDisabled_metricsEnabled(ctx context.Context, workerID int, twm taskWithMeta) { - - var respFlags taskResultFlags - { - taskStart := time.Now() - defer func() { - taskEnd := time.Now() - - lt.resultsChan <- taskResult{ - taskResultFlags: respFlags, - QueuedDuration: taskStart.Sub(twm.enqueueTime), - TaskDuration: taskEnd.Sub(taskStart), - Meta: twm.meta, - } - }() - } - - // phase is the name of the step which has possibly caused a panic - phase := "do" - - defer func() { - - if r := recover(); r != nil { - - respFlags.Panicked = 1 - respFlags.Errored = 1 - - switch v := r.(type) { - case error: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", v, - ) - case []byte: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", string(v), - ) - case string: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", v, - ) - default: - const msg = "unknown cause" - - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", msg, - ) - } - } - }() - err := twm.doer.Do(ctx, workerID) - phase = "" // done, no panic occurred - if err == nil { - respFlags.Passed = 1 - return - } - - lt.logger.WarnContext(ctx, - "task error", - "worker_id", workerID, - "error", err, - ) - - respFlags.Errored = 1 - - return -} - -func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx context.Context, _ *error) error { +func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx context.Context, shutdownErrResp *error) error { cfgUpdateChan := lt.cfgUpdateChan defer close(cfgUpdateChan) lt.startTime = time.Now() - // if lt.csvData.writeErr == nil // a.k.a. !cfg.csvOutputDisabled // (a.k.a. metrics enabled) - { - - csvFile, err := os.Create(lt.csvData.outputFilename) - if err != nil { - return fmt.Errorf("failed to open output csv metrics file for writing: %w", err) - } - defer lt.writeOutputCsvFooterAndClose(csvFile) - - lt.csvData.writeErr = lt.writeOutputCsvConfigComment(csvFile) - - if lt.csvData.writeErr == nil { - - lt.csvData.writer = csv.NewWriter(csvFile) - - lt.csvData.writeErr = lt.writeOutputCsvHeaders() - } - } - lt.logger.InfoContext(ctx, "starting loadtest", "config", lt.loadtestConfigAsJson(), ) - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - - lt.resultsHandler() - }() - numWorkers := lt.numWorkers // numTasks is the total number of tasks @@ -989,6 +1171,8 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con intervalID := time.Now() + maxTasks := lt.maxTasks + interval := lt.interval numNewTasks := lt.numIntervalTasks ctxDone := ctx.Done() @@ -1034,48 +1218,274 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con var delay time.Duration + readRetries := func(p []Doer) int { + // make sure you only fill up to len + + var i int + for i < len(p) { + select { + case task := <-lt.retryTaskChan: + p[i] = task + default: + return i + } + i++ + } + + return i + } + // stopping routine runs on return // flushing as much as possible defer func() { - lt.logger.DebugContext(ctx, - "waiting for running tasks to stop", - ) - lt.resultWaitGroup.Wait() + err := func(flushRetries bool) error { + if !flushRetries { - lt.logger.DebugContext(ctx, - "stopping result handler routine", - ) + lt.logger.DebugContext(ctx, + "not waiting on retries to flush on shutdown", + "reason", "retries disabled or flush retries on shutdown disabled", + "num_tasks", numTasks, + ) - // signal for result handler routines to stop - close(lt.resultsChan) + return nil + } - // signal for workers to stop - lt.logger.DebugContext(ctx, - "stopping workers", - ) - for i := 0; i < len(lt.workers); i++ { - close(lt.workers[i]) - } + if err := ctx.Err(); err != nil { + lt.logger.WarnContext(ctx, + "not waiting on retries to flush on shutdown", + "reason", "user stopped loadtest", + "num_tasks", numTasks, + "error", err, + ) + return nil + } - // wait for result handler routines to stop - lt.logger.DebugContext(ctx, - "waiting for result handler routines to stop", - ) - wg.Wait() + lt.logger.DebugContext(ctx, + "waiting on retries to flush", + "num_tasks", numTasks, + ) - // wait for workers to stop - lt.logger.DebugContext(ctx, - "waiting for workers to stop", - ) - lt.workerWaitGroup.Wait() + if meta.NumIntervalTasks <= 0 || numWorkers <= 0 { - lt.logger.InfoContext(ctx, - "loadtest stopped", - ) - }() + lt.logger.ErrorContext(ctx, + "retry flushing could not be attempted", + "num_tasks", numTasks, + "num_interval_tasks", meta.NumIntervalTasks, + "num_workers", numWorkers, + ) - // getTaskSlotCount is the task emission back pressure + return ErrRetriesFailedToFlush + } + + preflushNumTasks := numTasks + + lt.logger.WarnContext(ctx, + "shutting down: flushing retries", + "num_tasks", numTasks, + "flush_retries_timeout", lt.flushRetriesTimeout.String(), + ) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), lt.flushRetriesTimeout) + defer cancel() + + intervalID = time.Now() + taskBuf = taskBuf[:0] + + for { + + if err := shutdownCtx.Err(); err != nil { + lt.logger.ErrorContext(ctx, + "failed to flush all retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + "error", err, + ) + + return ErrRetriesFailedToFlush + } + + lt.resultWaitGroup.Wait() + + for { + + if err := shutdownCtx.Err(); err != nil { + lt.logger.ErrorContext(ctx, + "failed to flush all retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + "error", err, + ) + + return ErrRetriesFailedToFlush + } + + // if maxTasks > 0 + { + if numTasks >= maxTasks { + lt.logger.ErrorContext(ctx, + "failed to flush all retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + "reason", "reached max tasks", + ) + return ErrRetriesFailedToFlush + } + + // 1. the below looks off/odd, why not use?: + // + // ``` + // if n := maxTasks - numTasks; n < numNewTasks { + // numNewTasks = n + // } + // ``` + // + // 2. And for that matter, why not keep meta.NumIntervalTasks in sync with numNewTasks? + // + // --- + // + // 1. The implementation would be exactly the same, just using another variable + // 2. the meta.NumIntervalTasks value is used in RATE calculations, if we keep it in sync + // with BOUNDS values then the last tasks could run at a lower RATE than intended. It + // is only kept in sync when a user adjusts the RATE via a ConfigUpdate. Don't confuse + // bounds purpose values with rate purpose values. + // + numNewTasks = maxTasks - numTasks + if numNewTasks > meta.NumIntervalTasks { + numNewTasks = meta.NumIntervalTasks + } + } + + select { + case <-ctxDone: + lt.logger.WarnContext(ctx, + "user stopped loadtest while attempting to flush retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + ) + return nil + default: + // continue with load generating retries + } + + // acquire load generation opportunity slots ( smooths bursts ) + // + // in the shutdown retry flow we always want to acquire before reading retries + // to avoid a deadlock edge case of the retry queue being full, all workers tasks failed and need to be retried + if err := lt.intervalTasksSema.Acquire(shutdownCtx, int64(numNewTasks)); err != nil { + lt.logger.ErrorContext(ctx, + "failed to flush all retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + "error", err, + "reason", "shutdown timeout likely reached while waiting for semaphore acquisition", + ) + return ErrRetriesFailedToFlush + } + + // read up to numNewTasks from retry slice + taskBufSize := readRetries(taskBuf[:numNewTasks:numNewTasks]) + if taskBufSize <= 0 { + // wait for any pending tasks to flush and try read again + + lt.logger.DebugContext(ctx, + "verifying all retries have flushed", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + ) + + lt.resultWaitGroup.Wait() + + // read up to numNewTasks from retry slice again + taskBufSize = readRetries(taskBuf[:numNewTasks:numNewTasks]) + if taskBufSize <= 0 { + + lt.logger.InfoContext(ctx, + "all retries flushed", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + ) + return nil + } + } + taskBuf = taskBuf[:taskBufSize] + + // re-release any extra load slots we allocated beyond what really remains to do + if numNewTasks > taskBufSize { + lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) + } + + lt.resultWaitGroup.Add(taskBufSize) + + meta.IntervalID = intervalID + + enqueueTasks() + + taskBuf = taskBuf[:0] + + numTasks += taskBufSize + + // wait for next interval time to exist + nextIntervalID := intervalID.Add(interval) + realNow := time.Now() + delay = nextIntervalID.Sub(realNow) + if delay > 0 { + time.Sleep(delay) + intervalID = nextIntervalID + + if taskBufSize < numNewTasks { + // just finished this iteration of retry enqueuing + // + // break to loop through retry drain context again + break + } + + continue + } + + if delay < 0 { + intervalID = realNow + + } + + if taskBufSize < numNewTasks { + // just finished this iteration of retry enqueuing + // + // break to loop through retry drain context again + break + } + } + } + }(lt.flushRetriesOnShutdown) + if err != nil { + *shutdownErrResp = err + } + + lt.logger.DebugContext(ctx, + "waiting for running tasks to stop", + ) + lt.resultWaitGroup.Wait() + + // signal for workers to stop + lt.logger.DebugContext(ctx, + "stopping workers", + ) + for i := 0; i < len(lt.workers); i++ { + close(lt.workers[i]) + } + + // wait for workers to stop + lt.logger.DebugContext(ctx, + "waiting for workers to stop", + ) + lt.workerWaitGroup.Wait() + + lt.logger.InfoContext(ctx, + "loadtest stopped", + ) + }() + + // getTaskSlotCount is the task emission back pressure // throttle that conveys the number of tasks that // are allowed to be un-finished for the performance // interval under normal circumstances @@ -1316,6 +1726,21 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con // main task scheduling loop for { + // if maxTasks > 0 + { + if numTasks >= maxTasks { + lt.logger.WarnContext(ctx, + "loadtest finished: max task count reached", + "max_tasks", maxTasks, + ) + return nil + } + + numNewTasks = maxTasks - numTasks + if numNewTasks > meta.NumIntervalTasks { + numNewTasks = meta.NumIntervalTasks + } + } // duplicating short-circuit signal control processing to give it priority over the randomizing nature of the multi-select // that follows @@ -1343,7 +1768,22 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con // continue with load generation } + // read up to numNewTasks from retry slice taskBufSize := 0 + + // acquire load generation opportunity slots ( smooths bursts ) + // + // do this early conditionally to allow retries to settle in the retry channel + // so we can pick them up when enough buffer space has cleared + // + // thus we avoid a possible deadlock where the retry queue is full and the workers + // all have failed tasks that wish to be retried + if lt.intervalTasksSema.Acquire(ctx, int64(numNewTasks)) != nil { + return nil + } + + taskBufSize = readRetries(taskBuf[:numNewTasks:numNewTasks]) + taskBuf = taskBuf[:taskBufSize] if taskBufSize < numNewTasks { @@ -1354,12 +1794,31 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con } if n == 0 { - lt.logger.WarnContext(ctx, - "stopping loadtest: ReadTasks did not load enough tasks", - "final_task_delta", 0, - ) + // if !lt.retriesDisabled + { + // iteration is technically done now + // but there could be straggling retries + // queued after this, those should continue + // to be flushed if and only if maxTasks + // has not been reached and if it is greater + // than zero + if taskBufSize == 0 { + // return immediately if there is nothing + // new to enqueue - return nil + lt.logger.WarnContext(ctx, + "stopping loadtest: ReadTasks did not load enough tasks", + "final_task_delta", 0, + ) + + return nil + } + + lt.logger.DebugContext(ctx, + "scheduled: stopping loadtest: ReadTasks did not load enough tasks", + "retry_count", taskBufSize, + ) + } } taskBufSize += n @@ -1368,8 +1827,11 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con // acquire load generation opportunity slots ( smooths bursts ) // if not done already - if lt.intervalTasksSema.Acquire(ctx, int64(taskBufSize)) != nil { - return nil + // + // but if we allocated too many in our retry prep phase then release the + // difference + if numNewTasks > taskBufSize { + lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } lt.resultWaitGroup.Add(taskBufSize) @@ -1382,7 +1844,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con // must have hit the end of ReadTasks iterator // increase numTasks total by actual number queued // and stop traffic generation - // numTasks += taskBufSize // this line only has an effect except in a retries enabled context + numTasks += taskBufSize lt.logger.WarnContext(ctx, "stopping loadtest: ReadTasks did not load enough tasks", "final_task_delta", taskBufSize, @@ -1394,8 +1856,6 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con numTasks += taskBufSize - meta.Lag = 0 - // wait for next interval time to exist nextIntervalID := intervalID.Add(interval) realNow := time.Now() @@ -1409,128 +1869,50 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con if delay < 0 { intervalID = realNow - lag := -delay - meta.Lag = lag - - lt.resultWaitGroup.Add(1) - lt.resultsChan <- taskResult{ - Meta: taskMeta{ - IntervalID: intervalID, - Lag: lag, - }, - } - } } } -func (lt *Loadtest) doTask_retriesEnabled_metricsDisabled(ctx context.Context, workerID int, twm taskWithMeta) { - - defer lt.resultWaitGroup.Done() - - // phase is the name of the step which has possibly caused a panic - phase := "do" +func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx context.Context, shutdownErrResp *error) error { - var rt *retryTask - if v, ok := twm.doer.(*retryTask); ok { - rt = v - phase = "retry" - defer func() { - *rt = retryTask{} - lt.retryTaskPool.Put(v) - }() - } - defer func() { + cfgUpdateChan := lt.cfgUpdateChan + defer close(cfgUpdateChan) - if r := recover(); r != nil { + lt.startTime = time.Now() - switch v := r.(type) { - case error: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", v, - ) - case []byte: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", string(v), - ) - case string: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", v, - ) - default: - const msg = "unknown cause" + // if lt.csvData.writeErr == nil // a.k.a. !cfg.csvOutputDisabled // (a.k.a. metrics enabled) + { - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", msg, - ) - } + csvFile, err := os.Create(lt.csvData.outputFilename) + if err != nil { + return fmt.Errorf("failed to open output csv metrics file for writing: %w", err) } - }() - err := twm.doer.Do(ctx, workerID) - phase = "" // done, no panic occurred - if err == nil { - - return - } - - lt.logger.WarnContext(ctx, - "task error", - "worker_id", workerID, - "error", err, - ) - - var dr DoRetryer - if rt != nil { - dr = rt.DoRetryer - } else if v, ok := twm.doer.(DoRetryer); ok { - dr = v - } else { - return - } + defer lt.writeOutputCsvFooterAndClose(csvFile) - phase = "can-retry" - if v, ok := dr.(DoRetryChecker); ok && !v.CanRetry(ctx, workerID, err) { - phase = "" // done, no panic occurred - return - } - phase = "" // done, no panic occurred + lt.csvData.writeErr = lt.writeOutputCsvConfigComment(csvFile) - // queue a new retry task - { - rt := lt.retryTaskPool.Get().(*retryTask) + if lt.csvData.writeErr == nil { - *rt = retryTask{dr, err} + lt.csvData.writer = csv.NewWriter(csvFile) - lt.retryTaskChan <- rt + lt.csvData.writeErr = lt.writeOutputCsvHeaders() + } } - return -} - -func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx context.Context, shutdownErrResp *error) error { - - cfgUpdateChan := lt.cfgUpdateChan - defer close(cfgUpdateChan) - - lt.startTime = time.Now() - lt.logger.InfoContext(ctx, "starting loadtest", "config", lt.loadtestConfigAsJson(), ) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + lt.resultsHandler() + }() + numWorkers := lt.numWorkers // numTasks is the total number of tasks @@ -1657,6 +2039,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con intervalID = time.Now() taskBuf = taskBuf[:0] + meta.Lag = 0 for { @@ -1755,6 +2138,8 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con numTasks += taskBufSize + meta.Lag = 0 + // wait for next interval time to exist nextIntervalID := intervalID.Add(interval) realNow := time.Now() @@ -1776,6 +2161,17 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con if delay < 0 { intervalID = realNow + lag := -delay + meta.Lag = lag + + lt.resultWaitGroup.Add(1) + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + IntervalID: intervalID, + Lag: lag, + }, + } + } if taskBufSize < numNewTasks { @@ -1796,6 +2192,13 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con ) lt.resultWaitGroup.Wait() + lt.logger.DebugContext(ctx, + "stopping result handler routine", + ) + + // signal for result handler routines to stop + close(lt.resultsChan) + // signal for workers to stop lt.logger.DebugContext(ctx, "stopping workers", @@ -1804,6 +2207,12 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con close(lt.workers[i]) } + // wait for result handler routines to stop + lt.logger.DebugContext(ctx, + "waiting for result handler routines to stop", + ) + wg.Wait() + // wait for workers to stop lt.logger.DebugContext(ctx, "waiting for workers to stop", @@ -2171,6 +2580,8 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con numTasks += taskBufSize + meta.Lag = 0 + // wait for next interval time to exist nextIntervalID := intervalID.Add(interval) realNow := time.Now() @@ -2184,72 +2595,22 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con if delay < 0 { intervalID = realNow - } - } -} - -func (lt *Loadtest) doTask_retriesDisabled_metricsDisabled(ctx context.Context, workerID int, twm taskWithMeta) { - - defer lt.resultWaitGroup.Done() - - // phase is the name of the step which has possibly caused a panic - phase := "do" - - defer func() { - - if r := recover(); r != nil { - - switch v := r.(type) { - case error: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", v, - ) - case []byte: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", string(v), - ) - case string: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", v, - ) - default: - const msg = "unknown cause" + lag := -delay + meta.Lag = lag - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", msg, - ) + lt.resultWaitGroup.Add(1) + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + IntervalID: intervalID, + Lag: lag, + }, } - } - }() - err := twm.doer.Do(ctx, workerID) - phase = "" // done, no panic occurred - if err == nil { - return + } } - - lt.logger.WarnContext(ctx, - "task error", - "worker_id", workerID, - "error", err, - ) - - return } -func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx context.Context, _ *error) error { +func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx context.Context, shutdownErrResp *error) error { cfgUpdateChan := lt.cfgUpdateChan defer close(cfgUpdateChan) @@ -2314,11 +2675,214 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx co var delay time.Duration - // stopping routine runs on return - // flushing as much as possible - defer func() { + readRetries := func(p []Doer) int { + // make sure you only fill up to len - lt.logger.DebugContext(ctx, + var i int + for i < len(p) { + select { + case task := <-lt.retryTaskChan: + p[i] = task + default: + return i + } + i++ + } + + return i + } + + // stopping routine runs on return + // flushing as much as possible + defer func() { + + err := func(flushRetries bool) error { + if !flushRetries { + + lt.logger.DebugContext(ctx, + "not waiting on retries to flush on shutdown", + "reason", "retries disabled or flush retries on shutdown disabled", + "num_tasks", numTasks, + ) + + return nil + } + + if err := ctx.Err(); err != nil { + lt.logger.WarnContext(ctx, + "not waiting on retries to flush on shutdown", + "reason", "user stopped loadtest", + "num_tasks", numTasks, + "error", err, + ) + return nil + } + + lt.logger.DebugContext(ctx, + "waiting on retries to flush", + "num_tasks", numTasks, + ) + + if meta.NumIntervalTasks <= 0 || numWorkers <= 0 { + + lt.logger.ErrorContext(ctx, + "retry flushing could not be attempted", + "num_tasks", numTasks, + "num_interval_tasks", meta.NumIntervalTasks, + "num_workers", numWorkers, + ) + + return ErrRetriesFailedToFlush + } + + preflushNumTasks := numTasks + + lt.logger.WarnContext(ctx, + "shutting down: flushing retries", + "num_tasks", numTasks, + "flush_retries_timeout", lt.flushRetriesTimeout.String(), + ) + + shutdownCtx, cancel := context.WithTimeout(context.Background(), lt.flushRetriesTimeout) + defer cancel() + + intervalID = time.Now() + taskBuf = taskBuf[:0] + + for { + + if err := shutdownCtx.Err(); err != nil { + lt.logger.ErrorContext(ctx, + "failed to flush all retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + "error", err, + ) + + return ErrRetriesFailedToFlush + } + + lt.resultWaitGroup.Wait() + + for { + + if err := shutdownCtx.Err(); err != nil { + lt.logger.ErrorContext(ctx, + "failed to flush all retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + "error", err, + ) + + return ErrRetriesFailedToFlush + } + + select { + case <-ctxDone: + lt.logger.WarnContext(ctx, + "user stopped loadtest while attempting to flush retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + ) + return nil + default: + // continue with load generating retries + } + + // acquire load generation opportunity slots ( smooths bursts ) + // + // in the shutdown retry flow we always want to acquire before reading retries + // to avoid a deadlock edge case of the retry queue being full, all workers tasks failed and need to be retried + if err := lt.intervalTasksSema.Acquire(shutdownCtx, int64(numNewTasks)); err != nil { + lt.logger.ErrorContext(ctx, + "failed to flush all retries", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + "error", err, + "reason", "shutdown timeout likely reached while waiting for semaphore acquisition", + ) + return ErrRetriesFailedToFlush + } + + // read up to numNewTasks from retry slice + taskBufSize := readRetries(taskBuf[:numNewTasks:numNewTasks]) + if taskBufSize <= 0 { + // wait for any pending tasks to flush and try read again + + lt.logger.DebugContext(ctx, + "verifying all retries have flushed", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + ) + + lt.resultWaitGroup.Wait() + + // read up to numNewTasks from retry slice again + taskBufSize = readRetries(taskBuf[:numNewTasks:numNewTasks]) + if taskBufSize <= 0 { + + lt.logger.InfoContext(ctx, + "all retries flushed", + "preflush_num_tasks", preflushNumTasks, + "num_tasks", numTasks, + ) + return nil + } + } + taskBuf = taskBuf[:taskBufSize] + + // re-release any extra load slots we allocated beyond what really remains to do + if numNewTasks > taskBufSize { + lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) + } + + lt.resultWaitGroup.Add(taskBufSize) + + meta.IntervalID = intervalID + + enqueueTasks() + + taskBuf = taskBuf[:0] + + numTasks += taskBufSize + + // wait for next interval time to exist + nextIntervalID := intervalID.Add(interval) + realNow := time.Now() + delay = nextIntervalID.Sub(realNow) + if delay > 0 { + time.Sleep(delay) + intervalID = nextIntervalID + + if taskBufSize < numNewTasks { + // just finished this iteration of retry enqueuing + // + // break to loop through retry drain context again + break + } + + continue + } + + if delay < 0 { + intervalID = realNow + + } + + if taskBufSize < numNewTasks { + // just finished this iteration of retry enqueuing + // + // break to loop through retry drain context again + break + } + } + } + }(lt.flushRetriesOnShutdown) + if err != nil { + *shutdownErrResp = err + } + + lt.logger.DebugContext(ctx, "waiting for running tasks to stop", ) lt.resultWaitGroup.Wait() @@ -2610,7 +3174,22 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx co // continue with load generation } + // read up to numNewTasks from retry slice taskBufSize := 0 + + // acquire load generation opportunity slots ( smooths bursts ) + // + // do this early conditionally to allow retries to settle in the retry channel + // so we can pick them up when enough buffer space has cleared + // + // thus we avoid a possible deadlock where the retry queue is full and the workers + // all have failed tasks that wish to be retried + if lt.intervalTasksSema.Acquire(ctx, int64(numNewTasks)) != nil { + return nil + } + + taskBufSize = readRetries(taskBuf[:numNewTasks:numNewTasks]) + taskBuf = taskBuf[:taskBufSize] if taskBufSize < numNewTasks { @@ -2621,12 +3200,31 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx co } if n == 0 { - lt.logger.WarnContext(ctx, - "stopping loadtest: ReadTasks did not load enough tasks", - "final_task_delta", 0, - ) + // if !lt.retriesDisabled + { + // iteration is technically done now + // but there could be straggling retries + // queued after this, those should continue + // to be flushed if and only if maxTasks + // has not been reached and if it is greater + // than zero + if taskBufSize == 0 { + // return immediately if there is nothing + // new to enqueue - return nil + lt.logger.WarnContext(ctx, + "stopping loadtest: ReadTasks did not load enough tasks", + "final_task_delta", 0, + ) + + return nil + } + + lt.logger.DebugContext(ctx, + "scheduled: stopping loadtest: ReadTasks did not load enough tasks", + "retry_count", taskBufSize, + ) + } } taskBufSize += n @@ -2635,8 +3233,11 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx co // acquire load generation opportunity slots ( smooths bursts ) // if not done already - if lt.intervalTasksSema.Acquire(ctx, int64(taskBufSize)) != nil { - return nil + // + // but if we allocated too many in our retry prep phase then release the + // difference + if numNewTasks > taskBufSize { + lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } lt.resultWaitGroup.Add(taskBufSize) @@ -2649,7 +3250,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx co // must have hit the end of ReadTasks iterator // increase numTasks total by actual number queued // and stop traffic generation - // numTasks += taskBufSize // this line only has an effect except in a retries enabled context + numTasks += taskBufSize lt.logger.WarnContext(ctx, "stopping loadtest: ReadTasks did not load enough tasks", "final_task_delta", taskBufSize, @@ -2678,7 +3279,7 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx co } } -func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context.Context, shutdownErrResp *error) error { +func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx context.Context, _ *error) error { cfgUpdateChan := lt.cfgUpdateChan defer close(cfgUpdateChan) @@ -2773,382 +3374,129 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context var delay time.Duration - readRetries := func(p []Doer) int { - // make sure you only fill up to len - - var i int - for i < len(p) { - select { - case task := <-lt.retryTaskChan: - p[i] = task - default: - return i - } - i++ - } - - return i - } - // stopping routine runs on return // flushing as much as possible defer func() { - err := func(flushRetries bool) error { - if !flushRetries { + lt.logger.DebugContext(ctx, + "waiting for running tasks to stop", + ) + lt.resultWaitGroup.Wait() - lt.logger.DebugContext(ctx, - "not waiting on retries to flush on shutdown", - "reason", "retries disabled or flush retries on shutdown disabled", - "num_tasks", numTasks, - ) + lt.logger.DebugContext(ctx, + "stopping result handler routine", + ) - return nil - } + // signal for result handler routines to stop + close(lt.resultsChan) - if err := ctx.Err(); err != nil { - lt.logger.WarnContext(ctx, - "not waiting on retries to flush on shutdown", - "reason", "user stopped loadtest", - "num_tasks", numTasks, - "error", err, - ) - return nil - } + // signal for workers to stop + lt.logger.DebugContext(ctx, + "stopping workers", + ) + for i := 0; i < len(lt.workers); i++ { + close(lt.workers[i]) + } - lt.logger.DebugContext(ctx, - "waiting on retries to flush", - "num_tasks", numTasks, - ) + // wait for result handler routines to stop + lt.logger.DebugContext(ctx, + "waiting for result handler routines to stop", + ) + wg.Wait() - if meta.NumIntervalTasks <= 0 || numWorkers <= 0 { + // wait for workers to stop + lt.logger.DebugContext(ctx, + "waiting for workers to stop", + ) + lt.workerWaitGroup.Wait() - lt.logger.ErrorContext(ctx, - "retry flushing could not be attempted", - "num_tasks", numTasks, - "num_interval_tasks", meta.NumIntervalTasks, - "num_workers", numWorkers, - ) + lt.logger.InfoContext(ctx, + "loadtest stopped", + ) + }() - return ErrRetriesFailedToFlush - } + // getTaskSlotCount is the task emission back pressure + // throttle that conveys the number of tasks that + // are allowed to be un-finished for the performance + // interval under normal circumstances + getTaskSlotCount := func() int { + return maxPendingTasks(numWorkers, numNewTasks) + } - preflushNumTasks := numTasks + // apply initial task buffer limits to the interval semaphore + taskSlotCount := getTaskSlotCount() + lt.intervalTasksSema.Release(int64(taskSlotCount)) - lt.logger.WarnContext(ctx, - "shutting down: flushing retries", - "num_tasks", numTasks, - "flush_retries_timeout", lt.flushRetriesTimeout.String(), - ) + configCausesPause := func() bool { + return meta.NumIntervalTasks <= 0 || numWorkers <= 0 + } - shutdownCtx, cancel := context.WithTimeout(context.Background(), lt.flushRetriesTimeout) - defer cancel() + var paused bool + var pauseStart time.Time - intervalID = time.Now() - taskBuf = taskBuf[:0] - meta.Lag = 0 + handleConfigUpdateAndPauseState := func(cu ConfigUpdate) error { + for { + var prepSemaErr error + var recomputeInterTaskInterval, recomputeTaskSlots bool - for { + if cu.numWorkers.set { + recomputeTaskSlots = true - if err := shutdownCtx.Err(); err != nil { + n := cu.numWorkers.val + + // prevent over committing on the maxWorkers count + if n < 0 { lt.logger.ErrorContext(ctx, - "failed to flush all retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - "error", err, + "config update not within loadtest boundary conditions: numWorkers", + "reason", "update tried to set numWorkers too low", + "remediation_taken", "using min value", + "requested", n, + "min", 0, ) - - return ErrRetriesFailedToFlush + n = 0 + } else if n > lt.maxWorkers { + lt.logger.ErrorContext(ctx, + "config update not within loadtest boundary conditions: numWorkers", + "reason", "update tried to set numWorkers too high", + "remediation_hint", "increase the loadtest MaxWorkers setting", + "remediation_taken", "using max value", + "requested", n, + "max", lt.maxWorkers, + ) + n = lt.maxWorkers } - lt.resultWaitGroup.Wait() - - for { - - if err := shutdownCtx.Err(); err != nil { - lt.logger.ErrorContext(ctx, - "failed to flush all retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - "error", err, - ) - - return ErrRetriesFailedToFlush - } - - // if maxTasks > 0 - { - if numTasks >= maxTasks { - lt.logger.ErrorContext(ctx, - "failed to flush all retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - "reason", "reached max tasks", - ) - return ErrRetriesFailedToFlush - } + if n > numWorkers { - // 1. the below looks off/odd, why not use?: - // - // ``` - // if n := maxTasks - numTasks; n < numNewTasks { - // numNewTasks = n - // } - // ``` - // - // 2. And for that matter, why not keep meta.NumIntervalTasks in sync with numNewTasks? - // - // --- - // - // 1. The implementation would be exactly the same, just using another variable - // 2. the meta.NumIntervalTasks value is used in RATE calculations, if we keep it in sync - // with BOUNDS values then the last tasks could run at a lower RATE than intended. It - // is only kept in sync when a user adjusts the RATE via a ConfigUpdate. Don't confuse - // bounds purpose values with rate purpose values. - // - numNewTasks = maxTasks - numTasks - if numNewTasks > meta.NumIntervalTasks { - numNewTasks = meta.NumIntervalTasks - } + // unpause workers + for i := numWorkers; i < len(lt.workers); i++ { + lt.workers[i] <- struct{}{} } - select { - case <-ctxDone: - lt.logger.WarnContext(ctx, - "user stopped loadtest while attempting to flush retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - ) - return nil - default: - // continue with load generating retries + // spawn new workers if needed + for i := len(lt.workers); i < n; i++ { + lt.addWorker(ctx, i) } + } else if n < numWorkers { - // acquire load generation opportunity slots ( smooths bursts ) - // - // in the shutdown retry flow we always want to acquire before reading retries - // to avoid a deadlock edge case of the retry queue being full, all workers tasks failed and need to be retried - if err := lt.intervalTasksSema.Acquire(shutdownCtx, int64(numNewTasks)); err != nil { - lt.logger.ErrorContext(ctx, - "failed to flush all retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - "error", err, - "reason", "shutdown timeout likely reached while waiting for semaphore acquisition", - ) - return ErrRetriesFailedToFlush + // pause workers if needed + for i := numWorkers - 1; i >= n; i-- { + lt.workers[i] <- struct{}{} } + } - // read up to numNewTasks from retry slice - taskBufSize := readRetries(taskBuf[:numNewTasks:numNewTasks]) - if taskBufSize <= 0 { - // wait for any pending tasks to flush and try read again - - lt.logger.DebugContext(ctx, - "verifying all retries have flushed", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - ) + configChanges = append(configChanges, + "old_num_workers", numWorkers, + "new_num_workers", n, + ) + numWorkers = n + } - lt.resultWaitGroup.Wait() + if cu.numIntervalTasks.set { + recomputeInterTaskInterval = true + recomputeTaskSlots = true - // read up to numNewTasks from retry slice again - taskBufSize = readRetries(taskBuf[:numNewTasks:numNewTasks]) - if taskBufSize <= 0 { - - lt.logger.InfoContext(ctx, - "all retries flushed", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - ) - return nil - } - } - taskBuf = taskBuf[:taskBufSize] - - // re-release any extra load slots we allocated beyond what really remains to do - if numNewTasks > taskBufSize { - lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) - } - - lt.resultWaitGroup.Add(taskBufSize) - - meta.IntervalID = intervalID - - enqueueTasks() - - taskBuf = taskBuf[:0] - - numTasks += taskBufSize - - meta.Lag = 0 - - // wait for next interval time to exist - nextIntervalID := intervalID.Add(interval) - realNow := time.Now() - delay = nextIntervalID.Sub(realNow) - if delay > 0 { - time.Sleep(delay) - intervalID = nextIntervalID - - if taskBufSize < numNewTasks { - // just finished this iteration of retry enqueuing - // - // break to loop through retry drain context again - break - } - - continue - } - - if delay < 0 { - intervalID = realNow - - lag := -delay - meta.Lag = lag - - lt.resultWaitGroup.Add(1) - lt.resultsChan <- taskResult{ - Meta: taskMeta{ - IntervalID: intervalID, - Lag: lag, - }, - } - - } - - if taskBufSize < numNewTasks { - // just finished this iteration of retry enqueuing - // - // break to loop through retry drain context again - break - } - } - } - }(lt.flushRetriesOnShutdown) - if err != nil { - *shutdownErrResp = err - } - - lt.logger.DebugContext(ctx, - "waiting for running tasks to stop", - ) - lt.resultWaitGroup.Wait() - - lt.logger.DebugContext(ctx, - "stopping result handler routine", - ) - - // signal for result handler routines to stop - close(lt.resultsChan) - - // signal for workers to stop - lt.logger.DebugContext(ctx, - "stopping workers", - ) - for i := 0; i < len(lt.workers); i++ { - close(lt.workers[i]) - } - - // wait for result handler routines to stop - lt.logger.DebugContext(ctx, - "waiting for result handler routines to stop", - ) - wg.Wait() - - // wait for workers to stop - lt.logger.DebugContext(ctx, - "waiting for workers to stop", - ) - lt.workerWaitGroup.Wait() - - lt.logger.InfoContext(ctx, - "loadtest stopped", - ) - }() - - // getTaskSlotCount is the task emission back pressure - // throttle that conveys the number of tasks that - // are allowed to be un-finished for the performance - // interval under normal circumstances - getTaskSlotCount := func() int { - return maxPendingTasks(numWorkers, numNewTasks) - } - - // apply initial task buffer limits to the interval semaphore - taskSlotCount := getTaskSlotCount() - lt.intervalTasksSema.Release(int64(taskSlotCount)) - - configCausesPause := func() bool { - return meta.NumIntervalTasks <= 0 || numWorkers <= 0 - } - - var paused bool - var pauseStart time.Time - - handleConfigUpdateAndPauseState := func(cu ConfigUpdate) error { - for { - var prepSemaErr error - var recomputeInterTaskInterval, recomputeTaskSlots bool - - if cu.numWorkers.set { - recomputeTaskSlots = true - - n := cu.numWorkers.val - - // prevent over committing on the maxWorkers count - if n < 0 { - lt.logger.ErrorContext(ctx, - "config update not within loadtest boundary conditions: numWorkers", - "reason", "update tried to set numWorkers too low", - "remediation_taken", "using min value", - "requested", n, - "min", 0, - ) - n = 0 - } else if n > lt.maxWorkers { - lt.logger.ErrorContext(ctx, - "config update not within loadtest boundary conditions: numWorkers", - "reason", "update tried to set numWorkers too high", - "remediation_hint", "increase the loadtest MaxWorkers setting", - "remediation_taken", "using max value", - "requested", n, - "max", lt.maxWorkers, - ) - n = lt.maxWorkers - } - - if n > numWorkers { - - // unpause workers - for i := numWorkers; i < len(lt.workers); i++ { - lt.workers[i] <- struct{}{} - } - - // spawn new workers if needed - for i := len(lt.workers); i < n; i++ { - lt.addWorker(ctx, i) - } - } else if n < numWorkers { - - // pause workers if needed - for i := numWorkers - 1; i >= n; i-- { - lt.workers[i] <- struct{}{} - } - } - - configChanges = append(configChanges, - "old_num_workers", numWorkers, - "new_num_workers", n, - ) - numWorkers = n - } - - if cu.numIntervalTasks.set { - recomputeInterTaskInterval = true - recomputeTaskSlots = true - - n := cu.numIntervalTasks.val + n := cu.numIntervalTasks.val // prevent over committing on the maxIntervalTasks count if n < 0 { @@ -3350,22 +3698,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context // continue with load generation } - // read up to numNewTasks from retry slice taskBufSize := 0 - - // acquire load generation opportunity slots ( smooths bursts ) - // - // do this early conditionally to allow retries to settle in the retry channel - // so we can pick them up when enough buffer space has cleared - // - // thus we avoid a possible deadlock where the retry queue is full and the workers - // all have failed tasks that wish to be retried - if lt.intervalTasksSema.Acquire(ctx, int64(numNewTasks)) != nil { - return nil - } - - taskBufSize = readRetries(taskBuf[:numNewTasks:numNewTasks]) - taskBuf = taskBuf[:taskBufSize] if taskBufSize < numNewTasks { @@ -3376,31 +3709,12 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context } if n == 0 { - // if !lt.retriesDisabled - { - // iteration is technically done now - // but there could be straggling retries - // queued after this, those should continue - // to be flushed if and only if maxTasks - // has not been reached and if it is greater - // than zero - if taskBufSize == 0 { - // return immediately if there is nothing - // new to enqueue - - lt.logger.WarnContext(ctx, - "stopping loadtest: ReadTasks did not load enough tasks", - "final_task_delta", 0, - ) - - return nil - } + lt.logger.WarnContext(ctx, + "stopping loadtest: ReadTasks did not load enough tasks", + "final_task_delta", 0, + ) - lt.logger.DebugContext(ctx, - "scheduled: stopping loadtest: ReadTasks did not load enough tasks", - "retry_count", taskBufSize, - ) - } + return nil } taskBufSize += n @@ -3409,11 +3723,8 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context // acquire load generation opportunity slots ( smooths bursts ) // if not done already - // - // but if we allocated too many in our retry prep phase then release the - // difference - if numNewTasks > taskBufSize { - lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) + if lt.intervalTasksSema.Acquire(ctx, int64(taskBufSize)) != nil { + return nil } lt.resultWaitGroup.Add(taskBufSize) @@ -3426,7 +3737,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context // must have hit the end of ReadTasks iterator // increase numTasks total by actual number queued // and stop traffic generation - numTasks += taskBufSize + // numTasks += taskBufSize // this line only has an effect except in a retries enabled context lt.logger.WarnContext(ctx, "stopping loadtest: ReadTasks did not load enough tasks", "final_task_delta", taskBufSize, @@ -3468,47 +3779,19 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context } } -func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx context.Context, _ *error) error { +func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsDisabled(ctx context.Context, _ *error) error { cfgUpdateChan := lt.cfgUpdateChan defer close(cfgUpdateChan) lt.startTime = time.Now() - // if lt.csvData.writeErr == nil // a.k.a. !cfg.csvOutputDisabled // (a.k.a. metrics enabled) - { - - csvFile, err := os.Create(lt.csvData.outputFilename) - if err != nil { - return fmt.Errorf("failed to open output csv metrics file for writing: %w", err) - } - defer lt.writeOutputCsvFooterAndClose(csvFile) + lt.logger.InfoContext(ctx, + "starting loadtest", + "config", lt.loadtestConfigAsJson(), + ) - lt.csvData.writeErr = lt.writeOutputCsvConfigComment(csvFile) - - if lt.csvData.writeErr == nil { - - lt.csvData.writer = csv.NewWriter(csvFile) - - lt.csvData.writeErr = lt.writeOutputCsvHeaders() - } - } - - lt.logger.InfoContext(ctx, - "starting loadtest", - "config", lt.loadtestConfigAsJson(), - ) - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - - lt.resultsHandler() - }() - - numWorkers := lt.numWorkers + numWorkers := lt.numWorkers // numTasks is the total number of tasks // scheduled to run ( including retries ) @@ -3572,13 +3855,6 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx contex ) lt.resultWaitGroup.Wait() - lt.logger.DebugContext(ctx, - "stopping result handler routine", - ) - - // signal for result handler routines to stop - close(lt.resultsChan) - // signal for workers to stop lt.logger.DebugContext(ctx, "stopping workers", @@ -3587,12 +3863,6 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx contex close(lt.workers[i]) } - // wait for result handler routines to stop - lt.logger.DebugContext(ctx, - "waiting for result handler routines to stop", - ) - wg.Wait() - // wait for workers to stop lt.logger.DebugContext(ctx, "waiting for workers to stop", @@ -3938,8 +4208,6 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx contex numTasks += taskBufSize - meta.Lag = 0 - // wait for next interval time to exist nextIntervalID := intervalID.Add(interval) realNow := time.Now() @@ -3953,33 +4221,50 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx contex if delay < 0 { intervalID = realNow - lag := -delay - meta.Lag = lag - - lt.resultWaitGroup.Add(1) - lt.resultsChan <- taskResult{ - Meta: taskMeta{ - IntervalID: intervalID, - Lag: lag, - }, - } - } } } -func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx context.Context, shutdownErrResp *error) error { +func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx context.Context, _ *error) error { cfgUpdateChan := lt.cfgUpdateChan defer close(cfgUpdateChan) lt.startTime = time.Now() + // if lt.csvData.writeErr == nil // a.k.a. !cfg.csvOutputDisabled // (a.k.a. metrics enabled) + { + + csvFile, err := os.Create(lt.csvData.outputFilename) + if err != nil { + return fmt.Errorf("failed to open output csv metrics file for writing: %w", err) + } + defer lt.writeOutputCsvFooterAndClose(csvFile) + + lt.csvData.writeErr = lt.writeOutputCsvConfigComment(csvFile) + + if lt.csvData.writeErr == nil { + + lt.csvData.writer = csv.NewWriter(csvFile) + + lt.csvData.writeErr = lt.writeOutputCsvHeaders() + } + } + lt.logger.InfoContext(ctx, "starting loadtest", "config", lt.loadtestConfigAsJson(), ) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + + lt.resultsHandler() + }() + numWorkers := lt.numWorkers // numTasks is the total number of tasks @@ -3988,8 +4273,6 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex intervalID := time.Now() - maxTasks := lt.maxTasks - interval := lt.interval numNewTasks := lt.numIntervalTasks ctxDone := ctx.Done() @@ -4028,261 +4311,29 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex enqueueTasks = floodStrategy } else { enqueueTasks = staggerStrategy - } - } - } - updateEnqueueTasksStrategy() - - var delay time.Duration - - readRetries := func(p []Doer) int { - // make sure you only fill up to len - - var i int - for i < len(p) { - select { - case task := <-lt.retryTaskChan: - p[i] = task - default: - return i - } - i++ - } - - return i - } - - // stopping routine runs on return - // flushing as much as possible - defer func() { - - err := func(flushRetries bool) error { - if !flushRetries { - - lt.logger.DebugContext(ctx, - "not waiting on retries to flush on shutdown", - "reason", "retries disabled or flush retries on shutdown disabled", - "num_tasks", numTasks, - ) - - return nil - } - - if err := ctx.Err(); err != nil { - lt.logger.WarnContext(ctx, - "not waiting on retries to flush on shutdown", - "reason", "user stopped loadtest", - "num_tasks", numTasks, - "error", err, - ) - return nil - } - - lt.logger.DebugContext(ctx, - "waiting on retries to flush", - "num_tasks", numTasks, - ) - - if meta.NumIntervalTasks <= 0 || numWorkers <= 0 { - - lt.logger.ErrorContext(ctx, - "retry flushing could not be attempted", - "num_tasks", numTasks, - "num_interval_tasks", meta.NumIntervalTasks, - "num_workers", numWorkers, - ) - - return ErrRetriesFailedToFlush - } - - preflushNumTasks := numTasks - - lt.logger.WarnContext(ctx, - "shutting down: flushing retries", - "num_tasks", numTasks, - "flush_retries_timeout", lt.flushRetriesTimeout.String(), - ) - - shutdownCtx, cancel := context.WithTimeout(context.Background(), lt.flushRetriesTimeout) - defer cancel() - - intervalID = time.Now() - taskBuf = taskBuf[:0] - - for { - - if err := shutdownCtx.Err(); err != nil { - lt.logger.ErrorContext(ctx, - "failed to flush all retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - "error", err, - ) - - return ErrRetriesFailedToFlush - } - - lt.resultWaitGroup.Wait() - - for { - - if err := shutdownCtx.Err(); err != nil { - lt.logger.ErrorContext(ctx, - "failed to flush all retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - "error", err, - ) - - return ErrRetriesFailedToFlush - } - - // if maxTasks > 0 - { - if numTasks >= maxTasks { - lt.logger.ErrorContext(ctx, - "failed to flush all retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - "reason", "reached max tasks", - ) - return ErrRetriesFailedToFlush - } - - // 1. the below looks off/odd, why not use?: - // - // ``` - // if n := maxTasks - numTasks; n < numNewTasks { - // numNewTasks = n - // } - // ``` - // - // 2. And for that matter, why not keep meta.NumIntervalTasks in sync with numNewTasks? - // - // --- - // - // 1. The implementation would be exactly the same, just using another variable - // 2. the meta.NumIntervalTasks value is used in RATE calculations, if we keep it in sync - // with BOUNDS values then the last tasks could run at a lower RATE than intended. It - // is only kept in sync when a user adjusts the RATE via a ConfigUpdate. Don't confuse - // bounds purpose values with rate purpose values. - // - numNewTasks = maxTasks - numTasks - if numNewTasks > meta.NumIntervalTasks { - numNewTasks = meta.NumIntervalTasks - } - } - - select { - case <-ctxDone: - lt.logger.WarnContext(ctx, - "user stopped loadtest while attempting to flush retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - ) - return nil - default: - // continue with load generating retries - } - - // acquire load generation opportunity slots ( smooths bursts ) - // - // in the shutdown retry flow we always want to acquire before reading retries - // to avoid a deadlock edge case of the retry queue being full, all workers tasks failed and need to be retried - if err := lt.intervalTasksSema.Acquire(shutdownCtx, int64(numNewTasks)); err != nil { - lt.logger.ErrorContext(ctx, - "failed to flush all retries", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - "error", err, - "reason", "shutdown timeout likely reached while waiting for semaphore acquisition", - ) - return ErrRetriesFailedToFlush - } - - // read up to numNewTasks from retry slice - taskBufSize := readRetries(taskBuf[:numNewTasks:numNewTasks]) - if taskBufSize <= 0 { - // wait for any pending tasks to flush and try read again - - lt.logger.DebugContext(ctx, - "verifying all retries have flushed", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - ) - - lt.resultWaitGroup.Wait() - - // read up to numNewTasks from retry slice again - taskBufSize = readRetries(taskBuf[:numNewTasks:numNewTasks]) - if taskBufSize <= 0 { - - lt.logger.InfoContext(ctx, - "all retries flushed", - "preflush_num_tasks", preflushNumTasks, - "num_tasks", numTasks, - ) - return nil - } - } - taskBuf = taskBuf[:taskBufSize] - - // re-release any extra load slots we allocated beyond what really remains to do - if numNewTasks > taskBufSize { - lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) - } - - lt.resultWaitGroup.Add(taskBufSize) - - meta.IntervalID = intervalID - - enqueueTasks() - - taskBuf = taskBuf[:0] - - numTasks += taskBufSize - - // wait for next interval time to exist - nextIntervalID := intervalID.Add(interval) - realNow := time.Now() - delay = nextIntervalID.Sub(realNow) - if delay > 0 { - time.Sleep(delay) - intervalID = nextIntervalID - - if taskBufSize < numNewTasks { - // just finished this iteration of retry enqueuing - // - // break to loop through retry drain context again - break - } - - continue - } - - if delay < 0 { - intervalID = realNow - - } - - if taskBufSize < numNewTasks { - // just finished this iteration of retry enqueuing - // - // break to loop through retry drain context again - break - } - } - } - }(lt.flushRetriesOnShutdown) - if err != nil { - *shutdownErrResp = err + } } + } + updateEnqueueTasksStrategy() + + var delay time.Duration + + // stopping routine runs on return + // flushing as much as possible + defer func() { lt.logger.DebugContext(ctx, "waiting for running tasks to stop", ) lt.resultWaitGroup.Wait() + lt.logger.DebugContext(ctx, + "stopping result handler routine", + ) + + // signal for result handler routines to stop + close(lt.resultsChan) + // signal for workers to stop lt.logger.DebugContext(ctx, "stopping workers", @@ -4291,6 +4342,12 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex close(lt.workers[i]) } + // wait for result handler routines to stop + lt.logger.DebugContext(ctx, + "waiting for result handler routines to stop", + ) + wg.Wait() + // wait for workers to stop lt.logger.DebugContext(ctx, "waiting for workers to stop", @@ -4543,21 +4600,6 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex // main task scheduling loop for { - // if maxTasks > 0 - { - if numTasks >= maxTasks { - lt.logger.WarnContext(ctx, - "loadtest finished: max task count reached", - "max_tasks", maxTasks, - ) - return nil - } - - numNewTasks = maxTasks - numTasks - if numNewTasks > meta.NumIntervalTasks { - numNewTasks = meta.NumIntervalTasks - } - } // duplicating short-circuit signal control processing to give it priority over the randomizing nature of the multi-select // that follows @@ -4585,22 +4627,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex // continue with load generation } - // read up to numNewTasks from retry slice taskBufSize := 0 - - // acquire load generation opportunity slots ( smooths bursts ) - // - // do this early conditionally to allow retries to settle in the retry channel - // so we can pick them up when enough buffer space has cleared - // - // thus we avoid a possible deadlock where the retry queue is full and the workers - // all have failed tasks that wish to be retried - if lt.intervalTasksSema.Acquire(ctx, int64(numNewTasks)) != nil { - return nil - } - - taskBufSize = readRetries(taskBuf[:numNewTasks:numNewTasks]) - taskBuf = taskBuf[:taskBufSize] if taskBufSize < numNewTasks { @@ -4611,31 +4638,12 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex } if n == 0 { - // if !lt.retriesDisabled - { - // iteration is technically done now - // but there could be straggling retries - // queued after this, those should continue - // to be flushed if and only if maxTasks - // has not been reached and if it is greater - // than zero - if taskBufSize == 0 { - // return immediately if there is nothing - // new to enqueue - - lt.logger.WarnContext(ctx, - "stopping loadtest: ReadTasks did not load enough tasks", - "final_task_delta", 0, - ) - - return nil - } + lt.logger.WarnContext(ctx, + "stopping loadtest: ReadTasks did not load enough tasks", + "final_task_delta", 0, + ) - lt.logger.DebugContext(ctx, - "scheduled: stopping loadtest: ReadTasks did not load enough tasks", - "retry_count", taskBufSize, - ) - } + return nil } taskBufSize += n @@ -4644,11 +4652,8 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex // acquire load generation opportunity slots ( smooths bursts ) // if not done already - // - // but if we allocated too many in our retry prep phase then release the - // difference - if numNewTasks > taskBufSize { - lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) + if lt.intervalTasksSema.Acquire(ctx, int64(taskBufSize)) != nil { + return nil } lt.resultWaitGroup.Add(taskBufSize) @@ -4661,7 +4666,7 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex // must have hit the end of ReadTasks iterator // increase numTasks total by actual number queued // and stop traffic generation - numTasks += taskBufSize + // numTasks += taskBufSize // this line only has an effect except in a retries enabled context lt.logger.WarnContext(ctx, "stopping loadtest: ReadTasks did not load enough tasks", "final_task_delta", taskBufSize, @@ -4673,6 +4678,8 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex numTasks += taskBufSize + meta.Lag = 0 + // wait for next interval time to exist nextIntervalID := intervalID.Add(interval) realNow := time.Now() @@ -4686,11 +4693,22 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex if delay < 0 { intervalID = realNow + lag := -delay + meta.Lag = lag + + lt.resultWaitGroup.Add(1) + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + IntervalID: intervalID, + Lag: lag, + }, + } + } } } -func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsDisabled(ctx context.Context, _ *error) error { +func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx context.Context, _ *error) error { cfgUpdateChan := lt.cfgUpdateChan defer close(cfgUpdateChan) @@ -4710,8 +4728,6 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsDisabled(ctx conte intervalID := time.Now() - maxTasks := lt.maxTasks - interval := lt.interval numNewTasks := lt.numIntervalTasks ctxDone := ctx.Done() @@ -5026,21 +5042,6 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsDisabled(ctx conte // main task scheduling loop for { - // if maxTasks > 0 - { - if numTasks >= maxTasks { - lt.logger.WarnContext(ctx, - "loadtest finished: max task count reached", - "max_tasks", maxTasks, - ) - return nil - } - - numNewTasks = maxTasks - numTasks - if numNewTasks > meta.NumIntervalTasks { - numNewTasks = meta.NumIntervalTasks - } - } // duplicating short-circuit signal control processing to give it priority over the randomizing nature of the multi-select // that follows @@ -5135,3 +5136,372 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsDisabled(ctx conte } } } + +func (lt *Loadtest) writeOutputCsvRow_maxTasksGTZero_percentileEnabled(mr metricRecord) { + + cd := <.csvData + if cd.writeErr != nil { + return + } + + nowStr := timeToString(time.Now()) + + var percent string + { + // Note: this could integer overflow + // should be highly unlikely and only affects + // a percent complete metric which is likely not too critical + high := mr.totalNumTasks * 10000 / lt.maxTasks + low := high % 100 + high /= 100 + + var sep string + if low < 10 { + sep = ".0" + } else { + sep = "." + } + + percent = strconv.Itoa(high) + sep + strconv.Itoa(low) + } + + fields := []string{ + nowStr, + timeToString(mr.intervalID), + strconv.Itoa(mr.numIntervalTasks), + mr.lag.String(), + mr.sumLag.String(), + strconv.Itoa(mr.numTasks), + strconv.Itoa(mr.numPass), + strconv.Itoa(mr.numFail), + strconv.Itoa(mr.numRetry), + strconv.Itoa(mr.numPanic), + mr.minQueuedDuration.String(), + (mr.sumQueuedDuration / time.Duration(mr.numTasks)).String(), + mr.maxQueuedDuration.String(), + mr.sumQueuedDuration.String(), + mr.minTaskDuration.String(), + (mr.sumTaskDuration / time.Duration(mr.numTasks)).String(), + mr.maxTaskDuration.String(), + mr.sumTaskDuration.String(), + mr.queuedDurations.StringPercentile(lt.latencyPercentile), + mr.taskDurations.StringPercentile(lt.latencyPercentile), + percent, + } + + if err := cd.writer.Write(fields); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } +} + +func (lt *Loadtest) writeOutputCsvRow_maxTasksGTZero_percentileDisabled(mr metricRecord) { + + cd := <.csvData + if cd.writeErr != nil { + return + } + + nowStr := timeToString(time.Now()) + + var percent string + { + // Note: this could integer overflow + // should be highly unlikely and only affects + // a percent complete metric which is likely not too critical + high := mr.totalNumTasks * 10000 / lt.maxTasks + low := high % 100 + high /= 100 + + var sep string + if low < 10 { + sep = ".0" + } else { + sep = "." + } + + percent = strconv.Itoa(high) + sep + strconv.Itoa(low) + } + + fields := []string{ + nowStr, + timeToString(mr.intervalID), + strconv.Itoa(mr.numIntervalTasks), + mr.lag.String(), + mr.sumLag.String(), + strconv.Itoa(mr.numTasks), + strconv.Itoa(mr.numPass), + strconv.Itoa(mr.numFail), + strconv.Itoa(mr.numRetry), + strconv.Itoa(mr.numPanic), + mr.minQueuedDuration.String(), + (mr.sumQueuedDuration / time.Duration(mr.numTasks)).String(), + mr.maxQueuedDuration.String(), + mr.sumQueuedDuration.String(), + mr.minTaskDuration.String(), + (mr.sumTaskDuration / time.Duration(mr.numTasks)).String(), + mr.maxTaskDuration.String(), + mr.sumTaskDuration.String(), + percent, + } + + if err := cd.writer.Write(fields); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } +} + +func (lt *Loadtest) writeOutputCsvRow_maxTasksNotGTZero_percentileEnabled(mr metricRecord) { + + cd := <.csvData + if cd.writeErr != nil { + return + } + + nowStr := timeToString(time.Now()) + + fields := []string{ + nowStr, + timeToString(mr.intervalID), + strconv.Itoa(mr.numIntervalTasks), + mr.lag.String(), + mr.sumLag.String(), + strconv.Itoa(mr.numTasks), + strconv.Itoa(mr.numPass), + strconv.Itoa(mr.numFail), + strconv.Itoa(mr.numRetry), + strconv.Itoa(mr.numPanic), + mr.minQueuedDuration.String(), + (mr.sumQueuedDuration / time.Duration(mr.numTasks)).String(), + mr.maxQueuedDuration.String(), + mr.sumQueuedDuration.String(), + mr.minTaskDuration.String(), + (mr.sumTaskDuration / time.Duration(mr.numTasks)).String(), + mr.maxTaskDuration.String(), + mr.sumTaskDuration.String(), + mr.queuedDurations.StringPercentile(lt.latencyPercentile), + mr.taskDurations.StringPercentile(lt.latencyPercentile), + } + + if err := cd.writer.Write(fields); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } +} + +func (lt *Loadtest) writeOutputCsvRow_maxTasksNotGTZero_percentileDisabled(mr metricRecord) { + + cd := <.csvData + if cd.writeErr != nil { + return + } + + nowStr := timeToString(time.Now()) + + fields := []string{ + nowStr, + timeToString(mr.intervalID), + strconv.Itoa(mr.numIntervalTasks), + mr.lag.String(), + mr.sumLag.String(), + strconv.Itoa(mr.numTasks), + strconv.Itoa(mr.numPass), + strconv.Itoa(mr.numFail), + strconv.Itoa(mr.numRetry), + strconv.Itoa(mr.numPanic), + mr.minQueuedDuration.String(), + (mr.sumQueuedDuration / time.Duration(mr.numTasks)).String(), + mr.maxQueuedDuration.String(), + mr.sumQueuedDuration.String(), + mr.minTaskDuration.String(), + (mr.sumTaskDuration / time.Duration(mr.numTasks)).String(), + mr.maxTaskDuration.String(), + mr.sumTaskDuration.String(), + } + + if err := cd.writer.Write(fields); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } +} + +func (lt *Loadtest) resultsHandler_percentileEnabled() { + + cd := <.csvData + var mr metricRecord + mr.reset() + mr.queuedDurations = []time.Duration{} + mr.taskDurations = []time.Duration{} + + var writeRow func() + if lt.maxTasks > 0 { + writeRow = func() { + mr.totalNumTasks += mr.numTasks + lt.writeOutputCsvRow(mr) + } + } else { + writeRow = func() { + lt.writeOutputCsvRow(mr) + } + } + + cd.flushDeadline = time.Now().Add(cd.flushInterval) + + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return + } + + lt.resultWaitGroup.Done() + + if cd.writeErr != nil { + continue + } + + if tr.taskResultFlags.isZero() { + + mr.sumLag += tr.Meta.Lag + + continue + } + + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } + + if mr.minQueuedDuration > tr.QueuedDuration { + mr.minQueuedDuration = tr.QueuedDuration + } + + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueuedDuration < tr.QueuedDuration { + mr.maxQueuedDuration = tr.QueuedDuration + } + + mr.sumQueuedDuration += tr.QueuedDuration + mr.sumTaskDuration += tr.TaskDuration + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) + + mr.numTasks++ + + mr.queuedDurations.Add(tr.QueuedDuration) + mr.taskDurations.Add(tr.TaskDuration) + + if mr.numTasks >= mr.numIntervalTasks { + + writeRow() + mr.reset() + + mr.resetLatencySlices() + + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) + } + } + } +} + +func (lt *Loadtest) resultsHandler_percentileDisabled() { + + cd := <.csvData + var mr metricRecord + mr.reset() + + var writeRow func() + if lt.maxTasks > 0 { + writeRow = func() { + mr.totalNumTasks += mr.numTasks + lt.writeOutputCsvRow(mr) + } + } else { + writeRow = func() { + lt.writeOutputCsvRow(mr) + } + } + + cd.flushDeadline = time.Now().Add(cd.flushInterval) + + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return + } + + lt.resultWaitGroup.Done() + + if cd.writeErr != nil { + continue + } + + if tr.taskResultFlags.isZero() { + + mr.sumLag += tr.Meta.Lag + + continue + } + + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } + + if mr.minQueuedDuration > tr.QueuedDuration { + mr.minQueuedDuration = tr.QueuedDuration + } + + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueuedDuration < tr.QueuedDuration { + mr.maxQueuedDuration = tr.QueuedDuration + } + + mr.sumQueuedDuration += tr.QueuedDuration + mr.sumTaskDuration += tr.TaskDuration + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) + + mr.numTasks++ + + if mr.numTasks >= mr.numIntervalTasks { + + writeRow() + mr.reset() + + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) + } + } + } +} diff --git a/loadtester/internal/cmd/generate/doTask.go.tmpl b/loadtester/internal/cmd/generate/doTask.go.tmpl new file mode 100644 index 0000000..c15b05c --- /dev/null +++ b/loadtester/internal/cmd/generate/doTask.go.tmpl @@ -0,0 +1,121 @@ +func (lt *Loadtest) doTask_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{end}}_metrics{{if .MetricsEnabled}}Enabled{{else}}Disabled{{end}}(ctx context.Context, workerID int, twm taskWithMeta) { + {{if .MetricsEnabled}} + var respFlags taskResultFlags + { + taskStart := time.Now() + defer func(){ + taskEnd := time.Now() + + lt.resultsChan <- taskResult{ + taskResultFlags: respFlags, + QueuedDuration: taskStart.Sub(twm.enqueueTime), + TaskDuration: taskEnd.Sub(taskStart), + Meta: twm.meta, + } + }() + } + {{else}} + defer lt.resultWaitGroup.Done() + {{end}} + + // phase is the name of the step which has possibly caused a panic + phase := "do" + {{if .RetriesEnabled}} + var rt *retryTask + if v, ok := twm.doer.(*retryTask); ok { + rt = v + phase = "retry" + defer func() { + *rt = retryTask{} + lt.retryTaskPool.Put(v) + }() + } + {{- end}} + defer func() { + + if r := recover(); r != nil { + {{if .MetricsEnabled}} + respFlags.Panicked = 1 + respFlags.Errored = 1 + {{end}} + + switch v := r.(type) { + case error: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", v, + ) + case []byte: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", string(v), + ) + case string: + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", v, + ) + default: + const msg = "unknown cause" + + lt.logger.ErrorContext(ctx, + "worker recovered from panic", + "worker_id", workerID, + "phase", phase, + "error", msg, + ) + } + } + }() + err := twm.doer.Do(ctx, workerID) + phase = "" // done, no panic occurred + if err == nil { + {{if .MetricsEnabled}}respFlags.Passed = 1{{end}} + return + } + + lt.logger.WarnContext(ctx, + "task error", + "worker_id", workerID, + "error", err, + ) + + {{if .MetricsEnabled}}respFlags.Errored = 1{{end}} + + {{if .RetriesEnabled}} + var dr DoRetryer + if rt != nil { + dr = rt.DoRetryer + } else if v, ok := twm.doer.(DoRetryer); ok { + dr = v + } else { + return + } + + phase = "can-retry" + if v, ok := dr.(DoRetryChecker); ok && !v.CanRetry(ctx, workerID, err) { + phase = "" // done, no panic occurred + return + } + phase = "" // done, no panic occurred + + // queue a new retry task + { + rt := lt.retryTaskPool.Get().(*retryTask) + + *rt = retryTask{dr, err} + + lt.retryTaskChan <- rt + } + + {{if .MetricsEnabled}}respFlags.RetryQueued = 1{{end}} + + {{- end}} + return +} diff --git a/loadtester/internal/cmd/generate/imports.go.tmpl b/loadtester/internal/cmd/generate/imports.go.tmpl new file mode 100644 index 0000000..b23398b --- /dev/null +++ b/loadtester/internal/cmd/generate/imports.go.tmpl @@ -0,0 +1,11 @@ +package loadtester + +import ( + "context" + "encoding/csv" + "fmt" + "os" + "strconv" + "sync" + "time" +) diff --git a/loadtester/internal/cmd/generate/main.go b/loadtester/internal/cmd/generate/main.go index ac02fe5..ca5af96 100644 --- a/loadtester/internal/cmd/generate/main.go +++ b/loadtester/internal/cmd/generate/main.go @@ -5,19 +5,58 @@ import ( "bytes" _ "embed" "go/format" + "io" "os" "text/template" ) -//go:embed gen_strategies.go.tmpl -var templateBody string +//go:embed imports.go.tmpl +var tsImports string -func main() { - tmpl, err := template.New("").Parse(templateBody) +//go:embed doTask.go.tmpl +var tsDoTask string + +//go:embed run.go.tmpl +var tsRun string + +//go:embed writeOutputCsvRow.go.tmpl +var tsWriteOutputCsvRow string + +//go:embed resultsHandler.go.tmpl +var tsResultsHandler string + +func parse(s string) *template.Template { + t, err := template.New("").Parse(s) if err != nil { panic(err) } + return t +} + +func renderer[T any](w io.Writer) func(*template.Template, []T) { + return func(t *template.Template, data []T) { + if len(data) == 0 { + if err := t.Execute(w, nil); err != nil { + panic(err) + } + if _, err := w.Write([]byte("\n")); err != nil { + panic(err) + } + return + } + + for _, d := range data { + if err := t.Execute(w, d); err != nil { + panic(err) + } + if _, err := w.Write([]byte("\n")); err != nil { + panic(err) + } + } + } +} +func main() { const dstFile = "../../../gen_strategies.go" defer func() { @@ -36,106 +75,104 @@ func main() { var buf bytes.Buffer - type data struct { - ImportsOnly bool - GenDoTask bool - RetriesEnabled bool - MaxTasksGTZero bool - MetricsEnabled bool - } - _, err = buf.WriteString(`// Code generated by ./internal/cmd/generate/main.go DO NOT EDIT.` + "\n\n") if err != nil { panic(err) } - err = tmpl.Execute(&buf, data{ - ImportsOnly: true, - }) - if err != nil { - panic(err) - } + // create imports section of source code + { + render := renderer[any](&buf) - err = tmpl.Execute(&buf, data{ - RetriesEnabled: true, - GenDoTask: true, - MaxTasksGTZero: false, - MetricsEnabled: true, - }) - if err != nil { - panic(err) + render(parse(tsImports), nil) } - err = tmpl.Execute(&buf, data{ - RetriesEnabled: false, - GenDoTask: true, - MaxTasksGTZero: false, - MetricsEnabled: true, - }) - if err != nil { - panic(err) - } + // render doTask strategies + { + t := parse(tsDoTask) - err = tmpl.Execute(&buf, data{ - RetriesEnabled: true, - GenDoTask: true, - MaxTasksGTZero: false, - MetricsEnabled: false, - }) - if err != nil { - panic(err) - } + type cfg struct { + RetriesEnabled bool + MetricsEnabled bool + } - err = tmpl.Execute(&buf, data{ - RetriesEnabled: false, - GenDoTask: true, - MaxTasksGTZero: false, - MetricsEnabled: false, - }) - if err != nil { - panic(err) - } + render := renderer[cfg](&buf) - err = tmpl.Execute(&buf, data{ - RetriesEnabled: true, - GenDoTask: false, - MaxTasksGTZero: true, - MetricsEnabled: true, - }) - if err != nil { - panic(err) + render(t, []cfg{ + {RetriesEnabled: true, MetricsEnabled: true}, + {RetriesEnabled: true, MetricsEnabled: false}, + {RetriesEnabled: false, MetricsEnabled: true}, + {RetriesEnabled: false, MetricsEnabled: false}, + }) } - err = tmpl.Execute(&buf, data{ - RetriesEnabled: false, - GenDoTask: false, - MaxTasksGTZero: true, - MetricsEnabled: true, - }) - if err != nil { - panic(err) + // render run strategies + { + t := parse(tsRun) + + type cfg struct { + RetriesEnabled bool + MaxTasksGTZero bool + MetricsEnabled bool + } + + render := renderer[cfg](&buf) + + render(t, []cfg{ + {RetriesEnabled: true, MaxTasksGTZero: true, MetricsEnabled: true}, + {RetriesEnabled: true, MaxTasksGTZero: true, MetricsEnabled: false}, + {RetriesEnabled: true, MaxTasksGTZero: false, MetricsEnabled: true}, + {RetriesEnabled: true, MaxTasksGTZero: false, MetricsEnabled: false}, + {RetriesEnabled: false, MaxTasksGTZero: true, MetricsEnabled: true}, + {RetriesEnabled: false, MaxTasksGTZero: true, MetricsEnabled: false}, + {RetriesEnabled: false, MaxTasksGTZero: false, MetricsEnabled: true}, + {RetriesEnabled: false, MaxTasksGTZero: false, MetricsEnabled: false}, + }) } - err = tmpl.Execute(&buf, data{ - RetriesEnabled: true, - GenDoTask: false, - MaxTasksGTZero: true, - MetricsEnabled: false, - }) - if err != nil { - panic(err) + // render writeOutputCsvRow strategies + { + t := parse(tsWriteOutputCsvRow) + + type cfg struct { + MaxTasksGTZero bool + PercentileEnabled bool + } + + render := renderer[cfg](&buf) + + render(t, []cfg{ + {MaxTasksGTZero: true, PercentileEnabled: true}, + {MaxTasksGTZero: true, PercentileEnabled: false}, + {MaxTasksGTZero: false, PercentileEnabled: true}, + {MaxTasksGTZero: false, PercentileEnabled: false}, + }) } - err = tmpl.Execute(&buf, data{ - RetriesEnabled: false, - GenDoTask: false, - MaxTasksGTZero: true, - MetricsEnabled: false, - }) - if err != nil { - panic(err) + // render writeOutputCsvRow strategies + { + t := parse(tsResultsHandler) + + type cfg struct { + PercentileEnabled bool + } + + render := renderer[cfg](&buf) + + render(t, []cfg{ + {PercentileEnabled: true}, + {PercentileEnabled: false}, + }) } + // // for debugging + // _, err = f.Write(buf.Bytes()) + // if err != nil { + // panic(err) + // } else { + // return + // } + b, err := format.Source(buf.Bytes()) if err != nil { panic(err) diff --git a/loadtester/internal/cmd/generate/resultsHandler.go.tmpl b/loadtester/internal/cmd/generate/resultsHandler.go.tmpl new file mode 100644 index 0000000..07f5b43 --- /dev/null +++ b/loadtester/internal/cmd/generate/resultsHandler.go.tmpl @@ -0,0 +1,101 @@ +func (lt *Loadtest) resultsHandler_percentile{{if .PercentileEnabled}}Enabled{{else}}Disabled{{end}}() { + + cd := <.csvData + var mr metricRecord + mr.reset() + {{if .PercentileEnabled -}} + mr.queuedDurations = []time.Duration{} + mr.taskDurations = []time.Duration{} + {{end}} + + var writeRow func() + if lt.maxTasks > 0 { + writeRow = func() { + mr.totalNumTasks += mr.numTasks + lt.writeOutputCsvRow(mr) + } + } else { + writeRow = func() { + lt.writeOutputCsvRow(mr) + } + } + + cd.flushDeadline = time.Now().Add(cd.flushInterval) + + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return + } + + lt.resultWaitGroup.Done() + + if cd.writeErr != nil { + continue + } + + if tr.taskResultFlags.isZero() { + + mr.sumLag += tr.Meta.Lag + + continue + } + + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } + + if mr.minQueuedDuration > tr.QueuedDuration { + mr.minQueuedDuration = tr.QueuedDuration + } + + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueuedDuration < tr.QueuedDuration { + mr.maxQueuedDuration = tr.QueuedDuration + } + + mr.sumQueuedDuration += tr.QueuedDuration + mr.sumTaskDuration += tr.TaskDuration + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) + + mr.numTasks++ + + {{if .PercentileEnabled -}} + mr.queuedDurations.Add(tr.QueuedDuration) + mr.taskDurations.Add(tr.TaskDuration) + {{end}} + + if mr.numTasks >= mr.numIntervalTasks { + + writeRow() + mr.reset() + + {{if .PercentileEnabled}} + mr.resetLatencySlices() + {{end}} + + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) + } + } + } +} diff --git a/loadtester/internal/cmd/generate/gen_strategies.go.tmpl b/loadtester/internal/cmd/generate/run.go.tmpl similarity index 88% rename from loadtester/internal/cmd/generate/gen_strategies.go.tmpl rename to loadtester/internal/cmd/generate/run.go.tmpl index f9fe735..6ab47a2 100644 --- a/loadtester/internal/cmd/generate/gen_strategies.go.tmpl +++ b/loadtester/internal/cmd/generate/run.go.tmpl @@ -1,138 +1,3 @@ -{{if .ImportsOnly -}} -package loadtester - -import ( - "context" - "encoding/csv" - "fmt" - "os" - "sync" - "time" -) -{{else -}} -{{if .GenDoTask}} -func (lt *Loadtest) doTask_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{end}}_metrics{{if .MetricsEnabled}}Enabled{{else}}Disabled{{end}}(ctx context.Context, workerID int, twm taskWithMeta) { - {{if .MetricsEnabled}} - var respFlags taskResultFlags - { - taskStart := time.Now() - defer func(){ - taskEnd := time.Now() - - lt.resultsChan <- taskResult{ - taskResultFlags: respFlags, - QueuedDuration: taskStart.Sub(twm.enqueueTime), - TaskDuration: taskEnd.Sub(taskStart), - Meta: twm.meta, - } - }() - } - {{else}} - defer lt.resultWaitGroup.Done() - {{end}} - - // phase is the name of the step which has possibly caused a panic - phase := "do" - {{if .RetriesEnabled}} - var rt *retryTask - if v, ok := twm.doer.(*retryTask); ok { - rt = v - phase = "retry" - defer func() { - *rt = retryTask{} - lt.retryTaskPool.Put(v) - }() - } - {{- end}} - defer func() { - - if r := recover(); r != nil { - {{if .MetricsEnabled}} - respFlags.Panicked = 1 - respFlags.Errored = 1 - {{end}} - - switch v := r.(type) { - case error: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", v, - ) - case []byte: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", string(v), - ) - case string: - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", v, - ) - default: - const msg = "unknown cause" - - lt.logger.ErrorContext(ctx, - "worker recovered from panic", - "worker_id", workerID, - "phase", phase, - "error", msg, - ) - } - } - }() - err := twm.doer.Do(ctx, workerID) - phase = "" // done, no panic occurred - if err == nil { - {{if .MetricsEnabled}}respFlags.Passed = 1{{end}} - return - } - - lt.logger.WarnContext(ctx, - "task error", - "worker_id", workerID, - "error", err, - ) - - {{if .MetricsEnabled}}respFlags.Errored = 1{{end}} - - {{if .RetriesEnabled}} - var dr DoRetryer - if rt != nil { - dr = rt.DoRetryer - } else if v, ok := twm.doer.(DoRetryer); ok { - dr = v - } else { - return - } - - phase = "can-retry" - if v, ok := dr.(DoRetryChecker); ok && !v.CanRetry(ctx, workerID, err) { - phase = "" // done, no panic occurred - return - } - phase = "" // done, no panic occurred - - // queue a new retry task - { - rt := lt.retryTaskPool.Get().(*retryTask) - - *rt = retryTask{dr, err} - - lt.retryTaskChan <- rt - } - - {{if .MetricsEnabled}}respFlags.RetryQueued = 1{{end}} - - {{- end}} - return -} -{{end}} func (lt *Loadtest) run_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{end}}_maxTasks{{if .MaxTasksGTZero}}GTZero{{else}}NotGTZero{{end}}_metrics{{if .MetricsEnabled}}Enabled{{else}}Disabled{{end}}(ctx context.Context, {{if .RetriesEnabled}}shutdownErrResp{{else}}_{{end}} *error) error { cfgUpdateChan := lt.cfgUpdateChan @@ -958,4 +823,3 @@ func (lt *Loadtest) run_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{en } } } -{{end}} diff --git a/loadtester/internal/cmd/generate/writeOutputCsvRow.go.tmpl b/loadtester/internal/cmd/generate/writeOutputCsvRow.go.tmpl new file mode 100644 index 0000000..3a19438 --- /dev/null +++ b/loadtester/internal/cmd/generate/writeOutputCsvRow.go.tmpl @@ -0,0 +1,58 @@ +func (lt *Loadtest) writeOutputCsvRow_maxTasks{{if .MaxTasksGTZero}}GTZero{{else}}NotGTZero{{end}}_percentile{{if .PercentileEnabled}}Enabled{{else}}Disabled{{end}}(mr metricRecord) { + + cd := <.csvData + if cd.writeErr != nil { + return + } + + nowStr := timeToString(time.Now()) + + {{if .MaxTasksGTZero}} + var percent string + { + // Note: this could integer overflow + // should be highly unlikely and only affects + // a percent complete metric which is likely not too critical + high := mr.totalNumTasks * 10000 / lt.maxTasks + low := high % 100 + high /= 100 + + var sep string + if low < 10 { + sep = ".0" + } else { + sep = "." + } + + percent = strconv.Itoa(high) + sep + strconv.Itoa(low) + } + {{end}} + + fields := []string{ + nowStr, + timeToString(mr.intervalID), + strconv.Itoa(mr.numIntervalTasks), + mr.lag.String(), + mr.sumLag.String(), + strconv.Itoa(mr.numTasks), + strconv.Itoa(mr.numPass), + strconv.Itoa(mr.numFail), + strconv.Itoa(mr.numRetry), + strconv.Itoa(mr.numPanic), + mr.minQueuedDuration.String(), + (mr.sumQueuedDuration / time.Duration(mr.numTasks)).String(), + mr.maxQueuedDuration.String(), + mr.sumQueuedDuration.String(), + mr.minTaskDuration.String(), + (mr.sumTaskDuration / time.Duration(mr.numTasks)).String(), + mr.maxTaskDuration.String(), + mr.sumTaskDuration.String(),{{if .PercentileEnabled}} + mr.queuedDurations.StringPercentile(lt.latencyPercentile), + mr.taskDurations.StringPercentile(lt.latencyPercentile),{{end}}{{if .MaxTasksGTZero}} + percent,{{end}} + } + + if err := cd.writer.Write(fields); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } +} diff --git a/loadtester/latencylist.go b/loadtester/latencylist.go new file mode 100644 index 0000000..4ca75dd --- /dev/null +++ b/loadtester/latencylist.go @@ -0,0 +1,40 @@ +package loadtester + +import ( + "math" + "slices" + "time" +) + +type latencyList []time.Duration + +func (ll *latencyList) Add(d time.Duration) { + *ll = append(*ll, d) +} + +func (ll latencyList) StringPercentile(p uint8) string { + + maxIdx := len(ll) - 1 + if maxIdx < 0 { + return "" + } + + slices.Sort(ll) + + // check for integer overflow + if maxIdx == 0 || int(p) <= (math.MaxInt/maxIdx) { + // integer math multiplication operation will not overflow + + // using +50 in numerator because it is half of the denominator (100) and simulates a rounding operation + v := ll[(maxIdx*int(p)+50)/100] + + return v.String() + } + + // p * size would overflow, time to use expensive floats + fidx := math.Round(float64(p) / float64(100) * float64(maxIdx)) + idx := int(fidx) + v := ll[idx] + + return v.String() +} diff --git a/loadtester/loadtest.go b/loadtester/loadtest.go index 8ff4e69..a20322f 100644 --- a/loadtester/loadtest.go +++ b/loadtester/loadtest.go @@ -102,6 +102,8 @@ type Loadtest struct { run func(context.Context, *error) error doTask func(context.Context, int, taskWithMeta) writeOutputCsvRow func(metricRecord) + resultsHandler func() + latencyPercentile uint8 flushRetriesOnShutdown bool retriesDisabled bool metricsEnabled bool @@ -118,12 +120,9 @@ func NewLoadtest(taskReader TaskReader, options ...LoadtestOption) (*Loadtest, e // so it's important to account for them per interval when constructing max // config buffers - const intervalPossibleLagResultCount = 1 - resultsChanSize := (cfg.maxIntervalTasks + intervalPossibleLagResultCount) * cfg.outputBufferingFactor - var resultsChan chan taskResult if !cfg.csvOutputDisabled { - resultsChan = make(chan taskResult, resultsChanSize) + resultsChan = make(chan taskResult, cfg.resultsChanSize) } var retryTaskChan chan *retryTask @@ -173,12 +172,21 @@ func NewLoadtest(taskReader TaskReader, options ...LoadtestOption) (*Loadtest, e logger: cfg.logger, intervalTasksSema: sm, metricsEnabled: !cfg.csvOutputDisabled, + latencyPercentile: cfg.latencyPercentile, } if cfg.maxTasks > 0 { - lt.writeOutputCsvRow = lt.writeOutputCsvRow_maxTasksGTZero + if cfg.latencyPercentile != 0 { + lt.writeOutputCsvRow = lt.writeOutputCsvRow_maxTasksGTZero_percentileEnabled + } else { + lt.writeOutputCsvRow = lt.writeOutputCsvRow_maxTasksGTZero_percentileDisabled + } } else { - lt.writeOutputCsvRow = lt.writeOutputCsvRow_maxTasksNotGTZero + if cfg.latencyPercentile != 0 { + lt.writeOutputCsvRow = lt.writeOutputCsvRow_maxTasksNotGTZero_percentileEnabled + } else { + lt.writeOutputCsvRow = lt.writeOutputCsvRow_maxTasksNotGTZero_percentileDisabled + } } if !cfg.retriesDisabled { @@ -217,6 +225,12 @@ func NewLoadtest(taskReader TaskReader, options ...LoadtestOption) (*Loadtest, e } } + if cfg.latencyPercentile != 0 { + lt.resultsHandler = lt.resultsHandler_percentileEnabled + } else { + lt.resultsHandler = lt.resultsHandler_percentileDisabled + } + return lt, nil } @@ -260,6 +274,7 @@ func (lt *Loadtest) loadtestConfigAsJson() any { FlushRetriesOnShutdown bool `json:"flush_retries_on_shutdown"` FlushRetriesTimeout string `json:"flush_retries_timeout"` RetriesDisabled bool `json:"retries_disabled"` + LatencyPercentile uint8 `json:"latency_percentile,omitempty"` } return Config{ @@ -275,6 +290,7 @@ func (lt *Loadtest) loadtestConfigAsJson() any { FlushRetriesOnShutdown: lt.flushRetriesOnShutdown, FlushRetriesTimeout: lt.flushRetriesTimeout.String(), RetriesDisabled: lt.retriesDisabled, + LatencyPercentile: lt.latencyPercentile, } } diff --git a/loadtester/loadtest_options.go b/loadtester/loadtest_options.go index 1c9bb9f..657b2f6 100644 --- a/loadtester/loadtest_options.go +++ b/loadtester/loadtest_options.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "log/slog" + "math" "time" ) @@ -26,6 +27,8 @@ type loadtestConfig struct { flushRetriesOnShutdown bool retriesDisabled bool logger StructuredLogger + latencyPercentile uint8 + resultsChanSize int } func newLoadtestConfig(options ...LoadtestOption) (loadtestConfig, error) { @@ -95,6 +98,10 @@ func newLoadtestConfig(options ...LoadtestOption) (loadtestConfig, error) { return result, errors.New("loadtest misconfigured: flushRetriesTimeout < 0") } + if !(cfg.latencyPercentile > 49 && cfg.latencyPercentile < 100) && cfg.latencyPercentile != 0 { + return result, errors.New("latency percentile must be greater than 49 and less than 100; or 0 to disable gathering percentile latencies") + } + if cfg.logger == nil { logger, err := NewLogger(slog.LevelInfo) if err != nil { @@ -103,6 +110,30 @@ func newLoadtestConfig(options ...LoadtestOption) (loadtestConfig, error) { cfg.logger = logger } + // check for integer overflows from user input when computing resultsChanSize + if !cfg.csvOutputDisabled { + const intervalPossibleLagResultCount = 1 + // note: if intervalPossibleLagResultCount is ever adjusted, then the bellow if statement needs to change + if cfg.maxIntervalTasks == math.MaxInt { + return result, errors.New("MaxIntervalTasks value is too large") + } + + maxIntervalResultCount := cfg.maxIntervalTasks + intervalPossibleLagResultCount + if maxIntervalResultCount > (math.MaxInt / cfg.outputBufferingFactor) { + return result, errors.New("MaxIntervalTasks and OutputBufferingFactor values combination is too large") + } + + cfg.resultsChanSize = maxIntervalResultCount * cfg.outputBufferingFactor + } + + if cfg.maxIntervalTasks > (math.MaxInt / 2) { + return result, errors.New("MaxIntervalTasks value is too large") + } + + if cfg.maxWorkers > (math.MaxInt / 2) { + return result, errors.New("MaxWorkers value is too large") + } + result = cfg return result, nil } @@ -174,6 +205,12 @@ func MetricsCsvWriterDisabled(b bool) LoadtestOption { } } +func LatencyPercentileUint8(v uint8) LoadtestOption { + return func(cfg *loadtestConfig) { + cfg.latencyPercentile = v + } +} + // FlushRetriesOnShutdown is useful when your loadtest is more like a smoke test // that must have all tasks flush and be successful func FlushRetriesOnShutdown(b bool) LoadtestOption {