Skip to content

Commit

Permalink
Scrape auto scaling group attribute in awsentity processor
Browse files Browse the repository at this point in the history
  • Loading branch information
zhihonl committed Jan 24, 2025
1 parent 09f8b09 commit 6ef2a2a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 16 deletions.
68 changes: 67 additions & 1 deletion plugins/processors/awsentity/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/extension/entitystore"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsentity/entityattributes"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/awsentity/internal/k8sattributescraper"
"github.com/aws/amazon-cloudwatch-agent/plugins/processors/ec2tagger"
"github.com/aws/amazon-cloudwatch-agent/translator/config"
)

Expand Down Expand Up @@ -131,6 +132,12 @@ func (p *awsEntityProcessor) processMetrics(_ context.Context, md pmetric.Metric
switch p.config.EntityType {
case entityattributes.Resource:
if p.config.Platform == config.ModeEC2 {
// ec2tagger processor may have picked up the ASG name from an ec2:DescribeTags call
if getAutoScalingGroupFromEntityStore() == EMPTY && p.config.ScrapeDatapointAttribute {
if autoScalingGroup := p.scrapeResourceEntityAttribute(rm.At(i).ScopeMetrics()); autoScalingGroup != EMPTY {
setAutoScalingGroup(autoScalingGroup)
}
}
ec2Info = getEC2InfoFromEntityStore()
if ec2Info.GetInstanceID() != EMPTY {
resourceAttrs.PutStr(entityattributes.AttributeEntityType, entityattributes.AttributeEntityAWSResource)
Expand Down Expand Up @@ -285,7 +292,8 @@ func (p *awsEntityProcessor) processMetrics(_ context.Context, md pmetric.Metric

// scrapeServiceAttribute expands the datapoint attributes and search for
// service name and environment attributes. This is only used for components
// that only emit attributes on datapoint level.
// that only emit attributes on datapoint level. This code block contains a lot
// of repeated code because OTEL metrics type do not have a common interface.
func (p *awsEntityProcessor) scrapeServiceAttribute(scopeMetric pmetric.ScopeMetricsSlice) (string, string, string) {
entityServiceName := EMPTY
entityServiceNameSource := EMPTY
Expand Down Expand Up @@ -408,6 +416,64 @@ func (p *awsEntityProcessor) scrapeServiceAttribute(scopeMetric pmetric.ScopeMet
return entityServiceName, entityEnvironmentName, entityServiceNameSource
}

// scrapeResourceEntityAttribute expands the datapoint attributes and search for
// resource entity related attributes. This is only used for components
// that only emit attributes on datapoint level. This code block contains a lot
// of repeated code because OTEL metrics type do not have a common interface.
func (p *awsEntityProcessor) scrapeResourceEntityAttribute(scopeMetric pmetric.ScopeMetricsSlice) string {
autoScalingGroup := EMPTY
for j := 0; j < scopeMetric.Len(); j++ {
metric := scopeMetric.At(j).Metrics()
for k := 0; k < metric.Len(); k++ {
if autoScalingGroup != EMPTY {
return autoScalingGroup
}
m := metric.At(k)
switch m.Type() {
case pmetric.MetricTypeGauge:
dps := m.Gauge().DataPoints()
for l := 0; l < dps.Len(); l++ {
if dpAutoScalingGroup, ok := dps.At(l).Attributes().Get(ec2tagger.CWDimensionASG); ok {
autoScalingGroup = dpAutoScalingGroup.Str()
}
}
case pmetric.MetricTypeSum:
dps := m.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
if dpAutoScalingGroup, ok := dps.At(l).Attributes().Get(ec2tagger.CWDimensionASG); ok {
autoScalingGroup = dpAutoScalingGroup.Str()
}
}
case pmetric.MetricTypeHistogram:
dps := m.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
if dpAutoScalingGroup, ok := dps.At(l).Attributes().Get(ec2tagger.CWDimensionASG); ok {
autoScalingGroup = dpAutoScalingGroup.Str()
}
}
case pmetric.MetricTypeExponentialHistogram:
dps := m.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
if dpAutoScalingGroup, ok := dps.At(l).Attributes().Get(ec2tagger.CWDimensionASG); ok {
autoScalingGroup = dpAutoScalingGroup.Str()
}
}
case pmetric.MetricTypeSummary:
dps := m.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
if dpAutoScalingGroup, ok := dps.At(l).Attributes().Get(ec2tagger.CWDimensionASG); ok {
autoScalingGroup = dpAutoScalingGroup.Str()
}
}
default:
p.logger.Debug("Ignore unknown metric type", zap.String("type", m.Type().String()))
}

}
}
return autoScalingGroup
}

// getServiceAttributes prioritize service name retrieval based on
// following attribute priority
// 1. service.name
Expand Down
2 changes: 1 addition & 1 deletion plugins/processors/ec2tagger/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const sampleConfig = `

const (
Ec2InstanceTagKeyASG = "aws:autoscaling:groupName"
cwDimensionASG = "AutoScalingGroupName"
CWDimensionASG = "AutoScalingGroupName"
mdKeyInstanceId = "InstanceId"
mdKeyImageId = "ImageId"
mdKeyInstanceType = "InstanceType"
Expand Down
6 changes: 3 additions & 3 deletions plugins/processors/ec2tagger/ec2tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (t *Tagger) updateTags() error {
key := *tag.Key
if Ec2InstanceTagKeyASG == key {
// rename to match CW dimension as applied by AutoScaling service, not the EC2 tag
key = cwDimensionASG
key = CWDimensionASG
}
tags[key] = *tag.Value
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func (t *Tagger) ec2TagsRetrieved() bool {
if t.ec2TagCache != nil {
for _, key := range t.EC2InstanceTagKeys {
if key == Ec2InstanceTagKeyASG {
key = cwDimensionASG
key = CWDimensionASG
}
if key == "*" {
continue
Expand Down Expand Up @@ -303,7 +303,7 @@ func (t *Tagger) Start(ctx context.Context, host component.Host) error {
if !useAllTags && len(t.EC2InstanceTagKeys) > 0 {
// if the customer said 'AutoScalingGroupName' (the CW dimension), do what they mean not what they said
for i, key := range t.EC2InstanceTagKeys {
if cwDimensionASG == key {
if CWDimensionASG == key {
t.EC2InstanceTagKeys[i] = Ec2InstanceTagKeyASG
}
}
Expand Down
28 changes: 17 additions & 11 deletions translator/translate/otel/pipeline/host/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,14 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators,
return nil, fmt.Errorf("no receivers configured in pipeline %s", t.name)
}
var entityProcessor common.Translator[component.Config]
if strings.HasPrefix(t.name, common.PipelineNameHostOtlpMetrics) {
entityProcessor = nil
} else if strings.HasPrefix(t.name, common.PipelineNameHostCustomMetrics) {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, "telegraf", true)
} else if strings.HasPrefix(t.name, common.PipelineNameHost) || strings.HasPrefix(t.name, common.PipelineNameHostDeltaMetrics) {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Resource, "", false)
}
var ec2TaggerEnabled bool

translators := common.ComponentTranslators{
Receivers: t.receivers,
Processors: common.NewTranslatorMap[component.Config](),
Exporters: common.NewTranslatorMap[component.Config](),
Extensions: common.NewTranslatorMap[component.Config](),
}
currentContext := context.CurrentContext()
if entityProcessor != nil && currentContext.Mode() == config.ModeEC2 && !currentContext.RunInContainer() && (t.Destination() == common.CloudWatchKey || t.Destination() == common.DefaultDestination) {
translators.Processors.Set(entityProcessor)
}

if strings.HasPrefix(t.name, common.PipelineNameHostDeltaMetrics) || strings.HasPrefix(t.name, common.PipelineNameHostOtlpMetrics) {
log.Printf("D! delta processor required because metrics with diskio or net are set")
Expand All @@ -91,6 +81,7 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators,
if conf.IsSet(common.ConfigKey(common.MetricsKey, common.AppendDimensionsKey)) {
log.Printf("D! ec2tagger processor required because append_dimensions is set")
translators.Processors.Set(ec2taggerprocessor.NewTranslator())
ec2TaggerEnabled = true
}

mdt := metricsdecorator.NewTranslator(metricsdecorator.WithIgnorePlugins(common.JmxKey))
Expand All @@ -100,6 +91,21 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators,
}
}

if strings.HasPrefix(t.name, common.PipelineNameHostOtlpMetrics) {
entityProcessor = nil
} else if strings.HasPrefix(t.name, common.PipelineNameHostCustomMetrics) {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, "telegraf", true)
} else if (strings.HasPrefix(t.name, common.PipelineNameHost) || strings.HasPrefix(t.name, common.PipelineNameHostDeltaMetrics)) && ec2TaggerEnabled {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Resource, "", true)
} else if strings.HasPrefix(t.name, common.PipelineNameHost) || strings.HasPrefix(t.name, common.PipelineNameHostDeltaMetrics) {
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Resource, "", false)
}

currentContext := context.CurrentContext()
if entityProcessor != nil && currentContext.Mode() == config.ModeEC2 && !currentContext.RunInContainer() && (t.Destination() == common.CloudWatchKey || t.Destination() == common.DefaultDestination) {
translators.Processors.Set(entityProcessor)
}

switch t.Destination() {
case common.DefaultDestination, common.CloudWatchKey:
translators.Exporters.Set(awscloudwatch.NewTranslator())
Expand Down

0 comments on commit 6ef2a2a

Please sign in to comment.