Skip to content

Commit

Permalink
fix consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikus1993 committed Jun 12, 2024
1 parent 3d194d8 commit 0ee2384
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 43 deletions.
9 changes: 8 additions & 1 deletion src/BuildingBlocks/Messaging/Logging/SubscriberLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ internal static partial class SubscriberLogger
Level = LogLevel.Error,
Message = "Can't process message {Exchange} -> {RoutingKey} -> {Queue}")]
public static partial void LogCantProcessMessage(
this ILogger logger, Exception exception, string exchange, string routingKey, string queue, [LogProperties(SkipNullProperties = true)]MessageProperties properties);
this ILogger logger, Exception exception, string exchange, string routingKey, string queue, [LogProperties(SkipNullProperties = true)]MessageProperties properties, [LogProperties(SkipNullProperties = true)]MessageReceivedInfo info);


[LoggerMessage(
Expand All @@ -21,6 +21,13 @@ public static partial void LogCantProcessMessage(
public static partial void LogCantDeserializeMessage(
this ILogger logger, string exchange, string routingKey, string queue);

[LoggerMessage(
EventId = 1,
Level = LogLevel.Warning,
Message = "Message Processor Not Found {Exchange} -> {RoutingKey} -> {Queue}")]
public static partial void LogMessageProcessorNotFound(
this ILogger logger, string exchange, string routingKey, string queue);

[LoggerMessage(
EventId = 1,
Level = LogLevel.Warning,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,82 @@

namespace Messaging.RabbitMq.Consumer;

internal interface IMessageProcessor
{
Task<AckStrategy> Process(ReadOnlyMemory<byte> body, MessageProperties properties,
MessageReceivedInfo info, CancellationToken ct);
}
internal sealed class RabbitMqMessageProcessor<T> : IMessageProcessor where T : IMessage
{
private static readonly Type _type = typeof(T);
private readonly IServiceProvider _serviceProvider;

public RabbitMqMessageProcessor(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}

public async Task<AckStrategy> Process(ReadOnlyMemory<byte> body, MessageProperties properties,
MessageReceivedInfo info, CancellationToken ct)
{
using var activity = RabbitMqTelemetry.RabbitMqActivitySource.Start("rabbitmq.consume", ActivityKind.Consumer, RabbitMqTelemetry.GetHeaderFromProps(properties).ActivityContext);
await using var serviceScope = _serviceProvider.CreateAsyncScope();
var logger = serviceScope.ServiceProvider.GetRequiredService<ILogger<MultiMessageRabbitMqMessageConsumer>>();

try
{
if (activity is not null)
{
activity.SetTag("messaging.rabbitmq.routing_key", info.RoutingKey);
activity.SetTag("messaging.exchange", info.Exchange);
activity.SetTag("messaging.destination", info.Queue);
activity.SetTag("messaging.timestamp", properties.Timestamp);
activity.SetTag("messaging.message_id", properties.MessageId);
activity.SetTag("messaging.message_type", properties.Type);
activity.SetTag("messaging.system", "rabbitmq");
activity.SetTag("messaging.destination_kind", "queue");
activity.SetTag("messaging.protocol", "AMQP");
activity.SetTag("messaging.protocol_version", "0.9.1");
activity.SetTag("messaging.message_name", _type.Name);
}


var serializer = serviceScope.ServiceProvider.GetRequiredService<ISerializer>();
if (serializer.BytesToMessage(_type, body) is not T msg)
{
logger.LogCantDeserializeMessage(info.Exchange, info.RoutingKey, info.Queue);
activity?.AddEvent(new ActivityEvent("Message is null or can't be deserialized"));
return AckStrategies.NackWithRequeue;
}

var subscriber = serviceScope.ServiceProvider.GetRequiredService<IMessageSubscriber<T>>();

var result = await subscriber.Handle(msg, ct);
if (!result.IsSuccess)
{
logger.LogCantProcessMessage(result.ErrorValue, info.Exchange, info.RoutingKey, info.Queue, properties, info);
activity?.RecordException(result.ErrorValue);
return AckStrategies.NackWithRequeue;
}

return AckStrategies.Ack;
}
catch (Exception exc)
{
logger.LogCantProcessMessage(exc, info.Exchange, info.RoutingKey, info.Queue, properties, info);
activity?.RecordException(exc);
return AckStrategies.NackWithRequeue;
}
}
}

public sealed class MultiMessageRabbitMqMessageConsumer : BackgroundService
{
private readonly IAdvancedBus _advancedBus;
private readonly IServiceProvider _serviceProvider;
private readonly MultiMessageRabbitMqSubscriptionConfiguration _subscriptionConfiguration;
private IDisposable? _disposable;
private ConcurrentDictionary<Type, MethodInfo> _methodInfos = new();
private ConcurrentDictionary<Type, Type> _processorTypes = new();

public MultiMessageRabbitMqMessageConsumer(IAdvancedBus advancedBus, IServiceProvider serviceProvider, MultiMessageRabbitMqSubscriptionConfiguration subscriptionConfiguration)
{
Expand Down Expand Up @@ -62,61 +130,39 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)

_disposable = _advancedBus.Consume(queue, async (body, properties, info, ct) =>
{
using var activity = RabbitMqTelemetry.RabbitMqActivitySource.Start("rabbitmq.consume", ActivityKind.Consumer, RabbitMqTelemetry.GetHeaderFromProps(properties).ActivityContext);
await using var serviceScope = _serviceProvider.CreateAsyncScope();
var logger = serviceScope.ServiceProvider.GetRequiredService<ILogger<MultiMessageRabbitMqMessageConsumer>>();

try
{
if (activity is not null)
{
activity.SetTag("messaging.rabbitmq.routing_key", info.RoutingKey);
activity.SetTag("messaging.exchange", info.Exchange);
activity.SetTag("messaging.destination", info.Queue);
activity.SetTag("messaging.timestamp", properties.Timestamp);
activity.SetTag("messaging.message_id", properties.MessageId);
activity.SetTag("messaging.message_type", properties.Type);
activity.SetTag("messaging.system", "rabbitmq");
activity.SetTag("messaging.destination_kind", "queue");
activity.SetTag("messaging.protocol", "AMQP");
activity.SetTag("messaging.protocol_version", "0.9.1");
}

if (!_subscriptionConfiguration.Subscriptions.TryGetValue(info.RoutingKey, out var type))
{
var logger = serviceScope.ServiceProvider.GetRequiredService<ILogger<MultiMessageRabbitMqMessageConsumer>>();
logger.LogSubscriberNotFound(info.Exchange, info.RoutingKey, info.Queue);
return AckStrategies.NackWithRequeue;
}

activity?.SetTag("messaging.message_name", type.Message.Name);

var serializer = serviceScope.ServiceProvider.GetRequiredService<ISerializer>();
if (serializer.BytesToMessage(type.Message, body) is not {} msg)
var processorType = _processorTypes.GetOrAdd(type.Message, t =>
{
logger.LogCantDeserializeMessage(info.Exchange, info.RoutingKey, info.Queue);
activity?.AddEvent(new ActivityEvent("Message is null or can't be deserialized"));
return AckStrategies.NackWithRequeue;
}

var subscriber = serviceScope.ServiceProvider.GetRequiredService(type.Handler);
var handleMethodInfo = _methodInfos.GetOrAdd(subscriber.GetType(),
(subscriberType) =>
subscriberType.GetMethod(nameof(IMessageSubscriber<IMessage>.Handle))!);
var processorType = typeof(RabbitMqMessageProcessor<>).MakeGenericType(t);
return processorType;
});

var result = await (Task<Result<Unit>>)handleMethodInfo.Invoke(subscriber, [msg, ct])!;
if (!result.IsSuccess)
var processor = serviceScope.ServiceProvider.GetRequiredService(processorType);

if (processor is not IMessageProcessor messageProcessor)
{
logger.LogCantProcessMessage(result.ErrorValue, info.Exchange, info.RoutingKey, info.Queue, properties);
activity?.RecordException(result.ErrorValue);
return _subscriptionConfiguration.AckStrategy;
var logger = serviceScope.ServiceProvider.GetRequiredService<ILogger<MultiMessageRabbitMqMessageConsumer>>();
logger.LogMessageProcessorNotFound(info.Exchange, info.RoutingKey, info.Queue);
return AckStrategies.NackWithoutRequeue;
}

return AckStrategies.Ack;
return await messageProcessor.Process(body, properties, info, ct);

}
catch (Exception exc)
{
logger.LogCantProcessMessage(exc, info.Exchange, info.RoutingKey, info.Queue, properties);
activity?.RecordException(exc);
var logger = serviceScope.ServiceProvider.GetRequiredService<ILogger<MultiMessageRabbitMqMessageConsumer>>();
logger.LogCantProcessMessage(exc, info.Exchange, info.RoutingKey, info.Queue, properties, info);
return _subscriptionConfiguration.AckStrategy;
}
}, configuration =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var result = await subscriber.Handle(message, ct);
if (!result.IsSuccess)
{
logger.LogCantProcessMessage(result.ErrorValue, info.Exchange, info.RoutingKey, info.Queue, properties);
logger.LogCantProcessMessage(result.ErrorValue, info.Exchange, info.RoutingKey, info.Queue, properties, info);
activity?.RecordException(result.ErrorValue);
return _subscriptionConfiguration.AckStrategy;
}
Expand All @@ -103,7 +103,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}
catch (Exception exc)
{
logger.LogCantProcessMessage(exc, info.Exchange, info.RoutingKey, info.Queue, properties);
logger.LogCantProcessMessage(exc, info.Exchange, info.RoutingKey, info.Queue, properties, info);
activity?.RecordException(exc);
return _subscriptionConfiguration.AckStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ public static async Task<RabbitMqTestConsumer<T>> CreateAsync<T>(IAdvancedBus bu
var channel2 = Channel.CreateBounded<T2>(new BoundedChannelOptions(message2SubscriberTake) { FullMode = BoundedChannelFullMode.Wait});
var testSubscriber2 = new TestSubscriber<T2>(channel2, message2SubscriberTake);
var services = new ServiceCollection();
services.AddSingleton<TestSubscriber<T1>>(testSubscriber);
services.AddSingleton<TestSubscriber<T2>>(testSubscriber2);
services.AddSingleton<RabbitMqMessageProcessor<T1>>();
services.AddSingleton<RabbitMqMessageProcessor<T2>>();
services.AddSingleton<IMessageSubscriber<T1>, TestSubscriber<T1>>(_ => testSubscriber);
services.AddSingleton<IMessageSubscriber<T2>, TestSubscriber<T2>>(_ => testSubscriber2);
services.AddSingleton<ISerializer>(new SystemTextJsonSerializer());
services.AddSingleton(NullLoggerFactory.Instance.CreateLogger<MultiMessageRabbitMqMessageConsumer>());
var consumer = new MultiMessageRabbitMqMessageConsumer(bus, services.BuildServiceProvider(), configuration);
Expand Down

0 comments on commit 0ee2384

Please sign in to comment.