Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] Add support for flat configuration style #37444

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .chloggen/add-config-flat-statements-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for flat configuration style.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29017]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The flat configuration style allows users to configure statements by providing a list of statements instead of a
structured configuration map. The statement's context is expressed by adding the context's name prefix to path names,
which are used to infer and to select the appropriate context for the statement.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
75 changes: 75 additions & 0 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-co

import (
"errors"
"fmt"
"reflect"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand All @@ -25,6 +28,7 @@ var (
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32080#issuecomment-2120764953"),
)
errFlatLogsGateDisabled = errors.New("'flatten_data' requires the 'transform.flatten.logs' feature gate to be enabled")
contextStatementsFields = []string{"trace_statements", "metric_statements", "log_statements"}
)

// Config defines the configuration for the processor.
Expand All @@ -44,6 +48,77 @@ type Config struct {
logger *zap.Logger
}

// Unmarshal is used internally by mapstructure to parse the transformprocessor configuration (Config),
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
// adding support to structured and flat configuration styles.
// When the flat configuration style is used, each statement becomes a new common.ContextStatements
// object, with empty [common.ContextStatements.Context] value.
// On the other hand, structured configurations are parsed following the mapstructure Config format.
// Mixed configuration styles are also supported.
//
// Example of flat configuration:
//
// log_statements:
// - set(attributes["service.new_name"], attributes["service.name"])
// - delete_key(attributes, "service.name")
//
// Example of structured configuration:
//
// log_statements:
// - context: "span"
// statements:
// - set(attributes["service.new_name"], attributes["service.name"])
// - delete_key(attributes, "service.name")
func (c *Config) Unmarshal(conf *confmap.Conf) error {
if conf == nil {
return nil
}

contextStatementsPatch := map[string]any{}
for _, fieldName := range contextStatementsFields {
if !conf.IsSet(fieldName) {
continue
}
rawVal := conf.Get(fieldName)
values, ok := rawVal.([]any)
if !ok {
return fmt.Errorf("invalid %s type, expected: array, got: %t", fieldName, rawVal)
}
if len(values) == 0 {
continue
}

stmts := make([]any, 0, len(values))
for i, value := range values {
// Array of strings means it's a flat configuration style
if reflect.TypeOf(value).Kind() == reflect.String {
stmts = append(stmts, map[string]any{
"statements": []any{value},
"shared_cache": true,
})
} else {
configuredKeys, ok := value.(map[string]any)
if ok {
_, hasShareCacheKey := configuredKeys["shared_cache"]
if hasShareCacheKey {
return fmt.Errorf("%s[%d] has invalid keys: %s", fieldName, i, "shared_cache")
}
}
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
stmts = append(stmts, value)
}
}
contextStatementsPatch[fieldName] = stmts
}

if len(contextStatementsPatch) > 0 {
err := conf.Merge(confmap.NewFromStringMap(contextStatementsPatch))
if err != nil {
return err
}
}

return conf.Unmarshal(c)
}

var _ component.Config = (*Config)(nil)

func (c *Config) Validate() error {
Expand Down
118 changes: 118 additions & 0 deletions processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,108 @@ func TestLoadConfig(t *testing.T) {
},
},
},
{
id: component.NewIDWithName(metadata.Type, "flat_configuration"),
expected: &Config{
ErrorMode: ottl.PropagateError,
TraceStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`},
},
{
SharedCache: true,
Statements: []string{`set(resource.attributes["name"], "bear")`},
},
},
MetricStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`},
},
{
SharedCache: true,
Statements: []string{`set(resource.attributes["name"], "bear")`},
},
},
LogStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`},
},
{
SharedCache: true,
Statements: []string{`set(resource.attributes["name"], "bear")`},
},
},
},
},
{
id: component.NewIDWithName(metadata.Type, "mixed_configuration_styles"),
expected: &Config{
ErrorMode: ottl.PropagateError,
TraceStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`},
},
{
Context: "span",
Statements: []string{
`set(attributes["name"], "bear")`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
{
Statements: []string{`set(span.attributes["name"], "lion")`},
},
{
SharedCache: true,
Statements: []string{`set(span.name, "lion") where span.attributes["http.path"] == "/animal"`},
},
},
MetricStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`},
},
{
Context: "resource",
Statements: []string{
`set(attributes["name"], "bear")`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
{
Statements: []string{`set(metric.name, "lion")`},
},
{
SharedCache: true,
Statements: []string{`set(metric.name, "lion") where resource.attributes["http.path"] == "/animal"`},
},
},
LogStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`},
},
{
Context: "resource",
Statements: []string{
`set(attributes["name"], "bear")`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
{
Statements: []string{`set(log.attributes["name"], "lion")`},
},
{
SharedCache: true,
Statements: []string{`set(log.body, "lion") where log.attributes["http.path"] == "/animal"`},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.id.Name(), func(t *testing.T) {
Expand Down Expand Up @@ -257,3 +359,19 @@ func Test_UnknownErrorMode(t *testing.T) {
assert.NoError(t, err)
assert.Error(t, sub.Unmarshal(cfg))
}

func Test_SharedCacheKeyError(t *testing.T) {
id := component.NewIDWithName(metadata.Type, "with_shared_cache_key")

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
assert.NoError(t, err)

factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(id.String())
assert.NoError(t, err)

err = sub.Unmarshal(cfg)
assert.ErrorContains(t, err, "metric_statements[0] has invalid keys: shared_cache")
}
20 changes: 20 additions & 0 deletions processor/transformprocessor/internal/common/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
)

// LoadContextCache retrieves or creates a context cache map for the given context ID.
// If the cache is not found, a new map is created and stored in the contextCache map.
func LoadContextCache(contextCache map[ContextID]*pcommon.Map, context ContextID) *pcommon.Map {
v, ok := contextCache[context]
if ok {
return v
}
m := pcommon.NewMap()
contextCache[context] = &m
return &m
}
5 changes: 5 additions & 0 deletions processor/transformprocessor/internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type ContextStatements struct {
Context ContextID `mapstructure:"context"`
Conditions []string `mapstructure:"conditions"`
Statements []string `mapstructure:"statements"`
// SharedCache is experimental and subject to change or removal in the future.
// Although it's configurable via `mapstructure`, users won't be able to set it on their
// configurations, as it's currently meant for internal use only, and it's validated by
// the transformprocessor Config unmarshaller function.
SharedCache bool `mapstructure:"shared_cache"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's currently being set programmatically, and does not allow users to configure it on their configurations (https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/37444/files#diff-1e527186a992bb04852a9e8cd6fe43ef611d0e071360c4e40a1432a30efc1d38R89).

That's a conservative approach to keep the behavior the same, but there's no technical reason to not allow it.
if you folks also think it might be useful, we could make this setting available, so users would be able to control which statement's groups are using the shared cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets be opinionated and hide it for now. Config support can be added later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we unexport it and/or remove the mapstructure tags? That would mean the unmarshal function doesn't have to worry about users trying to set it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I couldn't find a clean solution for this field, so I ended up with this approach, considering there's a possibility of making this setting available to users in the future.

Given we're still relying on mapstructure to unmarshal the configuration, unexporting this field would require both, a custom unmarshalling function for common.ContextStatements to set the field value, and some mechanism to pass this information down from the transformprocessor.Config Unmarshal function (which is the one who knows its value). Unexported fields are ignored by mapstructure as it's not possible to set their values using reflection.

I was able to unexport it and make it work by passing the extra shared_cache key here (as it's currently doing), and an extra confmap.WithIgnoreUnused() option here (otherwise mapstructure returns an error), then with that key in the conf map, we just need to read it and set the field value on the common.ContextStatements unmarshaller function. The problem with this approach is that invalid keys are not validated anymore, and we would need to validate them manually, which IMO, is not ideal.

Finally, another option would be removing the mapstructure tag and keep it exported, so we wouldn't need to worry about users trying to set it on their configurations. To set it internally, we would need to use reflection, as I initially implemented on the draft (see 498f9b1).

Do you have any thoughts or ideas on how to work it around?

Copy link
Member

@TylerHelmuth TylerHelmuth Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping that since we're using a custom unmarshaller we it could be the definitive source of whether that value should be true or false. In my head we'd be able to identify if the user is using the flat style and then set c.sharedCache ourselves in the Unmarshall func.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the implementation again, really any time we're doing map[string]any manipulation in the Unmarshall function it would be great to work directly on the c *Config if we can.

Copy link
Contributor Author

@edmocosta edmocosta Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping that since we're using a custom unmarshaller we it could be the definitive source of whether that value should be true or false. In my head we'd be able to identify if the user is using the flat style and then set c.sharedCache ourselves in the Unmarshall func.

We're still relying on mapstructure to unmarshall the configuration, the current logic is only normalizing the flat configuration style yaml map so it can be properly unmarshalled as it was configured using the structured configuration style. That's why we're manipulating map[string]any values instead of the Config struct.

Here is an example of the otlpreceiver doing something similar: https://github.com/open-telemetry/opentelemetry-collector/blob/2447a81885fc580860860bd6a8768422a70c99f8/receiver/otlpreceiver/config.go#L63-L90

In that case it has a 1:1 relation, the yaml config map is compatible with the target structure, so it can call conf.Unmarshal on the very beginning as it's doing. It does not apply to us, as the flat configuration styles is not compatible with the Config struct.

Looking at the implementation again, really any time we're doing map[string]any manipulation in the Unmarshall function it would be great to work directly on the c *Config if we can.

If we move some code around, we can unexport the field and have a hybrid approach without using reflection. After calling conf.Unmarshal, we can iterate over the context statements setting the sharedCache value. For that, we would need to put both transformprocessor.Config and common.ContextStatements into the same package, which I guess wouldn't be an issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we move some code around, we can unexport the field and have a hybrid approach without using reflection. After calling conf.Unmarshal, we can iterate over the context statements setting the sharedCache value. For that, we would need to put both transformprocessor.Config and common.ContextStatements into the same package, which I guess wouldn't be an issue.

ya something like this sounds like a good idea to try.

}

func (c ContextStatements) GetStatements() []string {
Expand Down
1 change: 1 addition & 0 deletions processor/transformprocessor/internal/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption {
func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) {
pcOptions := []ottl.ParserCollectionOption[LogsConsumer]{
withCommonContextParsers[LogsConsumer](),
ottl.EnableParserCollectionModifiedStatementLogging[LogsConsumer](true),
}

for _, option := range options {
Expand Down
1 change: 1 addition & 0 deletions processor/transformprocessor/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func WithMetricErrorMode(errorMode ottl.ErrorMode) MetricParserCollectionOption
func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) {
pcOptions := []ottl.ParserCollectionOption[MetricsConsumer]{
withCommonContextParsers[MetricsConsumer](),
ottl.EnableParserCollectionModifiedStatementLogging[MetricsConsumer](true),
}

for _, option := range options {
Expand Down
1 change: 1 addition & 0 deletions processor/transformprocessor/internal/common/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func WithTraceErrorMode(errorMode ottl.ErrorMode) TraceParserCollectionOption {
func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) {
pcOptions := []ottl.ParserCollectionOption[TracesConsumer]{
withCommonContextParsers[TracesConsumer](),
ottl.EnableParserCollectionModifiedStatementLogging[TracesConsumer](true),
}

for _, option := range options {
Expand Down
20 changes: 16 additions & 4 deletions processor/transformprocessor/internal/logs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand All @@ -16,8 +17,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

type parsedContextStatements struct {
common.LogsConsumer
sharedCache bool
}

type Processor struct {
contexts []common.LogsConsumer
contexts []parsedContextStatements
logger *zap.Logger
flatMode bool
}
Expand All @@ -28,14 +34,14 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E
return nil, err
}

contexts := make([]common.LogsConsumer, len(contextStatements))
contexts := make([]parsedContextStatements, len(contextStatements))
var errors error
for i, cs := range contextStatements {
context, err := pc.ParseContextStatements(cs)
if err != nil {
errors = multierr.Append(errors, err)
}
contexts[i] = context
contexts[i] = parsedContextStatements{context, cs.SharedCache}
}

if errors != nil {
Expand All @@ -54,8 +60,14 @@ func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, e
pdatautil.FlattenLogs(ld.ResourceLogs())
defer pdatautil.GroupByResourceLogs(ld.ResourceLogs())
}

sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts))
for _, c := range p.contexts {
err := c.ConsumeLogs(ctx, ld, nil)
var cache *pcommon.Map
if c.sharedCache {
cache = common.LoadContextCache(sharedContextCache, c.Context())
}
err := c.ConsumeLogs(ctx, ld, cache)
if err != nil {
p.logger.Error("failed processing logs", zap.Error(err))
return ld, err
Expand Down
Loading
Loading