diff --git a/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs b/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs index 6bb6c844ff..da479ab5ab 100644 --- a/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs +++ b/src/Proto.OpenTelemetry/OpenTelemetryDecorators.cs @@ -3,6 +3,7 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using OpenTelemetry; using OpenTelemetry.Trace; using Proto.Extensions; using Proto.Mailbox; @@ -109,7 +110,7 @@ public override Task Receive(MessageEnvelope envelope) => OpenTelemetryMethodsDecorators.Receive(Source, envelope, _receiveActivitySetup, () => base.Receive(envelope)); - public override void Respond(object message)=> + public override void Respond(object message) => OpenTelemetryMethodsDecorators.Respond(message, () => base.Respond(message)); @@ -275,7 +276,7 @@ internal static void Forward(string source, PID target, object message, Activity throw; } } - + [MethodImpl(MethodImplOptions.AggressiveInlining)] internal static void Respond(object message, Action respond) @@ -312,6 +313,12 @@ internal static async Task Receive(string source, MessageEnvelope envelope, Acti var propagationContext = envelope.Header.ExtractPropagationContext(); + bool hasBaggage = propagationContext.Baggage.Count > 0 || Baggage.Current.Count > 0; + if (hasBaggage) + { + Baggage.Current = propagationContext.Baggage; + } + using var activity = OpenTelemetryHelpers.BuildStartedActivity(propagationContext.ActivityContext, source, nameof(Receive), message, receiveActivitySetup); @@ -339,4 +346,4 @@ internal static async Task Receive(string source, MessageEnvelope envelope, Acti throw; } } -} +} \ No newline at end of file diff --git a/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs b/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs index 5876d09f49..b6ac8063fd 100644 --- a/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs +++ b/tests/Proto.OpenTelemetry.Tests/OpenTelemetryTracingTests.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading.Tasks; using FluentAssertions; +using OpenTelemetry; using OpenTelemetry.Trace; using Proto.Future; using Xunit; @@ -12,6 +13,11 @@ namespace Proto.OpenTelemetry.Tests; public class OpenTelemetryTracingTests : IClassFixture { + private static readonly Baggage TestBaggage = Baggage.Create(new Dictionary + { + {"baggageKey", "baggageValue"} + }); + private static readonly Props ProxyTraceActorProps = Props.FromProducer(() => new TraceTestActor()).WithTracing(); private static readonly Props InnerTraceActorProps = Props.FromFunc(context => @@ -22,7 +28,7 @@ public class OpenTelemetryTracingTests : IClassFixture if (context.Sender is not null) { - context.Respond(new TraceResponse()); + context.Respond(GetTraceResponse() ); } } @@ -31,6 +37,13 @@ public class OpenTelemetryTracingTests : IClassFixture ) .WithTracing(); + static TraceResponse GetTraceResponse() + { + return Baggage.Current.Count > 0 + ? new TraceResponse(Baggage.Current) + : new TraceResponse(); + } + private static readonly ActivitySource TestSource = new("Proto.Actor.Tests"); private readonly ActivityFixture _fixture; @@ -122,6 +135,68 @@ private void TracesPropagateCorrectly(ActivitySpanId outerSpanId, ActivityTraceI inner.Should().NotBeNull(); } + + [Fact] + public async Task TracesPropagateCorrectlyWithBaggageForRequestAsync() => + await VerifyTrace(async (rootContext, target) => + { + Baggage.Current = TestBaggage; + var response = await rootContext.RequestAsync(target, new TraceMe(SendAs.RequestAsync)); + response.Should().BeEquivalentTo(new TraceResponse(TestBaggage)); + } + ); + + [Fact] + public async Task TracesPropagateCorrectlyWithBaggageForRequest() => + await VerifyTrace(async (rootContext, target) => + { + Baggage.Current = TestBaggage; + rootContext.Request(target, new TraceMe(SendAs.Request)); + await Task.Delay(100); + } + ); + + [Fact] + public async Task TracesPropagateCorrectlyWithBaggageForRequestWithForward() => + await VerifyTrace(async (rootContext, target) => + { + Baggage.Current = TestBaggage; + var response = await rootContext.RequestAsync(target, new TraceMe(SendAs.Forward)); + response.Should().BeEquivalentTo(new TraceResponse(TestBaggage)); + } + ); + + [Fact] + public async Task TracesPropagateCorrectlyWithBaggageForRequestWithSender() => + await VerifyTrace(async (rootContext, target) => + { + Baggage.Current = TestBaggage; + var future = new FutureProcess(rootContext.System); + rootContext.Request(target, new TraceMe(SendAs.Request), future.Pid); + var response = (MessageEnvelope)await future.Task; + response.Message.Should().Be(new TraceResponse(TestBaggage)); + } + ); + + [Fact] + public async Task TracesPropagateCorrectlyWithBaggageForRequestWithSenderWithAdditionalMiddleware() => + await VerifyTrace(async (tracedRoot, target) => + { + var middleContext = tracedRoot.WithSenderMiddleware(next => async (context, _, envelope) => + { + var updatedEnvelope = envelope.WithHeader("test", "value"); + await next(context, target, updatedEnvelope); + }); + var future = new FutureProcess(middleContext.System); + Baggage.Current = TestBaggage; + middleContext.Request(target, new TraceMe(SendAs.Request), future.Pid); + var response = (MessageEnvelope)await future.Task; + response.Message.Should().Be(new TraceResponse(TestBaggage)); + } + ); + + // End + private async Task VerifyTrace(Func action) { var tracedRoot = new ActorSystem().Root.WithTracing(); @@ -175,7 +250,7 @@ private enum SendAs private record TraceMe(SendAs Method); - private record TraceResponse; + private record TraceResponse(Baggage? Baggage = null); public class TraceTestActor : IActor {