diff --git a/README.md b/README.md index 59b7a2de..846e7193 100644 --- a/README.md +++ b/README.md @@ -186,7 +186,7 @@ Now, follow this [Microsoft documentation](https://learn.microsoft.com/en-us/azu ### Subscribe to domain events with pull delivery -Install the package [Workleap.DomainEventPropagation.Subscription.PullDelivery](fixme) in your ASP.NET Core project that wants to receive events from Event Grid topics. +Install the package [Workleap.DomainEventPropagation.Subscription.PullDelivery](https://www.nuget.org/packages/Workleap.DomainEventPropagation.Subscription.PullDelivery) in your ASP.NET Core project that wants to receive events from Event Grid topics. First, you will need to use one of the following methods to register the required services. ```csharp @@ -202,6 +202,7 @@ services.AddPullDeliverySubscription() "TopicName": "" "SubscriptionName": "", "MaxDegreeOfParallelism": 10, + "MaxRetries": 3, "TopicAccessKey": "", // Can be omitted to use Azure Identity (RBAC) } } @@ -220,6 +221,7 @@ services.AddPullDeliverySubscription() "TopicName": "" "SubscriptionName": "", "MaxDegreeOfParallelism": 10, + "MaxRetries": 3, "TopicAccessKey": "", // Can be omitted to use Azure Identity (RBAC) }, "TopicSub2": { @@ -227,6 +229,7 @@ services.AddPullDeliverySubscription() "TopicName": "" "SubscriptionName": "", "MaxDegreeOfParallelism": 10, + "MaxRetries": 10, "TopicAccessKey": "", // Can be omitted to use Azure Identity (RBAC) } } @@ -246,6 +249,9 @@ services.AddPullDeliverySubscription() // Maximum degree of parallelism for processing events options.MaxDegreeOfParallelism = 10; + + // Client side max number of retries before being sent to the Dead Letter Queue + options.MaxRetries = 10; // Using an access key options.TopicAccessKey = ""; @@ -283,6 +289,12 @@ public class ExampleDomainEventHandler : IDomainEventHandler } ``` +#### Client Side Max Retries Count + +To ensure that messages end up in the Dead Letter Queue, a client-side retry count is required. Otherwise, when a message is requeued (released), Event Grid ignores the subscription retry count, and if the event exceeds its time-to-live, it is silently dropped. + +It can be configured by setting the `MaxRetries` property in the `EventGridSubscriptionClientOptions`. The default value is 3. + ## Configure the underlying Event Grid clients options You can use the [named options pattern](https://learn.microsoft.com/en-us/dotnet/core/extensions/options#named-options-support-using-iconfigurenamedoptions) to configure the behavior of the underlying Event Grid clients. For instance: diff --git a/src/Shared/LoggingExtensions.cs b/src/Shared/LoggingExtensions.cs index 2c6a9d52..40fc483a 100644 --- a/src/Shared/LoggingExtensions.cs +++ b/src/Shared/LoggingExtensions.cs @@ -23,4 +23,7 @@ internal static partial class LoggingExtensions [LoggerMessage(6, LogLevel.Warning, "Failed to handle CloudEvents from the Event Grid topic {topicName} on subscription {subscription}")] public static partial void CloudEventCouldNotBeHandled(this ILogger logger, string topicName, string subscription, Exception ex); + + [LoggerMessage(7, LogLevel.Information, "The event with {eventId} {EventName} will be rejected since it exceed the max retries count.")] + public static partial void EventWillBeRejectedDueToMaxRetries(this ILogger logger, string eventId, string eventName, Exception ex); } \ No newline at end of file diff --git a/src/Workleap.DomainEventPropagation.Publishing/Workleap.DomainEventPropagation.Publishing.csproj b/src/Workleap.DomainEventPropagation.Publishing/Workleap.DomainEventPropagation.Publishing.csproj index 827fd133..5528a600 100644 --- a/src/Workleap.DomainEventPropagation.Publishing/Workleap.DomainEventPropagation.Publishing.csproj +++ b/src/Workleap.DomainEventPropagation.Publishing/Workleap.DomainEventPropagation.Publishing.csproj @@ -24,7 +24,7 @@ - + diff --git a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests/EventPropagationSubscriptionOptionsValidatorTests.cs b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests/EventPropagationSubscriptionOptionsValidatorTests.cs index 1aecff2e..afc02ea0 100644 --- a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests/EventPropagationSubscriptionOptionsValidatorTests.cs +++ b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests/EventPropagationSubscriptionOptionsValidatorTests.cs @@ -9,40 +9,50 @@ public class EventPropagationSubscriptionOptionsValidatorTests [Theory] // Valid options - [InlineData("accessKey", false, "http://topicurl.com", "topicName", "subName", true)] + [InlineData("accessKey", false, "http://topicurl.com", "topicName", "subName", null, true)] // Access key - [InlineData(" ", false, "http://topicurl.com", "topicName", "subName", false)] - [InlineData(null, false, "http://topicurl.com", "topicName", "subName", false)] + [InlineData(" ", false, "http://topicurl.com", "topicName", "subName", null, false)] + [InlineData(null, false, "http://topicurl.com", "topicName", "subName", null, false)] // Token credential - [InlineData("accessKey", true, "http://topicurl.com", "topicName", "subName", true)] + [InlineData("accessKey", true, "http://topicurl.com", "topicName", "subName", null, true)] // Topic endpoint - [InlineData("accessKey", false, "invalid-url", "topicName", "subName", false)] - [InlineData("accessKey", false, null, "topicName", "subName", false)] - [InlineData("accessKey", false, " ", "topicName", "subName", false)] + [InlineData("accessKey", false, "invalid-url", "topicName", "subName", null, false)] + [InlineData("accessKey", false, null, "topicName", "subName", null, false)] + [InlineData("accessKey", false, " ", "topicName", "subName", null, false)] // Topic name - [InlineData("accessKey", false, "http://topicurl.com", " ", "subName", false)] - [InlineData("accessKey", false, "http://topicurl.com", null, "subName", false)] + [InlineData("accessKey", false, "http://topicurl.com", " ", "subName", null, false)] + [InlineData("accessKey", false, "http://topicurl.com", null, "subName", null, false)] // Subscription name - [InlineData("accessKey", false, "http://topicurl.com", "topicName", "", false)] - [InlineData("accessKey", false, "http://topicurl.com", "topicName", null, false)] - public void GivenNamedConfiguration_WhenValidate_ThenOptionsAreValidated(string topicAccessKey, bool useTokenCredential, string topicEndpoint, string topicName, string subName, bool validationSucceeded) - { - var validator = new EventPropagationSubscriptionOptionsValidator(); + [InlineData("accessKey", false, "http://topicurl.com", "topicName", "", null, false)] + [InlineData("accessKey", false, "http://topicurl.com", "topicName", null, null, false)] - var result = validator.Validate("namedOptions", new EventPropagationSubscriptionOptions + // Max retries count + [InlineData("accessKey", false, "http://topicurl.com", "topicName", "", -1, false)] + [InlineData("accessKey", false, "http://topicurl.com", "topicName", null, 11, false)] + public void GivenNamedConfiguration_WhenValidate_ThenOptionsAreValidated(string topicAccessKey, bool useTokenCredential, string topicEndpoint, string topicName, string subName, int? maxRetriesCount, bool validationSucceeded) + { + var option = new EventPropagationSubscriptionOptions { TokenCredential = useTokenCredential ? new DefaultAzureCredential() : default, TopicEndpoint = topicEndpoint, TopicAccessKey = topicAccessKey, TopicName = topicName, - SubscriptionName = subName, - }); + SubscriptionName = subName + }; + + if (maxRetriesCount.HasValue) + { + option.MaxRetries = maxRetriesCount.Value; + } + + var validator = new EventPropagationSubscriptionOptionsValidator(); + var result = validator.Validate(name: "namedOptions", options: option); - Assert.Equal(validationSucceeded, result.Succeeded); + Assert.Equal(expected: validationSucceeded, actual: result.Succeeded); } } \ No newline at end of file diff --git a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests/EventPullerServiceTests.cs b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests/EventPullerServiceTests.cs index 6caeb120..74f7f112 100644 --- a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests/EventPullerServiceTests.cs +++ b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery.Tests/EventPullerServiceTests.cs @@ -80,8 +80,11 @@ public async Task GivenTwoEventsReceived_WhenHandleSuccessfully_ThenEventsAreAck public async Task GivenTwoEventsReceived_WhenHandleThrowUnhandledException_ThenEventsAreReleased() { // Given - var client = this.GivenClient(); - var events = this.GivenEventsForClient(client, GenerateEvent(), GenerateEvent(deliveryCount: 3)); + var maxRetries = 3; + var client = this.GivenClient(options: GenerateOptions( + maxRetries: maxRetries + )); + var events = this.GivenEventsForClient(client, GenerateEvent(), GenerateEvent(deliveryCount: maxRetries)); this.GivenClientFailsHandlingEvents(client); // When @@ -95,7 +98,10 @@ public async Task GivenTwoEventsReceived_WhenHandleThrowUnhandledException_ThenE public async Task GivenMultipleEventsReceivedWithCustomRetryDelays_WhenHandleThrowUnhandledException_ThenEventsAreReleasedWithDelay() { // Given - var client = this.GivenClient(options: GenerateOptions(retryDelays: [TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(75)])); + var client = this.GivenClient(options: GenerateOptions( + maxRetries: 10, + retryDelays: [TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(75)]) + ); var events = this.GivenEventsForClient(client, GenerateEvent(), GenerateEvent(deliveryCount: 3), GenerateEvent(deliveryCount: 4), GenerateEvent(deliveryCount: 5)); this.GivenClientFailsHandlingEvents(client); @@ -122,6 +128,24 @@ public async Task GivenTwoEventsReceived_WhenHandleThrowRejectingException_ThenE this.ThenClientRejectedEvents(client, events); } + [Fact] + public async Task GivenEventsReceivedWithDeliveryCountHigherThanMaxRetries_WhenHandleThrowException_ThenEventsAreRejected() + { + // Given + var maxRetries = 3; + var client = this.GivenClient(options: GenerateOptions( + maxRetries: maxRetries + )); + var events = this.GivenEventsForClient(client, GenerateEvent(deliveryCount: maxRetries + 1)); + this.GivenClientFailsHandlingEvents(client, new Exception()); + + // When + await this.WhenRunningPullerService(); + + // Then + this.ThenClientRejectedEvents(client, events); + } + public static IEnumerable RejectingExceptions() { yield return [new DomainEventTypeNotRegisteredException("event")]; @@ -261,13 +285,14 @@ private void ThenClientRejectedEvents(EventPullerClient client, params EventBund Assert.True(events.All(x => client.EventHandlingResult.RejectedEvents.Contains(x.LockToken))); } - private static EventPropagationSubscriptionOptions GenerateOptions(string id = "id", TimeSpan[]? retryDelays = null) + private static EventPropagationSubscriptionOptions GenerateOptions(string id = "id", TimeSpan[]? retryDelays = null, int maxRetries = 3) { return new EventPropagationSubscriptionOptions { TopicName = $"topic-{id}", SubscriptionName = $"subscription-{id}", MaxDegreeOfParallelism = 10, + MaxRetries = maxRetries, RetryDelays = retryDelays, }; } diff --git a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPropagationSubscriptionOptions.cs b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPropagationSubscriptionOptions.cs index 0d5c3d0b..9d426bd5 100644 --- a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPropagationSubscriptionOptions.cs +++ b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPropagationSubscriptionOptions.cs @@ -18,5 +18,10 @@ public class EventPropagationSubscriptionOptions public int MaxDegreeOfParallelism { get; set; } = 1; + /// + /// Client side maximum retry count before sending the message to the dead-letter queue. + /// + public int MaxRetries { get; set; } = 3; + public IReadOnlyCollection? RetryDelays { get; set; } } \ No newline at end of file diff --git a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPropagationSubscriptionOptionsValidator.cs b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPropagationSubscriptionOptionsValidator.cs index 4153ac3c..7fdc31da 100644 --- a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPropagationSubscriptionOptionsValidator.cs +++ b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPropagationSubscriptionOptionsValidator.cs @@ -31,6 +31,11 @@ public ValidateOptionsResult Validate(string name, EventPropagationSubscriptionO return ValidateOptionsResult.Fail("A topic endpoint is required"); } + if (options.MaxRetries is < 0 or > 10) + { + return ValidateOptionsResult.Fail("MaxRetries must be between 0 and 10. The upper limit ensures the event's time-to-live does not expire."); + } + return ValidateOptionsResult.Success; } } \ No newline at end of file diff --git a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPullerService.cs b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPullerService.cs index 383d5907..2944ecb4 100644 --- a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPullerService.cs +++ b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/EventPullerService.cs @@ -27,6 +27,7 @@ public EventPullerService( optionsMonitor.Get(descriptor.Name).TopicName, optionsMonitor.Get(descriptor.Name).SubscriptionName, optionsMonitor.Get(descriptor.Name).MaxDegreeOfParallelism, + optionsMonitor.Get(descriptor.Name).MaxRetries, optionsMonitor.Get(descriptor.Name).RetryDelays?.ToList(), eventGridClientWrapperFactory.CreateClient(descriptor.Name)), serviceScopeFactory, @@ -153,8 +154,12 @@ private async Task HandleBundleAsync(ICloudEventHandler cloudEventHandler, Event this._logger.EventWillBeRejected(eventBundle.Event.Id, eventBundle.Event.Type, ex); await this._rejectEventChannel.Writer.WriteAsync(eventBundle, cancellationToken).ConfigureAwait(false); break; + case not null when eventBundle.DeliveryCount > this._eventGridTopicSubscription.MaxRetriesCount: + this._logger.EventWillBeRejectedDueToMaxRetries(eventBundle.Event.Id, eventBundle.Event.Type, ex); + await this._rejectEventChannel.Writer.WriteAsync(eventBundle, cancellationToken).ConfigureAwait(false); + break; default: - this._logger.EventWillBeReleased(eventBundle.Event.Id, eventBundle.Event.Type, ex); + this._logger.EventWillBeReleased(eventBundle.Event.Id, eventBundle.Event.Type, ex!); await this._releaseEventChannel.Writer.WriteAsync(eventBundle, cancellationToken).ConfigureAwait(false); break; } @@ -251,5 +256,5 @@ private static IEnumerable ReadCurrentContent(Channel } } - private record EventGridTopicSubscription(string TopicName, string SubscriptionName, int MaxHandlerDop, List? RetryDelays, IEventGridClientAdapter Client); + private record EventGridTopicSubscription(string TopicName, string SubscriptionName, int MaxHandlerDop, int MaxRetriesCount, List? RetryDelays, IEventGridClientAdapter Client); } \ No newline at end of file diff --git a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/PublicAPI.Shipped.txt b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/PublicAPI.Shipped.txt index 57f3e1c2..0dca8771 100644 --- a/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/PublicAPI.Shipped.txt +++ b/src/Workleap.DomainEventPropagation.Subscription.PullDelivery/PublicAPI.Shipped.txt @@ -16,6 +16,8 @@ Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.MaxDegreeOfP Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.MaxDegreeOfParallelism.set -> void Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.RetryDelays.get -> System.Collections.Generic.IReadOnlyCollection? Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.RetryDelays.set -> void +Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.MaxRetries.get -> int +Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.MaxRetries.set -> void Workleap.DomainEventPropagation.EventPropagationSubscriptionOptionsValidator Workleap.DomainEventPropagation.EventPropagationSubscriptionOptionsValidator.EventPropagationSubscriptionOptionsValidator() -> void Workleap.DomainEventPropagation.EventPropagationSubscriptionOptionsValidator.Validate(string! name, Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions! options) -> Microsoft.Extensions.Options.ValidateOptionsResult!