From cb3050bd52cb238c13f98e5ac0488515ae6dbd06 Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Mon, 18 Mar 2024 18:19:52 +0100 Subject: [PATCH 1/2] Added support for propagating OpenTelemetry baggage --- .../OpenTelemetryDecorators.cs | 12 ++- .../OpenTelemetryTracingTests.cs | 78 ++++++++++++++++++- 2 files changed, 85 insertions(+), 5 deletions(-) diff --git a/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs b/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs index 6bb6c844ff..a651d3a54b 100644 --- a/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs +++ b/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs @@ -3,6 +3,7 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using OpenTelemetry; using OpenTelemetry.Trace; using Proto.Extensions; using Proto.Mailbox; @@ -109,7 +110,7 @@ public override Task Receive(MessageEnvelope envelope) => OpenTelemetryMethodsDecorators.Receive(Source, envelope, _receiveActivitySetup, () => base.Receive(envelope)); - public override void Respond(object message)=> + public override void Respond(object message) => OpenTelemetryMethodsDecorators.Respond(message, () => base.Respond(message)); @@ -275,7 +276,7 @@ internal static void Forward(string source, PID target, object message, Activity throw; } } - + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static void Respond(object message, Action respond) @@ -312,6 +313,11 @@ internal static async Task Receive(string source, MessageEnvelope envelope, Acti var propagationContext = envelope.Header.ExtractPropagationContext(); + if (propagationContext.Baggage.Count > 0) + { + Baggage.Current = propagationContext.Baggage; + } + using var activity = OpenTelemetryHelpers.BuildStartedActivity(propagationContext.ActivityContext, source, nameof(Receive), message, receiveActivitySetup); @@ -339,4 +345,4 @@ internal static async Task Receive(string source, MessageEnvelope envelope, Acti throw; } } -} +} \ No newline at end of file diff --git a/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs b/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs index 5876d09f49..4a8bf51729 100644 --- a/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs +++ b/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading.Tasks; using FluentAssertions; +using OpenTelemetry; using OpenTelemetry.Trace; using Proto.Future; using Xunit; @@ -12,6 +13,11 @@ namespace Proto.OpenTelemetry.Tests; public class OpenTelemetryTracingTests : IClassFixture { + private static readonly Baggage TestBaggage = Baggage.Create(new Dictionary + { + {"baggageKey", "baggageValue"} + }); + private static readonly Props ProxyTraceActorProps = Props.FromProducer(() => new TraceTestActor()).WithTracing(); private static readonly Props InnerTraceActorProps = Props.FromFunc(context => @@ -22,7 +28,7 @@ public class OpenTelemetryTracingTests : IClassFixture if (context.Sender is not null) { - context.Respond(new TraceResponse()); + context.Respond(GetTraceResponse() ); } } @@ -31,6 +37,13 @@ public class OpenTelemetryTracingTests : IClassFixture ) .WithTracing(); + static TraceResponse GetTraceResponse() + { + return Baggage.Current.Count > 0 + ? new TraceResponse(Baggage.Current) + : new TraceResponse(); + } + private static readonly ActivitySource TestSource = new("Proto.Actor.Tests"); private readonly ActivityFixture _fixture; @@ -122,6 +135,67 @@ private void TracesPropagateCorrectly(ActivitySpanId outerSpanId, ActivityTraceI inner.Should().NotBeNull(); } + + [Fact] + public async Task TracesPropagateCorrectlyWithBaggageForRequestAsync() => + await VerifyTrace(async (rootContext, target) => + { + Baggage.Current = TestBaggage; + var response = await rootContext.RequestAsync(target, new TraceMe(SendAs.RequestAsync)); + response.Should().BeEquivalentTo(new TraceResponse(TestBaggage)); + } + ); + + [Fact] + public async Task TracesPropagateCorrectlyWithBaggageForRequest() => + await VerifyTrace(async (rootContext, target) => + { + Baggage.Current = TestBaggage; + rootContext.Request(target, new TraceMe(SendAs.Request)); + await Task.Delay(100); + } + ); + + [Fact] + public async Task TracesPropagateCorrectlyWithBaggageForRequestWithForward() => + await VerifyTrace(async (rootContext, target) => + { + Baggage.Current = TestBaggage; + var response = await rootContext.RequestAsync(target, new TraceMe(SendAs.Forward)); + response.Should().BeEquivalentTo(new TraceResponse(TestBaggage)); + } + ); + + [Fact] + public async Task TracesPropagateCorrectlyWithBaggageForRequestWithSender() => + await VerifyTrace(async (rootContext, target) => + { + Baggage.Current = TestBaggage; + var future = new FutureProcess(rootContext.System); + rootContext.Request(target, new TraceMe(SendAs.Request), future.Pid); + var response = (MessageEnvelope)await future.Task; + response.Message.Should().Be(new TraceResponse(TestBaggage)); + } + ); + + [Fact] + public async Task TracesPropagateCorrectlyWithBaggageForRequestWithSenderWithAdditionalMiddleware() => + await VerifyTrace(async (tracedRoot, target) => + { + var middleContext = tracedRoot.WithSenderMiddleware(next => async (context, _, envelope) => + { + var updatedEnvelope = envelope.WithHeader("test", "value"); + await next(context, target, updatedEnvelope); + }); + var future = new FutureProcess(middleContext.System); + middleContext.Request(target, new TraceMe(SendAs.Request), future.Pid); + var response = (MessageEnvelope)await future.Task; + response.Message.Should().Be(new TraceResponse()); + } + ); + + // End + private async Task VerifyTrace(Func action) { var tracedRoot = new ActorSystem().Root.WithTracing(); @@ -175,7 +249,7 @@ private enum SendAs private record TraceMe(SendAs Method); - private record TraceResponse; + private record TraceResponse(Baggage? Baggage = null); public class TraceTestActor : IActor { From 199c66e67135ca6ddc0e24fc5c9b5cc8fdeaa4c5 Mon Sep 17 00:00:00 2001 From: Magne Helleborg Date: Mon, 18 Mar 2024 19:20:02 +0100 Subject: [PATCH 2/2] Updated tests, only set baggage if it's in use --- src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs | 3 ++- tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs b/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs index a651d3a54b..da479ab5ab 100644 --- a/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs +++ b/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs @@ -313,7 +313,8 @@ internal static async Task Receive(string source, MessageEnvelope envelope, Acti var propagationContext = envelope.Header.ExtractPropagationContext(); - if (propagationContext.Baggage.Count > 0) + bool hasBaggage = propagationContext.Baggage.Count > 0 || Baggage.Current.Count > 0; + if (hasBaggage) { Baggage.Current = propagationContext.Baggage; } diff --git a/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs b/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs index 4a8bf51729..b6ac8063fd 100644 --- a/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs +++ b/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs @@ -188,9 +188,10 @@ await VerifyTrace(async (tracedRoot, target) => await next(context, target, updatedEnvelope); }); var future = new FutureProcess(middleContext.System); + Baggage.Current = TestBaggage; middleContext.Request(target, new TraceMe(SendAs.Request), future.Pid); var response = (MessageEnvelope)await future.Task; - response.Message.Should().Be(new TraceResponse()); + response.Message.Should().Be(new TraceResponse(TestBaggage)); } );