diff --git a/src/BuildingBlocks/Messaging/RabbitMq/Consumer/MultiMessageRabbitMqMessageConsumer.cs b/src/BuildingBlocks/Messaging/RabbitMq/Consumer/MultiMessageRabbitMqMessageConsumer.cs index ef5ccb9..dca9de5 100644 --- a/src/BuildingBlocks/Messaging/RabbitMq/Consumer/MultiMessageRabbitMqMessageConsumer.cs +++ b/src/BuildingBlocks/Messaging/RabbitMq/Consumer/MultiMessageRabbitMqMessageConsumer.cs @@ -99,11 +99,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } var subscriber = serviceScope.ServiceProvider.GetRequiredService(type.Handler); - var handle = _methodInfos.GetOrAdd(subscriber.GetType(), + var handleMethodInfo = _methodInfos.GetOrAdd(subscriber.GetType(), (subscriberType) => subscriberType.GetMethod(nameof(IMessageSubscriber.Handle))!); - var result = await (Task>)handle.Invoke(subscriber, [msg, ct])!; + var result = await (Task>)handleMethodInfo.Invoke(subscriber, [msg, ct])!; if (!result.IsSuccess) { logger.LogCantProcessMessage(result.ErrorValue, info.Exchange, info.RoutingKey, info.Queue, properties); diff --git a/src/BuildingBlocks/Messaging/RabbitMq/Extensions/RabbitMqClientExtensions.cs b/src/BuildingBlocks/Messaging/RabbitMq/Extensions/RabbitMqClientExtensions.cs index 325a922..82f1923 100644 --- a/src/BuildingBlocks/Messaging/RabbitMq/Extensions/RabbitMqClientExtensions.cs +++ b/src/BuildingBlocks/Messaging/RabbitMq/Extensions/RabbitMqClientExtensions.cs @@ -2,6 +2,7 @@ using EasyNetQ.Topology; using Messaging.RabbitMq.Configuration; +using Messaging.RabbitMq.Consumer; namespace Messaging.RabbitMq.Extensions; @@ -15,4 +16,13 @@ public static Task DeclareExchangeAsync(this IAdvancedBus bus, Rabb configuration.WithType(ExchangeType.Topic); }, cancellationToken); } + + public static Task DeclareExchangeAsync(this IAdvancedBus bus, MultiMessageRabbitMqSubscriptionConfiguration cfg, CancellationToken cancellationToken = default) + { + return bus.ExchangeDeclareAsync(cfg.Exchange, configuration => + { + configuration.AsDurable(true); + configuration.WithType(ExchangeType.Topic); + }, cancellationToken); + } } \ No newline at end of file diff --git a/tests/BuildingBlocks/Messaging.Tests/Extensions/RabbitMqTestConsumer.cs b/tests/BuildingBlocks/Messaging.Tests/Extensions/RabbitMqTestConsumer.cs index ff2f027..80faf78 100644 --- a/tests/BuildingBlocks/Messaging.Tests/Extensions/RabbitMqTestConsumer.cs +++ b/tests/BuildingBlocks/Messaging.Tests/Extensions/RabbitMqTestConsumer.cs @@ -16,7 +16,8 @@ namespace Messaging.Tests.Extensions; -file sealed class TestSubscriber : IMessageSubscriber where T : IMessage + +internal sealed class TestSubscriber : IMessageSubscriber where T : IMessage { private readonly ChannelWriter _writer; private readonly int _take; @@ -58,6 +59,24 @@ public static async Task> CreateAsync(IAdvancedBus bu await consumer.StartAsync(cancellationToken); return new RabbitMqTestConsumer(consumer, channel, cancellationToken); } + + public static async Task<(RabbitMqMultiMessageTestConsumer, RabbitMqMultiMessageTestConsumer)> CreateMultiMessageConsumerAsync(IAdvancedBus bus, MultiMessageRabbitMqSubscriptionConfiguration configuration, int message1SubscriberTake, int message2SubscriberTake, CancellationToken cancellationToken = default) + where T1 : IMessage + where T2 : IMessage + { + var channel = Channel.CreateBounded(new BoundedChannelOptions(message1SubscriberTake) { FullMode = BoundedChannelFullMode.Wait}); + var testSubscriber = new TestSubscriber(channel, message1SubscriberTake); + var channel2 = Channel.CreateBounded(new BoundedChannelOptions(message2SubscriberTake) { FullMode = BoundedChannelFullMode.Wait}); + var testSubscriber2 = new TestSubscriber(channel2, message2SubscriberTake); + var services = new ServiceCollection(); + services.AddSingleton>(testSubscriber); + services.AddSingleton>(testSubscriber2); + services.AddSingleton(new SystemTextJsonSerializer()); + services.AddSingleton(NullLoggerFactory.Instance.CreateLogger()); + var consumer = new MultiMessageRabbitMqMessageConsumer(bus, services.BuildServiceProvider(), configuration); + await consumer.StartAsync(cancellationToken); + return (new RabbitMqMultiMessageTestConsumer(consumer, channel, cancellationToken), new RabbitMqMultiMessageTestConsumer(consumer, channel2, cancellationToken)); + } } public sealed class RabbitMqTestConsumer: IAsyncDisposable where T : IMessage @@ -90,6 +109,45 @@ public async IAsyncEnumerable Consume() } + public async ValueTask DisposeAsync() + { + await _consumer.StopAsync(_cancellationTokenSource.Token); + _cancellationTokenSource.Dispose(); + _channel.Writer.TryComplete(); + _consumer.Dispose(); + } +} + +public sealed class RabbitMqMultiMessageTestConsumer: IAsyncDisposable where T : IMessage +{ + + private readonly MultiMessageRabbitMqMessageConsumer _consumer; + private readonly CancellationTokenSource _cancellationTokenSource; + private readonly Channel _channel; + + internal RabbitMqMultiMessageTestConsumer(MultiMessageRabbitMqMessageConsumer consumer, Channel channel, CancellationToken cancellationToken) + { + _channel = channel; + _consumer = consumer; + _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + } + + public async IAsyncEnumerable Consume() + { + await foreach (var message in _channel.Reader.ReadAllAsync(_cancellationTokenSource.Token)) + { + yield return message; + } + } + + public async Task ConsumeOne() + { + await _channel.Reader.WaitToReadAsync(_cancellationTokenSource.Token); + return await _channel.Reader.ReadAsync(_cancellationTokenSource.Token); + } + + public async ValueTask DisposeAsync() { await _consumer.StopAsync(_cancellationTokenSource.Token); diff --git a/tests/BuildingBlocks/Messaging.Tests/Subscriber/RabbitMqMultiMessageConsumerTests.cs b/tests/BuildingBlocks/Messaging.Tests/Subscriber/RabbitMqMultiMessageConsumerTests.cs new file mode 100644 index 0000000..d8fe56c --- /dev/null +++ b/tests/BuildingBlocks/Messaging.Tests/Subscriber/RabbitMqMultiMessageConsumerTests.cs @@ -0,0 +1,117 @@ +using AutoFixture.Xunit2; + +using EasyNetQ; +using EasyNetQ.Serialization.SystemTextJson; +using EasyNetQ.Topology; + +using Messaging.RabbitMq.Configuration; +using Messaging.RabbitMq.Consumer; +using Messaging.RabbitMq.Extensions; +using Messaging.RabbitMq.Publisher; +using Messaging.Tests.Extensions; +using Messaging.Tests.Fixtures; + +using Shouldly; + +using Xunit; + +using IMessage = Messaging.Abstraction.IMessage; + +namespace Messaging.Tests.Subscriber; + +public class Message1 : IMessage +{ + public string? Message { get; init; } + public Guid Id { get; set; } = Guid.NewGuid(); + public long Timestamp { get; set; } + public long ConsumedAtTimestamp { get; set; } +} + +public class Message2 : IMessage +{ + public string? Message { get; init; } + public Guid Id { get; set; } = Guid.NewGuid(); + public long Timestamp { get; set; } + + public long ConsumedAtTimestamp { get; set; } +} + +[Collection(nameof(RabbitMqFixtureCollectionTest))] +public class RabbitMqmultiMessageConsumerTests +{ + private readonly RabbitMqFixture _rabbitMqFixture; + + public RabbitMqmultiMessageConsumerTests(RabbitMqFixture rabbitMqFixture) + { + _rabbitMqFixture = rabbitMqFixture; + } + + [Theory] + [InlineAutoData()] + public async Task TestMessageSubscription(RabbitMqPublisherConfig config, Msg msg, string queueName) + { + // Arrange + using var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromSeconds(30)); + var publisher = new RabbitMqMessagePublisher(_rabbitMqFixture.Bus.Advanced, config); + var exchange = await _rabbitMqFixture.Bus.Advanced.DeclareExchangeAsync(config, cancellationToken: cts.Token); + await using var consumer = await RabbitMqTestConsumer.CreateAsync(_rabbitMqFixture.Bus.Advanced, + new RabbitMqSubscriptionConfiguration() + { + Exchange = exchange.Name, Topic = config.Topic, Queue = queueName + }, 1, cts.Token); + // Act + + await publisher.Publish(msg, cancellationToken: cts.Token); + + + var subject = await consumer.ConsumeOne(); + + subject.ShouldNotBeNull(); + subject.Message.ShouldNotBeNullOrEmpty(); + subject.Message.ShouldBe(msg.Message); + } + + [Theory] + [InlineAutoData()] + public async Task TestMessagesSubscription(MultiMessageRabbitMqSubscriptionConfiguration config, Message1 message1, Message2 message2, string message1Topic, string message2Topic) + { + // Arrange + + using var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromSeconds(30)); + var message1Publisher = new RabbitMqMessagePublisher(_rabbitMqFixture.Bus.Advanced, new RabbitMqPublisherConfig(){Exchange = config.Exchange, Topic = message1Topic }); + var message2Publisher = new RabbitMqMessagePublisher(_rabbitMqFixture.Bus.Advanced, new RabbitMqPublisherConfig(){ Exchange = config.Exchange, Topic = message2Topic }); + var exchange = await _rabbitMqFixture.Bus.Advanced.DeclareExchangeAsync(config, cancellationToken: cts.Token); + var configuration = new MultiMessageRabbitMqSubscriptionConfiguration() + { + Exchange = config.Exchange, Queue = config.Queue, + }; + configuration.AddSubscription>(new MultiMessageRabbitMqSubscription(){RouteKey = message1Topic}); + configuration.AddSubscription>(new MultiMessageRabbitMqSubscription(){RouteKey = message2Topic}); + var consumers = await RabbitMqTestConsumer.CreateMultiMessageConsumerAsync(_rabbitMqFixture.Bus.Advanced, + configuration, 1, 1, cts.Token); + await using var consumer1 = consumers.Item1; + await using var consumer2 = consumers.Item2; + // Act + + await message1Publisher.Publish(message1, cancellationToken: cts.Token); + await message2Publisher.Publish(message2, cancellationToken: cts.Token); + + var subject1T = consumer1.ConsumeOne(); + var subject2T = consumer2.ConsumeOne(); + + await Task.WhenAll(subject1T, subject2T); + + var subject1 = await subject1T; + var subject2 = await subject2T; + + subject1.ShouldNotBeNull(); + subject1.Message.ShouldNotBeNullOrEmpty(); + subject1.Message.ShouldBe(message1.Message); + + subject2.ShouldNotBeNull(); + subject2.Message.ShouldNotBeNullOrEmpty(); + subject2.Message.ShouldBe(message2.Message); + } +} \ No newline at end of file