Skip to content

Commit

Permalink
rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikus1993 committed Jun 9, 2024
1 parent e2af07d commit 3d194d8
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
}

var subscriber = serviceScope.ServiceProvider.GetRequiredService(type.Handler);
var handle = _methodInfos.GetOrAdd(subscriber.GetType(),
var handleMethodInfo = _methodInfos.GetOrAdd(subscriber.GetType(),
(subscriberType) =>
subscriberType.GetMethod(nameof(IMessageSubscriber<IMessage>.Handle))!);

var result = await (Task<Result<Unit>>)handle.Invoke(subscriber, [msg, ct])!;
var result = await (Task<Result<Unit>>)handleMethodInfo.Invoke(subscriber, [msg, ct])!;
if (!result.IsSuccess)
{
logger.LogCantProcessMessage(result.ErrorValue, info.Exchange, info.RoutingKey, info.Queue, properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using EasyNetQ.Topology;

using Messaging.RabbitMq.Configuration;
using Messaging.RabbitMq.Consumer;

namespace Messaging.RabbitMq.Extensions;

Expand All @@ -15,4 +16,13 @@ public static Task<Exchange> DeclareExchangeAsync<T>(this IAdvancedBus bus, Rabb
configuration.WithType(ExchangeType.Topic);
}, cancellationToken);
}

public static Task<Exchange> DeclareExchangeAsync(this IAdvancedBus bus, MultiMessageRabbitMqSubscriptionConfiguration cfg, CancellationToken cancellationToken = default)
{
return bus.ExchangeDeclareAsync(cfg.Exchange, configuration =>
{
configuration.AsDurable(true);
configuration.WithType(ExchangeType.Topic);
}, cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

namespace Messaging.Tests.Extensions;

file sealed class TestSubscriber<T> : IMessageSubscriber<T> where T : IMessage

internal sealed class TestSubscriber<T> : IMessageSubscriber<T> where T : IMessage
{
private readonly ChannelWriter<T> _writer;
private readonly int _take;
Expand Down Expand Up @@ -58,6 +59,24 @@ public static async Task<RabbitMqTestConsumer<T>> CreateAsync<T>(IAdvancedBus bu
await consumer.StartAsync(cancellationToken);
return new RabbitMqTestConsumer<T>(consumer, channel, cancellationToken);
}

public static async Task<(RabbitMqMultiMessageTestConsumer<T1>, RabbitMqMultiMessageTestConsumer<T2>)> CreateMultiMessageConsumerAsync<T1, T2>(IAdvancedBus bus, MultiMessageRabbitMqSubscriptionConfiguration configuration, int message1SubscriberTake, int message2SubscriberTake, CancellationToken cancellationToken = default)
where T1 : IMessage
where T2 : IMessage
{
var channel = Channel.CreateBounded<T1>(new BoundedChannelOptions(message1SubscriberTake) { FullMode = BoundedChannelFullMode.Wait});
var testSubscriber = new TestSubscriber<T1>(channel, message1SubscriberTake);
var channel2 = Channel.CreateBounded<T2>(new BoundedChannelOptions(message2SubscriberTake) { FullMode = BoundedChannelFullMode.Wait});
var testSubscriber2 = new TestSubscriber<T2>(channel2, message2SubscriberTake);
var services = new ServiceCollection();
services.AddSingleton<TestSubscriber<T1>>(testSubscriber);
services.AddSingleton<TestSubscriber<T2>>(testSubscriber2);
services.AddSingleton<ISerializer>(new SystemTextJsonSerializer());
services.AddSingleton(NullLoggerFactory.Instance.CreateLogger<MultiMessageRabbitMqMessageConsumer>());
var consumer = new MultiMessageRabbitMqMessageConsumer(bus, services.BuildServiceProvider(), configuration);
await consumer.StartAsync(cancellationToken);
return (new RabbitMqMultiMessageTestConsumer<T1>(consumer, channel, cancellationToken), new RabbitMqMultiMessageTestConsumer<T2>(consumer, channel2, cancellationToken));
}
}

public sealed class RabbitMqTestConsumer<T>: IAsyncDisposable where T : IMessage
Expand Down Expand Up @@ -90,6 +109,45 @@ public async IAsyncEnumerable<T> Consume()
}


public async ValueTask DisposeAsync()
{
await _consumer.StopAsync(_cancellationTokenSource.Token);
_cancellationTokenSource.Dispose();
_channel.Writer.TryComplete();
_consumer.Dispose();
}
}

public sealed class RabbitMqMultiMessageTestConsumer<T>: IAsyncDisposable where T : IMessage
{

private readonly MultiMessageRabbitMqMessageConsumer _consumer;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Channel<T> _channel;

internal RabbitMqMultiMessageTestConsumer(MultiMessageRabbitMqMessageConsumer consumer, Channel<T> channel, CancellationToken cancellationToken)
{
_channel = channel;
_consumer = consumer;
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

}

public async IAsyncEnumerable<T> Consume()
{
await foreach (var message in _channel.Reader.ReadAllAsync(_cancellationTokenSource.Token))
{
yield return message;
}
}

public async Task<T?> ConsumeOne()
{
await _channel.Reader.WaitToReadAsync(_cancellationTokenSource.Token);
return await _channel.Reader.ReadAsync(_cancellationTokenSource.Token);
}


public async ValueTask DisposeAsync()
{
await _consumer.StopAsync(_cancellationTokenSource.Token);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using AutoFixture.Xunit2;

using EasyNetQ;
using EasyNetQ.Serialization.SystemTextJson;
using EasyNetQ.Topology;

using Messaging.RabbitMq.Configuration;
using Messaging.RabbitMq.Consumer;
using Messaging.RabbitMq.Extensions;
using Messaging.RabbitMq.Publisher;
using Messaging.Tests.Extensions;
using Messaging.Tests.Fixtures;

using Shouldly;

using Xunit;

using IMessage = Messaging.Abstraction.IMessage;

namespace Messaging.Tests.Subscriber;

public class Message1 : IMessage
{
public string? Message { get; init; }
public Guid Id { get; set; } = Guid.NewGuid();
public long Timestamp { get; set; }
public long ConsumedAtTimestamp { get; set; }
}

public class Message2 : IMessage
{
public string? Message { get; init; }
public Guid Id { get; set; } = Guid.NewGuid();
public long Timestamp { get; set; }

public long ConsumedAtTimestamp { get; set; }
}

[Collection(nameof(RabbitMqFixtureCollectionTest))]
public class RabbitMqmultiMessageConsumerTests
{
private readonly RabbitMqFixture _rabbitMqFixture;

public RabbitMqmultiMessageConsumerTests(RabbitMqFixture rabbitMqFixture)
{
_rabbitMqFixture = rabbitMqFixture;
}

[Theory]
[InlineAutoData()]
public async Task TestMessageSubscription(RabbitMqPublisherConfig<Msg> config, Msg msg, string queueName)
{
// Arrange
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(30));
var publisher = new RabbitMqMessagePublisher<Msg>(_rabbitMqFixture.Bus.Advanced, config);
var exchange = await _rabbitMqFixture.Bus.Advanced.DeclareExchangeAsync(config, cancellationToken: cts.Token);
await using var consumer = await RabbitMqTestConsumer.CreateAsync(_rabbitMqFixture.Bus.Advanced,
new RabbitMqSubscriptionConfiguration<Msg>()
{
Exchange = exchange.Name, Topic = config.Topic, Queue = queueName
}, 1, cts.Token);
// Act

await publisher.Publish(msg, cancellationToken: cts.Token);


var subject = await consumer.ConsumeOne();

subject.ShouldNotBeNull();
subject.Message.ShouldNotBeNullOrEmpty();
subject.Message.ShouldBe(msg.Message);
}

[Theory]
[InlineAutoData()]
public async Task TestMessagesSubscription(MultiMessageRabbitMqSubscriptionConfiguration config, Message1 message1, Message2 message2, string message1Topic, string message2Topic)
{
// Arrange

using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(30));
var message1Publisher = new RabbitMqMessagePublisher<Message1>(_rabbitMqFixture.Bus.Advanced, new RabbitMqPublisherConfig<Message1>(){Exchange = config.Exchange, Topic = message1Topic });
var message2Publisher = new RabbitMqMessagePublisher<Message2>(_rabbitMqFixture.Bus.Advanced, new RabbitMqPublisherConfig<Message2>(){ Exchange = config.Exchange, Topic = message2Topic });
var exchange = await _rabbitMqFixture.Bus.Advanced.DeclareExchangeAsync(config, cancellationToken: cts.Token);
var configuration = new MultiMessageRabbitMqSubscriptionConfiguration()
{
Exchange = config.Exchange, Queue = config.Queue,
};
configuration.AddSubscription<Message1, TestSubscriber<Message1>>(new MultiMessageRabbitMqSubscription(){RouteKey = message1Topic});
configuration.AddSubscription<Message2, TestSubscriber<Message2>>(new MultiMessageRabbitMqSubscription(){RouteKey = message2Topic});
var consumers = await RabbitMqTestConsumer.CreateMultiMessageConsumerAsync<Message1, Message2>(_rabbitMqFixture.Bus.Advanced,
configuration, 1, 1, cts.Token);
await using var consumer1 = consumers.Item1;
await using var consumer2 = consumers.Item2;
// Act

await message1Publisher.Publish(message1, cancellationToken: cts.Token);
await message2Publisher.Publish(message2, cancellationToken: cts.Token);

var subject1T = consumer1.ConsumeOne();
var subject2T = consumer2.ConsumeOne();

await Task.WhenAll(subject1T, subject2T);

var subject1 = await subject1T;
var subject2 = await subject2T;

subject1.ShouldNotBeNull();
subject1.Message.ShouldNotBeNullOrEmpty();
subject1.Message.ShouldBe(message1.Message);

subject2.ShouldNotBeNull();
subject2.Message.ShouldNotBeNullOrEmpty();
subject2.Message.ShouldBe(message2.Message);
}
}

0 comments on commit 3d194d8

Please sign in to comment.