From d7b35c19cdc1856ffd7753f77d8713f031650b7c Mon Sep 17 00:00:00 2001 From: sara pellegrini Date: Mon, 15 Jul 2024 11:47:46 +0200 Subject: [PATCH] Introduce processing time metric. (#7097) * Introduce processing time metric. * Move the record of the metrics for a message successfully processed after the outbox transaction commit. * Remove unnecessary comment. * Move the critical time --------- Co-authored-by: SzymonPobiega --- .../When_message_is_processed_successfully.cs | 41 +++++++++++++++++-- .../Metrics/When_message_processing_fails.cs | 23 ++++++++++- .../MeterTests.Verify_MeterAPI.approved.txt | 4 +- .../Helpers/TestingMetricListener.cs | 2 + .../OpenTelemetry/MeterTests.cs | 3 +- .../Incoming/IncomingPipelineMetrics.cs | 36 +++++++++++++--- ...nsportReceiveToPhysicalMessageConnector.cs | 12 ++++-- .../Pipeline/MainPipelineExecutor.cs | 12 ++++-- 8 files changed, 114 insertions(+), 19 deletions(-) diff --git a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_is_processed_successfully.cs b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_is_processed_successfully.cs index 5b67e5dd4ce..03418a84f8c 100644 --- a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_is_processed_successfully.cs +++ b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_is_processed_successfully.cs @@ -30,20 +30,50 @@ public async Task Should_report_successful_message_metric() metricsListener.AssertMetric("nservicebus.messaging.successes", 5); metricsListener.AssertMetric("nservicebus.messaging.fetches", 5); metricsListener.AssertMetric("nservicebus.messaging.failures", 0); + metricsListener.AssertMetric("nservicebus.messaging.critical_time", 5); + metricsListener.AssertMetric("nservicebus.messaging.processing_time", 5); + metricsListener.AssertMetric("nservicebus.messaging.handler_time", 5); metricsListener.AssertTags("nservicebus.messaging.fetches", new Dictionary { ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), ["nservicebus.discriminator"] = "disc", + ["nservicebus.message_type"] = typeof(OutgoingMessage).FullName }); metricsListener.AssertTags("nservicebus.messaging.successes", + new Dictionary + { + ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), + ["nservicebus.discriminator"] = "disc", + ["nservicebus.message_type"] = typeof(OutgoingMessage).FullName + }); + + metricsListener.AssertTags("nservicebus.messaging.critical_time", + new Dictionary + { + ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), + ["nservicebus.discriminator"] = "disc", + ["nservicebus.message_type"] = typeof(OutgoingMessage).FullName + }); + + metricsListener.AssertTags("nservicebus.messaging.processing_time", + new Dictionary + { + ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), + ["nservicebus.discriminator"] = "disc", + ["nservicebus.message_type"] = typeof(OutgoingMessage).FullName + }); + + metricsListener.AssertTags("nservicebus.messaging.handler_time", new Dictionary { ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(EndpointWithMetrics)), ["nservicebus.discriminator"] = "disc", ["nservicebus.message_type"] = typeof(OutgoingMessage).FullName, + ["nservicebus.message_handler_type"] = typeof(EndpointWithMetrics.MessageHandler).FullName, + ["execution.result"] = "success" }); } @@ -68,9 +98,12 @@ public async Task Should_only_tag_most_concrete_type_on_metric() metricsListener.AssertMetric("nservicebus.messaging.fetches", 5); metricsListener.AssertMetric("nservicebus.messaging.failures", 0); - var successEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.queue"); - var successType = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_type"); - var successHandlerType = metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_handler_types"); + var successEndpoint = + metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.queue"); + var successType = + metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_type"); + var successHandlerType = + metricsListener.AssertTagKeyExists("nservicebus.messaging.successes", "nservicebus.message_handler_types"); var fetchedEndpoint = metricsListener.AssertTagKeyExists("nservicebus.messaging.fetches", "nservicebus.queue"); @@ -90,7 +123,7 @@ class EndpointWithMetrics : EndpointConfigurationBuilder { public EndpointWithMetrics() => EndpointSetup(); - class MessageHandler : IHandleMessages + public class MessageHandler : IHandleMessages { readonly Context testContext; diff --git a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_processing_fails.cs b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_processing_fails.cs index 7179c24e2d1..215d520b3dc 100644 --- a/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_processing_fails.cs +++ b/src/NServiceBus.AcceptanceTests/Core/OpenTelemetry/Metrics/When_message_processing_fails.cs @@ -23,12 +23,34 @@ public async Task Should_report_failing_message_metrics() metricsListener.AssertMetric("nservicebus.messaging.fetches", 1); metricsListener.AssertMetric("nservicebus.messaging.failures", 1); metricsListener.AssertMetric("nservicebus.messaging.successes", 0); + metricsListener.AssertMetric("nservicebus.messaging.critical_time", 0); + metricsListener.AssertMetric("nservicebus.messaging.processing_time", 0); + metricsListener.AssertMetric("nservicebus.messaging.handler_time", 1); + + metricsListener.AssertTags("nservicebus.messaging.fetches", + new Dictionary + { + ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(FailingEndpoint)), + ["nservicebus.discriminator"] = "disc", + ["nservicebus.message_type"] = typeof(FailingMessage).FullName + }); metricsListener.AssertTags("nservicebus.messaging.failures", new Dictionary { ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(FailingEndpoint)), ["nservicebus.discriminator"] = "disc", + ["nservicebus.message_type"] = typeof(FailingMessage).FullName, + ["error.type"] = typeof(SimulatedException).FullName, + }); + + metricsListener.AssertTags("nservicebus.messaging.handler_time", + new Dictionary + { + ["nservicebus.queue"] = Conventions.EndpointNamingConvention(typeof(FailingEndpoint)), + ["nservicebus.discriminator"] = "disc", + ["nservicebus.message_type"] = typeof(FailingMessage).FullName, + ["execution.result"] = "failure", ["error.type"] = typeof(SimulatedException).FullName, }); } @@ -55,7 +77,6 @@ public Task Handle(FailingMessage message, IMessageHandlerContext context) } const string ErrorMessage = "oh no!"; - } } diff --git a/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt b/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt index 331b25092e3..a78c375d8c7 100644 --- a/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt +++ b/src/NServiceBus.Core.Tests/ApprovalFiles/MeterTests.Verify_MeterAPI.approved.txt @@ -1,6 +1,7 @@ { "Note": "Changes to metrics API should result in an update to NServiceBusMeter version.", - "ActivitySourceVersion": "0.2.0", + "MetricsSourceName": "NServiceBus.Core.Pipeline.Incoming", + "MetricsSourceVersion": "0.2.0", "Tags": [ "error.type", "execution.result", @@ -15,6 +16,7 @@ "nservicebus.messaging.failures => Counter", "nservicebus.messaging.fetches => Counter", "nservicebus.messaging.handler_time => Histogram, Unit: s", + "nservicebus.messaging.processing_time => Histogram, Unit: s", "nservicebus.messaging.successes => Counter", "nservicebus.recoverability.delayed => Counter", "nservicebus.recoverability.error => Counter", diff --git a/src/NServiceBus.Core.Tests/OpenTelemetry/Helpers/TestingMetricListener.cs b/src/NServiceBus.Core.Tests/OpenTelemetry/Helpers/TestingMetricListener.cs index 08d3fafca80..a03c8ff6cd1 100644 --- a/src/NServiceBus.Core.Tests/OpenTelemetry/Helpers/TestingMetricListener.cs +++ b/src/NServiceBus.Core.Tests/OpenTelemetry/Helpers/TestingMetricListener.cs @@ -12,6 +12,7 @@ class TestingMetricListener : IDisposable readonly MeterListener meterListener; public List metrics = []; public string version = ""; + public string metricsSourceName = ""; public TestingMetricListener(string sourceName) { @@ -25,6 +26,7 @@ public TestingMetricListener(string sourceName) listener.EnableMeasurementEvents(instrument); metrics.Add(instrument); version = instrument.Meter.Version; + metricsSourceName = instrument.Meter.Name; } } }; diff --git a/src/NServiceBus.Core.Tests/OpenTelemetry/MeterTests.cs b/src/NServiceBus.Core.Tests/OpenTelemetry/MeterTests.cs index 13ab01938e2..ab141b15478 100644 --- a/src/NServiceBus.Core.Tests/OpenTelemetry/MeterTests.cs +++ b/src/NServiceBus.Core.Tests/OpenTelemetry/MeterTests.cs @@ -30,7 +30,8 @@ public void Verify_MeterAPI() Approver.Verify(new { Note = "Changes to metrics API should result in an update to NServiceBusMeter version.", - ActivitySourceVersion = metricsListener.version, + MetricsSourceName = metricsListener.metricsSourceName, + MetricsSourceVersion = metricsListener.version, Tags = meterTags, Metrics = metrics }); diff --git a/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs b/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs index 5f676e2ada4..2cabb67851f 100644 --- a/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs +++ b/src/NServiceBus.Core/Pipeline/Incoming/IncomingPipelineMetrics.cs @@ -13,6 +13,7 @@ class IncomingPipelineMetrics const string TotalFailures = "nservicebus.messaging.failures"; const string MessageHandlerTime = "nservicebus.messaging.handler_time"; const string CriticalTime = "nservicebus.messaging.critical_time"; + const string ProcessingTime = "nservicebus.messaging.processing_time"; const string RecoverabilityImmediate = "nservicebus.recoverability.immediate"; const string RecoverabilityDelayed = "nservicebus.recoverability.delayed"; const string RecoverabilityError = "nservicebus.recoverability.error"; @@ -30,6 +31,8 @@ public IncomingPipelineMetrics(IMeterFactory meterFactory, string queueName, str "The time in seconds for the execution of the business code."); criticalTime = meter.CreateHistogram(CriticalTime, "s", "The time in seconds between when the message was sent until processed by the endpoint."); + processingTime = meter.CreateHistogram(ProcessingTime, "s", + "The time in seconds between when the message was fetched from the input queue until successfully processed by the endpoint."); totalImmediateRetries = meter.CreateCounter(RecoverabilityImmediate, description: "Total number of immediate retries requested."); totalDelayedRetries = meter.CreateCounter(RecoverabilityDelayed, @@ -47,13 +50,35 @@ public void AddDefaultIncomingPipelineMetricTags(IncomingPipelineMetricTags inco incomingPipelineMetricsTags.Add(MeterTags.EndpointDiscriminator, endpointDiscriminator ?? ""); } - public void RecordMessageSuccessfullyProcessed(ITransportReceiveContext context, IncomingPipelineMetricTags incomingPipelineMetricTags) + public void RecordProcessingTime(ITransportReceiveContext context, TimeSpan elapsed) + { + if (!processingTime.Enabled) + { + return; + } + + var incomingPipelineMetricTags = context.Extensions.Get(); + + TagList tags; + tags.Add(new(MeterTags.ExecutionResult, "success")); + incomingPipelineMetricTags.ApplyTags(ref tags, [ + MeterTags.QueueName, + MeterTags.EndpointDiscriminator, + MeterTags.MessageType, + MeterTags.MessageHandlerTypes]); + + processingTime.Record(elapsed.TotalSeconds, tags); + } + + public void RecordCriticalTimeAndTotalProcessed(ITransportReceiveContext context) { if (!totalProcessedSuccessfully.Enabled && !criticalTime.Enabled) { return; } + var incomingPipelineMetricTags = context.Extensions.Get(); + TagList tags; tags.Add(new(MeterTags.ExecutionResult, "success")); incomingPipelineMetricTags.ApplyTags(ref tags, [ @@ -66,10 +91,9 @@ public void RecordMessageSuccessfullyProcessed(ITransportReceiveContext context, { totalProcessedSuccessfully.Add(1, tags); } + var completedAt = DateTimeOffset.UtcNow; if (criticalTime.Enabled) { - var completedAt = DateTimeOffset.UtcNow; - if (context.Message.Headers.TryGetDeliverAt(out var startTime) || context.Message.Headers.TryGetTimeSent(out startTime)) { @@ -96,7 +120,7 @@ public void RecordMessageProcessingFailure(IncomingPipelineMetricTags incomingPi MeterTags.MessageHandlerTypes]); totalFailures.Add(1, tags); - // the critical time is intentionally not recorded in case of failure + // the processing and critical time are intentionally not recorded in case of failure } public void RecordFetchedMessage(IncomingPipelineMetricTags incomingPipelineMetricTags) @@ -109,7 +133,8 @@ public void RecordFetchedMessage(IncomingPipelineMetricTags incomingPipelineMetr TagList tags; incomingPipelineMetricTags.ApplyTags(ref tags, [ MeterTags.EndpointDiscriminator, - MeterTags.QueueName]); + MeterTags.QueueName, + MeterTags.MessageType]); totalFetched.Add(1, tags); } @@ -217,6 +242,7 @@ public void RecordSendToErrorQueue(IRecoverabilityContext recoverabilityContext) readonly Counter totalFailures; readonly Histogram messageHandlerTime; readonly Histogram criticalTime; + readonly Histogram processingTime; readonly Counter totalImmediateRetries; readonly Counter totalDelayedRetries; readonly Counter totalSentToErrorQueue; diff --git a/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs b/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs index 6f0818a3bc0..2ab39e0168e 100644 --- a/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs +++ b/src/NServiceBus.Core/Pipeline/Incoming/TransportReceiveToPhysicalMessageConnector.cs @@ -20,6 +20,7 @@ public TransportReceiveToPhysicalMessageConnector(IOutboxStorage outboxStorage, public async Task Invoke(ITransportReceiveContext context, Func next) { + var processingStartedAt = DateTimeOffset.UtcNow; var messageId = context.Message.MessageId; var physicalMessageContext = this.CreateIncomingPhysicalMessageContext(context.Message, context); @@ -34,14 +35,14 @@ public async Task Invoke(ITransportReceiveContext context, Func(out IncomingPipelineMetricTags incomingPipelineMetricsTags); - incomingPipelineMetrics.RecordMessageSuccessfullyProcessed(context, incomingPipelineMetricsTags); - var outboxMessage = new OutboxMessage(messageId, ConvertToOutboxOperations(pendingTransportOperations.Operations)); await outboxStorage.Store(outboxMessage, outboxTransaction, context.Extensions, context.CancellationToken).ConfigureAwait(false); context.Extensions.Remove(); await outboxTransaction.Commit(context.CancellationToken).ConfigureAwait(false); + + var processingCompletedAt = DateTimeOffset.UtcNow; + incomingPipelineMetrics.RecordProcessingTime(context, processingCompletedAt - processingStartedAt); } physicalMessageContext.Extensions.Remove(); @@ -68,6 +69,11 @@ public async Task Invoke(ITransportReceiveContext context, Func(); incomingPipelineMetrics.AddDefaultIncomingPipelineMetricTags(incomingPipelineMetricsTags); - incomingPipelineMetrics.RecordFetchedMessage(incomingPipelineMetricsTags); var childScope = rootBuilder.CreateAsyncScope(); await using (childScope.ConfigureAwait(false)) @@ -64,10 +62,16 @@ public async Task Invoke(MessageContext messageContext, CancellationToken cancel ex.Data["Pipeline canceled"] = transportReceiveContext.CancellationToken.IsCancellationRequested; - incomingPipelineMetrics.RecordMessageProcessingFailure(incomingPipelineMetricsTags, ex); - + if (!ex.IsCausedBy(transportReceiveContext.CancellationToken)) + { + incomingPipelineMetrics.RecordMessageProcessingFailure(incomingPipelineMetricsTags, ex); + } throw; } + finally + { + incomingPipelineMetrics.RecordFetchedMessage(incomingPipelineMetricsTags); + } var completedAt = DateTimeOffset.UtcNow; await receivePipelineNotification.Raise(new ReceivePipelineCompleted(message, pipelineStartedAt, completedAt), cancellationToken).ConfigureAwait(false);