diff --git a/exporter/loadbalancingexporter/README.md b/exporter/loadbalancingexporter/README.md index eecaa3e389fd..6831f6522707 100644 --- a/exporter/loadbalancingexporter/README.md +++ b/exporter/loadbalancingexporter/README.md @@ -111,11 +111,13 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th * This resolver currently returns a maximum of 100 hosts. * `TODO`: Feature request [29771](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29771) aims to cover the pagination for this scenario * The `routing_key` property is used to specify how to route values (spans or metrics) to exporters based on different parameters. This functionality is currently enabled only for `trace` and `metric` pipeline types. It supports one of the following values: - * `service`: Routes values based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate. + * `service`: Routes values based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate. In addition to resource / span attributes, `span.kind`, `span.name` (the top level properties of a span) are also supported. + * `attributes`: Routes based on values in the attributes of the traces. This is similar to service, but useful for situations in which a single service overwhelms any given instance of the collector, and should be split over multiple collectors. * `traceID`: Routes spans based on their `traceID`. Invalid for metrics. * `metric`: Routes metrics based on their metric name. Invalid for spans. * `streamID`: Routes metrics based on their datapoint streamID. That's the unique hash of all it's attributes, plus the attributes and identifying information of its resource, scope, and metric data * loadbalancing exporter supports set of standard [queuing, retry and timeout settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md), but they are disable by default to maintain compatibility +* The `routing_attributes` property is used to list the attributes that should be used if the `routing_key` is `attributes`. Simple example diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index b9682df16892..d997ed1e9db8 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -20,6 +20,7 @@ const ( metricNameRouting resourceRouting streamIDRouting + attrRouting ) const ( @@ -28,6 +29,7 @@ const ( metricNameRoutingStr = "metric" resourceRoutingStr = "resource" streamIDRoutingStr = "streamID" + attrRoutingStr = "attributes" ) // Config defines configuration for the exporter. @@ -36,9 +38,17 @@ type Config struct { configretry.BackOffConfig `mapstructure:"retry_on_failure"` QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"` - Protocol Protocol `mapstructure:"protocol"` - Resolver ResolverSettings `mapstructure:"resolver"` - RoutingKey string `mapstructure:"routing_key"` + Protocol Protocol `mapstructure:"protocol"` + Resolver ResolverSettings `mapstructure:"resolver"` + + // RoutingKey is a single routing key value + RoutingKey string `mapstructure:"routing_key"` + + // RoutingAttributes creates a composite routing key, based on several resource attributes of the application. + // + // Supports all attributes available (both resource and span), as well as the pseudo attributes "span.kind" and + // "span.name". + RoutingAttributes []string `mapstructure:"routing_attributes"` } // Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment. diff --git a/exporter/loadbalancingexporter/trace_exporter.go b/exporter/loadbalancingexporter/trace_exporter.go index e6fb9647d977..692144b3c182 100644 --- a/exporter/loadbalancingexporter/trace_exporter.go +++ b/exporter/loadbalancingexporter/trace_exporter.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "time" @@ -22,6 +23,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal" ) +const ( + pseudoAttrSpanName = "span.name" + pseudoAttrSpanKind = "span.kind" +) + var _ exporter.Traces = (*traceExporterImp)(nil) type exporterTraces map[*wrappedExporter]ptrace.Traces @@ -29,6 +35,7 @@ type exporterTraces map[*wrappedExporter]ptrace.Traces type traceExporterImp struct { loadBalancer *loadBalancer routingKey routingKey + routingAttrs []string stopped bool shutdownWg sync.WaitGroup @@ -64,6 +71,9 @@ func newTracesExporter(params exporter.Settings, cfg component.Config) (*traceEx switch cfg.(*Config).RoutingKey { case svcRoutingStr: traceExporter.routingKey = svcRouting + case attrRoutingStr: + traceExporter.routingKey = attrRouting + traceExporter.routingAttrs = cfg.(*Config).RoutingAttributes case traceIDRoutingStr, "": default: return nil, fmt.Errorf("unsupported routing_key: %s", cfg.(*Config).RoutingKey) @@ -92,7 +102,7 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) exporterSegregatedTraces := make(exporterTraces) endpoints := make(map[*wrappedExporter]string) for _, batch := range batches { - routingID, err := routingIdentifiersFromTraces(batch, e.routingKey) + routingID, err := routingIdentifiersFromTraces(batch, e.routingKey, e.routingAttrs) if err != nil { return err } @@ -133,7 +143,15 @@ func (e *traceExporterImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) return errs } -func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string]bool, error) { +// routingIdentifiersFromTraces reads the traces and determines an identifier that can be used to define a position on the +// ring hash. It takes the routingKey, defining what type of routing should be used, and a series of attributes +// (optionally) used if the routingKey is attrRouting. +// +// only svcRouting and attrRouting are supported. For attrRouting, any attribute, as well the "pseudo" attributes span.name +// and span.kind are supported. +// +// In practice, makes the assumption that ptrace.Traces includes only one trace of each kind, in the "trace tree". +func routingIdentifiersFromTraces(td ptrace.Traces, rType routingKey, attrs []string) (map[string]bool, error) { ids := make(map[string]bool) rs := td.ResourceSpans() if rs.Len() == 0 { @@ -149,18 +167,72 @@ func routingIdentifiersFromTraces(td ptrace.Traces, key routingKey) (map[string] if spans.Len() == 0 { return nil, errors.New("empty spans") } + // Determine how the key should be populated. + switch rType { + case traceIDRouting: + // The simple case is the TraceID routing. In this case, we just use the string representation of the Trace ID. + tid := spans.At(0).TraceID() + ids[string(tid[:])] = true - if key == svcRouting { - for i := 0; i < rs.Len(); i++ { - svc, ok := rs.At(i).Resource().Attributes().Get("service.name") - if !ok { - return nil, errors.New("unable to get service name") + return ids, nil + case svcRouting: + // Service Name is still handled as an "attribute router", it's just expressed as a "pseudo attribute" + attrs = []string{"service.name"} + case attrRouting: + // By default, we'll just use the input provided. + break + default: + return nil, fmt.Errorf("unsupported routing_key: %d", rType) + } + + // Composite the attributes together as a key. + for i := 0; i < rs.Len(); i++ { + // rKey will never return an error. See + // 1. https://pkg.go.dev/bytes#Buffer.Write + // 2. https://stackoverflow.com/a/70388629 + var rKey strings.Builder + + for _, a := range attrs { + // resource spans + rAttr, ok := rs.At(i).Resource().Attributes().Get(a) + if ok { + rKey.WriteString(rAttr.Str()) + continue + } + + // ils or "instrumentation library spans" + ils := rs.At(0).ScopeSpans() + iAttr, ok := ils.At(0).Scope().Attributes().Get(a) + if ok { + rKey.WriteString(iAttr.Str()) + continue + } + + // the lowest level span (or what engineers regularly interact with) + spans := ils.At(0).Spans() + + if a == pseudoAttrSpanKind { + rKey.WriteString(spans.At(0).Kind().String()) + + continue + } + + if a == pseudoAttrSpanName { + rKey.WriteString(spans.At(0).Name()) + + continue + } + + sAttr, ok := spans.At(0).Attributes().Get(a) + if ok { + rKey.WriteString(sAttr.Str()) + continue } - ids[svc.Str()] = true } - return ids, nil + + // No matter what, there will be a key here (even if that key is ""). + ids[rKey.String()] = true } - tid := spans.At(0).TraceID() - ids[string(tid[:])] = true + return ids, nil } diff --git a/exporter/loadbalancingexporter/trace_exporter_test.go b/exporter/loadbalancingexporter/trace_exporter_test.go index 8751c83e8986..730c6f67e836 100644 --- a/exporter/loadbalancingexporter/trace_exporter_test.go +++ b/exporter/loadbalancingexporter/trace_exporter_test.go @@ -243,6 +243,166 @@ func TestConsumeTracesServiceBased(t *testing.T) { assert.NoError(t, res) } +func TestAttributeBasedRouting(t *testing.T) { + for _, tc := range []struct { + name string + attributes []string + batch ptrace.Traces + res map[string]bool + }{ + { + name: "service name", + attributes: []string{ + "service.name", + }, + batch: simpleTracesWithServiceName(), + + res: map[string]bool{ + "service-name-1": true, + "service-name-2": true, + "service-name-3": true, + }, + }, + { + name: "span name", + attributes: []string{ + "span.name", + }, + batch: func() ptrace.Traces { + traces := ptrace.NewTraces() + traces.ResourceSpans().EnsureCapacity(1) + + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("/foo/bar/baz") + + return traces + }(), + res: map[string]bool{ + "/foo/bar/baz": true, + }, + }, + { + name: "span kind", + attributes: []string{ + "span.kind", + }, + batch: func() ptrace.Traces { + traces := ptrace.NewTraces() + traces.ResourceSpans().EnsureCapacity(1) + + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetKind(ptrace.SpanKindClient) + + return traces + }(), + res: map[string]bool{ + "Client": true, + }, + }, + { + name: "composite; name & span kind", + attributes: []string{ + "service.name", + "span.kind", + }, + batch: func() ptrace.Traces { + traces := ptrace.NewTraces() + traces.ResourceSpans().EnsureCapacity(1) + + res := traces.ResourceSpans().AppendEmpty() + res.Resource().Attributes().PutStr("service.name", "service-name-1") + + span := res.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetKind(ptrace.SpanKindClient) + + return traces + }(), + res: map[string]bool{ + "service-name-1Client": true, + }, + }, + { + name: "composite, but missing attr", + attributes: []string{ + "missing.attribute", + "span.kind", + }, + batch: func() ptrace.Traces { + traces := ptrace.NewTraces() + traces.ResourceSpans().EnsureCapacity(1) + + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetKind(ptrace.SpanKindServer) + + return traces + }(), + res: map[string]bool{ + "Server": true, + }, + }, + { + name: "span attribute", + attributes: []string{ + "http.path", + }, + batch: func() ptrace.Traces { + traces := ptrace.NewTraces() + traces.ResourceSpans().EnsureCapacity(1) + + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.Attributes().PutStr("http.path", "/foo/bar/baz") + + return traces + }(), + res: map[string]bool{ + "/foo/bar/baz": true, + }, + }, + { + name: "composite pseudo, resource and span attributes", + attributes: []string{ + "service.name", + "span.kind", + "http.path", + }, + batch: func() ptrace.Traces { + traces := ptrace.NewTraces() + traces.ResourceSpans().EnsureCapacity(1) + + res := traces.ResourceSpans().AppendEmpty() + res.Resource().Attributes().PutStr("service.name", "service-name-1") + + span := res.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetKind(ptrace.SpanKindClient) + span.Attributes().PutStr("http.path", "/foo/bar/baz") + + return traces + }(), + res: map[string]bool{ + "service-name-1Client/foo/bar/baz": true, + }, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + res, err := routingIdentifiersFromTraces(tc.batch, attrRouting, tc.attributes) + assert.NoError(t, err) + assert.Equal(t, res, tc.res) + }) + } +} + +func TestUnsupportedRoutingKeyInRouting(t *testing.T) { + traces := ptrace.NewTraces() + traces.ResourceSpans().EnsureCapacity(1) + + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetKind(ptrace.SpanKindServer) + + _, err := routingIdentifiersFromTraces(traces, 38, []string{}) + assert.Equal(t, "unsupported routing_key: 38", err.Error()) +} + func TestServiceBasedRoutingForSameTraceId(t *testing.T) { b := pcommon.TraceID([16]byte{1, 2, 3, 4}) for _, tt := range []struct { @@ -261,11 +421,12 @@ func TestServiceBasedRoutingForSameTraceId(t *testing.T) { "same trace id and different services - trace id routing", twoServicesWithSameTraceID(), traceIDRouting, + map[string]bool{string(b[:]): true}, }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey) + res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, []string{}) assert.NoError(t, err) assert.Equal(t, res, tt.res) }) @@ -378,15 +539,17 @@ func TestBatchWithTwoTraces(t *testing.T) { func TestNoTracesInBatch(t *testing.T) { for _, tt := range []struct { - desc string - batch ptrace.Traces - routingKey routingKey - err error + desc string + batch ptrace.Traces + routingKey routingKey + routingAttrs []string + err error }{ { "no resource spans", ptrace.NewTraces(), traceIDRouting, + []string{}, errors.New("empty resource spans"), }, { @@ -397,6 +560,7 @@ func TestNoTracesInBatch(t *testing.T) { return batch }(), traceIDRouting, + []string{}, errors.New("empty scope spans"), }, { @@ -407,11 +571,12 @@ func TestNoTracesInBatch(t *testing.T) { return batch }(), svcRouting, + []string{}, errors.New("empty spans"), }, } { t.Run(tt.desc, func(t *testing.T) { - res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey) + res, err := routingIdentifiersFromTraces(tt.batch, tt.routingKey, tt.routingAttrs) assert.Equal(t, err, tt.err) assert.Equal(t, res, map[string]bool(nil)) })