Skip to content

Commit

Permalink
Limit number of goroutines for GetMetricData calls
Browse files Browse the repository at this point in the history
The CloudWatch client already limits the number of concurrent requests
to GetMetricData API via the `-cloudwatch-concurrency` flag, however
in discovery jobs there's no limit to how many goroutines can be spawn
at the same time and get blocked waiting for the client to be available.

With this change, we use an errgroup to block adding goroutines to
handle GetMetricData partitions until the client is actually available.
  • Loading branch information
cristiangreco committed Aug 2, 2023
1 parent ba183dd commit 2f55b60
Showing 1 changed file with 34 additions and 19 deletions.
53 changes: 34 additions & 19 deletions pkg/job/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/job/maxdimassociator"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/logging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/model"
"golang.org/x/sync/errgroup"
)

type resourceAssociator interface {
Expand All @@ -32,6 +33,7 @@ func runDiscoveryJob(
clientTag tagging.Client,
clientCloudwatch cloudwatch.Client,
metricsPerQuery int,
concurrencyLimit int,
) ([]*model.TaggedResource, []*model.CloudwatchData) {
logger.Debug("Get tagged resources")

Expand Down Expand Up @@ -61,33 +63,46 @@ func runDiscoveryJob(

maxMetricCount := metricsPerQuery
length := getMetricDataInputLength(job.Metrics)
partition := int(math.Ceil(float64(metricDataLength) / float64(maxMetricCount)))
logger.Debug("GetMetricData partitions", "total", partition)
partitionSize := int(math.Ceil(float64(metricDataLength) / float64(maxMetricCount)))
logger.Debug("GetMetricData partitions", "size", partitionSize)

var wg sync.WaitGroup
wg.Add(partition)
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(concurrencyLimit)

getMetricDataOutput := make([][]*cloudwatch.MetricDataResult, partition)
count := 0
mu := sync.Mutex{}
getMetricDataOutput := make([][]*cloudwatch.MetricDataResult, 0, partitionSize)

count := 0
for i := 0; i < metricDataLength; i += maxMetricCount {
go func(i, n int) {
defer wg.Done()
end := i + maxMetricCount
if end > metricDataLength {
end = metricDataLength
}
input := getMetricDatas[i:end]
data := clientCloudwatch.GetMetricData(ctx, logger, input, svc.Namespace, length, job.Delay, job.RoundingPeriod)
start := i
end := i + maxMetricCount
if end > metricDataLength {
end = metricDataLength
}
partitionNum := count
count++

g.Go(func() error {
logger.Debug("GetMetricData partition", "start", start, "end", end, "partitionNum", partitionNum)

input := getMetricDatas[start:end]
data := clientCloudwatch.GetMetricData(gCtx, logger, input, svc.Namespace, length, job.Delay, job.RoundingPeriod)
if data != nil {
getMetricDataOutput[n] = data
mu.Lock()
getMetricDataOutput = append(getMetricDataOutput, data)
mu.Unlock()
} else {
logger.Warn("GetMetricData partition empty result", "partition", n, "start", i, "end", end)
logger.Warn("GetMetricData partition empty result", "start", start, "end", end, "partitionNum", partitionNum)
}
}(i, count)
count++

return nil
})
}

if err = g.Wait(); err != nil {
logger.Error(err, "GetMetricData work group error")
return nil, nil
}
wg.Wait()

// Update getMetricDatas slice with values and timestamps from API response.
// We iterate through the response MetricDataResults and match the result ID
Expand Down

0 comments on commit 2f55b60

Please sign in to comment.