Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/tailsampling] Replace invert sampled with drop policy #37760

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/tailsamplingprocessor-new-drop-policy-type.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/tailsampling

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Replaced invert sampled and invert not sampled decision with a new policy type named drop.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37760]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 9 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
Composite PolicyType = "composite"
// And allows defining a And policy, combining the other policies in one
And PolicyType = "and"
// Drop allows defining a Drop policy, combining one or more policies to drop traces.
Drop PolicyType = "drop"
// SpanCount sample traces that are have more spans per Trace than a given threshold.
SpanCount PolicyType = "span_count"
// TraceState sample traces with specified values by the given key
Expand Down Expand Up @@ -100,6 +102,11 @@ type AndCfg struct {
SubPolicyCfg []AndSubPolicyCfg `mapstructure:"and_sub_policy"`
}

// DropCfg holds the common configuration to all policies under drop policy.
type DropCfg struct {
SubPolicyCfg []AndSubPolicyCfg `mapstructure:"drop_sub_policy"`
}

// CompositeCfg holds the configurable settings to create a composite
// sampling policy evaluator.
type CompositeCfg struct {
Expand All @@ -123,6 +130,8 @@ type PolicyCfg struct {
CompositeCfg CompositeCfg `mapstructure:"composite"`
// Configs for defining and policy
AndCfg AndCfg `mapstructure:"and"`
// Configs for defining drop policy
DropCfg DropCfg `mapstructure:"drop"`
}

// LatencyCfg holds the configurable settings to create a latency filter sampling policy
Expand Down
28 changes: 28 additions & 0 deletions processor/tailsamplingprocessor/drop_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor"

import (
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)

func getNewDropPolicy(settings component.TelemetrySettings, config *DropCfg) (sampling.PolicyEvaluator, error) {
subPolicyEvaluators := make([]sampling.PolicyEvaluator, len(config.SubPolicyCfg))
for i := range config.SubPolicyCfg {
policyCfg := &config.SubPolicyCfg[i]
policy, err := getDropSubPolicyEvaluator(settings, policyCfg)
if err != nil {
return nil, err
}
subPolicyEvaluators[i] = policy
}
return sampling.NewDrop(settings.Logger, subPolicyEvaluators), nil
}

// Return instance of and sub-policy
func getDropSubPolicyEvaluator(settings component.TelemetrySettings, cfg *AndSubPolicyCfg) (sampling.PolicyEvaluator, error) {
return getSharedPolicyEvaluator(settings, &cfg.sharedPolicyCfg)
}
51 changes: 51 additions & 0 deletions processor/tailsamplingprocessor/drop_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tailsamplingprocessor

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)

func TestDropHelper(t *testing.T) {
t.Run("valid", func(t *testing.T) {
actual, err := getNewDropPolicy(componenttest.NewNopTelemetrySettings(), &DropCfg{
SubPolicyCfg: []AndSubPolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-and-policy-1",
Type: Latency,
LatencyCfg: LatencyCfg{ThresholdMs: 100},
},
},
},
})
require.NoError(t, err)

expected := sampling.NewDrop(zap.NewNop(), []sampling.PolicyEvaluator{
sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0),
})
assert.Equal(t, expected, actual)
})

t.Run("unsupported sampling policy type", func(t *testing.T) {
_, err := getNewDropPolicy(componenttest.NewNopTelemetrySettings(), &DropCfg{
SubPolicyCfg: []AndSubPolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Name: "test-and-policy-2",
Type: Drop, // nested drop is not allowed
},
},
},
})
require.EqualError(t, err, "unknown sampling policy type drop")
})
}
9 changes: 1 addition & 8 deletions processor/tailsamplingprocessor/internal/sampling/and.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,14 @@ func NewAnd(
// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (c *And) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) {
// The policy iterates over all sub-policies and returns Sampled if all sub-policies returned a Sampled Decision.
// If any subpolicy returns NotSampled or InvertNotSampled, it returns NotSampled Decision.
for _, sub := range c.subpolicies {
decision, err := sub.Evaluate(ctx, traceID, trace)
if err != nil {
return Unspecified, err
}
if decision == NotSampled || decision == InvertNotSampled {
if decision == NotSampled {
return NotSampled, nil
}
}
return Sampled, nil
}

// OnDroppedSpans is called when the trace needs to be dropped, due to memory
// pressure, before the decision_wait time has been reached.
func (c *And) OnDroppedSpans(pcommon.TraceID, *TraceData) (Decision, error) {
return Sampled, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestAndEvaluatorSampled(t *testing.T) {
assert.Equal(t, Sampled, decision)
}

func TestAndEvaluatorStringInvertSampled(t *testing.T) {
func TestAndEvaluatorStringInvertMatch(t *testing.T) {
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"no_match"}, false, 0, true)
n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"})
require.NoError(t, err)
Expand All @@ -88,7 +88,7 @@ func TestAndEvaluatorStringInvertSampled(t *testing.T) {
assert.Equal(t, Sampled, decision)
}

func TestAndEvaluatorStringInvertNotSampled(t *testing.T) {
func TestAndEvaluatorStringInvertNotMatch(t *testing.T) {
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"attribute_value"}, false, 0, true)
n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"})
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,17 @@ func TestBooleanTagFilterInverted(t *testing.T) {
{
Desc: "non-matching span attribute",
Trace: newTraceBoolAttrs(empty, "non_matching", true),
Decision: InvertSampled,
Decision: Sampled,
},
{
Desc: "span attribute with non matching boolean value",
Trace: newTraceBoolAttrs(empty, "example", false),
Decision: InvertSampled,
Decision: Sampled,
},
{
Desc: "span attribute with matching boolean value",
Trace: newTraceBoolAttrs(empty, "example", true),
Decision: InvertNotSampled,
Decision: NotSampled,
},
}

Expand Down
20 changes: 1 addition & 19 deletions processor/tailsamplingprocessor/internal/sampling/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace
return Unspecified, err
}

if decision == Sampled || decision == InvertSampled {
if decision == Sampled {
// The subpolicy made a decision to Sample. Now we need to make our decision.

// Calculate resulting SPS counter if we decide to sample this trace
Expand All @@ -123,21 +123,3 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace

return NotSampled, nil
}

// OnDroppedSpans is called when the trace needs to be dropped, due to memory
// pressure, before the decision_wait time has been reached.
func (c *Composite) OnDroppedSpans(pcommon.TraceID, *TraceData) (Decision, error) {
// Here we have a number of possible solutions:
// 1. Random sample traces based on maxTotalSPS.
// 2. Perform full composite sampling logic by calling Composite.Evaluate(), essentially
// using partial trace data for sampling.
// 3. Sample everything.
//
// It seems that #2 may be the best choice from end user perspective, but
// it is not certain and it is also additional performance penalty when we are
// already under a memory (and possibly CPU) pressure situation.
//
// For now we are playing safe and go with #3. Investigating alternate options
// should be a future task.
return Sampled, nil
}
44 changes: 44 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/drop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sampling // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"

import (
"context"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.uber.org/zap"
)

type Drop struct {
// the subpolicy evaluators
subpolicies []PolicyEvaluator
logger *zap.Logger
}

func NewDrop(
logger *zap.Logger,
subpolicies []PolicyEvaluator,
) PolicyEvaluator {
return &Drop{
subpolicies: subpolicies,
logger: logger,
}
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (c *Drop) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace *TraceData) (Decision, error) {
// The policy iterates over all sub-policies and returns Dropped if all
// sub-policies returned a Sampled Decision. If any subpolicy returns
// NotSampled, it returns NotSampled Decision.
for _, sub := range c.subpolicies {
decision, err := sub.Evaluate(ctx, traceID, trace)
if err != nil {
return Unspecified, err
}
if decision == NotSampled {
return NotSampled, nil
}
}
return Dropped, nil
}
114 changes: 114 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/drop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sampling

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

func TestDropEvaluatorNotSampled(t *testing.T) {
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "name", []string{"value"}, false, 0, false)
n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"})
require.NoError(t, err)

and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2})

traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ils := rs.ScopeSpans().AppendEmpty()

span := ils.Spans().AppendEmpty()
span.Status().SetCode(ptrace.StatusCodeError)
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})

trace := &TraceData{
ReceivedBatches: traces,
}
decision, err := and.Evaluate(context.Background(), traceID, trace)
require.NoError(t, err, "Failed to evaluate and policy: %v", err)
assert.Equal(t, NotSampled, decision)
}

func TestDropEvaluatorSampled(t *testing.T) {
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"attribute_value"}, false, 0, false)
n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"})
require.NoError(t, err)

and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2})

traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ils := rs.ScopeSpans().AppendEmpty()

span := ils.Spans().AppendEmpty()
span.Attributes().PutStr("attribute_name", "attribute_value")
span.Status().SetCode(ptrace.StatusCodeError)
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})

trace := &TraceData{
ReceivedBatches: traces,
}
decision, err := and.Evaluate(context.Background(), traceID, trace)
require.NoError(t, err, "Failed to evaluate and policy: %v", err)
assert.Equal(t, Dropped, decision)
}

func TestDropEvaluatorStringInvertMatch(t *testing.T) {
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"no_match"}, false, 0, true)
n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"})
require.NoError(t, err)

and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2})

traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ils := rs.ScopeSpans().AppendEmpty()

span := ils.Spans().AppendEmpty()
span.Attributes().PutStr("attribute_name", "attribute_value")
span.Status().SetCode(ptrace.StatusCodeError)
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})

trace := &TraceData{
ReceivedBatches: traces,
}
decision, err := and.Evaluate(context.Background(), traceID, trace)
require.NoError(t, err, "Failed to evaluate and policy: %v", err)
assert.Equal(t, Dropped, decision)
}

func TestDropEvaluatorStringInvertNotMatch(t *testing.T) {
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "attribute_name", []string{"attribute_value"}, false, 0, true)
n2, err := NewStatusCodeFilter(componenttest.NewNopTelemetrySettings(), []string{"ERROR"})
require.NoError(t, err)

and := NewDrop(zap.NewNop(), []PolicyEvaluator{n1, n2})

traces := ptrace.NewTraces()
rs := traces.ResourceSpans().AppendEmpty()
ils := rs.ScopeSpans().AppendEmpty()

span := ils.Spans().AppendEmpty()
span.Attributes().PutStr("attribute_name", "attribute_value")
span.Status().SetCode(ptrace.StatusCodeError)
span.SetTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})
span.SetSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})

trace := &TraceData{
ReceivedBatches: traces,
}
decision, err := and.Evaluate(context.Background(), traceID, trace)
require.NoError(t, err, "Failed to evaluate and policy: %v", err)
assert.Equal(t, NotSampled, decision)
}
Loading
Loading