Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spike] sdk/log: Change FilterProcessor to Filterer #5825

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add `WithFilterer` provider option and `Filterer` interface in `go.opentelemetry.io/otel/sdk/log` that accepts a newly introduced `FilterParameters` type. (#5825)

### Changed

- Enable exemplars by default in `go.opentelemetry.io/otel/sdk/metric`. Exemplars can be disabled by setting `OTEL_METRICS_EXEMPLAR_FILTER=always_off` (#5778)
Expand All @@ -19,6 +23,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The race condition for multiple `FixedSize` exemplar reservoirs identified in #5814 is resolved. (#5819)
- Fix log records duplication in case of heterogeneous resource attributes by correctly mapping each log record to it's resource and scope. (#5803)

### Removed

- Remove `go.opentelemetry.io/otel/sdk/log/internal/x` package. Filtering is now done via `Filterer` interface in `go.opentelemetry.io/otel/sdk/log`. (#5825)

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
55 changes: 14 additions & 41 deletions sdk/log/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"strings"
"sync"

logapi "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/global"
Expand Down Expand Up @@ -51,19 +50,20 @@ func Example() {
// slog.SetDefault(otelslog.NewLogger("my/pkg/name", otelslog.WithLoggerProvider(provider)))
}

// Use a processor that filters out records based on the provided context.
func ExampleProcessor_filtering() {
// Use a filterer that filters out records based on the provided context.
func ExampleFilterer() {
// Existing processor that emits telemetry.
var processor log.Processor = log.NewBatchProcessor(nil)
processor := log.NewBatchProcessor(nil)

// Wrap the processor so that it ignores processing log records
// Add a filterer so that the SDK ignores processing of log records
// when a context deriving from WithIgnoreLogs is passed
// to the logging methods.
processor = &ContextFilterProcessor{Processor: processor}
filterer := &ContextFilterer{}

// The created processor can then be registered with
// the OpenTelemetry Logs SDK using the WithProcessor option.
_ = log.NewLoggerProvider(
log.WithFilterer(filterer),
log.WithProcessor(processor),
)
}
Expand All @@ -72,54 +72,27 @@ type key struct{}

var ignoreLogsKey key

// WithIgnoreLogs returns a context which is used by [ContextFilterProcessor]
// WithIgnoreLogs returns a context which is used by [ContextFilterer]
// to filter out log records.
func WithIgnoreLogs(ctx context.Context) context.Context {
return context.WithValue(ctx, ignoreLogsKey, true)
}

// ContextFilterProcessor filters out logs when a context deriving from
// [WithIgnoreLogs] is passed to its methods.
type ContextFilterProcessor struct {
log.Processor
// ContextFilterer filters out logs when a context deriving from
// [WithIgnoreLogs] is passed to Logger's methods.
type ContextFilterer struct{}

lazyFilter sync.Once
// Use the experimental FilterProcessor interface
// (go.opentelemetry.io/otel/sdk/log/internal/x).
filter filter
}

type filter interface {
Enabled(ctx context.Context, param logapi.EnabledParameters) bool
}

func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
if ignoreLogs(ctx) {
return nil
}
return p.Processor.OnEmit(ctx, record)
}

func (p *ContextFilterProcessor) Enabled(ctx context.Context, param logapi.EnabledParameters) bool {
p.lazyFilter.Do(func() {
if f, ok := p.Processor.(filter); ok {
p.filter = f
}
})
return !ignoreLogs(ctx) && (p.filter == nil || p.filter.Enabled(ctx, param))
}

func ignoreLogs(ctx context.Context) bool {
func (p *ContextFilterer) Filter(ctx context.Context, param log.FilterParameters) bool {
_, ok := ctx.Value(ignoreLogsKey).(bool)
return ok
}

// Use a processor which redacts sensitive data from some attributes.
func ExampleProcessor_redact() {
func ExampleProcessor() {
// Existing processor that emits telemetry.
var processor log.Processor = log.NewBatchProcessor(nil)
processor := log.NewBatchProcessor(nil)

// Add a processor so that it redacts values from token attributes.
// Add a processor so that the SDK redacts values from token attributes.
redactProcessor := &RedactTokensProcessor{}

// The created processor can then be registered with
Expand Down
50 changes: 50 additions & 0 deletions sdk/log/filterer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"

"go.opentelemetry.io/otel/log"
)

// Filterer handles filtering of log records.
//
// Any of the Filterer's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Filterer to manage
// this concurrency.
type Filterer interface {
// Filter returns whether the SDK will process for the given context
// and param.
//
// The passed param may be a partial record (e.g a record with only the
// Severity set). If a Filterer needs more information than is provided, it
// is said to be in an indeterminate state. An implementation should
// return true for an indeterminate state.
//
// The returned value will be true when the SDK should process for the
// provided context and param, and will be false if the SDK should not
// process.
Filter(ctx context.Context, param FilterParameters) bool
}

// FilterParameters represent Filter parameters.
type FilterParameters struct {
severity log.Severity
severitySet bool

noCmp [0]func() //nolint: unused // This is indeed used.
}

// Severity returns the [Severity] level value, or [SeverityUndefined] if no value was set.
// The ok result indicates whether the value was set.
func (r *FilterParameters) Severity() (value log.Severity, ok bool) {
return r.severity, r.severitySet
}

// setSeverity sets the [Severity] level.
func (r *FilterParameters) setSeverity(level log.Severity) {
r.severity = level
r.severitySet = true
}
35 changes: 0 additions & 35 deletions sdk/log/internal/x/README.md

This file was deleted.

47 changes: 0 additions & 47 deletions sdk/log/internal/x/x.go

This file was deleted.

37 changes: 16 additions & 21 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/trace"
)

Expand All @@ -35,6 +34,12 @@ func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger {
}

func (l *logger) Emit(ctx context.Context, r log.Record) {
param := log.EnabledParameters{}
param.SetSeverity(r.Severity())
if !l.Enabled(ctx, param) {
Comment on lines +38 to +39
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should be possible to make it possible to filter out based on more parameters than passed via Logger.Enabled? E.g. log record body, attributes, etc.

This could be enhanced later without making any breaking change.

return
}
XSAM marked this conversation as resolved.
Show resolved Hide resolved

newRecord := l.newRecord(ctx, r)
for _, p := range l.provider.processors {
if err := p.OnEmit(ctx, &newRecord); err != nil {
Expand All @@ -43,30 +48,20 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
}
}

// Enabled returns true if at least one Processor held by the LoggerProvider
// that created the logger will process param for the provided context and param.
//
// If it is not possible to definitively determine the param will be
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process.
// Enabled returns false if at least one Filterer held by the LoggerProvider
// that created the logger will return false for the provided context and param.
func (l *logger) Enabled(ctx context.Context, param log.EnabledParameters) bool {
fltrs := l.provider.filterProcessors()
// If there are more Processors than FilterProcessors we cannot be sure
// that all Processors will drop the record. Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(fltrs) || anyEnabled(ctx, param, fltrs)
}
newParam := FilterParameters{}
if v, ok := param.Severity(); ok {
newParam.setSeverity(v)
}

func anyEnabled(ctx context.Context, param log.EnabledParameters, fltrs []x.FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, param) {
// At least one Processor will process the Record.
return true
for _, flt := range l.provider.filterers {
if !flt.Filter(ctx, newParam) {
return false
}
}
// No Processor will process the record
return false
return true
}

func (l *logger) newRecord(ctx context.Context, r log.Record) Record {
Expand Down
Loading
Loading