Skip to content

Commit

Permalink
Cleanup unused code and fixed issue where confirmation was send when …
Browse files Browse the repository at this point in the history
…`IsFaulted` is set
  • Loading branch information
ramonsmits committed Oct 7, 2024
1 parent 0f79f10 commit c11bdc8
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 47 deletions.
69 changes: 35 additions & 34 deletions src/Helper/RetryAcknowledgementFilter.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
namespace ServiceControl;

using System;
using System.Globalization;
using MassTransit;

record Dummy
{
}
Expand All @@ -17,57 +17,54 @@ class RetryAcknowledgementFilter : IFilter<ConsumeContext>
public const string ControlMessageHeader = "NServiceBus.ControlMessage";

static readonly Dummy markMessagesConsumed = new();
static readonly UriCreationOptions addressUriCreationOptions = new();

async Task IFilter<ConsumeContext>.Send(ConsumeContext context, IPipe<ConsumeContext> next)
{
var proxy = new RetryAcknowledgementProxy(context);

var useRetryAcknowledgement = IsRetriedMessage(context, out var id, out var acknowledgementQueue);
var useRetryAcknowledgement = IsRetriedMessage(context, out var id, out var acknowledgementQueueAddress);

await next.Send(proxy).ConfigureAwait(false);

if (proxy.IsFaulted)
{
// not typically going to happen, exception should be thrown, but you can check other things as well
return;
}

if (proxy.Consumed.Any())
if (useRetryAcknowledgement)
{
if (useRetryAcknowledgement)
{
await ConfirmSuccessfulRetry().ConfigureAwait(false);
}
await ConfirmSuccessfulRetry().ConfigureAwait(false);
}

async Task ConfirmSuccessfulRetry()
{
// TODO: Hardcoded to `exchange:`, likely this needs to go through the connector too and detect that its a ACK and
var address = new Uri("exchange:" + acknowledgementQueue);
var endpoint = await context.GetSendEndpoint(address).ConfigureAwait(false);

//cfg.Send<Empty>(m => m.UseSerializer("application/json"));
await endpoint.Send(
markMessagesConsumed,
c =>
{
//NServiceBUs non ISO1806 compliant format
var timestamp = DateTimeOffset.UtcNow.ToString("yyyy-MM-dd HH:mm:ss:ffffff Z", CultureInfo.InvariantCulture);

var h = c.Headers;
h.Set("ServiceControl.Retry.Successful", timestamp);
h.Set(RetryUniqueMessageIdHeaderKey, id);
h.Set(ControlMessageHeader, bool.TrueString);
return Task.CompletedTask;
}
).ConfigureAwait(false);
}
async Task ConfirmSuccessfulRetry()
{
var endpoint = await context.GetSendEndpoint(acknowledgementQueueAddress).ConfigureAwait(false);

//cfg.Send<Empty>(m => m.UseSerializer("application/json"));
await endpoint.Send(
markMessagesConsumed,
c =>
{
//NServiceBus non ISO1806 compliant format
var timestamp = DateTimeOffset.UtcNow.ToString("yyyy-MM-dd HH:mm:ss:ffffff Z", CultureInfo.InvariantCulture);

var h = c.Headers;
h.Set("ServiceControl.Retry.Successful", timestamp);
h.Set(RetryUniqueMessageIdHeaderKey, id);
h.Set(ControlMessageHeader, bool.TrueString);
return Task.CompletedTask;
}
).ConfigureAwait(false);
}
}

public void Probe(ProbeContext context)
void IProbeSite.Probe(ProbeContext context)
{
}

static bool IsRetriedMessage(ConsumeContext context, out string retryUniqueMessageId, out string retryAcknowledgementQueue)
static bool IsRetriedMessage(ConsumeContext context, out string retryUniqueMessageId, out Uri retryAcknowledgementAddress)
{
var h = context.ReceiveContext.TransportHeaders;

Expand All @@ -77,12 +74,16 @@ static bool IsRetriedMessage(ConsumeContext context, out string retryUniqueMessa
h.TryGetHeader(RetryConfirmationQueueHeaderKey, out var acknowledgementQueue))
{
retryUniqueMessageId = (string)uniqueMessageId;
retryAcknowledgementQueue = (string)acknowledgementQueue;
var retryAcknowledgementQueue = (string)acknowledgementQueue;
if (!Uri.TryCreate(retryAcknowledgementQueue, addressUriCreationOptions, out retryAcknowledgementAddress))
{
throw new InvalidOperationException($"Header '{RetryConfirmationQueueHeaderKey}' value contains a non addressable value");
}
return true;
}

retryUniqueMessageId = null;
retryAcknowledgementQueue = null;
retryAcknowledgementAddress = null;
return false;
}
}
}
13 changes: 0 additions & 13 deletions src/Helper/RetryAcknowledgementProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,6 @@ class RetryAcknowledgementProxy(ConsumeContext context) : ConsumeContextProxy(co
{
public bool IsFaulted { get; private set; }

readonly List<Guid> _consumed = [];
public IReadOnlyList<Guid> Consumed => _consumed;

public override Task NotifyConsumed<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType)
{
if (context.MessageId.HasValue)
{
_consumed.Add(context.MessageId.Value);
}

return base.NotifyConsumed(context, duration, consumerType);
}

public override Task NotifyFaulted<T>(ConsumeContext<T> context, TimeSpan duration, string consumerType, Exception exception)
{
IsFaulted = true;
Expand Down

0 comments on commit c11bdc8

Please sign in to comment.