Skip to content

Commit

Permalink
Performance Benchmarking Log monitor and TPS tests (#513)
Browse files Browse the repository at this point in the history
* added concurrent log monitoring/writing to test and tests several combinations of # of logs monitored and different tps

* fixed call to StartLogWrite

* restructured json packet to send to database. CWA configs are now generated at runtime. log number is given as an environment variable so tests can run concurrently, so commented code is included for that change

* rewrote how databse entry packets are crafted/create, defer log stream/group deletions, addressed other small pr comments

* removed extra metrics from CWA config

* separated delete log group and streams so they can be called separately, but kept the original function definition as to not break other code. removed bad calls to DeleteLogGroupAndStreams
  • Loading branch information
gmealy1 authored Jul 21, 2022
1 parent 74ba948 commit 06b61c9
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 178 deletions.
19 changes: 19 additions & 0 deletions integration/test/cwl_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func ValidateLogs(t *testing.T, logGroup, logStream string, numExpectedLogs int,
// DeleteLogGroupAndStream cleans up a log group and stream by name. This gracefully handles
// ResourceNotFoundException errors from calling the APIs
func DeleteLogGroupAndStream(logGroupName, logStreamName string) {
DeleteLogStream(logGroupName, logStreamName)
DeleteLogGroup(logGroupName)
}

// DeleteLogStream cleans up log stream by name
func DeleteLogStream(logGroupName, logStreamName string) {
cwlClient, clientContext, err := getCloudWatchLogsClient()
if err != nil {
log.Printf("Error occurred while creating CloudWatch Logs SDK client: %v", err)
Expand All @@ -96,6 +102,19 @@ func DeleteLogGroupAndStream(logGroupName, logStreamName string) {
if err != nil && !errors.As(err, &rnf) {
log.Printf("Error occurred while deleting log stream %s: %v", logStreamName, err)
}
}

// DeleteLogGroup cleans up log group by name
func DeleteLogGroup(logGroupName string) {
cwlClient, clientContext, err := getCloudWatchLogsClient()
if err != nil {
log.Printf("Error occurred while creating CloudWatch Logs SDK client: %v", err)
return // terminate gracefully so this alone doesn't cause integration test failures
}

// catch ResourceNotFoundException when deleting the log group and log stream, as these
// are not useful exceptions to log errors on during cleanup
var rnf *types.ResourceNotFoundException

_, err = cwlClient.DeleteLogGroup(*clientContext, &cloudwatchlogs.DeleteLogGroupInput{
LogGroupName: aws.String(logGroupName),
Expand Down
114 changes: 99 additions & 15 deletions integration/test/performancetest/performance_query_utils.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package performancetest

import (
"time"
"errors"
"context"
"encoding/json"
"os"
"errors"
"fmt"

"os"
"strconv"
"time"
"sort"
"math"
"log"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
Expand All @@ -19,14 +23,31 @@ const (
DimensionName = "InstanceId"
Stat = "Average"
Period = 10
configPath = "./resources/config.json"

METRIC_PERIOD = 5 * 60 // this const is in seconds , 5 mins
PARTITION_KEY ="Year"
HASH = "Hash"
COMMIT_DATE= "CommitDate"
SHA_ENV = "SHA"
SHA_DATE_ENV = "SHA_DATE"
)

//struct that holds statistics on the returned data
type Stats struct {
Average float64
P99 float64 //99% percent process
Max float64
Min float64
Period int //in seconds
Std float64
Data []float64
}

/*
* GetConfigMetrics parses the cloudwatch agent config and returns the associated
* metrics that the cloudwatch agent is measuring on itself
*/
func GetConfigMetrics() ([]string, []string, error) {
func GetConfigMetrics(configPath string) ([]string, []string, error) {
//get metric measurements from config file
file, err := os.ReadFile(configPath)
if err != nil {
Expand Down Expand Up @@ -120,7 +141,7 @@ func ConstructMetricDataQuery(id, namespace, dimensionName, dimensionValue, metr
return query
}

func GetPerformanceMetrics(instanceId string, agentRuntime int, agentContext context.Context) ([]byte, error) {
func GetPerformanceMetrics(instanceId string, agentRuntime, logNum, tps int, agentContext context.Context, configPath string) (map[string]interface{}, error) {

//load default configuration
cfg, err := config.LoadDefaultConfig(agentContext)
Expand All @@ -131,7 +152,7 @@ func GetPerformanceMetrics(instanceId string, agentRuntime int, agentContext con
client := cloudwatch.NewFromConfig(cfg)

//fetch names of metrics to request and generate corresponding ids
metricNames, ids, err := GetConfigMetrics()
metricNames, ids, err := GetConfigMetrics(configPath)
if err != nil {
return nil, err
}
Expand All @@ -147,12 +168,75 @@ func GetPerformanceMetrics(instanceId string, agentRuntime int, agentContext con
if err != nil {
return nil, err
}

log.Println("Data successfully received from CloudWatch API")

//craft packet to be sent to database
packet := make(map[string]interface{})
//add information about current release/commit
packet[PARTITION_KEY] = time.Now().Year()
packet[HASH] = os.Getenv(SHA_ENV) //fmt.Sprintf("%d", time.Now().UnixNano())
packet[COMMIT_DATE],_ = strconv.Atoi(os.Getenv(SHA_DATE_ENV))

//add test metadata
packet["NumberOfLogsMonitored"] = logNum
packet["TPS"] = tps

//add actual test data with statistics
for _, result := range metrics.MetricDataResults {
packet[*result.Label] = CalcStats(result.Values)
}

//format data to json before passing output
outputData, err := json.MarshalIndent(metrics.MetricDataResults, "", " ")
if err != nil {
return nil, err
}
return packet, nil
}

return outputData, nil
}
/* CalcStats takes in an array of data and returns the average, min, max, p99, and stdev of the data in a Stats struct
* statistics are calculated this way instead of using GetMetricStatistics API because GetMetricStatistics would require multiple
* API calls as only one metric can be requested/processed at a time whereas all metrics can be requested in one GetMetricData request.
*/
func CalcStats(data []float64) Stats {
length := len(data)
if length == 0 {
return Stats{}
}

//make a copy so we aren't modifying original - keeps original data in order of the time
dataCopy := make([]float64, length)
copy(dataCopy, data)
sort.Float64s(dataCopy)

min := dataCopy[0]
max := dataCopy[length - 1]

sum := 0.0
for _, value := range dataCopy {
sum += value
}

avg := sum / float64(length)

if length < 99 {
log.Println("Note: less than 99 values given, p99 value will be equal the max value")
}
p99Index := int(float64(length) * .99) - 1
p99Val := dataCopy[p99Index]

stdDevSum := 0.0
for _, value := range dataCopy {
stdDevSum += math.Pow(avg - value, 2)
}

stdDev := math.Sqrt(stdDevSum / float64(length))

statistics := Stats{
Average: avg,
Max: max,
Min: min,
P99: p99Val,
Std: stdDev,
Period: int(METRIC_PERIOD / float64(length)),
Data: data,
}

return statistics
}
Loading

0 comments on commit 06b61c9

Please sign in to comment.