diff --git a/plugins/outputs/cloudwatch/aggregator.go b/plugins/outputs/cloudwatch/aggregator.go index b21edfd0d2..24ee8c4ce6 100644 --- a/plugins/outputs/cloudwatch/aggregator.go +++ b/plugins/outputs/cloudwatch/aggregator.go @@ -119,12 +119,16 @@ func newDurationAggregator(durationInSeconds time.Duration, func (durationAgg *durationAggregator) aggregating() { durationAgg.wg.Add(1) - // sleep for some time until next round duration from now. + // Sleep to align the interval to the wall clock. + // This initial sleep is not interrupted if the aggregator gets shutdown. now := time.Now() time.Sleep(now.Truncate(durationAgg.aggregationDuration).Add(durationAgg.aggregationDuration).Sub(now)) durationAgg.ticker = time.NewTicker(durationAgg.aggregationDuration) defer durationAgg.ticker.Stop() for { + // There is no priority to select{}. + // If there is a new metric AND the shutdownChan is closed when this + // loop begins, then the behavior is random. select { case m := <-durationAgg.aggregationChan: // https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html diff --git a/plugins/outputs/cloudwatch/aggregator_test.go b/plugins/outputs/cloudwatch/aggregator_test.go index c826420955..33b0a8a7b1 100644 --- a/plugins/outputs/cloudwatch/aggregator_test.go +++ b/plugins/outputs/cloudwatch/aggregator_test.go @@ -136,19 +136,30 @@ func TestAggregator_ShutdownBehavior(t *testing.T) { // verify the remaining metrics can be read after shutdown // the metrics should be available immediately after the shutdown even before aggregation period aggregationInterval := 2 * time.Second - tags := map[string]string{"d1key": "d1value", "d2key": "d2value", aggregationIntervalTagKey: aggregationInterval.String()} + tags := map[string]string{ + "d1key": "d1value", + "d2key": "d2value", + aggregationIntervalTagKey: aggregationInterval.String()} fields := map[string]interface{}{"value": 1} timestamp := time.Now() m := metric.New(metricName, tags, fields, timestamp) aggregator.AddMetric(m) - - //give some time to aggregation to do the work - time.Sleep(time.Second * 2) - + // The Aggregator creates a new durationAggregator for each metric. + // And there is a delay when each new durationAggregator begins. + // So submit a metric and wait for the first aggregation to occur. + assertMetricContent(t, metricChan, 3*aggregationInterval, m, expectedFieldContent{ + "value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}}) + assertNoMetricsInChan(t, metricChan) + // Now submit the same metric and it should be routed to the existing + // durationAggregator without delay. + timestamp = time.Now() + m = metric.New(metricName, tags, fields, timestamp) + aggregator.AddMetric(m) + // Shutdown before the 2nd aggregationInterval completes. close(shutdownChan) wg.Wait() - - assertMetricContent(t, metricChan, 1*time.Second, m, expectedFieldContent{"value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}}) + assertMetricContent(t, metricChan, 1*time.Second, m, expectedFieldContent{ + "value", 1, 1, 1, 1, "", []float64{1.0488088481701516}, []float64{1}}) assertNoMetricsInChan(t, metricChan) }