Skip to content

Commit

Permalink
[processor/transform] Add support for flat configuration style (#37444)
Browse files Browse the repository at this point in the history
Co-authored-by: Evan Bradley <[email protected]>
  • Loading branch information
edmocosta and evan-bradley authored Feb 3, 2025
1 parent b71d233 commit 56866f4
Show file tree
Hide file tree
Showing 15 changed files with 792 additions and 12 deletions.
30 changes: 30 additions & 0 deletions .chloggen/add-config-flat-statements-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/transformprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for flat configuration style.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29017]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The flat configuration style allows users to configure statements by providing a list of statements instead of a
structured configuration map. The statement's context is expressed by adding the context's name prefix to path names,
which are used to infer and to select the appropriate context for the statement.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
83 changes: 83 additions & 0 deletions processor/transformprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-co

import (
"errors"
"fmt"
"reflect"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/featuregate"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -44,6 +47,86 @@ type Config struct {
logger *zap.Logger
}

// Unmarshal is used internally by mapstructure to parse the transformprocessor configuration (Config),
// adding support to structured and flat configuration styles.
// When the flat configuration style is used, each statement becomes a new common.ContextStatements
// object, with empty [common.ContextStatements.Context] value.
// On the other hand, structured configurations are parsed following the mapstructure Config format.
// Mixed configuration styles are also supported.
//
// Example of flat configuration:
//
// log_statements:
// - set(attributes["service.new_name"], attributes["service.name"])
// - delete_key(attributes, "service.name")
//
// Example of structured configuration:
//
// log_statements:
// - context: "span"
// statements:
// - set(attributes["service.new_name"], attributes["service.name"])
// - delete_key(attributes, "service.name")
func (c *Config) Unmarshal(conf *confmap.Conf) error {
if conf == nil {
return nil
}

contextStatementsFields := map[string]*[]common.ContextStatements{
"trace_statements": &c.TraceStatements,
"metric_statements": &c.MetricStatements,
"log_statements": &c.LogStatements,
}

flatContextStatements := map[string][]int{}
contextStatementsPatch := map[string]any{}
for fieldName := range contextStatementsFields {
if !conf.IsSet(fieldName) {
continue
}
rawVal := conf.Get(fieldName)
values, ok := rawVal.([]any)
if !ok {
return fmt.Errorf("invalid %s type, expected: array, got: %t", fieldName, rawVal)
}
if len(values) == 0 {
continue
}

stmts := make([]any, 0, len(values))
for i, value := range values {
// Array of strings means it's a flat configuration style
if reflect.TypeOf(value).Kind() == reflect.String {
stmts = append(stmts, map[string]any{"statements": []any{value}})
flatContextStatements[fieldName] = append(flatContextStatements[fieldName], i)
} else {
stmts = append(stmts, value)
}
}
contextStatementsPatch[fieldName] = stmts
}

if len(contextStatementsPatch) > 0 {
err := conf.Merge(confmap.NewFromStringMap(contextStatementsPatch))
if err != nil {
return err
}
}

err := conf.Unmarshal(c)
if err != nil {
return err
}

for fieldName, indexes := range flatContextStatements {
for _, i := range indexes {
(*contextStatementsFields[fieldName])[i].SharedCache = true
}
}

return err
}

var _ component.Config = (*Config)(nil)

func (c *Config) Validate() error {
Expand Down
102 changes: 102 additions & 0 deletions processor/transformprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,108 @@ func TestLoadConfig(t *testing.T) {
},
},
},
{
id: component.NewIDWithName(metadata.Type, "flat_configuration"),
expected: &Config{
ErrorMode: ottl.PropagateError,
TraceStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`},
},
{
SharedCache: true,
Statements: []string{`set(resource.attributes["name"], "bear")`},
},
},
MetricStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`},
},
{
SharedCache: true,
Statements: []string{`set(resource.attributes["name"], "bear")`},
},
},
LogStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`},
},
{
SharedCache: true,
Statements: []string{`set(resource.attributes["name"], "bear")`},
},
},
},
},
{
id: component.NewIDWithName(metadata.Type, "mixed_configuration_styles"),
expected: &Config{
ErrorMode: ottl.PropagateError,
TraceStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`},
},
{
Context: "span",
Statements: []string{
`set(attributes["name"], "bear")`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
{
Statements: []string{`set(span.attributes["name"], "lion")`},
},
{
SharedCache: true,
Statements: []string{`set(span.name, "lion") where span.attributes["http.path"] == "/animal"`},
},
},
MetricStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`},
},
{
Context: "resource",
Statements: []string{
`set(attributes["name"], "bear")`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
{
Statements: []string{`set(metric.name, "lion")`},
},
{
SharedCache: true,
Statements: []string{`set(metric.name, "lion") where resource.attributes["http.path"] == "/animal"`},
},
},
LogStatements: []common.ContextStatements{
{
SharedCache: true,
Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`},
},
{
Context: "resource",
Statements: []string{
`set(attributes["name"], "bear")`,
`keep_keys(attributes, ["http.method", "http.path"])`,
},
},
{
Statements: []string{`set(log.attributes["name"], "lion")`},
},
{
SharedCache: true,
Statements: []string{`set(log.body, "lion") where log.attributes["http.path"] == "/animal"`},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.id.Name(), func(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions processor/transformprocessor/internal/common/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"

import (
"go.opentelemetry.io/collector/pdata/pcommon"
)

// LoadContextCache retrieves or creates a context cache map for the given context ID.
// If the cache is not found, a new map is created and stored in the contextCache map.
func LoadContextCache(contextCache map[ContextID]*pcommon.Map, context ContextID) *pcommon.Map {
v, ok := contextCache[context]
if ok {
return v
}
m := pcommon.NewMap()
contextCache[context] = &m
return &m
}
5 changes: 5 additions & 0 deletions processor/transformprocessor/internal/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type ContextStatements struct {
Context ContextID `mapstructure:"context"`
Conditions []string `mapstructure:"conditions"`
Statements []string `mapstructure:"statements"`

// `SharedCache` is an experimental feature that may change or be removed in the future.
// When enabled, it allows the statements cache to be shared across all other groups that share the cache.
// This feature is not configurable via `mapstructure` and cannot be set in configuration files.
SharedCache bool `mapstructure:"-"`
}

func (c ContextStatements) GetStatements() []string {
Expand Down
1 change: 1 addition & 0 deletions processor/transformprocessor/internal/common/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption {
func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) {
pcOptions := []ottl.ParserCollectionOption[LogsConsumer]{
withCommonContextParsers[LogsConsumer](),
ottl.EnableParserCollectionModifiedStatementLogging[LogsConsumer](true),
}

for _, option := range options {
Expand Down
1 change: 1 addition & 0 deletions processor/transformprocessor/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func WithMetricErrorMode(errorMode ottl.ErrorMode) MetricParserCollectionOption
func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) {
pcOptions := []ottl.ParserCollectionOption[MetricsConsumer]{
withCommonContextParsers[MetricsConsumer](),
ottl.EnableParserCollectionModifiedStatementLogging[MetricsConsumer](true),
}

for _, option := range options {
Expand Down
1 change: 1 addition & 0 deletions processor/transformprocessor/internal/common/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func WithTraceErrorMode(errorMode ottl.ErrorMode) TraceParserCollectionOption {
func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) {
pcOptions := []ottl.ParserCollectionOption[TracesConsumer]{
withCommonContextParsers[TracesConsumer](),
ottl.EnableParserCollectionModifiedStatementLogging[TracesConsumer](true),
}

for _, option := range options {
Expand Down
20 changes: 16 additions & 4 deletions processor/transformprocessor/internal/logs/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand All @@ -16,8 +17,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common"
)

type parsedContextStatements struct {
common.LogsConsumer
sharedCache bool
}

type Processor struct {
contexts []common.LogsConsumer
contexts []parsedContextStatements
logger *zap.Logger
flatMode bool
}
Expand All @@ -28,14 +34,14 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E
return nil, err
}

contexts := make([]common.LogsConsumer, len(contextStatements))
contexts := make([]parsedContextStatements, len(contextStatements))
var errors error
for i, cs := range contextStatements {
context, err := pc.ParseContextStatements(cs)
if err != nil {
errors = multierr.Append(errors, err)
}
contexts[i] = context
contexts[i] = parsedContextStatements{context, cs.SharedCache}
}

if errors != nil {
Expand All @@ -54,8 +60,14 @@ func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, e
pdatautil.FlattenLogs(ld.ResourceLogs())
defer pdatautil.GroupByResourceLogs(ld.ResourceLogs())
}

sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts))
for _, c := range p.contexts {
err := c.ConsumeLogs(ctx, ld, nil)
var cache *pcommon.Map
if c.sharedCache {
cache = common.LoadContextCache(sharedContextCache, c.Context())
}
err := c.ConsumeLogs(ctx, ld, cache)
if err != nil {
p.logger.Error("failed processing logs", zap.Error(err))
return ld, err
Expand Down
Loading

0 comments on commit 56866f4

Please sign in to comment.