From a5ae83a0c19d8f8cb5912ed3c24ced57cb717a62 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Fri, 16 Aug 2024 07:34:53 -0500 Subject: [PATCH] Improvements to the Rabbit MQ tests, upgraded to latest Rabbit MQ client --- .../ConventionalRoutingContext.cs | 18 ++++++++- ...sage.cs => ConventionallyRoutedMessage.cs} | 2 +- .../ConventionalRouting/HeadersMessage.cs | 5 ++- .../RoutedMessageHandler.cs | 2 +- .../conventional_listener_discovery.cs | 11 +++-- .../end_to_end_with_conventional_routing.cs | 4 +- ...th_conventional_routing_custom_exchange.cs | 40 ++++++++++--------- ...d_with_conventional_routing_with_prefix.cs | 4 +- ..._discovering_a_sender_with_all_defaults.cs | 7 ++-- .../Wolverine.RabbitMQ.Tests/SpecialMapper.cs | 2 +- .../Internal/RabbitMqEndpoint.NServiceBus.cs | 2 +- .../Internal/RabbitMqEnvelopeMapper.cs | 10 ++--- .../Internal/RabbitMqTransport.cs | 1 - .../Internal/WorkerQueueMessageConsumer.cs | 10 ++--- .../Wolverine.RabbitMQ.csproj | 2 +- 15 files changed, 72 insertions(+), 48 deletions(-) rename src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/{RoutedMessage.cs => ConventionallyRoutedMessage.cs} (73%) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/ConventionalRoutingContext.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/ConventionalRoutingContext.cs index 3d3337263..d31f272a8 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/ConventionalRoutingContext.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/ConventionalRoutingContext.cs @@ -10,7 +10,7 @@ namespace Wolverine.RabbitMQ.Tests.ConventionalRouting; public static class ConventionalRoutingTestDefaults { - public static bool RoutingMessageOnly(Type type) => type == typeof(RoutedMessage); + public static bool RoutingMessageOnly(Type type) => type == typeof(ConventionallyRoutedMessage); } @@ -18,6 +18,8 @@ public abstract class ConventionalRoutingContext : IDisposable { private IHost _host; + internal bool DisableListenerDiscovery { get; set; } + internal IWolverineRuntime theRuntime { get @@ -25,7 +27,14 @@ internal IWolverineRuntime theRuntime if (_host == null) { _host = WolverineHost.For(opts => - opts.UseRabbitMq().UseConventionalRouting().AutoProvision().AutoPurgeOnStartup()); + { + opts.UseRabbitMq().UseConventionalRouting().AutoProvision().AutoPurgeOnStartup(); + + if (DisableListenerDiscovery) + { + opts.Discovery.DisableConventionalDiscovery(); + } + }); } return _host.Services.GetRequiredService(); @@ -56,6 +65,11 @@ internal void ConfigureConventions(Action conf { _host = WolverineHost.For(opts => { + if (DisableListenerDiscovery) + { + opts.Discovery.DisableConventionalDiscovery(); + } + opts.UseRabbitMq().UseConventionalRouting(configure).AutoProvision().AutoPurgeOnStartup(); }); } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/RoutedMessage.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/ConventionallyRoutedMessage.cs similarity index 73% rename from src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/RoutedMessage.cs rename to src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/ConventionallyRoutedMessage.cs index 59c49f631..2a115b446 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/RoutedMessage.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/ConventionallyRoutedMessage.cs @@ -3,4 +3,4 @@ namespace Wolverine.RabbitMQ.Tests.ConventionalRouting; [MessageIdentity("routed")] -public class RoutedMessage; \ No newline at end of file +public class ConventionallyRoutedMessage; \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/HeadersMessage.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/HeadersMessage.cs index 45cf90336..fa777bc0a 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/HeadersMessage.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/HeadersMessage.cs @@ -1,3 +1,6 @@ -namespace Wolverine.RabbitMQ.Tests.ConventionalRouting; +using Wolverine.Attributes; +namespace Wolverine.RabbitMQ.Tests.ConventionalRouting; + +[WolverineIgnore] public record HeadersMessage; \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/RoutedMessageHandler.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/RoutedMessageHandler.cs index f52045e26..37bee8438 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/RoutedMessageHandler.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/RoutedMessageHandler.cs @@ -2,7 +2,7 @@ namespace Wolverine.RabbitMQ.Tests.ConventionalRouting; public class RoutedMessageHandler { - public void Handle(RoutedMessage message) + public void Handle(ConventionallyRoutedMessage message) { } } \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/conventional_listener_discovery.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/conventional_listener_discovery.cs index c6361fa9f..aedde147b 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/conventional_listener_discovery.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/conventional_listener_discovery.cs @@ -35,7 +35,10 @@ public void disable_sender_with_lambda() [Fact] public void exclude_types() { - ConfigureConventions(c => { c.ExcludeTypes(t => t == typeof(PublishedMessage)); }); + ConfigureConventions(c => + { + c.ExcludeTypes(t => t == typeof(PublishedMessage) || t == typeof(HeadersMessage)); + }); AssertNoRoutes(); @@ -85,10 +88,10 @@ public void disable_listener_by_lambda() { ConfigureConventions(c => { - c.IncludeTypes(t => t == typeof(RoutedMessage)); + c.IncludeTypes(t => t == typeof(ConventionallyRoutedMessage)); c.QueueNameForListener(t => { - if (t == typeof(RoutedMessage)) + if (t == typeof(ConventionallyRoutedMessage)) { return null; // should not be routed } @@ -110,7 +113,7 @@ public void configure_listener() { ConfigureConventions(c => { - c.IncludeTypes(t => t == typeof(RoutedMessage)); + c.IncludeTypes(t => t == typeof(ConventionallyRoutedMessage)); c.ConfigureListeners((x, _) => { x.ListenerCount(6); }); }); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs index ec38ed0d9..f15b3c95e 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing.cs @@ -39,11 +39,11 @@ public async Task send_from_one_node_to_another_all_with_conventional_routing() var session = await _sender.TrackActivity() .AlsoTrack(_receiver) .IncludeExternalTransports() - .SendMessageAndWaitAsync(new RoutedMessage()); + .SendMessageAndWaitAsync(new ConventionallyRoutedMessage()); var received = session .AllRecordsInOrder() - .Where(x => x.Envelope.Message?.GetType() == typeof(RoutedMessage)) + .Where(x => x.Envelope.Message?.GetType() == typeof(ConventionallyRoutedMessage)) .Single(x => x.MessageEventType == MessageEventType.Received); received diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing_custom_exchange.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing_custom_exchange.cs index 9604e562d..c9ac5bd55 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing_custom_exchange.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing_custom_exchange.cs @@ -14,43 +14,47 @@ public class end_to_end_with_conventional_routing_custom_exchange : IDisposable public end_to_end_with_conventional_routing_custom_exchange() { - _sender = WolverineHost.For(opts => + _receiver = WolverineHost.For(opts => { opts.UseRabbitMq().UseConventionalRouting(conventions => { + conventions.ExchangeNameForSending(type => type.Name + "_headers"); conventions.IncludeTypes(x => x == typeof(HeadersMessage)); - conventions.ConfigureSending((x, c) => + conventions.ConfigureListeners((x, c) => { if (c.MessageType == typeof(HeadersMessage)) { - x.ExchangeType(ExchangeType.Headers); + x.BindToExchange(ExchangeType.Headers, arguments: new Dictionary() + { + {"tenant-id", "tenant-id"} + }); } }); - }) + }) .AutoProvision().AutoPurgeOnStartup(); - opts.DisableConventionalDiscovery(); - opts.ServiceName = "Sender"; + opts.ServiceName = "Receiver"; }); - - _receiver = WolverineHost.For(opts => + + _sender = WolverineHost.For(opts => { opts.UseRabbitMq().UseConventionalRouting(conventions => - { - conventions.IncludeTypes(x => x == typeof(HeadersMessage)); - conventions.ConfigureListeners((x, c) => { - if (c.MessageType == typeof(HeadersMessage)) + conventions.ExchangeNameForSending(type => type.Name + "_headers"); + conventions.IncludeTypes(x => x == typeof(HeadersMessage)); + conventions.ConfigureSending((x, c) => { - x.BindToExchange(ExchangeType.Headers, arguments: new Dictionary() + if (c.MessageType == typeof(HeadersMessage)) { - {"tenant-id", "tenant-id"} - }); - } - }); + x.ExchangeType(ExchangeType.Headers); + } + }); }) .AutoProvision().AutoPurgeOnStartup(); - opts.ServiceName = "Receiver"; + opts.DisableConventionalDiscovery(); + opts.ServiceName = "Sender"; }); + + } public void Dispose() diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing_with_prefix.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing_with_prefix.cs index 34cc70891..1c26d2ab7 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing_with_prefix.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/end_to_end_with_conventional_routing_with_prefix.cs @@ -51,11 +51,11 @@ public async Task send_from_one_node_to_another_all_with_conventional_routing() .AlsoTrack(_receiver) .IncludeExternalTransports() .Timeout(30.Seconds()) - .SendMessageAndWaitAsync(new RoutedMessage()); + .SendMessageAndWaitAsync(new ConventionallyRoutedMessage()); var received = session .AllRecordsInOrder() - .Where(x => x.Envelope.Message?.GetType() == typeof(RoutedMessage)) + .Where(x => x.Envelope.Message?.GetType() == typeof(ConventionallyRoutedMessage)) .Single(x => x.MessageEventType == MessageEventType.Received); received diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/when_discovering_a_sender_with_all_defaults.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/when_discovering_a_sender_with_all_defaults.cs index bd3fcee7d..9a62f3e4e 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/when_discovering_a_sender_with_all_defaults.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/ConventionalRouting/when_discovering_a_sender_with_all_defaults.cs @@ -14,8 +14,9 @@ public class when_discovering_a_sender_with_all_defaults : ConventionalRoutingCo private readonly MessageRoute theRoute; public when_discovering_a_sender_with_all_defaults() { + DisableListenerDiscovery = true; ConfigureConventions(x=> x.IncludeTypes(ConventionalRoutingTestDefaults.RoutingMessageOnly)); - theRoute = PublishingRoutesFor().Single() as MessageRoute; + theRoute = PublishingRoutesFor().Single() as MessageRoute; } [Fact] @@ -28,7 +29,7 @@ public void should_have_exactly_one_route() public void routed_to_rabbit_mq_exchange() { var endpoint = theRoute.Sender.Endpoint.ShouldBeOfType(); - endpoint.ExchangeName.ShouldBe(typeof(PublishedMessage).ToMessageTypeName()); + endpoint.ExchangeName.ShouldBe(typeof(ConventionallyRoutedMessage).ToMessageTypeName()); } [Fact] @@ -42,7 +43,7 @@ public void endpoint_mode_is_inline_by_default() public async Task has_declared_exchange() { // The rabbit object construction is lazy, so force it to happen - await new MessageBus(theRuntime).SendAsync(new PublishedMessage()); + await new MessageBus(theRuntime).SendAsync(new ConventionallyRoutedMessage()); var endpoint = theRoute.Sender.Endpoint.ShouldBeOfType(); theTransport.Exchanges.Contains(endpoint.ExchangeName).ShouldBeTrue(); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/SpecialMapper.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/SpecialMapper.cs index 56caf378f..1601a7ce8 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/SpecialMapper.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/SpecialMapper.cs @@ -29,7 +29,7 @@ public void MapEnvelopeToOutgoing(Envelope envelope, IBasicProperties outgoing) } } - public void MapIncomingToEnvelope(Envelope envelope, ReadOnlyBasicProperties incoming) + public void MapIncomingToEnvelope(Envelope envelope, IReadOnlyBasicProperties incoming) { envelope.CorrelationId = incoming.CorrelationId; envelope.ContentType = "application/json"; diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs index a59db1c2d..20e1a9b70 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEndpoint.NServiceBus.cs @@ -27,7 +27,7 @@ void WriteReplyToAddress(Envelope e, IBasicProperties props) props.Headers["NServiceBus.ReplyToAddress"] = replyAddress.Value; } - void ReadReplyUri(Envelope e, ReadOnlyBasicProperties props) + void ReadReplyUri(Envelope e, IReadOnlyBasicProperties props) { if (props.Headers.TryGetValue("NServiceBus.ReplyToAddress", out var raw)) { diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEnvelopeMapper.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEnvelopeMapper.cs index 8bf69d0ff..41b4d697e 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEnvelopeMapper.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqEnvelopeMapper.cs @@ -12,9 +12,9 @@ namespace Wolverine.RabbitMQ.Internal; /// Rabbit MQ IBasicProperties object. Custom implementations of this can be used /// to create interoperability with non-Wolverine applications through Rabbit MQ /// -public interface IRabbitMqEnvelopeMapper : IEnvelopeMapper; +public interface IRabbitMqEnvelopeMapper : IEnvelopeMapper; -internal class RabbitMqEnvelopeMapper : EnvelopeMapper, IRabbitMqEnvelopeMapper +internal class RabbitMqEnvelopeMapper : EnvelopeMapper, IRabbitMqEnvelopeMapper { public RabbitMqEnvelopeMapper(Endpoint endpoint, IWolverineRuntime runtime) : base(endpoint) { @@ -23,7 +23,7 @@ public RabbitMqEnvelopeMapper(Endpoint endpoint, IWolverineRuntime runtime) : ba MapProperty(x => x.ContentType!, (e, p) => e.ContentType = p.ContentType, (e, p) => p.ContentType = e.ContentType); - Action readId = (e, props) => + Action readId = (e, props) => { if (Guid.TryParse(props.MessageId, out var id)) { @@ -58,7 +58,7 @@ protected override void writeOutgoingHeader(IBasicProperties outgoing, string ke } // TODO -- this needs to be open for customizations. See the NServiceBus interop - protected override bool tryReadIncomingHeader(ReadOnlyBasicProperties incoming, string key, out string? value) + protected override bool tryReadIncomingHeader(IReadOnlyBasicProperties incoming, string key, out string? value) { if (incoming.Headers == null) { @@ -76,7 +76,7 @@ protected override bool tryReadIncomingHeader(ReadOnlyBasicProperties incoming, return false; } - protected override void writeIncomingHeaders(ReadOnlyBasicProperties incoming, Envelope envelope) + protected override void writeIncomingHeaders(IReadOnlyBasicProperties incoming, Envelope envelope) { if (incoming.Headers == null) return; foreach (var pair in incoming.Headers) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs index 7d1b2fa51..7f1774972 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqTransport.cs @@ -48,7 +48,6 @@ public RabbitMqTransport() : this(ProtocolName) private void configureDefaults(ConnectionFactory factory) { factory.AutomaticRecoveryEnabled = true; - factory.DispatchConsumersAsync = true; factory.ClientProvidedName ??= "Wolverine"; } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs index 30f816975..521cd711c 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs @@ -10,13 +10,13 @@ internal class WorkerQueueMessageConsumer : AsyncDefaultBasicConsumer, IDisposab private readonly CancellationToken _cancellation; private readonly RabbitMqListener _listener; private readonly ILogger _logger; - private readonly IEnvelopeMapper _mapper; + private readonly IRabbitMqEnvelopeMapper _mapper; private readonly IReceiver _workerQueue; private bool _latched; public WorkerQueueMessageConsumer(IChannel channel, IReceiver workerQueue, ILogger logger, RabbitMqListener listener, - IEnvelopeMapper mapper, Uri address, CancellationToken cancellation) : base(channel) + IRabbitMqEnvelopeMapper mapper, Uri address, CancellationToken cancellation) : base(channel) { _workerQueue = workerQueue; _logger = logger; @@ -31,14 +31,14 @@ public void Dispose() _latched = true; } - public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - ReadOnlyBasicProperties properties, ReadOnlyMemory body) + public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, + string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) { return HandleBasicDeliverImpl(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); } public async Task HandleBasicDeliverImpl(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - ReadOnlyBasicProperties properties, ReadOnlyMemory body) + IReadOnlyBasicProperties properties, ReadOnlyMemory body) { if (_latched || _cancellation.IsCancellationRequested) { diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Wolverine.RabbitMQ.csproj b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Wolverine.RabbitMQ.csproj index 9454935d5..b8814a9b3 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Wolverine.RabbitMQ.csproj +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Wolverine.RabbitMQ.csproj @@ -12,7 +12,7 @@ - +