Skip to content

Commit

Permalink
latency percentiles support
Browse files Browse the repository at this point in the history
  • Loading branch information
josephcopenhaver committed Sep 28, 2023
1 parent eb256a6 commit 37c19a7
Show file tree
Hide file tree
Showing 13 changed files with 2,022 additions and 1,527 deletions.
194 changes: 16 additions & 178 deletions loadtester/csv_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type metricRecord struct {
// totalNumTasks is only modified if the loadtest's maxTasks setting is > 0
totalNumTasks int

queuedDurations latencyList
taskDurations latencyList
metricRecordResetables
}

Expand All @@ -94,6 +96,11 @@ func (mr *metricRecord) reset() {
}
}

func (mr *metricRecord) resetLatencySlices() {
mr.queuedDurations = mr.queuedDurations[:0]
mr.taskDurations = mr.taskDurations[:0]
}

func (lt *Loadtest) writeOutputCsvHeaders() error {

cd := &lt.csvData
Expand All @@ -118,6 +125,15 @@ func (lt *Loadtest) writeOutputCsvHeaders() error {
"max_task_duration",
"sum_task_duration",
"",
"",
"",
}

if lt.latencyPercentile != 0 {
fields[len(fields)-3] = "p" + strconv.Itoa(int(lt.latencyPercentile)) + "_queued_duration"
fields[len(fields)-2] = "p" + strconv.Itoa(int(lt.latencyPercentile)) + "_task_duration"
} else {
fields = fields[:len(fields)-2]
}

if lt.maxTasks > 0 {
Expand All @@ -137,95 +153,6 @@ func (lt *Loadtest) writeOutputCsvHeaders() error {
return cd.writer.Error()
}

// writeOutputCsvRow_maxTasksGTZero writes the metric record to the target csv file when maxTasks is > 0
func (lt *Loadtest) writeOutputCsvRow_maxTasksGTZero(mr metricRecord) {

cd := &lt.csvData
if cd.writeErr != nil {
return
}

nowStr := timeToString(time.Now())

var percent string
{
high := mr.totalNumTasks * 10000 / lt.maxTasks
low := high % 100
high /= 100

var sep string
if low < 10 {
sep = ".0"
} else {
sep = "."
}

percent = strconv.Itoa(high) + sep + strconv.Itoa(low)
}

fields := []string{
nowStr,
timeToString(mr.intervalID),
strconv.Itoa(mr.numIntervalTasks),
mr.lag.String(),
mr.sumLag.String(),
strconv.Itoa(mr.numTasks),
strconv.Itoa(mr.numPass),
strconv.Itoa(mr.numFail),
strconv.Itoa(mr.numRetry),
strconv.Itoa(mr.numPanic),
mr.minQueuedDuration.String(),
(mr.sumQueuedDuration / time.Duration(mr.numTasks)).String(),
mr.maxQueuedDuration.String(),
mr.sumQueuedDuration.String(),
mr.minTaskDuration.String(),
(mr.sumTaskDuration / time.Duration(mr.numTasks)).String(),
mr.maxTaskDuration.String(),
mr.sumTaskDuration.String(),
percent,
}

if err := cd.writer.Write(fields); err != nil {
cd.setErr(err) // sets error state in multiple goroutine safe way
}
}

// writeOutputCsvRow_maxTasksNotGTZero writes the metric record to the target csv file when maxTasks is <= 0
func (lt *Loadtest) writeOutputCsvRow_maxTasksNotGTZero(mr metricRecord) {

cd := &lt.csvData
if cd.writeErr != nil {
return
}

nowStr := timeToString(time.Now())

fields := []string{
nowStr,
timeToString(mr.intervalID),
strconv.Itoa(mr.numIntervalTasks),
mr.lag.String(),
mr.sumLag.String(),
strconv.Itoa(mr.numTasks),
strconv.Itoa(mr.numPass),
strconv.Itoa(mr.numFail),
strconv.Itoa(mr.numRetry),
strconv.Itoa(mr.numPanic),
mr.minQueuedDuration.String(),
(mr.sumQueuedDuration / time.Duration(mr.numTasks)).String(),
mr.maxQueuedDuration.String(),
mr.sumQueuedDuration.String(),
mr.minTaskDuration.String(),
(mr.sumTaskDuration / time.Duration(mr.numTasks)).String(),
mr.maxTaskDuration.String(),
mr.sumTaskDuration.String(),
}

if err := cd.writer.Write(fields); err != nil {
cd.setErr(err) // sets error state in multiple goroutine safe way
}
}

func (lt *Loadtest) writeOutputCsvFooterAndClose(csvFile *os.File) {

cd := &lt.csvData
Expand Down Expand Up @@ -255,95 +182,6 @@ func (lt *Loadtest) writeOutputCsvFooterAndClose(csvFile *os.File) {
_, cd.writeErr = csvFile.Write([]byte("\n# {\"done\":{\"end_time\":\"" + timeToString(time.Now()) + "\"}}\n"))
}

func (lt *Loadtest) resultsHandler() {

cd := &lt.csvData
var mr metricRecord
mr.reset()

var writeRow func()
if lt.maxTasks > 0 {
writeRow = func() {
mr.totalNumTasks += mr.numTasks
lt.writeOutputCsvRow(mr)
}
} else {
writeRow = func() {
lt.writeOutputCsvRow(mr)
}
}

cd.flushDeadline = time.Now().Add(cd.flushInterval)

for {
tr, ok := <-lt.resultsChan
if !ok {
if cd.writeErr == nil && mr.numTasks > 0 {
writeRow()
}
return
}

lt.resultWaitGroup.Done()

if cd.writeErr != nil {
continue
}

if tr.taskResultFlags.isZero() {

mr.sumLag += tr.Meta.Lag

continue
}

if mr.intervalID.Before(tr.Meta.IntervalID) {
mr.intervalID = tr.Meta.IntervalID
mr.numIntervalTasks = tr.Meta.NumIntervalTasks
mr.lag = tr.Meta.Lag
}

if mr.minQueuedDuration > tr.QueuedDuration {
mr.minQueuedDuration = tr.QueuedDuration
}

if mr.minTaskDuration > tr.TaskDuration {
mr.minTaskDuration = tr.TaskDuration
}

if mr.maxTaskDuration < tr.TaskDuration {
mr.maxTaskDuration = tr.TaskDuration
}

if mr.maxQueuedDuration < tr.QueuedDuration {
mr.maxQueuedDuration = tr.QueuedDuration
}

mr.sumQueuedDuration += tr.QueuedDuration
mr.sumTaskDuration += tr.TaskDuration
mr.numPass += int(tr.Passed)
mr.numFail += int(tr.Errored)
mr.numPanic += int(tr.Panicked)
mr.numRetry += int(tr.RetryQueued)

mr.numTasks++

if mr.numTasks >= mr.numIntervalTasks {

writeRow()
mr.reset()

if cd.writeErr == nil && !cd.flushDeadline.After(time.Now()) {
cd.writer.Flush()
if err := cd.writer.Error(); err != nil {
cd.setErr(err) // sets error state in multiple goroutine safe way
}
cd.flushDeadline = time.Now().Add(cd.flushInterval)
}
}
}
}

//
// helpers
//
Expand Down
1 change: 1 addition & 0 deletions loadtester/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func main() {
loadtester.NumWorkers(numWorkers),
loadtester.NumIntervalTasks(25),
loadtester.Interval(1*time.Second),
// loadtester.LatencyPercentileUint8(95), // default is 0 (disabled/not-calculated)
// loadtester.FlushRetriesOnShutdown(true), // default is false
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions loadtester/example_http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func main() {
loadtester.NumWorkers(parallelism),
loadtester.NumIntervalTasks(parallelism),
loadtester.Interval(5*time.Second),
// loadtester.LatencyPercentileUint8(95), // default is 0 (disabled/not-calculated)
// loadtester.FlushRetriesOnShutdown(true), // default is false
)
if err != nil {
Expand Down
Loading

0 comments on commit 37c19a7

Please sign in to comment.