From e7e44e792603493b7570bfe4a2e560c895646595 Mon Sep 17 00:00:00 2001 From: edmocosta <11836452+edmocosta@users.noreply.github.com> Date: Thu, 23 Jan 2025 15:05:34 +0100 Subject: [PATCH 1/4] Add support for flat configuration style --- .../add-config-flat-statements-support.yaml | 30 ++++ processor/transformprocessor/config.go | 61 +++++++ processor/transformprocessor/config_test.go | 118 +++++++++++++ .../internal/common/cache.go | 24 +++ .../internal/common/config.go | 5 + .../internal/common/logs.go | 1 + .../internal/common/metrics.go | 1 + .../internal/common/traces.go | 1 + .../internal/logs/processor.go | 17 +- .../internal/logs/processor_test.go | 131 ++++++++++++++ .../internal/metrics/processor.go | 16 +- .../internal/metrics/processor_test.go | 163 ++++++++++++++++++ .../internal/traces/processor.go | 16 +- .../internal/traces/processor_test.go | 160 +++++++++++++++++ .../transformprocessor/testdata/config.yaml | 49 ++++++ 15 files changed, 781 insertions(+), 12 deletions(-) create mode 100644 .chloggen/add-config-flat-statements-support.yaml create mode 100644 processor/transformprocessor/internal/common/cache.go diff --git a/.chloggen/add-config-flat-statements-support.yaml b/.chloggen/add-config-flat-statements-support.yaml new file mode 100644 index 000000000000..4af43774499d --- /dev/null +++ b/.chloggen/add-config-flat-statements-support.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for flat configuration style. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29017] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The flat configuration style allows users to configure statements by providing a list of statements instead of a + structured configuration map. The statement's context is expressed by adding the context's name prefix to path names, + which are used to infer and to select the appropriate context for the statement. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index 8c5b7f35e7d4..26d3576046c3 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -5,8 +5,11 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-co import ( "errors" + "fmt" + "reflect" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/featuregate" "go.uber.org/multierr" "go.uber.org/zap" @@ -25,6 +28,7 @@ var ( featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32080#issuecomment-2120764953"), ) errFlatLogsGateDisabled = errors.New("'flatten_data' requires the 'transform.flatten.logs' feature gate to be enabled") + contextStatementsFields = []string{"trace_statements", "metric_statements", "log_statements"} ) // Config defines the configuration for the processor. @@ -44,6 +48,63 @@ type Config struct { logger *zap.Logger } +// Unmarshal is used internally by mapstructure to parse the transformprocessor configuration (Config), +// adding support to structured and flat configuration styles (array of statements strings). +// When the flat configuration style is used, each statement becomes a new common.ContextStatements +// object, with empty [common.ContextStatements.Context] value. +// On the other hand, structured configurations are parsed following the mapstructure Config format. +// Mixed configuration styles are also supported. +func (c *Config) Unmarshal(component *confmap.Conf) error { + if component == nil { + return nil + } + + contextStatementsPatch := map[string]any{} + for _, fieldName := range contextStatementsFields { + if !component.IsSet(fieldName) { + continue + } + rawVal := component.Get(fieldName) + values, ok := rawVal.([]any) + if !ok { + return fmt.Errorf("invalid %s type, expected: array, got: %t", fieldName, rawVal) + } + if len(values) == 0 { + continue + } + + stmts := make([]any, 0, len(values)) + for i, value := range values { + // Array of strings means it's a flat configuration style + if reflect.TypeOf(value).Kind() == reflect.String { + stmts = append(stmts, map[string]any{ + "statements": []any{value}, + "shared_cache": true, + }) + } else { + configuredKeys, ok := value.(map[string]any) + if ok { + _, hasShareCacheKey := configuredKeys["shared_cache"] + if hasShareCacheKey { + return fmt.Errorf("%s[%d] has invalid keys: %s", fieldName, i, "shared_cache") + } + } + stmts = append(stmts, value) + } + } + contextStatementsPatch[fieldName] = stmts + } + + if len(contextStatementsPatch) > 0 { + err := component.Merge(confmap.NewFromStringMap(contextStatementsPatch)) + if err != nil { + return err + } + } + + return component.Unmarshal(c) +} + var _ component.Config = (*Config)(nil) func (c *Config) Validate() error { diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index e736707cd706..1a56973b4d1e 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -201,6 +201,108 @@ func TestLoadConfig(t *testing.T) { }, }, }, + { + id: component.NewIDWithName(metadata.Type, "flat_configuration"), + expected: &Config{ + ErrorMode: ottl.PropagateError, + TraceStatements: []common.ContextStatements{ + { + SharedCache: true, + Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`}, + }, + { + SharedCache: true, + Statements: []string{`set(resource.attributes["name"], "bear")`}, + }, + }, + MetricStatements: []common.ContextStatements{ + { + SharedCache: true, + Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`}, + }, + { + SharedCache: true, + Statements: []string{`set(resource.attributes["name"], "bear")`}, + }, + }, + LogStatements: []common.ContextStatements{ + { + SharedCache: true, + Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`}, + }, + { + SharedCache: true, + Statements: []string{`set(resource.attributes["name"], "bear")`}, + }, + }, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "mixed_configuration_styles"), + expected: &Config{ + ErrorMode: ottl.PropagateError, + TraceStatements: []common.ContextStatements{ + { + SharedCache: true, + Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`}, + }, + { + Context: "span", + Statements: []string{ + `set(attributes["name"], "bear")`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + { + Statements: []string{`set(span.attributes["name"], "lion")`}, + }, + { + SharedCache: true, + Statements: []string{`set(span.name, "lion") where span.attributes["http.path"] == "/animal"`}, + }, + }, + MetricStatements: []common.ContextStatements{ + { + SharedCache: true, + Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`}, + }, + { + Context: "resource", + Statements: []string{ + `set(attributes["name"], "bear")`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + { + Statements: []string{`set(metric.name, "lion")`}, + }, + { + SharedCache: true, + Statements: []string{`set(metric.name, "lion") where resource.attributes["http.path"] == "/animal"`}, + }, + }, + LogStatements: []common.ContextStatements{ + { + SharedCache: true, + Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`}, + }, + { + Context: "resource", + Statements: []string{ + `set(attributes["name"], "bear")`, + `keep_keys(attributes, ["http.method", "http.path"])`, + }, + }, + { + Statements: []string{`set(log.attributes["name"], "lion")`}, + }, + { + SharedCache: true, + Statements: []string{`set(log.body, "lion") where log.attributes["http.path"] == "/animal"`}, + }, + }, + }, + }, } for _, tt := range tests { t.Run(tt.id.Name(), func(t *testing.T) { @@ -257,3 +359,19 @@ func Test_UnknownErrorMode(t *testing.T) { assert.NoError(t, err) assert.Error(t, sub.Unmarshal(cfg)) } + +func Test_SharedCacheKeyError(t *testing.T) { + id := component.NewIDWithName(metadata.Type, "with_shared_cache_key") + + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + assert.NoError(t, err) + + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + + sub, err := cm.Sub(id.String()) + assert.NoError(t, err) + + err = sub.Unmarshal(cfg) + assert.ErrorContains(t, err, "metric_statements[0] has invalid keys: shared_cache") +} diff --git a/processor/transformprocessor/internal/common/cache.go b/processor/transformprocessor/internal/common/cache.go new file mode 100644 index 000000000000..a88681717f10 --- /dev/null +++ b/processor/transformprocessor/internal/common/cache.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// LoadContextCache retrieves or creates a context cache for the given context ID. +// If `sharedCache` is true, it returns the cached context map if it exists, +// or creates and stores a new one if it does not. If `sharedCache` is false, it returns nil. +func LoadContextCache(cache map[ContextID]*pcommon.Map, context ContextID, sharedCache bool) *pcommon.Map { + if !sharedCache { + return nil + } + v, ok := cache[context] + if ok { + return v + } + m := pcommon.NewMap() + cache[context] = &m + return &m +} diff --git a/processor/transformprocessor/internal/common/config.go b/processor/transformprocessor/internal/common/config.go index 79087389d644..8e36a0a9619d 100644 --- a/processor/transformprocessor/internal/common/config.go +++ b/processor/transformprocessor/internal/common/config.go @@ -39,6 +39,11 @@ type ContextStatements struct { Context ContextID `mapstructure:"context"` Conditions []string `mapstructure:"conditions"` Statements []string `mapstructure:"statements"` + // SharedCache is experimental and subject to change or removal in the future. + // Although it's configurable via `mapstructure`, users won't be able to set it on their + // configurations, as it's currently meant for internal use only, and it's validated by + // the transformprocessor Config unmarshaller function. + SharedCache bool `mapstructure:"shared_cache"` } func (c ContextStatements) GetStatements() []string { diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go index 711cbc418396..60fd8be5ac75 100644 --- a/processor/transformprocessor/internal/common/logs.go +++ b/processor/transformprocessor/internal/common/logs.go @@ -75,6 +75,7 @@ func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption { func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) { pcOptions := []ottl.ParserCollectionOption[LogsConsumer]{ withCommonContextParsers[LogsConsumer](), + ottl.EnableParserCollectionModifiedStatementLogging[LogsConsumer](true), } for _, option := range options { diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index 82f5434d18e4..fdc9f455f99c 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -195,6 +195,7 @@ func WithMetricErrorMode(errorMode ottl.ErrorMode) MetricParserCollectionOption func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) { pcOptions := []ottl.ParserCollectionOption[MetricsConsumer]{ withCommonContextParsers[MetricsConsumer](), + ottl.EnableParserCollectionModifiedStatementLogging[MetricsConsumer](true), } for _, option := range options { diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go index 4b3dd117b1f0..84cf139a8309 100644 --- a/processor/transformprocessor/internal/common/traces.go +++ b/processor/transformprocessor/internal/common/traces.go @@ -123,6 +123,7 @@ func WithTraceErrorMode(errorMode ottl.ErrorMode) TraceParserCollectionOption { func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) { pcOptions := []ottl.ParserCollectionOption[TracesConsumer]{ withCommonContextParsers[TracesConsumer](), + ottl.EnableParserCollectionModifiedStatementLogging[TracesConsumer](true), } for _, option := range options { diff --git a/processor/transformprocessor/internal/logs/processor.go b/processor/transformprocessor/internal/logs/processor.go index 23037fe847ba..488fdaf4391d 100644 --- a/processor/transformprocessor/internal/logs/processor.go +++ b/processor/transformprocessor/internal/logs/processor.go @@ -7,6 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/multierr" "go.uber.org/zap" @@ -16,8 +17,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) +type parsedContextStatements struct { + common.LogsConsumer + sharedCache bool +} + type Processor struct { - contexts []common.LogsConsumer + contexts []parsedContextStatements logger *zap.Logger flatMode bool } @@ -28,14 +34,14 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E return nil, err } - contexts := make([]common.LogsConsumer, len(contextStatements)) + contexts := make([]parsedContextStatements, len(contextStatements)) var errors error for i, cs := range contextStatements { context, err := pc.ParseContextStatements(cs) if err != nil { errors = multierr.Append(errors, err) } - contexts[i] = context + contexts[i] = parsedContextStatements{context, cs.SharedCache} } if errors != nil { @@ -54,8 +60,11 @@ func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, e pdatautil.FlattenLogs(ld.ResourceLogs()) defer pdatautil.GroupByResourceLogs(ld.ResourceLogs()) } + + sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts)) for _, c := range p.contexts { - err := c.ConsumeLogs(ctx, ld, nil) + cache := common.LoadContextCache(sharedContextCache, c.Context(), c.sharedCache) + err := c.ConsumeLogs(ctx, ld, cache) if err != nil { p.logger.Error("failed processing logs", zap.Error(err)) return ld, err diff --git a/processor/transformprocessor/internal/logs/processor_test.go b/processor/transformprocessor/internal/logs/processor_test.go index f3aee564b923..9510960aff92 100644 --- a/processor/transformprocessor/internal/logs/processor_test.go +++ b/processor/transformprocessor/internal/logs/processor_test.go @@ -935,6 +935,137 @@ func Test_ProcessLogs_ErrorMode(t *testing.T) { } } +func Test_ProcessLogs_CacheAccess(t *testing.T) { + tests := []struct { + name string + statements []common.ContextStatements + want func(td plog.Logs) + }{ + { + name: "resource:resource.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(resource.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(resource.attributes["test"], resource.cache["test"])`}, SharedCache: true}, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + name: "resource:cache", + statements: []common.ContextStatements{ + { + Context: common.Resource, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + }, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + name: "scope:scope.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(scope.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(scope.attributes["test"], scope.cache["test"])`}, SharedCache: true}, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "scope:cache", + statements: []common.ContextStatements{{ + Context: common.Scope, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + }, + }}, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "log:log.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(log.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(log.attributes["test"], log.cache["test"])`}, SharedCache: true}, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "log:cache", + statements: []common.ContextStatements{{ + Context: common.Log, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + }, + }}, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "cache isolation", + statements: []common.ContextStatements{ + { + Statements: []string{`set(log.cache["shared"], "fail")`}, + SharedCache: true, + }, + { + Statements: []string{ + `set(log.cache["test"], "pass")`, + `set(log.attributes["test"], log.cache["test"])`, + `set(log.attributes["test"], log.cache["shared"])`, + }, + }, + { + Context: common.Log, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + `set(attributes["test"], cache["shared"])`, + `set(attributes["test"], log.cache["shared"])`, + }, + }, + { + Statements: []string{`set(log.attributes["test"], "pass") where log.cache["shared"] == "fail"`}, + SharedCache: true, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + td := constructLogs() + processor, err := NewProcessor(tt.statements, ottl.IgnoreError, false, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessLogs(context.Background(), td) + assert.NoError(t, err) + + exTd := constructLogs() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func constructLogs() plog.Logs { td := plog.NewLogs() rs0 := td.ResourceLogs().AppendEmpty() diff --git a/processor/transformprocessor/internal/metrics/processor.go b/processor/transformprocessor/internal/metrics/processor.go index cc134d3d138f..e91370453506 100644 --- a/processor/transformprocessor/internal/metrics/processor.go +++ b/processor/transformprocessor/internal/metrics/processor.go @@ -7,6 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" "go.uber.org/zap" @@ -15,8 +16,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) +type parsedContextStatements struct { + common.MetricsConsumer + sharedCache bool +} + type Processor struct { - contexts []common.MetricsConsumer + contexts []parsedContextStatements logger *zap.Logger } @@ -26,14 +32,14 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E return nil, err } - contexts := make([]common.MetricsConsumer, len(contextStatements)) + contexts := make([]parsedContextStatements, len(contextStatements)) var errors error for i, cs := range contextStatements { context, err := pc.ParseContextStatements(cs) if err != nil { errors = multierr.Append(errors, err) } - contexts[i] = context + contexts[i] = parsedContextStatements{context, cs.SharedCache} } if errors != nil { @@ -47,8 +53,10 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E } func (p *Processor) ProcessMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { + sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts)) for _, c := range p.contexts { - err := c.ConsumeMetrics(ctx, md, nil) + cache := common.LoadContextCache(sharedContextCache, c.Context(), c.sharedCache) + err := c.ConsumeMetrics(ctx, md, cache) if err != nil { p.logger.Error("failed processing metrics", zap.Error(err)) return md, err diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 10132713998b..0c92487a9942 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -1607,6 +1607,169 @@ func Test_ProcessMetrics_ErrorMode(t *testing.T) { } } +func Test_ProcessMetrics_CacheAccess(t *testing.T) { + tests := []struct { + name string + statements []common.ContextStatements + want func(td pmetric.Metrics) + }{ + { + name: "resource:resource.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(resource.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(resource.attributes["test"], resource.cache["test"])`}, SharedCache: true}, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + name: "resource:cache", + statements: []common.ContextStatements{ + { + Context: common.Resource, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + }, + }, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + name: "scope:scope.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(scope.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(scope.attributes["test"], scope.cache["test"])`}, SharedCache: true}, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "scope:cache", + statements: []common.ContextStatements{{ + Context: common.Scope, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + }, + }}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "metric:metric.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(metric.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(metric.name, metric.cache["test"]) where metric.name == "operationB"`}, SharedCache: true}, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetName("pass") + }, + }, + { + name: "metric:cache", + statements: []common.ContextStatements{{ + Context: common.Metric, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(name, cache["test"]) where name == "operationB"`, + }, + }}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetName("pass") + }, + }, + { + name: "datapoint:datapoint.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(datapoint.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(datapoint.attributes["test"], datapoint.cache["test"]) where metric.name == "operationA"`}, SharedCache: true}, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "datapoint:cache", + statements: []common.ContextStatements{{ + Context: common.DataPoint, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"]) where metric.name == "operationA"`, + }, + }}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "cache isolation", + statements: []common.ContextStatements{ + { + Statements: []string{`set(datapoint.cache["shared"], "fail")`}, + SharedCache: true, + }, + { + Statements: []string{ + `set(datapoint.cache["test"], "pass")`, + `set(datapoint.attributes["test"], datapoint.cache["test"])`, + `set(datapoint.attributes["test"], datapoint.cache["shared"])`, + }, + Conditions: []string{ + `metric.name == "operationA"`, + }, + }, + { + Context: common.DataPoint, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + `set(attributes["test"], cache["shared"])`, + `set(attributes["test"], datapoint.cache["shared"])`, + }, + Conditions: []string{ + `metric.name == "operationA"`, + }, + }, + { + Statements: []string{`set(datapoint.attributes["test"], "pass") where datapoint.cache["shared"] == "fail"`}, + SharedCache: true, + Conditions: []string{ + `metric.name == "operationA"`, + }, + }, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + td := constructMetrics() + processor, err := NewProcessor(tt.statements, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessMetrics(context.Background(), td) + assert.NoError(t, err) + + exTd := constructMetrics() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func constructMetrics() pmetric.Metrics { td := pmetric.NewMetrics() rm0 := td.ResourceMetrics().AppendEmpty() diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index 6af07a4a942e..3b2309e0c265 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -7,6 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/multierr" "go.uber.org/zap" @@ -15,8 +16,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) +type parsedContextStatements struct { + common.TracesConsumer + sharedCache bool +} + type Processor struct { - contexts []common.TracesConsumer + contexts []parsedContextStatements logger *zap.Logger } @@ -26,14 +32,14 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E return nil, err } - contexts := make([]common.TracesConsumer, len(contextStatements)) + contexts := make([]parsedContextStatements, len(contextStatements)) var errors error for i, cs := range contextStatements { context, err := pc.ParseContextStatements(cs) if err != nil { errors = multierr.Append(errors, err) } - contexts[i] = context + contexts[i] = parsedContextStatements{context, cs.SharedCache} } if errors != nil { @@ -47,8 +53,10 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E } func (p *Processor) ProcessTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { + sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts)) for _, c := range p.contexts { - err := c.ConsumeTraces(ctx, td, nil) + cache := common.LoadContextCache(sharedContextCache, c.Context(), c.sharedCache) + err := c.ConsumeTraces(ctx, td, cache) if err != nil { p.logger.Error("failed processing traces", zap.Error(err)) return td, err diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index 9ac79c8a2e4b..9fab74454c0b 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -1001,6 +1001,166 @@ func Test_ProcessTraces_ErrorMode(t *testing.T) { } } +func Test_ProcessTraces_CacheAccess(t *testing.T) { + tests := []struct { + name string + statements []common.ContextStatements + want func(td ptrace.Traces) + }{ + { + name: "resource:resource.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(resource.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(resource.attributes["test"], resource.cache["test"])`}, SharedCache: true}, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + name: "resource:cache", + statements: []common.ContextStatements{ + { + Context: common.Resource, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + }, + }, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + name: "scope:scope.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(scope.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(scope.attributes["test"], scope.cache["test"])`}, SharedCache: true}, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "scope:cache", + statements: []common.ContextStatements{{ + Context: common.Scope, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + }, + }}, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "span:span.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(span.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(span.attributes["test"], span.cache["test"]) where span.name == "operationA"`}, SharedCache: true}, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + name: "span:cache", + statements: []common.ContextStatements{{ + Context: common.Span, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"]) where name == "operationA"`, + }, + }}, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + name: "spanevent:spanevent.cache", + statements: []common.ContextStatements{ + {Statements: []string{`set(spanevent.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(spanevent.attributes["test"], spanevent.cache["test"]) where spanevent.name == "eventA"`}, SharedCache: true}, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Events().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + name: "spanevent:cache", + statements: []common.ContextStatements{{ + Context: common.SpanEvent, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"]) where name == "eventA"`, + }, + }}, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Events().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + name: "cache isolation", + statements: []common.ContextStatements{ + { + Statements: []string{`set(span.cache["shared"], "fail")`}, + SharedCache: true, + }, + { + Statements: []string{ + `set(span.cache["test"], "pass")`, + `set(span.attributes["test"], span.cache["test"])`, + `set(span.attributes["test"], span.cache["shared"])`, + }, + Conditions: []string{ + `name == "operationA"`, + }, + }, + { + Context: common.Span, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + `set(attributes["test"], cache["shared"])`, + `set(attributes["test"], span.cache["shared"])`, + }, + Conditions: []string{ + `name == "operationA"`, + }, + }, + { + Statements: []string{`set(span.attributes["test"], "pass") where span.cache["shared"] == "fail"`}, + SharedCache: true, + Conditions: []string{ + `name == "operationA"`, + }, + }, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + td := constructTraces() + processor, err := NewProcessor(tt.statements, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessTraces(context.Background(), td) + assert.NoError(t, err) + + exTd := constructTraces() + tt.want(exTd) + + assert.Equal(t, exTd, td) + }) + } +} + func BenchmarkTwoSpans(b *testing.B) { tests := []struct { name string diff --git a/processor/transformprocessor/testdata/config.yaml b/processor/transformprocessor/testdata/config.yaml index c327eeb3a2ef..178f93293d83 100644 --- a/processor/transformprocessor/testdata/config.yaml +++ b/processor/transformprocessor/testdata/config.yaml @@ -146,3 +146,52 @@ transform/structured_configuration_with_inferred_context: - statements: - set(log.body, "bear") where log.attributes["http.path"] == "/animal" - set(resource.attributes["name"], "bear") + +transform/flat_configuration: + trace_statements: + - set(span.name, "bear") where span.attributes["http.path"] == "/animal" + - set(resource.attributes["name"], "bear") + metric_statements: + - set(metric.name, "bear") where resource.attributes["http.path"] == "/animal" + - set(resource.attributes["name"], "bear") + log_statements: + - set(log.body, "bear") where log.attributes["http.path"] == "/animal" + - set(resource.attributes["name"], "bear") + +transform/mixed_configuration_styles: + trace_statements: + - set(span.name, "bear") where span.attributes["http.path"] == "/animal" + - context: span + statements: + - set(attributes["name"], "bear") + - keep_keys(attributes, ["http.method", "http.path"]) + - statements: + - set(span.attributes["name"], "lion") + - set(span.name, "lion") where span.attributes["http.path"] == "/animal" + metric_statements: + - set(metric.name, "bear") where resource.attributes["http.path"] == "/animal" + - context: resource + statements: + - set(attributes["name"], "bear") + - keep_keys(attributes, ["http.method", "http.path"]) + - statements: + - set(metric.name, "lion") + - set(metric.name, "lion") where resource.attributes["http.path"] == "/animal" + log_statements: + - set(log.body, "bear") where log.attributes["http.path"] == "/animal" + - context: resource + statements: + - set(attributes["name"], "bear") + - keep_keys(attributes, ["http.method", "http.path"]) + - statements: + - set(log.attributes["name"], "lion") + - set(log.body, "lion") where log.attributes["http.path"] == "/animal" + +transform/with_shared_cache_key: + trace_statements: + - statements: + - set(resource.attributes["name"], "propagate") + metric_statements: + - statements: + - set(resource.attributes["name"], "silent") + shared_cache: true \ No newline at end of file From 68f2056f08879710d2997028bc783443bddb3bef Mon Sep 17 00:00:00 2001 From: edmocosta <11836452+edmocosta@users.noreply.github.com> Date: Fri, 24 Jan 2025 10:51:53 +0100 Subject: [PATCH 2/4] Apply code review suggestions --- processor/transformprocessor/config.go | 28 ++++++++++++++----- .../internal/common/cache.go | 14 ++++------ .../internal/logs/processor.go | 5 +++- .../internal/metrics/processor.go | 5 +++- .../internal/traces/processor.go | 5 +++- 5 files changed, 38 insertions(+), 19 deletions(-) diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index 26d3576046c3..cde26a00aea3 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -49,22 +49,36 @@ type Config struct { } // Unmarshal is used internally by mapstructure to parse the transformprocessor configuration (Config), -// adding support to structured and flat configuration styles (array of statements strings). +// adding support to structured and flat configuration styles. // When the flat configuration style is used, each statement becomes a new common.ContextStatements // object, with empty [common.ContextStatements.Context] value. // On the other hand, structured configurations are parsed following the mapstructure Config format. // Mixed configuration styles are also supported. -func (c *Config) Unmarshal(component *confmap.Conf) error { - if component == nil { +// +// Example of flat configuration: +// +// log_statements: +// - set(attributes["service.new_name"], attributes["service.name"]) +// - delete_key(attributes, "service.name") +// +// Example of structured configuration: +// +// log_statements: +// - context: "span" +// statements: +// - set(attributes["service.new_name"], attributes["service.name"]) +// - delete_key(attributes, "service.name") +func (c *Config) Unmarshal(conf *confmap.Conf) error { + if conf == nil { return nil } contextStatementsPatch := map[string]any{} for _, fieldName := range contextStatementsFields { - if !component.IsSet(fieldName) { + if !conf.IsSet(fieldName) { continue } - rawVal := component.Get(fieldName) + rawVal := conf.Get(fieldName) values, ok := rawVal.([]any) if !ok { return fmt.Errorf("invalid %s type, expected: array, got: %t", fieldName, rawVal) @@ -96,13 +110,13 @@ func (c *Config) Unmarshal(component *confmap.Conf) error { } if len(contextStatementsPatch) > 0 { - err := component.Merge(confmap.NewFromStringMap(contextStatementsPatch)) + err := conf.Merge(confmap.NewFromStringMap(contextStatementsPatch)) if err != nil { return err } } - return component.Unmarshal(c) + return conf.Unmarshal(c) } var _ component.Config = (*Config)(nil) diff --git a/processor/transformprocessor/internal/common/cache.go b/processor/transformprocessor/internal/common/cache.go index a88681717f10..c36b4ab49fcf 100644 --- a/processor/transformprocessor/internal/common/cache.go +++ b/processor/transformprocessor/internal/common/cache.go @@ -7,18 +7,14 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" ) -// LoadContextCache retrieves or creates a context cache for the given context ID. -// If `sharedCache` is true, it returns the cached context map if it exists, -// or creates and stores a new one if it does not. If `sharedCache` is false, it returns nil. -func LoadContextCache(cache map[ContextID]*pcommon.Map, context ContextID, sharedCache bool) *pcommon.Map { - if !sharedCache { - return nil - } - v, ok := cache[context] +// LoadContextCache retrieves or creates a context cache map for the given context ID. +// If the cache is not found, a new map is created and stored in the contextCache map. +func LoadContextCache(contextCache map[ContextID]*pcommon.Map, context ContextID) *pcommon.Map { + v, ok := contextCache[context] if ok { return v } m := pcommon.NewMap() - cache[context] = &m + contextCache[context] = &m return &m } diff --git a/processor/transformprocessor/internal/logs/processor.go b/processor/transformprocessor/internal/logs/processor.go index 488fdaf4391d..e1133ddb20d1 100644 --- a/processor/transformprocessor/internal/logs/processor.go +++ b/processor/transformprocessor/internal/logs/processor.go @@ -63,7 +63,10 @@ func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, e sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts)) for _, c := range p.contexts { - cache := common.LoadContextCache(sharedContextCache, c.Context(), c.sharedCache) + var cache *pcommon.Map + if c.sharedCache { + cache = common.LoadContextCache(sharedContextCache, c.Context()) + } err := c.ConsumeLogs(ctx, ld, cache) if err != nil { p.logger.Error("failed processing logs", zap.Error(err)) diff --git a/processor/transformprocessor/internal/metrics/processor.go b/processor/transformprocessor/internal/metrics/processor.go index e91370453506..4845afded7b7 100644 --- a/processor/transformprocessor/internal/metrics/processor.go +++ b/processor/transformprocessor/internal/metrics/processor.go @@ -55,7 +55,10 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E func (p *Processor) ProcessMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts)) for _, c := range p.contexts { - cache := common.LoadContextCache(sharedContextCache, c.Context(), c.sharedCache) + var cache *pcommon.Map + if c.sharedCache { + cache = common.LoadContextCache(sharedContextCache, c.Context()) + } err := c.ConsumeMetrics(ctx, md, cache) if err != nil { p.logger.Error("failed processing metrics", zap.Error(err)) diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index 3b2309e0c265..68952abbaa21 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -55,7 +55,10 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E func (p *Processor) ProcessTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts)) for _, c := range p.contexts { - cache := common.LoadContextCache(sharedContextCache, c.Context(), c.sharedCache) + var cache *pcommon.Map + if c.sharedCache { + cache = common.LoadContextCache(sharedContextCache, c.Context()) + } err := c.ConsumeTraces(ctx, td, cache) if err != nil { p.logger.Error("failed processing traces", zap.Error(err)) From 2d4f954b13eb9e83aa0beb132cab851be2ec09d7 Mon Sep 17 00:00:00 2001 From: edmocosta <11836452+edmocosta@users.noreply.github.com> Date: Tue, 28 Jan 2025 17:37:46 +0100 Subject: [PATCH 3/4] Change mapstructure tags to ignore the SharedCache field --- processor/transformprocessor/config.go | 36 +++++++++++-------- processor/transformprocessor/config_test.go | 16 --------- .../internal/common/config.go | 10 +++--- 3 files changed, 27 insertions(+), 35 deletions(-) diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index cde26a00aea3..c252d9d6b814 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -28,7 +28,6 @@ var ( featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32080#issuecomment-2120764953"), ) errFlatLogsGateDisabled = errors.New("'flatten_data' requires the 'transform.flatten.logs' feature gate to be enabled") - contextStatementsFields = []string{"trace_statements", "metric_statements", "log_statements"} ) // Config defines the configuration for the processor. @@ -73,8 +72,15 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { return nil } + contextStatementsFields := map[string]*[]common.ContextStatements{ + "trace_statements": &c.TraceStatements, + "metric_statements": &c.MetricStatements, + "log_statements": &c.LogStatements, + } + + flatContextStatements := map[string][]int{} contextStatementsPatch := map[string]any{} - for _, fieldName := range contextStatementsFields { + for fieldName := range contextStatementsFields { if !conf.IsSet(fieldName) { continue } @@ -91,18 +97,9 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { for i, value := range values { // Array of strings means it's a flat configuration style if reflect.TypeOf(value).Kind() == reflect.String { - stmts = append(stmts, map[string]any{ - "statements": []any{value}, - "shared_cache": true, - }) + stmts = append(stmts, map[string]any{"statements": []any{value}}) + flatContextStatements[fieldName] = append(flatContextStatements[fieldName], i) } else { - configuredKeys, ok := value.(map[string]any) - if ok { - _, hasShareCacheKey := configuredKeys["shared_cache"] - if hasShareCacheKey { - return fmt.Errorf("%s[%d] has invalid keys: %s", fieldName, i, "shared_cache") - } - } stmts = append(stmts, value) } } @@ -116,7 +113,18 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { } } - return conf.Unmarshal(c) + err := conf.Unmarshal(c) + if err != nil { + return err + } + + for fieldName, indexes := range flatContextStatements { + for _, i := range indexes { + (*contextStatementsFields[fieldName])[i].SharedCache = true + } + } + + return err } var _ component.Config = (*Config)(nil) diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index 1a56973b4d1e..b43d20ea954b 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -359,19 +359,3 @@ func Test_UnknownErrorMode(t *testing.T) { assert.NoError(t, err) assert.Error(t, sub.Unmarshal(cfg)) } - -func Test_SharedCacheKeyError(t *testing.T) { - id := component.NewIDWithName(metadata.Type, "with_shared_cache_key") - - cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) - assert.NoError(t, err) - - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - - sub, err := cm.Sub(id.String()) - assert.NoError(t, err) - - err = sub.Unmarshal(cfg) - assert.ErrorContains(t, err, "metric_statements[0] has invalid keys: shared_cache") -} diff --git a/processor/transformprocessor/internal/common/config.go b/processor/transformprocessor/internal/common/config.go index 8e36a0a9619d..5855275b5105 100644 --- a/processor/transformprocessor/internal/common/config.go +++ b/processor/transformprocessor/internal/common/config.go @@ -39,11 +39,11 @@ type ContextStatements struct { Context ContextID `mapstructure:"context"` Conditions []string `mapstructure:"conditions"` Statements []string `mapstructure:"statements"` - // SharedCache is experimental and subject to change or removal in the future. - // Although it's configurable via `mapstructure`, users won't be able to set it on their - // configurations, as it's currently meant for internal use only, and it's validated by - // the transformprocessor Config unmarshaller function. - SharedCache bool `mapstructure:"shared_cache"` + + // `SharedCache` is an experimental feature that may change or be removed in the future. + // When enabled, it allows the statements cache to be shared across all other groups that share the cache. + // This feature is not configurable via `mapstructure` and cannot be set in configuration files. + SharedCache bool `mapstructure:"-"` } func (c ContextStatements) GetStatements() []string { From 6b7b56aa53cc22e19e75d8d92dddfac31c6b683e Mon Sep 17 00:00:00 2001 From: Edmo Vamerlatti Costa <11836452+edmocosta@users.noreply.github.com> Date: Mon, 3 Feb 2025 22:42:08 +0100 Subject: [PATCH 4/4] Apply suggestions from code review Co-authored-by: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> --- .../internal/metrics/processor_test.go | 14 +++++++------- .../internal/traces/processor_test.go | 16 ++++++++-------- .../transformprocessor/testdata/config.yaml | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 0c92487a9942..88a6b216ed7b 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -1713,14 +1713,14 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { name: "cache isolation", statements: []common.ContextStatements{ { - Statements: []string{`set(datapoint.cache["shared"], "fail")`}, + Statements: []string{`set(datapoint.cache["shared"], "pass")`}, SharedCache: true, }, { Statements: []string{ - `set(datapoint.cache["test"], "pass")`, + `set(datapoint.cache["test"], "fail")`, `set(datapoint.attributes["test"], datapoint.cache["test"])`, - `set(datapoint.attributes["test"], datapoint.cache["shared"])`, + `set(datapoint.cache["shared"], "fail")`, }, Conditions: []string{ `metric.name == "operationA"`, @@ -1729,17 +1729,17 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { { Context: common.DataPoint, Statements: []string{ - `set(cache["test"], "pass")`, + `set(attributes["extra"], cache["test"]) where cache["test"] != nil`, + `set(cache["test"], "fail")`, `set(attributes["test"], cache["test"])`, - `set(attributes["test"], cache["shared"])`, - `set(attributes["test"], datapoint.cache["shared"])`, + `set(cache["shared"], "fail")`, }, Conditions: []string{ `metric.name == "operationA"`, }, }, { - Statements: []string{`set(datapoint.attributes["test"], "pass") where datapoint.cache["shared"] == "fail"`}, + Statements: []string{`set(datapoint.attributes["test"], "pass") where datapoint.cache["shared"] == "pass"`}, SharedCache: true, Conditions: []string{ `metric.name == "operationA"`, diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index 9fab74454c0b..be8e409267c5 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -1105,14 +1105,14 @@ func Test_ProcessTraces_CacheAccess(t *testing.T) { name: "cache isolation", statements: []common.ContextStatements{ { - Statements: []string{`set(span.cache["shared"], "fail")`}, + Statements: []string{`set(span.cache["shared"], "pass")`}, SharedCache: true, }, { Statements: []string{ - `set(span.cache["test"], "pass")`, + `set(span.cache["test"], "fail")`, `set(span.attributes["test"], span.cache["test"])`, - `set(span.attributes["test"], span.cache["shared"])`, + `set(span.cache["shared"], "fail")`, }, Conditions: []string{ `name == "operationA"`, @@ -1121,17 +1121,17 @@ func Test_ProcessTraces_CacheAccess(t *testing.T) { { Context: common.Span, Statements: []string{ - `set(cache["test"], "pass")`, - `set(attributes["test"], cache["test"])`, - `set(attributes["test"], cache["shared"])`, - `set(attributes["test"], span.cache["shared"])`, + `set(cache["shared"], "fail")`, + `set(attributes["extra"], cache["test"]) where cache["test"] != nil`, + `set(cache["test"], "fail")`, + `set(attributes["test"], span.cache["test"])`, }, Conditions: []string{ `name == "operationA"`, }, }, { - Statements: []string{`set(span.attributes["test"], "pass") where span.cache["shared"] == "fail"`}, + Statements: []string{`set(span.attributes["test"], "pass") where span.cache["shared"] == "pass"`}, SharedCache: true, Conditions: []string{ `name == "operationA"`, diff --git a/processor/transformprocessor/testdata/config.yaml b/processor/transformprocessor/testdata/config.yaml index 178f93293d83..cdd478c3d25d 100644 --- a/processor/transformprocessor/testdata/config.yaml +++ b/processor/transformprocessor/testdata/config.yaml @@ -194,4 +194,4 @@ transform/with_shared_cache_key: metric_statements: - statements: - set(resource.attributes["name"], "silent") - shared_cache: true \ No newline at end of file + shared_cache: true