Skip to content

Commit

Permalink
[IDP-2334] Add client side max retries count for pull delivery (#186)
Browse files Browse the repository at this point in the history
* Add client side max retries count

---------

Co-authored-by: Gérald Barré <[email protected]>
  • Loading branch information
PrincessMadMath and meziantou authored Oct 9, 2024
1 parent b16df47 commit e94b255
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 26 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ Now, follow this [Microsoft documentation](https://learn.microsoft.com/en-us/azu

### Subscribe to domain events with pull delivery

Install the package [Workleap.DomainEventPropagation.Subscription.PullDelivery](fixme) in your ASP.NET Core project that wants to receive events from Event Grid topics.
Install the package [Workleap.DomainEventPropagation.Subscription.PullDelivery](https://www.nuget.org/packages/Workleap.DomainEventPropagation.Subscription.PullDelivery) in your ASP.NET Core project that wants to receive events from Event Grid topics.
First, you will need to use one of the following methods to register the required services.

```csharp
Expand All @@ -202,6 +202,7 @@ services.AddPullDeliverySubscription()
"TopicName": "<namespace_topic_to_listen_to>"
"SubscriptionName": "<subscription_name_under_specified_topic>",
"MaxDegreeOfParallelism": 10,
"MaxRetries": 3,
"TopicAccessKey": "<secret_value>", // Can be omitted to use Azure Identity (RBAC)
}
}
Expand All @@ -220,13 +221,15 @@ services.AddPullDeliverySubscription()
"TopicName": "<namespace_topic_to_listen_to>"
"SubscriptionName": "<subscription_name_under_specified_topic>",
"MaxDegreeOfParallelism": 10,
"MaxRetries": 3,
"TopicAccessKey": "<secret_value>", // Can be omitted to use Azure Identity (RBAC)
},
"TopicSub2": {
"TopicEndpoint": "<azure_topic_uri>",
"TopicName": "<namespace_topic_to_listen_to>"
"SubscriptionName": "<subscription_name_under_specified_topic>",
"MaxDegreeOfParallelism": 10,
"MaxRetries": 10,
"TopicAccessKey": "<secret_value>", // Can be omitted to use Azure Identity (RBAC)
}
}
Expand All @@ -246,6 +249,9 @@ services.AddPullDeliverySubscription()

// Maximum degree of parallelism for processing events
options.MaxDegreeOfParallelism = 10;

// Client side max number of retries before being sent to the Dead Letter Queue
options.MaxRetries = 10;

// Using an access key
options.TopicAccessKey = "<secret_value>";
Expand Down Expand Up @@ -283,6 +289,12 @@ public class ExampleDomainEventHandler : IDomainEventHandler<ExampleDomainEvent>
}
```

#### Client Side Max Retries Count

To ensure that messages end up in the Dead Letter Queue, a client-side retry count is required. Otherwise, when a message is requeued (released), Event Grid ignores the subscription retry count, and if the event exceeds its time-to-live, it is silently dropped.

It can be configured by setting the `MaxRetries` property in the `EventGridSubscriptionClientOptions`. The default value is 3.

## Configure the underlying Event Grid clients options

You can use the [named options pattern](https://learn.microsoft.com/en-us/dotnet/core/extensions/options#named-options-support-using-iconfigurenamedoptions) to configure the behavior of the underlying Event Grid clients. For instance:
Expand Down
3 changes: 3 additions & 0 deletions src/Shared/LoggingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,7 @@ internal static partial class LoggingExtensions

[LoggerMessage(6, LogLevel.Warning, "Failed to handle CloudEvents from the Event Grid topic {topicName} on subscription {subscription}")]
public static partial void CloudEventCouldNotBeHandled(this ILogger logger, string topicName, string subscription, Exception ex);

[LoggerMessage(7, LogLevel.Information, "The event with {eventId} {EventName} will be rejected since it exceed the max retries count.")]
public static partial void EventWillBeRejectedDueToMaxRetries(this ILogger logger, string eventId, string eventName, Exception ex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="OpenTelemetry.Api" Version="1.9.0" />
<PackageReference Include="System.Text.Json" Version="8.0.4" />
<PackageReference Include="System.Text.Json" Version="8.0.5" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,50 @@ public class EventPropagationSubscriptionOptionsValidatorTests
[Theory]

// Valid options
[InlineData("accessKey", false, "http://topicurl.com", "topicName", "subName", true)]
[InlineData("accessKey", false, "http://topicurl.com", "topicName", "subName", null, true)]

// Access key
[InlineData(" ", false, "http://topicurl.com", "topicName", "subName", false)]
[InlineData(null, false, "http://topicurl.com", "topicName", "subName", false)]
[InlineData(" ", false, "http://topicurl.com", "topicName", "subName", null, false)]
[InlineData(null, false, "http://topicurl.com", "topicName", "subName", null, false)]

// Token credential
[InlineData("accessKey", true, "http://topicurl.com", "topicName", "subName", true)]
[InlineData("accessKey", true, "http://topicurl.com", "topicName", "subName", null, true)]

// Topic endpoint
[InlineData("accessKey", false, "invalid-url", "topicName", "subName", false)]
[InlineData("accessKey", false, null, "topicName", "subName", false)]
[InlineData("accessKey", false, " ", "topicName", "subName", false)]
[InlineData("accessKey", false, "invalid-url", "topicName", "subName", null, false)]
[InlineData("accessKey", false, null, "topicName", "subName", null, false)]
[InlineData("accessKey", false, " ", "topicName", "subName", null, false)]

// Topic name
[InlineData("accessKey", false, "http://topicurl.com", " ", "subName", false)]
[InlineData("accessKey", false, "http://topicurl.com", null, "subName", false)]
[InlineData("accessKey", false, "http://topicurl.com", " ", "subName", null, false)]
[InlineData("accessKey", false, "http://topicurl.com", null, "subName", null, false)]

// Subscription name
[InlineData("accessKey", false, "http://topicurl.com", "topicName", "", false)]
[InlineData("accessKey", false, "http://topicurl.com", "topicName", null, false)]
public void GivenNamedConfiguration_WhenValidate_ThenOptionsAreValidated(string topicAccessKey, bool useTokenCredential, string topicEndpoint, string topicName, string subName, bool validationSucceeded)
{
var validator = new EventPropagationSubscriptionOptionsValidator();
[InlineData("accessKey", false, "http://topicurl.com", "topicName", "", null, false)]
[InlineData("accessKey", false, "http://topicurl.com", "topicName", null, null, false)]

var result = validator.Validate("namedOptions", new EventPropagationSubscriptionOptions
// Max retries count
[InlineData("accessKey", false, "http://topicurl.com", "topicName", "", -1, false)]
[InlineData("accessKey", false, "http://topicurl.com", "topicName", null, 11, false)]
public void GivenNamedConfiguration_WhenValidate_ThenOptionsAreValidated(string topicAccessKey, bool useTokenCredential, string topicEndpoint, string topicName, string subName, int? maxRetriesCount, bool validationSucceeded)
{
var option = new EventPropagationSubscriptionOptions
{
TokenCredential = useTokenCredential ? new DefaultAzureCredential() : default,
TopicEndpoint = topicEndpoint,
TopicAccessKey = topicAccessKey,
TopicName = topicName,
SubscriptionName = subName,
});
SubscriptionName = subName
};

if (maxRetriesCount.HasValue)
{
option.MaxRetries = maxRetriesCount.Value;
}

var validator = new EventPropagationSubscriptionOptionsValidator();
var result = validator.Validate(name: "namedOptions", options: option);

Assert.Equal(validationSucceeded, result.Succeeded);
Assert.Equal(expected: validationSucceeded, actual: result.Succeeded);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ public async Task GivenTwoEventsReceived_WhenHandleSuccessfully_ThenEventsAreAck
public async Task GivenTwoEventsReceived_WhenHandleThrowUnhandledException_ThenEventsAreReleased()
{
// Given
var client = this.GivenClient();
var events = this.GivenEventsForClient(client, GenerateEvent(), GenerateEvent(deliveryCount: 3));
var maxRetries = 3;
var client = this.GivenClient(options: GenerateOptions(
maxRetries: maxRetries
));
var events = this.GivenEventsForClient(client, GenerateEvent(), GenerateEvent(deliveryCount: maxRetries));
this.GivenClientFailsHandlingEvents(client);

// When
Expand All @@ -95,7 +98,10 @@ public async Task GivenTwoEventsReceived_WhenHandleThrowUnhandledException_ThenE
public async Task GivenMultipleEventsReceivedWithCustomRetryDelays_WhenHandleThrowUnhandledException_ThenEventsAreReleasedWithDelay()
{
// Given
var client = this.GivenClient(options: GenerateOptions(retryDelays: [TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(75)]));
var client = this.GivenClient(options: GenerateOptions(
maxRetries: 10,
retryDelays: [TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(75)])
);
var events = this.GivenEventsForClient(client, GenerateEvent(), GenerateEvent(deliveryCount: 3), GenerateEvent(deliveryCount: 4), GenerateEvent(deliveryCount: 5));
this.GivenClientFailsHandlingEvents(client);

Expand All @@ -122,6 +128,24 @@ public async Task GivenTwoEventsReceived_WhenHandleThrowRejectingException_ThenE
this.ThenClientRejectedEvents(client, events);
}

[Fact]
public async Task GivenEventsReceivedWithDeliveryCountHigherThanMaxRetries_WhenHandleThrowException_ThenEventsAreRejected()
{
// Given
var maxRetries = 3;
var client = this.GivenClient(options: GenerateOptions(
maxRetries: maxRetries
));
var events = this.GivenEventsForClient(client, GenerateEvent(deliveryCount: maxRetries + 1));
this.GivenClientFailsHandlingEvents(client, new Exception());

// When
await this.WhenRunningPullerService();

// Then
this.ThenClientRejectedEvents(client, events);
}

public static IEnumerable<object[]> RejectingExceptions()
{
yield return [new DomainEventTypeNotRegisteredException("event")];
Expand Down Expand Up @@ -261,13 +285,14 @@ private void ThenClientRejectedEvents(EventPullerClient client, params EventBund
Assert.True(events.All(x => client.EventHandlingResult.RejectedEvents.Contains(x.LockToken)));
}

private static EventPropagationSubscriptionOptions GenerateOptions(string id = "id", TimeSpan[]? retryDelays = null)
private static EventPropagationSubscriptionOptions GenerateOptions(string id = "id", TimeSpan[]? retryDelays = null, int maxRetries = 3)
{
return new EventPropagationSubscriptionOptions
{
TopicName = $"topic-{id}",
SubscriptionName = $"subscription-{id}",
MaxDegreeOfParallelism = 10,
MaxRetries = maxRetries,
RetryDelays = retryDelays,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ public class EventPropagationSubscriptionOptions

public int MaxDegreeOfParallelism { get; set; } = 1;

/// <summary>
/// Client side maximum retry count before sending the message to the dead-letter queue.
/// </summary>
public int MaxRetries { get; set; } = 3;

public IReadOnlyCollection<TimeSpan>? RetryDelays { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public ValidateOptionsResult Validate(string name, EventPropagationSubscriptionO
return ValidateOptionsResult.Fail("A topic endpoint is required");
}

if (options.MaxRetries is < 0 or > 10)
{
return ValidateOptionsResult.Fail("MaxRetries must be between 0 and 10. The upper limit ensures the event's time-to-live does not expire.");
}

return ValidateOptionsResult.Success;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public EventPullerService(
optionsMonitor.Get(descriptor.Name).TopicName,
optionsMonitor.Get(descriptor.Name).SubscriptionName,
optionsMonitor.Get(descriptor.Name).MaxDegreeOfParallelism,
optionsMonitor.Get(descriptor.Name).MaxRetries,
optionsMonitor.Get(descriptor.Name).RetryDelays?.ToList(),
eventGridClientWrapperFactory.CreateClient(descriptor.Name)),
serviceScopeFactory,
Expand Down Expand Up @@ -153,8 +154,12 @@ private async Task HandleBundleAsync(ICloudEventHandler cloudEventHandler, Event
this._logger.EventWillBeRejected(eventBundle.Event.Id, eventBundle.Event.Type, ex);
await this._rejectEventChannel.Writer.WriteAsync(eventBundle, cancellationToken).ConfigureAwait(false);
break;
case not null when eventBundle.DeliveryCount > this._eventGridTopicSubscription.MaxRetriesCount:
this._logger.EventWillBeRejectedDueToMaxRetries(eventBundle.Event.Id, eventBundle.Event.Type, ex);
await this._rejectEventChannel.Writer.WriteAsync(eventBundle, cancellationToken).ConfigureAwait(false);
break;
default:
this._logger.EventWillBeReleased(eventBundle.Event.Id, eventBundle.Event.Type, ex);
this._logger.EventWillBeReleased(eventBundle.Event.Id, eventBundle.Event.Type, ex!);
await this._releaseEventChannel.Writer.WriteAsync(eventBundle, cancellationToken).ConfigureAwait(false);
break;
}
Expand Down Expand Up @@ -251,5 +256,5 @@ private static IEnumerable<EventBundle> ReadCurrentContent(Channel<EventBundle>
}
}

private record EventGridTopicSubscription(string TopicName, string SubscriptionName, int MaxHandlerDop, List<TimeSpan>? RetryDelays, IEventGridClientAdapter Client);
private record EventGridTopicSubscription(string TopicName, string SubscriptionName, int MaxHandlerDop, int MaxRetriesCount, List<TimeSpan>? RetryDelays, IEventGridClientAdapter Client);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.MaxDegreeOfP
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.MaxDegreeOfParallelism.set -> void
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.RetryDelays.get -> System.Collections.Generic.IReadOnlyCollection<System.TimeSpan>?
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.RetryDelays.set -> void
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.MaxRetries.get -> int
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions.MaxRetries.set -> void
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptionsValidator
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptionsValidator.EventPropagationSubscriptionOptionsValidator() -> void
Workleap.DomainEventPropagation.EventPropagationSubscriptionOptionsValidator.Validate(string! name, Workleap.DomainEventPropagation.EventPropagationSubscriptionOptions! options) -> Microsoft.Extensions.Options.ValidateOptionsResult!
Expand Down

0 comments on commit e94b255

Please sign in to comment.