Skip to content

Commit

Permalink
Limit number of goroutines for GetMetricData calls
Browse files Browse the repository at this point in the history
The CloudWatch client already limits the number of concurrent requests
to GetMetricData API via the `-cloudwatch-concurrency` flag, however
in discovery jobs there's no limit to how many goroutines can be spawn
at the same time and get blocked waiting for the client to be available.

With this change, we use an errgroup to block adding goroutines to
handle GetMetricData partitions until the client is actually available.
  • Loading branch information
cristiangreco committed Aug 2, 2023
1 parent ba183dd commit 96a0117
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 20 deletions.
54 changes: 35 additions & 19 deletions pkg/job/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"sync"

"golang.org/x/sync/errgroup"

"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/cloudwatch"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/clients/tagging"
"github.com/nerdswords/yet-another-cloudwatch-exporter/pkg/config"
Expand All @@ -32,6 +34,7 @@ func runDiscoveryJob(
clientTag tagging.Client,
clientCloudwatch cloudwatch.Client,
metricsPerQuery int,
concurrencyLimit int,
) ([]*model.TaggedResource, []*model.CloudwatchData) {
logger.Debug("Get tagged resources")

Expand Down Expand Up @@ -61,33 +64,46 @@ func runDiscoveryJob(

maxMetricCount := metricsPerQuery
length := getMetricDataInputLength(job.Metrics)
partition := int(math.Ceil(float64(metricDataLength) / float64(maxMetricCount)))
logger.Debug("GetMetricData partitions", "total", partition)
partitionSize := int(math.Ceil(float64(metricDataLength) / float64(maxMetricCount)))
logger.Debug("GetMetricData partitions", "size", partitionSize)

var wg sync.WaitGroup
wg.Add(partition)
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(concurrencyLimit)

getMetricDataOutput := make([][]*cloudwatch.MetricDataResult, partition)
count := 0
mu := sync.Mutex{}
getMetricDataOutput := make([][]*cloudwatch.MetricDataResult, 0, partitionSize)

count := 0
for i := 0; i < metricDataLength; i += maxMetricCount {
go func(i, n int) {
defer wg.Done()
end := i + maxMetricCount
if end > metricDataLength {
end = metricDataLength
}
input := getMetricDatas[i:end]
data := clientCloudwatch.GetMetricData(ctx, logger, input, svc.Namespace, length, job.Delay, job.RoundingPeriod)
start := i
end := i + maxMetricCount
if end > metricDataLength {
end = metricDataLength
}
partitionNum := count
count++

g.Go(func() error {
logger.Debug("GetMetricData partition", "start", start, "end", end, "partitionNum", partitionNum)

input := getMetricDatas[start:end]
data := clientCloudwatch.GetMetricData(gCtx, logger, input, svc.Namespace, length, job.Delay, job.RoundingPeriod)
if data != nil {
getMetricDataOutput[n] = data
mu.Lock()
getMetricDataOutput = append(getMetricDataOutput, data)
mu.Unlock()
} else {
logger.Warn("GetMetricData partition empty result", "partition", n, "start", i, "end", end)
logger.Warn("GetMetricData partition empty result", "start", start, "end", end, "partitionNum", partitionNum)
}
}(i, count)
count++

return nil
})
}

if err = g.Wait(); err != nil {
logger.Error(err, "GetMetricData work group error")
return nil, nil
}
wg.Wait()

// Update getMetricDatas slice with values and timestamps from API response.
// We iterate through the response MetricDataResults and match the result ID
Expand Down
2 changes: 1 addition & 1 deletion pkg/job/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func ScrapeAwsData(
}
jobLogger = jobLogger.With("account", accountID)

resources, metrics := runDiscoveryJob(ctx, jobLogger, discoveryJob, region, accountID, cfg.Discovery.ExportedTagsOnMetrics, factory.GetTaggingClient(region, role, taggingAPIConcurrency), factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency), metricsPerQuery)
resources, metrics := runDiscoveryJob(ctx, jobLogger, discoveryJob, region, accountID, cfg.Discovery.ExportedTagsOnMetrics, factory.GetTaggingClient(region, role, taggingAPIConcurrency), factory.GetCloudwatchClient(region, role, cloudWatchAPIConcurrency), metricsPerQuery, cloudWatchAPIConcurrency)
if len(metrics) != 0 {
mux.Lock()
awsInfoData = append(awsInfoData, resources...)
Expand Down

0 comments on commit 96a0117

Please sign in to comment.