diff --git a/loadtester/gen_strategies.go b/loadtester/gen_strategies.go index d53c448..7fb081d 100644 --- a/loadtester/gen_strategies.go +++ b/loadtester/gen_strategies.go @@ -1088,7 +1088,13 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -1127,8 +1133,8 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context lt.resultWaitGroup.Add(1) lt.resultsChan <- taskResult{ Meta: taskMeta{ - IntervalID: intervalID, - Lag: lag, + // IntervalID: intervalID, // not required unless in a debug context + Lag: lag, }, } @@ -1533,7 +1539,13 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -1576,8 +1588,8 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsEnabled(ctx context lt.resultWaitGroup.Add(1) lt.resultsChan <- taskResult{ Meta: taskMeta{ - IntervalID: intervalID, - Lag: lag, + // IntervalID: intervalID, // not required unless in a debug context + Lag: lag, }, } @@ -1849,7 +1861,13 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -2268,7 +2286,13 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksGTZero_metricsDisabled(ctx contex lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -2562,7 +2586,13 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -2601,8 +2631,8 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont lt.resultWaitGroup.Add(1) lt.resultsChan <- taskResult{ Meta: taskMeta{ - IntervalID: intervalID, - Lag: lag, + // IntervalID: intervalID, // not required unless in a debug context + Lag: lag, }, } @@ -2992,7 +3022,13 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -3035,8 +3071,8 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsEnabled(ctx cont lt.resultWaitGroup.Add(1) lt.resultsChan <- taskResult{ Meta: taskMeta{ - IntervalID: intervalID, - Lag: lag, + // IntervalID: intervalID, // not required unless in a debug context + Lag: lag, }, } @@ -3270,7 +3306,13 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -3674,7 +3716,13 @@ func (lt *Loadtest) run_retriesEnabled_maxTasksNotGTZero_metricsDisabled(ctx con lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -4161,7 +4209,13 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx contex return nil } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -4204,8 +4258,8 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsEnabled(ctx contex lt.resultWaitGroup.Add(1) lt.resultsChan <- taskResult{ Meta: taskMeta{ - IntervalID: intervalID, - Lag: lag, + // IntervalID: intervalID, // not required unless in a debug context + Lag: lag, }, } @@ -4620,7 +4674,13 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksGTZero_metricsDisabled(ctx conte return nil } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -5090,7 +5150,13 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con return nil } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -5133,8 +5199,8 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsEnabled(ctx con lt.resultWaitGroup.Add(1) lt.resultsChan <- taskResult{ Meta: taskMeta{ - IntervalID: intervalID, - Lag: lag, + // IntervalID: intervalID, // not required unless in a debug context + Lag: lag, }, } @@ -5532,7 +5598,13 @@ func (lt *Loadtest) run_retriesDisabled_maxTasksNotGTZero_metricsDisabled(ctx co return nil } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize + 1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -6555,1441 +6627,1729 @@ func (lt *Loadtest) writeOutputCsvRow_retryDisabled_maxTasksNotGTZero_percentile } } -func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksGTZero_percentileEnabled_varianceEnabled() { +func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksGTZero_percentileEnabled_varianceEnabled() func() { - cd := <.csvData - var mr metricRecord_retryEnabled_maxTasksGTZero_percentileEnabled_varianceEnabled - mr.latencies = lt.latencies - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryEnabled_maxTasksGTZero_percentileEnabled_varianceEnabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryEnabled_maxTasksGTZero_percentileEnabled_varianceEnabled + mr.latencies = lt.latencies + mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retryEnabled_maxTasksGTZero_percentileEnabled_varianceEnabled() + writeRow = func() { - mr.totalNumTasks += mr.numTasks + mr.totalNumTasks += mr.numTasks - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) - mr.numRetry += int(tr.RetryQueued) + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } + + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) - mr.numTasks++ + mr.numTasks++ - mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) - mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) + mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) + mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) - mr.latencies.queue.add(tr.QueueDuration) - mr.latencies.task.add(tr.TaskDuration) + mr.latencies.queue.add(tr.QueueDuration) + mr.latencies.task.add(tr.TaskDuration) - if mr.numTasks >= mr.numIntervalTasks { + if mr.numTasks >= ss[ssReadIdx] { - writeRow() - mr.reset() + writeRow() + mr.reset() + + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksGTZero_percentileEnabled_varianceDisabled() { +func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksGTZero_percentileEnabled_varianceDisabled() func() { - cd := <.csvData - var mr metricRecord_retryEnabled_maxTasksGTZero_percentileEnabled_varianceDisabled - mr.latencies = lt.latencies - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryEnabled_maxTasksGTZero_percentileEnabled_varianceDisabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryEnabled_maxTasksGTZero_percentileEnabled_varianceDisabled + mr.latencies = lt.latencies + mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retryEnabled_maxTasksGTZero_percentileEnabled_varianceDisabled() + writeRow = func() { - mr.totalNumTasks += mr.numTasks + mr.totalNumTasks += mr.numTasks - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } + + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) - mr.numRetry += int(tr.RetryQueued) + mr.numTasks++ - mr.numTasks++ + mr.latencies.queue.add(tr.QueueDuration) + mr.latencies.task.add(tr.TaskDuration) - mr.latencies.queue.add(tr.QueueDuration) - mr.latencies.task.add(tr.TaskDuration) + if mr.numTasks >= ss[ssReadIdx] { - if mr.numTasks >= mr.numIntervalTasks { + writeRow() + mr.reset() - writeRow() - mr.reset() + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksGTZero_percentileDisabled_varianceEnabled() { +func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksGTZero_percentileDisabled_varianceEnabled() func() { - cd := <.csvData - var mr metricRecord_retryEnabled_maxTasksGTZero_percentileDisabled_varianceEnabled - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryEnabled_maxTasksGTZero_percentileDisabled_varianceEnabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryEnabled_maxTasksGTZero_percentileDisabled_varianceEnabled + mr.reset() - mr.totalNumTasks += mr.numTasks + var writeRow func() + { + f := lt.writeOutputCsvRow_retryEnabled_maxTasksGTZero_percentileDisabled_varianceEnabled() + writeRow = func() { + + mr.totalNumTasks += mr.numTasks - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) - mr.numRetry += int(tr.RetryQueued) + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } + + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) - mr.numTasks++ + mr.numTasks++ - mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) - mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) + mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) + mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) - if mr.numTasks >= mr.numIntervalTasks { + if mr.numTasks >= ss[ssReadIdx] { + + writeRow() + mr.reset() - writeRow() - mr.reset() + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksGTZero_percentileDisabled_varianceDisabled() { +func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksGTZero_percentileDisabled_varianceDisabled() func() { - cd := <.csvData - var mr metricRecord_retryEnabled_maxTasksGTZero_percentileDisabled_varianceDisabled - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryEnabled_maxTasksGTZero_percentileDisabled_varianceDisabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryEnabled_maxTasksGTZero_percentileDisabled_varianceDisabled + mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retryEnabled_maxTasksGTZero_percentileDisabled_varianceDisabled() + writeRow = func() { - mr.totalNumTasks += mr.numTasks + mr.totalNumTasks += mr.numTasks - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } + + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) - mr.numRetry += int(tr.RetryQueued) + mr.numTasks++ - mr.numTasks++ + if mr.numTasks >= ss[ssReadIdx] { - if mr.numTasks >= mr.numIntervalTasks { + writeRow() + mr.reset() - writeRow() - mr.reset() + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled() { +func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled() func() { - cd := <.csvData - var mr metricRecord_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled - mr.latencies = lt.latencies - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled + mr.latencies = lt.latencies + mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled() + writeRow = func() { - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) - mr.numRetry += int(tr.RetryQueued) + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) - mr.numTasks++ + mr.numTasks++ - mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) - mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) + mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) + mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) - mr.latencies.queue.add(tr.QueueDuration) - mr.latencies.task.add(tr.TaskDuration) + mr.latencies.queue.add(tr.QueueDuration) + mr.latencies.task.add(tr.TaskDuration) - if mr.numTasks >= mr.numIntervalTasks { + if mr.numTasks >= ss[ssReadIdx] { - writeRow() - mr.reset() + writeRow() + mr.reset() + + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled() { +func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled() func() { - cd := <.csvData - var mr metricRecord_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled - mr.latencies = lt.latencies - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled + mr.latencies = lt.latencies + mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled() + writeRow = func() { - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } + + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) - mr.numRetry += int(tr.RetryQueued) + mr.numTasks++ - mr.numTasks++ + mr.latencies.queue.add(tr.QueueDuration) + mr.latencies.task.add(tr.TaskDuration) - mr.latencies.queue.add(tr.QueueDuration) - mr.latencies.task.add(tr.TaskDuration) + if mr.numTasks >= ss[ssReadIdx] { - if mr.numTasks >= mr.numIntervalTasks { + writeRow() + mr.reset() - writeRow() - mr.reset() + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled() { +func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled() func() { - cd := <.csvData - var mr metricRecord_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled + mr.reset() - f(mr) + var writeRow func() + { + f := lt.writeOutputCsvRow_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled() + writeRow = func() { + + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) - mr.numRetry += int(tr.RetryQueued) + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } - mr.numTasks++ + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } - mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) - mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) - if mr.numTasks >= mr.numIntervalTasks { + mr.numTasks++ - writeRow() - mr.reset() + mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) + mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) - } - } - } -} - -func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled() { + if mr.numTasks >= ss[ssReadIdx] { - cd := <.csvData - var mr metricRecord_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled - mr.reset() + writeRow() + mr.reset() - var writeRow func() - { - f := lt.writeOutputCsvRow_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled() - writeRow = func() { + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - f(mr) + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) + } + } } } +} - cd.flushDeadline = time.Now().Add(cd.flushInterval) +func (lt *Loadtest) resultsHandler_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled() func() { - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int + + return func() { + cd := <.csvData + var mr metricRecord_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled + mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled() + writeRow = func() { + + f(mr) } - return } - lt.resultWaitGroup.Done() + cd.flushDeadline = time.Now().Add(cd.flushInterval) - if cd.writeErr != nil { - continue - } + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return + } - if tr.taskResultFlags.isZero() { + lt.resultWaitGroup.Done() - mr.sumLag += tr.Meta.Lag + if cd.writeErr != nil { + continue + } - continue - } + if tr.taskResultFlags.isZero() { - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + mr.sumLag += tr.Meta.Lag - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + continue + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } + + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } + + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) - mr.numRetry += int(tr.RetryQueued) + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } - mr.numTasks++ + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } - if mr.numTasks >= mr.numIntervalTasks { + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + mr.numRetry += int(tr.RetryQueued) - writeRow() - mr.reset() + mr.numTasks++ - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if mr.numTasks >= ss[ssReadIdx] { + + writeRow() + mr.reset() + + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize + + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksGTZero_percentileEnabled_varianceEnabled() { +func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksGTZero_percentileEnabled_varianceEnabled() func() { - cd := <.csvData - var mr metricRecord_retryDisabled_maxTasksGTZero_percentileEnabled_varianceEnabled - mr.latencies = lt.latencies - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryDisabled_maxTasksGTZero_percentileEnabled_varianceEnabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryDisabled_maxTasksGTZero_percentileEnabled_varianceEnabled + mr.latencies = lt.latencies + mr.reset() - mr.totalNumTasks += mr.numTasks + var writeRow func() + { + f := lt.writeOutputCsvRow_retryDisabled_maxTasksGTZero_percentileEnabled_varianceEnabled() + writeRow = func() { + + mr.totalNumTasks += mr.numTasks - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) - mr.numTasks++ + mr.numTasks++ - mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) - mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) + mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) + mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) - mr.latencies.queue.add(tr.QueueDuration) - mr.latencies.task.add(tr.TaskDuration) + mr.latencies.queue.add(tr.QueueDuration) + mr.latencies.task.add(tr.TaskDuration) - if mr.numTasks >= mr.numIntervalTasks { + if mr.numTasks >= ss[ssReadIdx] { - writeRow() - mr.reset() + writeRow() + mr.reset() + + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksGTZero_percentileEnabled_varianceDisabled() { +func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksGTZero_percentileEnabled_varianceDisabled() func() { - cd := <.csvData - var mr metricRecord_retryDisabled_maxTasksGTZero_percentileEnabled_varianceDisabled - mr.latencies = lt.latencies - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryDisabled_maxTasksGTZero_percentileEnabled_varianceDisabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryDisabled_maxTasksGTZero_percentileEnabled_varianceDisabled + mr.latencies = lt.latencies + mr.reset() - mr.totalNumTasks += mr.numTasks + var writeRow func() + { + f := lt.writeOutputCsvRow_retryDisabled_maxTasksGTZero_percentileEnabled_varianceDisabled() + writeRow = func() { + + mr.totalNumTasks += mr.numTasks - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } + + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) + mr.numTasks++ - mr.numTasks++ + mr.latencies.queue.add(tr.QueueDuration) + mr.latencies.task.add(tr.TaskDuration) - mr.latencies.queue.add(tr.QueueDuration) - mr.latencies.task.add(tr.TaskDuration) + if mr.numTasks >= ss[ssReadIdx] { - if mr.numTasks >= mr.numIntervalTasks { + writeRow() + mr.reset() - writeRow() - mr.reset() + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksGTZero_percentileDisabled_varianceEnabled() { +func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksGTZero_percentileDisabled_varianceEnabled() func() { - cd := <.csvData - var mr metricRecord_retryDisabled_maxTasksGTZero_percentileDisabled_varianceEnabled - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryDisabled_maxTasksGTZero_percentileDisabled_varianceEnabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryDisabled_maxTasksGTZero_percentileDisabled_varianceEnabled + mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retryDisabled_maxTasksGTZero_percentileDisabled_varianceEnabled() + writeRow = func() { - mr.totalNumTasks += mr.numTasks + mr.totalNumTasks += mr.numTasks - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } + + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) + + mr.numTasks++ - mr.numTasks++ + mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) + mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) - mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) - mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) + if mr.numTasks >= ss[ssReadIdx] { - if mr.numTasks >= mr.numIntervalTasks { + writeRow() + mr.reset() - writeRow() - mr.reset() + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksGTZero_percentileDisabled_varianceDisabled() { +func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksGTZero_percentileDisabled_varianceDisabled() func() { - cd := <.csvData - var mr metricRecord_retryDisabled_maxTasksGTZero_percentileDisabled_varianceDisabled - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryDisabled_maxTasksGTZero_percentileDisabled_varianceDisabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryDisabled_maxTasksGTZero_percentileDisabled_varianceDisabled + mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retryDisabled_maxTasksGTZero_percentileDisabled_varianceDisabled() + writeRow = func() { - mr.totalNumTasks += mr.numTasks + mr.totalNumTasks += mr.numTasks - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) - mr.numTasks++ + mr.numTasks++ - if mr.numTasks >= mr.numIntervalTasks { + if mr.numTasks >= ss[ssReadIdx] { + + writeRow() + mr.reset() - writeRow() - mr.reset() + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled() { +func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled() func() { - cd := <.csvData - var mr metricRecord_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled - mr.latencies = lt.latencies - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled + mr.latencies = lt.latencies + mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled() + writeRow = func() { - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } + + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) - mr.numTasks++ + mr.numTasks++ - mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) - mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) + mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) + mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) - mr.latencies.queue.add(tr.QueueDuration) - mr.latencies.task.add(tr.TaskDuration) + mr.latencies.queue.add(tr.QueueDuration) + mr.latencies.task.add(tr.TaskDuration) - if mr.numTasks >= mr.numIntervalTasks { + if mr.numTasks >= ss[ssReadIdx] { + + writeRow() + mr.reset() - writeRow() - mr.reset() + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled() { +func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled() func() { - cd := <.csvData - var mr metricRecord_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled - mr.latencies = lt.latencies - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled + mr.latencies = lt.latencies + mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled() + writeRow = func() { - f(mr) + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } - mr.numTasks++ + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } - mr.latencies.queue.add(tr.QueueDuration) - mr.latencies.task.add(tr.TaskDuration) + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) - if mr.numTasks >= mr.numIntervalTasks { + mr.numTasks++ - writeRow() - mr.reset() + mr.latencies.queue.add(tr.QueueDuration) + mr.latencies.task.add(tr.TaskDuration) - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if mr.numTasks >= ss[ssReadIdx] { + + writeRow() + mr.reset() + + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize + + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled() { +func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled() func() { - cd := <.csvData - var mr metricRecord_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled + mr.reset() - f(mr) + var writeRow func() + { + f := lt.writeOutputCsvRow_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled() + writeRow = func() { + + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } - mr.numTasks++ + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } - mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) - mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) - if mr.numTasks >= mr.numIntervalTasks { + mr.numTasks++ - writeRow() - mr.reset() + mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) + mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if mr.numTasks >= ss[ssReadIdx] { + + writeRow() + mr.reset() + + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize + + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } } -func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled() { +func (lt *Loadtest) resultsHandler_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled() func() { - cd := <.csvData - var mr metricRecord_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled - mr.reset() + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int - var writeRow func() - { - f := lt.writeOutputCsvRow_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled() - writeRow = func() { + return func() { + cd := <.csvData + var mr metricRecord_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled + mr.reset() - f(mr) + var writeRow func() + { + f := lt.writeOutputCsvRow_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled() + writeRow = func() { + + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } + + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked) + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } + + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked) - mr.numTasks++ + mr.numTasks++ - if mr.numTasks >= mr.numIntervalTasks { + if mr.numTasks >= ss[ssReadIdx] { - writeRow() - mr.reset() + writeRow() + mr.reset() - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize + + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } diff --git a/loadtester/internal/cmd/generate/resultsHandler.go.tmpl b/loadtester/internal/cmd/generate/resultsHandler.go.tmpl index 5113c15..8f2d4a0 100644 --- a/loadtester/internal/cmd/generate/resultsHandler.go.tmpl +++ b/loadtester/internal/cmd/generate/resultsHandler.go.tmpl @@ -1,97 +1,115 @@ -func (lt *Loadtest) resultsHandler_retry{{if .RetriesEnabled}}Enabled{{else}}Disabled{{end}}_maxTasks{{if not .MaxTasksGTZero}}Not{{end}}GTZero_percentile{{if .PercentileEnabled}}Enabled{{else}}Disabled{{end}}_variance{{if .VarianceEnabled}}Enabled{{else}}Disabled{{end}}() { - - cd := <.csvData - var mr metricRecord_retry{{if .RetriesEnabled}}Enabled{{else}}Disabled{{end}}_maxTasks{{if not .MaxTasksGTZero}}Not{{end}}GTZero_percentile{{if .PercentileEnabled}}Enabled{{else}}Disabled{{end}}_variance{{if .VarianceEnabled}}Enabled{{else}}Disabled{{end}} - {{if .PercentileEnabled}}mr.latencies = lt.latencies - {{end}}mr.reset() - - var writeRow func() - { - f := lt.writeOutputCsvRow_retry{{if .RetriesEnabled}}Enabled{{else}}Disabled{{end}}_maxTasks{{if not .MaxTasksGTZero}}Not{{end}}GTZero_percentile{{if .PercentileEnabled}}Enabled{{else}}Disabled{{end}}_variance{{if .VarianceEnabled}}Enabled{{else}}Disabled{{end}}() - writeRow = func() { - {{if .MaxTasksGTZero}} - mr.totalNumTasks += mr.numTasks - {{end}} - f(mr) +func (lt *Loadtest) resultsHandler_retry{{if .RetriesEnabled}}Enabled{{else}}Disabled{{end}}_maxTasks{{if not .MaxTasksGTZero}}Not{{end}}GTZero_percentile{{if .PercentileEnabled}}Enabled{{else}}Disabled{{end}}_variance{{if .VarianceEnabled}}Enabled{{else}}Disabled{{end}}() func() { + + // construct ring buffer of sample sizes (ss) + ssSize := lt.maxLiveSamples + ss := make([]int, ssSize) + var ssNextWriteIdx int + var ssReadIdx int + + return func() { + cd := <.csvData + var mr metricRecord_retry{{if .RetriesEnabled}}Enabled{{else}}Disabled{{end}}_maxTasks{{if not .MaxTasksGTZero}}Not{{end}}GTZero_percentile{{if .PercentileEnabled}}Enabled{{else}}Disabled{{end}}_variance{{if .VarianceEnabled}}Enabled{{else}}Disabled{{end}} + {{if .PercentileEnabled}}mr.latencies = lt.latencies + {{end}}mr.reset() + + var writeRow func() + { + f := lt.writeOutputCsvRow_retry{{if .RetriesEnabled}}Enabled{{else}}Disabled{{end}}_maxTasks{{if not .MaxTasksGTZero}}Not{{end}}GTZero_percentile{{if .PercentileEnabled}}Enabled{{else}}Disabled{{end}}_variance{{if .VarianceEnabled}}Enabled{{else}}Disabled{{end}}() + writeRow = func() { + {{if .MaxTasksGTZero}} + mr.totalNumTasks += mr.numTasks + {{end}} + f(mr) + } } - } - cd.flushDeadline = time.Now().Add(cd.flushInterval) + cd.flushDeadline = time.Now().Add(cd.flushInterval) - for { - tr, ok := <-lt.resultsChan - if !ok { - if cd.writeErr == nil && mr.numTasks > 0 { - writeRow() + for { + tr, ok := <-lt.resultsChan + if !ok { + if cd.writeErr == nil && mr.numTasks > 0 { + writeRow() + } + return } - return - } - lt.resultWaitGroup.Done() + lt.resultWaitGroup.Done() - if cd.writeErr != nil { - continue - } + if cd.writeErr != nil { + continue + } - if tr.taskResultFlags.isZero() { + if tr.taskResultFlags.isZero() { - mr.sumLag += tr.Meta.Lag + mr.sumLag += tr.Meta.Lag - continue - } + if tr.Meta.SampleSize > 0 { + ss[ssNextWriteIdx] = tr.Meta.SampleSize - if mr.intervalID.Before(tr.Meta.IntervalID) { - mr.intervalID = tr.Meta.IntervalID - mr.numIntervalTasks = tr.Meta.NumIntervalTasks - mr.lag = tr.Meta.Lag - } + // advance write pointer forward + ssNextWriteIdx = (ssNextWriteIdx + 1) % ssSize + } - if mr.minQueueDuration > tr.QueueDuration { - mr.minQueueDuration = tr.QueueDuration - } + continue + } - if mr.minTaskDuration > tr.TaskDuration { - mr.minTaskDuration = tr.TaskDuration - } + if mr.intervalID.Before(tr.Meta.IntervalID) { + mr.intervalID = tr.Meta.IntervalID + mr.numIntervalTasks = tr.Meta.NumIntervalTasks + mr.lag = tr.Meta.Lag + } - if mr.maxTaskDuration < tr.TaskDuration { - mr.maxTaskDuration = tr.TaskDuration - } + if mr.minQueueDuration > tr.QueueDuration { + mr.minQueueDuration = tr.QueueDuration + } - if mr.maxQueueDuration < tr.QueueDuration { - mr.maxQueueDuration = tr.QueueDuration - } + if mr.minTaskDuration > tr.TaskDuration { + mr.minTaskDuration = tr.TaskDuration + } - mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) - mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) - mr.numPass += int(tr.Passed) - mr.numFail += int(tr.Errored) - mr.numPanic += int(tr.Panicked){{if .RetriesEnabled}} - mr.numRetry += int(tr.RetryQueued){{end}} + if mr.maxTaskDuration < tr.TaskDuration { + mr.maxTaskDuration = tr.TaskDuration + } - mr.numTasks++ + if mr.maxQueueDuration < tr.QueueDuration { + mr.maxQueueDuration = tr.QueueDuration + } - {{if .VarianceEnabled}} - mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) - mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) - {{end}} + mr.sumQueueDuration.Add(&mr.sumQueueDuration, big.NewInt(int64(tr.QueueDuration))) + mr.sumTaskDuration.Add(&mr.sumTaskDuration, big.NewInt(int64(tr.TaskDuration))) + mr.numPass += int(tr.Passed) + mr.numFail += int(tr.Errored) + mr.numPanic += int(tr.Panicked){{if .RetriesEnabled}} + mr.numRetry += int(tr.RetryQueued){{end}} - {{if .PercentileEnabled -}} - mr.latencies.queue.add(tr.QueueDuration) - mr.latencies.task.add(tr.TaskDuration) - {{end}} + mr.numTasks++ - if mr.numTasks >= mr.numIntervalTasks { + {{if .VarianceEnabled}} + mr.welfords.queue.Update(mr.numTasks, float64(tr.QueueDuration)) + mr.welfords.task.Update(mr.numTasks, float64(tr.TaskDuration)) + {{end}} + + {{if .PercentileEnabled -}} + mr.latencies.queue.add(tr.QueueDuration) + mr.latencies.task.add(tr.TaskDuration) + {{end}} + + if mr.numTasks >= ss[ssReadIdx] { + + writeRow() + mr.reset() - writeRow() - mr.reset() + // advance read pointer forward + ssReadIdx = (ssReadIdx + 1) % ssSize - if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { - cd.writer.Flush() - if err := cd.writer.Error(); err != nil { - cd.setErr(err) // sets error state in multiple goroutine safe way + if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) { + cd.writer.Flush() + if err := cd.writer.Error(); err != nil { + cd.setErr(err) // sets error state in multiple goroutine safe way + } + cd.flushDeadline = time.Now().Add(cd.flushInterval) } - cd.flushDeadline = time.Now().Add(cd.flushInterval) } } } diff --git a/loadtester/internal/cmd/generate/run.go.tmpl b/loadtester/internal/cmd/generate/run.go.tmpl index b4b77a6..4ec279f 100644 --- a/loadtester/internal/cmd/generate/run.go.tmpl +++ b/loadtester/internal/cmd/generate/run.go.tmpl @@ -301,7 +301,13 @@ func (lt *Loadtest) run_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{en lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize)) } - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize+1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -341,7 +347,7 @@ func (lt *Loadtest) run_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{en lt.resultWaitGroup.Add(1) lt.resultsChan <- taskResult{ Meta: taskMeta{ - IntervalID: intervalID, + // IntervalID: intervalID, // not required unless in a debug context Lag: lag, }, } @@ -771,7 +777,13 @@ func (lt *Loadtest) run_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{en } {{end}} - lt.resultWaitGroup.Add(taskBufSize) + lt.resultWaitGroup.Add(taskBufSize+1) // +1 because we're sending the expected Sample Size immediately to the results handler before queueing tasks + lt.resultsChan <- taskResult{ + Meta: taskMeta{ + // IntervalID: intervalID, // not required unless in a debug context + SampleSize: taskBufSize, + }, + } meta.IntervalID = intervalID @@ -815,7 +827,7 @@ func (lt *Loadtest) run_retries{{if .RetriesEnabled}}Enabled{{else}}Disabled{{en lt.resultWaitGroup.Add(1) lt.resultsChan <- taskResult{ Meta: taskMeta{ - IntervalID: intervalID, + // IntervalID: intervalID, // not required unless in a debug context Lag: lag, }, } diff --git a/loadtester/loadtest.go b/loadtester/loadtest.go index 7091447..5b24438 100644 --- a/loadtester/loadtest.go +++ b/loadtester/loadtest.go @@ -60,10 +60,12 @@ type taskResult struct { } type Loadtest struct { - taskReader TaskReader - maxTasks int - maxWorkers int - numWorkers int + taskReader TaskReader + maxTasks int + maxWorkers int + numWorkers int + // maxLiveSamples is the max buffer size of sample sizes that exist at any possible point in time + maxLiveSamples int workers []chan struct{} workerWaitGroup sync.WaitGroup resultWaitGroup sync.WaitGroup @@ -122,8 +124,10 @@ func NewLoadtest(options ...LoadtestOption) (*Loadtest, error) { // config buffers var resultsChan chan taskResult + var maxLiveSamples int if cfg.csvOutputEnabled { resultsChan = make(chan taskResult, cfg.resultsChanSize) + maxLiveSamples = cfg.outputBufferingFactor } var retryTaskChan chan *retryTask @@ -150,15 +154,16 @@ func NewLoadtest(options ...LoadtestOption) (*Loadtest, error) { } lt := &Loadtest{ - taskReader: cfg.taskReader, - maxTasks: cfg.maxTasks, - maxWorkers: cfg.maxWorkers, - numWorkers: cfg.numWorkers, - workers: make([]chan struct{}, 0, cfg.maxWorkers), - taskChan: make(chan taskWithMeta, cfg.maxIntervalTasks), - resultsChan: resultsChan, - cfgUpdateChan: make(chan ConfigUpdate), - retryTaskChan: retryTaskChan, + taskReader: cfg.taskReader, + maxTasks: cfg.maxTasks, + maxWorkers: cfg.maxWorkers, + numWorkers: cfg.numWorkers, + maxLiveSamples: maxLiveSamples, + workers: make([]chan struct{}, 0, cfg.maxWorkers), + taskChan: make(chan taskWithMeta, cfg.maxIntervalTasks), + resultsChan: resultsChan, + cfgUpdateChan: make(chan ConfigUpdate), + retryTaskChan: retryTaskChan, maxIntervalTasks: cfg.maxIntervalTasks, numIntervalTasks: cfg.numIntervalTasks, @@ -223,29 +228,29 @@ func NewLoadtest(options ...LoadtestOption) (*Loadtest, error) { if cfg.percentilesEnabled { if cfg.variancesEnabled { if cfg.retry { - lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksGTZero_percentileEnabled_varianceEnabled + lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksGTZero_percentileEnabled_varianceEnabled() } else { - lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksGTZero_percentileEnabled_varianceEnabled + lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksGTZero_percentileEnabled_varianceEnabled() } } else { if cfg.retry { - lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksGTZero_percentileEnabled_varianceDisabled + lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksGTZero_percentileEnabled_varianceDisabled() } else { - lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksGTZero_percentileEnabled_varianceDisabled + lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksGTZero_percentileEnabled_varianceDisabled() } } } else { if cfg.variancesEnabled { if cfg.retry { - lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksGTZero_percentileDisabled_varianceEnabled + lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksGTZero_percentileDisabled_varianceEnabled() } else { - lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksGTZero_percentileDisabled_varianceEnabled + lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksGTZero_percentileDisabled_varianceEnabled() } } else { if cfg.retry { - lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksGTZero_percentileDisabled_varianceDisabled + lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksGTZero_percentileDisabled_varianceDisabled() } else { - lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksGTZero_percentileDisabled_varianceDisabled + lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksGTZero_percentileDisabled_varianceDisabled() } } } @@ -253,29 +258,29 @@ func NewLoadtest(options ...LoadtestOption) (*Loadtest, error) { if cfg.percentilesEnabled { if cfg.variancesEnabled { if cfg.retry { - lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled + lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled() } else { - lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled + lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceEnabled() } } else { if cfg.retry { - lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled + lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled() } else { - lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled + lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksNotGTZero_percentileEnabled_varianceDisabled() } } } else { if cfg.variancesEnabled { if cfg.retry { - lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled + lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled() } else { - lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled + lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceEnabled() } } else { if cfg.retry { - lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled + lt.resultsHandler = lt.resultsHandler_retryEnabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled() } else { - lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled + lt.resultsHandler = lt.resultsHandler_retryDisabled_maxTasksNotGTZero_percentileDisabled_varianceDisabled() } } } diff --git a/loadtester/loadtest_options.go b/loadtester/loadtest_options.go index 2881866..31a9d17 100644 --- a/loadtester/loadtest_options.go +++ b/loadtester/loadtest_options.go @@ -120,12 +120,13 @@ func newLoadtestConfig(options ...LoadtestOption) (loadtestConfig, error) { // check for integer overflows from user input when computing metrics if cfg.csvOutputEnabled { const intervalPossibleLagResultCount = 1 + const intervalSampleSizeResultCount = 1 // note: if intervalPossibleLagResultCount is ever adjusted, then the bellow if statement needs to change - if cfg.maxIntervalTasks == math.MaxInt { + if cfg.maxIntervalTasks > (math.MaxInt - intervalPossibleLagResultCount - intervalSampleSizeResultCount) { return result, errors.New("MaxIntervalTasks value is too large") } - maxIntervalResultCount := cfg.maxIntervalTasks + intervalPossibleLagResultCount + maxIntervalResultCount := cfg.maxIntervalTasks + intervalPossibleLagResultCount + intervalSampleSizeResultCount if maxIntervalResultCount > (math.MaxInt / cfg.outputBufferingFactor) { return result, errors.New("MaxIntervalTasks and OutputBufferingFactor values combination is too large") } diff --git a/loadtester/task.go b/loadtester/task.go index 3601ee1..8a61498 100644 --- a/loadtester/task.go +++ b/loadtester/task.go @@ -58,6 +58,8 @@ func (rt *retryTask) Do(ctx context.Context, workerID int) error { type taskMeta struct { IntervalID time.Time + SampleSize int + // // rate gauges: //