diff --git a/.chloggen/tailsamplingprocessor-new-drop-policy-type.yaml b/.chloggen/tailsamplingprocessor-new-drop-policy-type.yaml new file mode 100644 index 000000000000..e63b4298d0d2 --- /dev/null +++ b/.chloggen/tailsamplingprocessor-new-drop-policy-type.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/tailsampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Replaced invert sampled and invert not sampled decision with a new policy type named drop. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37760] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 9bda384d64b2..2811abb56bd8 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -33,6 +33,8 @@ const ( Composite PolicyType = "composite" // And allows defining a And policy, combining the other policies in one And PolicyType = "and" + // Drop allows defining a Drop policy, combining one or more policies to drop traces. + Drop PolicyType = "drop" // SpanCount sample traces that are have more spans per Trace than a given threshold. SpanCount PolicyType = "span_count" // TraceState sample traces with specified values by the given key @@ -100,6 +102,11 @@ type AndCfg struct { SubPolicyCfg []AndSubPolicyCfg `mapstructure:"and_sub_policy"` } +// DropCfg holds the common configuration to all policies under drop policy. +type DropCfg struct { + SubPolicyCfg []AndSubPolicyCfg `mapstructure:"drop_sub_policy"` +} + // CompositeCfg holds the configurable settings to create a composite // sampling policy evaluator. type CompositeCfg struct { @@ -123,6 +130,8 @@ type PolicyCfg struct { CompositeCfg CompositeCfg `mapstructure:"composite"` // Configs for defining and policy AndCfg AndCfg `mapstructure:"and"` + // Configs for defining drop policy + DropCfg DropCfg `mapstructure:"drop"` } // LatencyCfg holds the configurable settings to create a latency filter sampling policy diff --git a/processor/tailsamplingprocessor/drop_helper.go b/processor/tailsamplingprocessor/drop_helper.go new file mode 100644 index 000000000000..5fbd0f7f00b7 --- /dev/null +++ b/processor/tailsamplingprocessor/drop_helper.go @@ -0,0 +1,28 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor" + +import ( + "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" +) + +func getNewDropPolicy(settings component.TelemetrySettings, config *DropCfg) (sampling.PolicyEvaluator, error) { + subPolicyEvaluators := make([]sampling.PolicyEvaluator, len(config.SubPolicyCfg)) + for i := range config.SubPolicyCfg { + policyCfg := &config.SubPolicyCfg[i] + policy, err := getDropSubPolicyEvaluator(settings, policyCfg) + if err != nil { + return nil, err + } + subPolicyEvaluators[i] = policy + } + return sampling.NewDrop(settings.Logger, subPolicyEvaluators), nil +} + +// Return instance of and sub-policy +func getDropSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg) (sampling.PolicyEvaluator, error) { + return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg) +} diff --git a/processor/tailsamplingprocessor/drop_helper_test.go b/processor/tailsamplingprocessor/drop_helper_test.go new file mode 100644 index 000000000000..898824dbcee1 --- /dev/null +++ b/processor/tailsamplingprocessor/drop_helper_test.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package tailsamplingprocessor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" +) + +func TestDropHelper(t *testing.T) { + t.Run("valid", func(t *testing.T) { + actual, err := getNewDropPolicy(componenttest.NewNopTelemetrySettings(), &DropCfg{ + SubPolicyCfg: []AndSubPolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "test-and-policy-1", + Type: Latency, + LatencyCfg: LatencyCfg{ThresholdMs: 100}, + }, + }, + }, + }) + require.NoError(t, err) + + expected := sampling.NewDrop(zap.NewNop(), []sampling.PolicyEvaluator{ + sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0), + }) + assert.Equal(t, expected, actual) + }) + + t.Run("unsupported sampling policy type", func(t *testing.T) { + _, err := getNewDropPolicy(componenttest.NewNopTelemetrySettings(), &DropCfg{ + SubPolicyCfg: []AndSubPolicyCfg{ + { + sharedPolicyCfg: sharedPolicyCfg{ + Name: "test-and-policy-2", + Type: Drop, // nested drop is not allowed + }, + }, + }, + }) + require.EqualError(t, err, "unknown sampling policy type drop") + }) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/and.go b/processor/tailsamplingprocessor/internal/sampling/and.go index 0be2a52e60f7..1f6b2cacc8c3 100644 --- a/processor/tailsamplingprocessor/internal/sampling/and.go +++ b/processor/tailsamplingprocessor/internal/sampling/and.go @@ -29,21 +29,14 @@ func NewAnd( // Evaluate looks at the trace data and returns a corresponding SamplingDecision. func (c *And) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) { // The policy iterates over all sub-policies and returns Sampled if all sub-policies returned a Sampled Decision. - // If any subpolicy returns NotSampled or InvertNotSampled, it returns NotSampled Decision. for _, sub := range c.subpolicies { decision, err := sub.Evaluate(ctx, traceID, trace) if err != nil { return Unspecified, err } - if decision == NotSampled || decision == InvertNotSampled { + if decision == NotSampled { return NotSampled, nil } } return Sampled, nil } - -// OnDroppedSpans is called when the trace needs to be dropped, due to memory -// pressure, before the decision_wait time has been reached. -func (c *And) OnDroppedSpans(pcommon.TraceID, *TraceData) (Decision, error) { - return Sampled, nil -} diff --git a/processor/tailsamplingprocessor/internal/sampling/and_test.go b/processor/tailsamplingprocessor/internal/sampling/and_test.go index 4fe8a081cba3..f641e2569cf6 100644 --- a/processor/tailsamplingprocessor/internal/sampling/and_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/and_test.go @@ -63,7 +63,7 @@ func TestAndEvaluatorSampled(t *testing.T) { assert.Equal(t, Sampled, decision) } -func TestAndEvaluatorStringInvertSampled(t *testing.T) { +func TestAndEvaluatorStringInvertMatch(t *testing.T) { n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"no_match"}, false, 0, true) n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"}) require.NoError(t, err) @@ -88,7 +88,7 @@ func TestAndEvaluatorStringInvertSampled(t *testing.T) { assert.Equal(t, Sampled, decision) } -func TestAndEvaluatorStringInvertNotSampled(t *testing.T) { +func TestAndEvaluatorStringInvertNotMatch(t *testing.T) { n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"attribute_value"}, false, 0, true) n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"}) require.NoError(t, err) diff --git a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go index 25a1643759e9..466b27dc6384 100644 --- a/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/boolean_tag_filter_test.go @@ -68,17 +68,17 @@ func TestBooleanTagFilterInverted(t *testing.T) { { Desc: "non-matching span attribute", Trace: newTraceBoolAttrs(empty, "non_matching", true), - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "span attribute with non matching boolean value", Trace: newTraceBoolAttrs(empty, "example", false), - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "span attribute with matching boolean value", Trace: newTraceBoolAttrs(empty, "example", true), - Decision: InvertNotSampled, + Decision: NotSampled, }, } diff --git a/processor/tailsamplingprocessor/internal/sampling/composite.go b/processor/tailsamplingprocessor/internal/sampling/composite.go index 0c98c5a1f195..9e89a3b7bc12 100644 --- a/processor/tailsamplingprocessor/internal/sampling/composite.go +++ b/processor/tailsamplingprocessor/internal/sampling/composite.go @@ -99,7 +99,7 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace return Unspecified, err } - if decision == Sampled || decision == InvertSampled { + if decision == Sampled { // The subpolicy made a decision to Sample. Now we need to make our decision. // Calculate resulting SPS counter if we decide to sample this trace @@ -123,21 +123,3 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace return NotSampled, nil } - -// OnDroppedSpans is called when the trace needs to be dropped, due to memory -// pressure, before the decision_wait time has been reached. -func (c *Composite) OnDroppedSpans(pcommon.TraceID, *TraceData) (Decision, error) { - // Here we have a number of possible solutions: - // 1. Random sample traces based on maxTotalSPS. - // 2. Perform full composite sampling logic by calling Composite.Evaluate(), essentially - // using partial trace data for sampling. - // 3. Sample everything. - // - // It seems that #2 may be the best choice from end user perspective, but - // it is not certain and it is also additional performance penalty when we are - // already under a memory (and possibly CPU) pressure situation. - // - // For now we are playing safe and go with #3. Investigating alternate options - // should be a future task. - return Sampled, nil -} diff --git a/processor/tailsamplingprocessor/internal/sampling/drop.go b/processor/tailsamplingprocessor/internal/sampling/drop.go new file mode 100644 index 000000000000..be55493b88c0 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/drop.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" + +import ( + "context" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.uber.org/zap" +) + +type Drop struct { + // the subpolicy evaluators + subpolicies []PolicyEvaluator + logger *zap.Logger +} + +func NewDrop( + logger *zap.Logger, + subpolicies []PolicyEvaluator, +) PolicyEvaluator { + return &Drop{ + subpolicies: subpolicies, + logger: logger, + } +} + +// Evaluate looks at the trace data and returns a corresponding SamplingDecision. +func (c *Drop) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) { + // The policy iterates over all sub-policies and returns Dropped if all + // sub-policies returned a Sampled Decision. If any subpolicy returns + // NotSampled, it returns NotSampled Decision. + for _, sub := range c.subpolicies { + decision, err := sub.Evaluate(ctx, traceID, trace) + if err != nil { + return Unspecified, err + } + if decision == NotSampled { + return NotSampled, nil + } + } + return Dropped, nil +} diff --git a/processor/tailsamplingprocessor/internal/sampling/drop_test.go b/processor/tailsamplingprocessor/internal/sampling/drop_test.go new file mode 100644 index 000000000000..39e50f8f6ee7 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/sampling/drop_test.go @@ -0,0 +1,114 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sampling + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" +) + +func TestDropEvaluatorNotSampled(t *testing.T) { + n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "name", []string{"value"}, false, 0, false) + n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"}) + require.NoError(t, err) + + and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2}) + + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + + span := ils.Spans().AppendEmpty() + span.Status().SetCode(ptrace.StatusCodeError) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + + trace := &TraceData{ + ReceivedBatches: traces, + } + decision, err := and.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate and policy: %v", err) + assert.Equal(t, NotSampled, decision) +} + +func TestDropEvaluatorSampled(t *testing.T) { + n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"attribute_value"}, false, 0, false) + n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"}) + require.NoError(t, err) + + and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2}) + + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + + span := ils.Spans().AppendEmpty() + span.Attributes().PutStr("attribute_name", "attribute_value") + span.Status().SetCode(ptrace.StatusCodeError) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + + trace := &TraceData{ + ReceivedBatches: traces, + } + decision, err := and.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate and policy: %v", err) + assert.Equal(t, Dropped, decision) +} + +func TestDropEvaluatorStringInvertMatch(t *testing.T) { + n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"no_match"}, false, 0, true) + n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"}) + require.NoError(t, err) + + and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2}) + + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + + span := ils.Spans().AppendEmpty() + span.Attributes().PutStr("attribute_name", "attribute_value") + span.Status().SetCode(ptrace.StatusCodeError) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + + trace := &TraceData{ + ReceivedBatches: traces, + } + decision, err := and.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate and policy: %v", err) + assert.Equal(t, Dropped, decision) +} + +func TestDropEvaluatorStringInvertNotMatch(t *testing.T) { + n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"attribute_value"}, false, 0, true) + n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"}) + require.NoError(t, err) + + and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2}) + + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ils := rs.ScopeSpans().AppendEmpty() + + span := ils.Spans().AppendEmpty() + span.Attributes().PutStr("attribute_name", "attribute_value") + span.Status().SetCode(ptrace.StatusCodeError) + span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + + trace := &TraceData{ + ReceivedBatches: traces, + } + decision, err := and.Evaluate(context.Background(), traceID, trace) + require.NoError(t, err, "Failed to evaluate and policy: %v", err) + assert.Equal(t, NotSampled, decision) +} diff --git a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go index 5864d4c7065c..9c92d2091f58 100644 --- a/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/numeric_tag_filter_test.go @@ -99,47 +99,47 @@ func TestNumericTagFilterInverted(t *testing.T) { { Desc: "nonmatching span attribute", Trace: newTraceIntAttrs(empty, "non_matching", math.MinInt32), - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "span attribute at the lower limit", Trace: newTraceIntAttrs(empty, "example", math.MinInt32), - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "resource attribute at the lower limit", Trace: newTraceIntAttrs(map[string]any{"example": math.MinInt32}, "non_matching", math.MinInt32), - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "span attribute at the upper limit", Trace: newTraceIntAttrs(empty, "example", math.MaxInt32), - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "resource attribute at the upper limit", Trace: newTraceIntAttrs(map[string]any{"example": math.MaxInt32}, "non_matching", math.MaxInt32), - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "span attribute below min limit", Trace: newTraceIntAttrs(empty, "example", math.MinInt32-1), - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "resource attribute below min limit", Trace: newTraceIntAttrs(map[string]any{"example": math.MinInt32 - 1}, "non_matching", math.MinInt32), - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "span attribute above max limit", Trace: newTraceIntAttrs(empty, "example", math.MaxInt32+1), - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "resource attribute above max limit", Trace: newTraceIntAttrs(map[string]any{"example": math.MaxInt32 + 1}, "non_matching", math.MaxInt32+1), - Decision: InvertSampled, + Decision: Sampled, }, } diff --git a/processor/tailsamplingprocessor/internal/sampling/policy.go b/processor/tailsamplingprocessor/internal/sampling/policy.go index 9441e8cfb6f0..958acb67e4b5 100644 --- a/processor/tailsamplingprocessor/internal/sampling/policy.go +++ b/processor/tailsamplingprocessor/internal/sampling/policy.go @@ -42,17 +42,11 @@ const ( // NotSampled is used to indicate that the decision was already taken // to not sample the data. NotSampled - // Dropped is used when data needs to be purged before the sampling policy - // had a chance to evaluate it. + // Dropped is used to indicate that a trace should be dropped regardless of + // all other decisions. Dropped // Error is used to indicate that policy evaluation was not succeeded. Error - // InvertSampled is used on the invert match flow and indicates to sample - // the data. - InvertSampled - // InvertNotSampled is used on the invert match flow and indicates to not - // sample the data. - InvertNotSampled ) // PolicyEvaluator implements a tail-based sampling policy evaluator, diff --git a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go index e9ee3da86773..cb12643b58b9 100644 --- a/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/string_tag_filter_test.go @@ -99,103 +99,103 @@ func TestStringTagFilter(t *testing.T) { Desc: "invert nonmatching node attribute key", Trace: newTraceStringAttrs(map[string]any{"non_matching": "value"}, "", ""), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "invert nonmatching node attribute value", Trace: newTraceStringAttrs(map[string]any{"example": "non_matching"}, "", ""), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "invert nonmatching node attribute list", Trace: newTraceStringAttrs(map[string]any{"example": "non_matching"}, "", ""), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"first_value", "value", "last_value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "invert matching node attribute", Trace: newTraceStringAttrs(map[string]any{"example": "value"}, "", ""), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "invert matching node attribute list", Trace: newTraceStringAttrs(map[string]any{"example": "value"}, "", ""), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"first_value", "value", "last_value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "invert nonmatching span attribute key", Trace: newTraceStringAttrs(nil, "nonmatching", "value"), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "invert nonmatching span attribute value", Trace: newTraceStringAttrs(nil, "example", "nonmatching"), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "invert nonmatching span attribute list", Trace: newTraceStringAttrs(nil, "example", "nonmatching"), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"first_value", "value", "last_value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "invert matching span attribute", Trace: newTraceStringAttrs(nil, "example", "value"), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "invert matching span attribute list", Trace: newTraceStringAttrs(nil, "example", "value"), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"first_value", "value", "last_value"}, EnabledRegexMatching: false, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "invert matching span attribute with regex", Trace: newTraceStringAttrs(nil, "example", "grpc.health.v1.HealthCheck"), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"v[0-9]+.HealthCheck$"}, EnabledRegexMatching: true, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "invert matching span attribute with regex list", Trace: newTraceStringAttrs(nil, "example", "grpc.health.v1.HealthCheck"), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"^http", "v[0-9]+.HealthCheck$", "metrics$"}, EnabledRegexMatching: true, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "invert nonmatching span attribute with regex", Trace: newTraceStringAttrs(nil, "example", "grpc.health.v1.HealthCheck"), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"v[a-z]+.HealthCheck$"}, EnabledRegexMatching: true, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "invert nonmatching span attribute with regex list", Trace: newTraceStringAttrs(nil, "example", "grpc.health.v1.HealthCheck"), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"^http", "v[a-z]+.HealthCheck$", "metrics$"}, EnabledRegexMatching: true, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertSampled, + Decision: Sampled, }, { Desc: "invert matching plain text node attribute in regex", Trace: newTraceStringAttrs(map[string]any{"example": "value"}, "", ""), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"value"}, EnabledRegexMatching: true, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "invert matching plain text node attribute in regex list", Trace: newTraceStringAttrs(map[string]any{"example": "value"}, "", ""), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{"first_value", "value", "last_value"}, EnabledRegexMatching: true, CacheMaxSize: defaultCacheSize, InvertMatch: true}, - Decision: InvertNotSampled, + Decision: NotSampled, }, { Desc: "invert nonmatching span attribute on empty filter list", Trace: newTraceStringAttrs(nil, "example", "grpc.health.v1.HealthCheck"), filterCfg: &TestStringAttributeCfg{Key: "example", Values: []string{}, EnabledRegexMatching: true, InvertMatch: true}, - Decision: InvertSampled, + Decision: Sampled, }, } diff --git a/processor/tailsamplingprocessor/internal/sampling/util.go b/processor/tailsamplingprocessor/internal/sampling/util.go index 6b896e526d04..9a6e47d401a7 100644 --- a/processor/tailsamplingprocessor/internal/sampling/util.go +++ b/processor/tailsamplingprocessor/internal/sampling/util.go @@ -23,7 +23,7 @@ func hasResourceOrSpanWithCondition( return Sampled } - if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSampleSpan) { + if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSampleSpan, false) { return Sampled } } @@ -42,14 +42,14 @@ func invertHasResourceOrSpanWithCondition( resource := rs.Resource() if !shouldSampleResource(resource) { - return InvertNotSampled + return NotSampled } - if !invertHasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSampleSpan) { - return InvertNotSampled + if !hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSampleSpan, true) { + return NotSampled } } - return InvertSampled + return Sampled } // hasSpanWithCondition iterates through all the instrumentation library spans until any callback returns true. @@ -57,39 +57,24 @@ func hasSpanWithCondition(td ptrace.Traces, shouldSample func(span ptrace.Span) for i := 0; i < td.ResourceSpans().Len(); i++ { rs := td.ResourceSpans().At(i) - if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSample) { + if hasInstrumentationLibrarySpanWithCondition(rs.ScopeSpans(), shouldSample, false) { return Sampled } } return NotSampled } -func hasInstrumentationLibrarySpanWithCondition(ilss ptrace.ScopeSpansSlice, check func(span ptrace.Span) bool) bool { +func hasInstrumentationLibrarySpanWithCondition(ilss ptrace.ScopeSpansSlice, check func(span ptrace.Span) bool, invert bool) bool { for i := 0; i < ilss.Len(); i++ { ils := ilss.At(i) for j := 0; j < ils.Spans().Len(); j++ { span := ils.Spans().At(j) - if check(span) { - return true + if r := check(span); r != invert { + return r } } } - return false -} - -func invertHasInstrumentationLibrarySpanWithCondition(ilss ptrace.ScopeSpansSlice, check func(span ptrace.Span) bool) bool { - for i := 0; i < ilss.Len(); i++ { - ils := ilss.At(i) - - for j := 0; j < ils.Spans().Len(); j++ { - span := ils.Spans().At(j) - - if !check(span) { - return false - } - } - } - return true + return invert } diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 400474450efb..0c3985db5409 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -60,6 +60,7 @@ type tailSamplingSpanProcessor struct { nonSampledIDCache cache.Cache[bool] deleteChan chan pcommon.TraceID numTracesOnMap *atomic.Uint64 + usingDropPolicy bool setPolicyMux sync.Mutex pendingPolicy []PolicyCfg @@ -77,10 +78,9 @@ var ( attrSampledTrue = metric.WithAttributes(attribute.String("sampled", "true")) attrSampledFalse = metric.WithAttributes(attribute.String("sampled", "false")) decisionToAttribute = map[sampling.Decision]metric.MeasurementOption{ - sampling.Sampled: attrSampledTrue, - sampling.NotSampled: attrSampledFalse, - sampling.InvertNotSampled: attrSampledFalse, - sampling.InvertSampled: attrSampledTrue, + sampling.Sampled: attrSampledTrue, + sampling.NotSampled: attrSampledFalse, + sampling.Dropped: attrSampledFalse, } ) @@ -194,6 +194,8 @@ func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (s return getNewCompositePolicy(settings, &cfg.CompositeCfg) case And: return getNewAndPolicy(settings, &cfg.AndCfg) + case Drop: + return getNewDropPolicy(settings, &cfg.DropCfg) default: return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg) } @@ -252,6 +254,7 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error cLen := len(cfgs) policies := make([]*policy, 0, cLen) policyNames := make(map[string]struct{}, cLen) + usingDropPolicy := false for _, cfg := range cfgs { if cfg.Name == "" { @@ -263,6 +266,10 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error } policyNames[cfg.Name] = struct{}{} + if cfg.Type == Drop { + usingDropPolicy = true + } + eval, err := getPolicyEvaluator(telemetrySettings, &cfg) if err != nil { return fmt.Errorf("failed to create policy evaluator for %q: %w", cfg.Name, err) @@ -280,6 +287,7 @@ func (tsp *tailSamplingSpanProcessor) loadSamplingPolicy(cfgs []PolicyCfg) error }) } + tsp.usingDropPolicy = usingDropPolicy tsp.policies = policies tsp.logger.Debug("Loaded sampling policy", zap.Int("policies.len", len(policies))) @@ -375,11 +383,11 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { } func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) sampling.Decision { - var decisions [8]bool + var decisions [6]bool ctx := context.Background() startTime := time.Now() - // Check all policies before making a final decision. + // Evaluate each policy. for _, p := range tsp.policies { decision, err := p.evaluator.Evaluate(ctx, id, trace) latency := time.Since(startTime) @@ -399,24 +407,24 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa } decisions[decision] = true + + // Break early if dropped or on the first sampled decision when not + // using drop policy. This reduces tick/decision latency. + if decision == sampling.Dropped || (!tsp.usingDropPolicy && decision == sampling.Sampled) { + break + } } var finalDecision sampling.Decision switch { - case decisions[sampling.InvertNotSampled]: // InvertNotSampled takes precedence + case decisions[sampling.Dropped]: // Dropped takes precedence finalDecision = sampling.NotSampled case decisions[sampling.Sampled]: - finalDecision = sampling.Sampled - case decisions[sampling.InvertSampled] && !decisions[sampling.NotSampled]: + metrics.decisionSampled++ finalDecision = sampling.Sampled default: - finalDecision = sampling.NotSampled - } - - if finalDecision == sampling.Sampled { - metrics.decisionSampled++ - } else { metrics.decisionNotSampled++ + finalDecision = sampling.NotSampled } return finalDecision diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index 260c724a38c1..80d1de69ffc6 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -62,49 +62,6 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { require.EqualValues(t, 1, nextConsumer.SpanCount()) } -func TestSamplingPolicyInvertSampled(t *testing.T) { - cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: defaultNumTraces, - } - nextConsumer := new(consumertest.TracesSink) - idb := newSyncIDBatcher() - - mpe1 := &mockPolicyEvaluator{} - - policies := []*policy{ - {name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))}, - } - - p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies)) - require.NoError(t, err) - - require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost())) - defer func() { - require.NoError(t, p.Shutdown(context.Background())) - }() - - mpe1.NextDecision = sampling.InvertSampled - - // Generate and deliver first span - require.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces())) - - tsp := p.(*tailSamplingSpanProcessor) - - // The first tick won't do anything - tsp.policyTicker.OnTick() - require.EqualValues(t, 0, mpe1.EvaluationCount) - - // This will cause policy evaluations on the first span - tsp.policyTicker.OnTick() - - // Both policies should have been evaluated once - require.EqualValues(t, 1, mpe1.EvaluationCount) - - // The final decision SHOULD be Sampled. - require.EqualValues(t, 1, nextConsumer.SpanCount()) -} - func TestSamplingMultiplePolicies(t *testing.T) { cfg := Config{ DecisionWait: defaultTestDecisionWait, @@ -129,8 +86,7 @@ func TestSamplingMultiplePolicies(t *testing.T) { require.NoError(t, p.Shutdown(context.Background())) }() - // InvertNotSampled takes precedence - mpe1.NextDecision = sampling.Sampled + mpe1.NextDecision = sampling.NotSampled mpe2.NextDecision = sampling.Sampled // Generate and deliver first span @@ -176,7 +132,6 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { require.NoError(t, p.Shutdown(context.Background())) }() - // InvertNotSampled takes precedence mpe1.NextDecision = sampling.NotSampled // Generate and deliver first span @@ -198,7 +153,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { require.EqualValues(t, 0, nextConsumer.SpanCount()) } -func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { +func TestSamplingPolicyDecisionDropped(t *testing.T) { cfg := Config{ DecisionWait: defaultTestDecisionWait, NumTraces: defaultNumTraces, @@ -222,14 +177,15 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { require.NoError(t, p.Shutdown(context.Background())) }() - // InvertNotSampled takes precedence - mpe1.NextDecision = sampling.InvertNotSampled - mpe2.NextDecision = sampling.Sampled + // Dropped takes precedence + mpe1.NextDecision = sampling.Sampled + mpe2.NextDecision = sampling.Dropped // Generate and deliver first span require.NoError(t, p.ConsumeTraces(context.Background(), simpleTraces())) tsp := p.(*tailSamplingSpanProcessor) + tsp.usingDropPolicy = true // The first tick won't do anything tsp.policyTicker.OnTick() @@ -275,7 +231,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { traceID := uInt64ToTraceID(1) // The combined decision from the policies is NotSampled - mpe1.NextDecision = sampling.InvertSampled + mpe1.NextDecision = sampling.NotSampled mpe2.NextDecision = sampling.NotSampled // A function that return a ptrace.Traces containing a single span for the single trace we are using. @@ -307,6 +263,8 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { // The final decision SHOULD be NotSampled. require.EqualValues(t, 0, nextConsumer.SpanCount()) + mpe2.NextDecision = sampling.Sampled + // Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span. // The policies should NOT be evaluated again. require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(2)))