From 73490e17312fa3bdc79f43a299f511632a9be3d8 Mon Sep 17 00:00:00 2001 From: dmitrynovik Date: Mon, 30 Dec 2024 15:48:31 +1100 Subject: [PATCH 1/5] feat: consistent hash exchange --- .../Config/RabbitMqOptionsBuilder.cs | 39 +++++++- Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs | 90 +++++++++++++++---- 2 files changed, 110 insertions(+), 19 deletions(-) diff --git a/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs b/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs index 7493083..ec1c678 100644 --- a/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs +++ b/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs @@ -72,6 +72,37 @@ public RabbitMqOptionsBuilder ExchangeNames( return this; } + public bool IsConsistentHashExchangeUsed() => !string.IsNullOrWhiteSpace(ConsistentHashExchangeName); + + public void UseConsistentHashExchange(int numberOfQueues, string exchangeName = "RebusConsistentHash") + { + if (numberOfQueues < 2) + { + throw new ArgumentException("Number of queues for consistent hash exchange should be at least 2", nameof(numberOfQueues)); + } + + if (string.IsNullOrWhiteSpace(exchangeName)) + { + ConsistentHashExchangeName = null; + NumberOfConsistentHashQueues = 0; + return; + } + + NumberOfConsistentHashQueues = numberOfQueues; + + if (exchangeName == DirectExchangeName) + { + throw new ArgumentException($"Exchange names for DIRECT and X-CONSISTENT-HASH are both set to '{DirectExchangeName}' - they must be different!"); + } + + if (exchangeName == TopicExchangeName) + { + throw new ArgumentException($"Exchange names for TOPIC and X-CONSISTENT-HASH are both set to '{TopicExchangeName}' - they must be different!"); + } + + ConsistentHashExchangeName = exchangeName; + } + /// /// Adds the given custom properties to be added to the RabbitMQ client connection when it is established /// @@ -157,7 +188,7 @@ public RabbitMqOptionsBuilder Ssl(SslSettings sslSettings) internal string DirectExchangeName { get; private set; } internal string TopicExchangeName { get; private set; } - + public string ConsistentHashExchangeName { get; private set; } internal int? MaxNumberOfMessagesToPrefetch { get; private set; } internal SslSettings SslSettings { get; private set; } @@ -165,6 +196,7 @@ public RabbitMqOptionsBuilder Ssl(SslSettings sslSettings) internal RabbitMqCallbackOptionsBuilder CallbackOptionsBuilder { get; } = new RabbitMqCallbackOptionsBuilder(); internal RabbitMqQueueOptionsBuilder QueueOptions { get; } = new RabbitMqQueueOptionsBuilder(); + public int NumberOfConsistentHashQueues { get; private set; } = 0; internal void Configure(RabbitMqTransport transport) { @@ -185,6 +217,11 @@ internal void Configure(RabbitMqTransport transport) transport.SetDeclareInputQueue(DeclareInputQueue.Value); } + if (NumberOfConsistentHashQueues > 1) + { + transport.SetNumberOfConsistentHashQueues(NumberOfConsistentHashQueues); + } + if (BindInputQueue.HasValue) { transport.SetBindInputQueue(BindInputQueue.Value); diff --git a/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs b/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs index 869e289..358d186 100644 --- a/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs +++ b/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs @@ -52,6 +52,8 @@ public class RabbitMqTransport : ITransport, IDisposable, IInitializable, ISubsc RabbitMqCallbackOptionsBuilder _callbackOptions = new RabbitMqCallbackOptionsBuilder(); RabbitMqQueueOptionsBuilder _inputQueueOptions = new RabbitMqQueueOptionsBuilder(); + private int _numberOfConsistentHashQueues = 0; + private string _consistentHashExchangeName; /// /// Constructs the RabbitMQ transport with multiple connection endpoints. They will be tryed in random order until working one is found @@ -123,6 +125,8 @@ public void SetDeclareInputQueue(bool value) _declareInputQueue = value; } + internal void SetNumberOfConsistentHashQueues(int value) => _numberOfConsistentHashQueues = value; + /// /// Sets whether a binding for the input queue should be declared /// @@ -147,6 +151,14 @@ public void SetTopicExchangeName(string topicExchangeName) _topicExchangeName = topicExchangeName; } + /// + /// Sets the name of the exchange used to send parallel messages + /// + public void SetConsistentHashExchangeName(string consistentHashExchangeName) + { + _consistentHashExchangeName = consistentHashExchangeName; + } + /// /// Configures how many messages to prefetch /// @@ -175,6 +187,8 @@ public void SetInputQueueOptions(RabbitMqQueueOptionsBuilder inputQueueOptions) _inputQueueOptions = inputQueueOptions; } + public bool IsConsistentHashExchangeUsed() => _numberOfConsistentHashQueues > 1; + /// /// Initializes the transport by creating the input queue /// @@ -182,7 +196,34 @@ public void Initialize() { if (Address == null) { return; } - CreateQueue(Address); + if (!IsConsistentHashExchangeUsed()) + { + CreateQueue(Address); + } + else + { + CreateConsistentHashExchangeAndMultipleQueues(); + } + } + + private void CreateConsistentHashExchangeAndMultipleQueues() + { + var connection = _connectionManager.GetConnection(); + try + { + using (var model = connection.CreateModel()) + { + CreateExchanges(model); + for (int i = 0; i < _numberOfConsistentHashQueues; ++i) + { + CreateQueue($"{Address}_{i}"); + } + } + } + catch (Exception exception) + { + throw new RebusApplicationException(exception, $"RabbitMQ Transport initialization using consistent hash exchange failed"); + } } /// @@ -196,23 +237,8 @@ public void CreateQueue(string address) { using (var model = connection.CreateModel()) { - const bool durable = true; - - if (_declareExchanges) - { - model.ExchangeDeclare(_directExchangeName, ExchangeType.Direct, durable); - model.ExchangeDeclare(_topicExchangeName, ExchangeType.Topic, durable); - } - - if (_declareInputQueue) - { - DeclareQueue(address, model); - } - - if (_bindInputQueue) - { - BindInputQueue(address, model); - } + CreateExchanges(model); + CreateQueue(address, model); } } catch (Exception exception) @@ -221,6 +247,33 @@ public void CreateQueue(string address) } } + private void CreateExchanges(IModel model) + { + if (_declareExchanges) + { + model.ExchangeDeclare(_directExchangeName, ExchangeType.Direct, true); + model.ExchangeDeclare(_topicExchangeName, ExchangeType.Topic, true); + } + + if (IsConsistentHashExchangeUsed()) + { + model.ExchangeDeclare(_consistentHashExchangeName, "x-consistent-hash", true); + } + } + + private void CreateQueue(string address, IModel model) + { + if (_declareInputQueue) + { + DeclareQueue(address, model); + } + + if (_bindInputQueue) + { + BindInputQueue(address, model); + } + } + void BindInputQueue(string address, IModel model) { model.QueueBind(address, _directExchangeName, address); @@ -231,6 +284,7 @@ void DeclareQueue(string address, IModel model) if (Address != null && Address.Equals(address)) { // This is the input queue => we use the queue setting to create the queue + model.QueueDeclare(address, exclusive: _inputQueueOptions.Exclusive, durable: _inputQueueOptions.Durable, From 542f4d12f6215335c3d46ef922a9e2efe9572c6a Mon Sep 17 00:00:00 2001 From: dmitrynovik Date: Mon, 30 Dec 2024 17:16:32 +1100 Subject: [PATCH 2/5] doc: consistent hash exchange --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index e66d3cc..b4265d8 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,11 @@ Build and run your RabbitMQ with management console, with the following shell co docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 8080:15672 rabbitmq:3-management ``` +In the docker desktop, exec the following shell command: +``` +rabbitmq-plugins enable rabbitmq_consistent_hash_exchange +``` + From your web brower, navigate to the RabbitMQ management console, ``` @@ -31,4 +36,9 @@ In Visual Studio 2017, run All Tests form the menus: Tests -> Run -> All Tests. ![](https://github.com/jonmat/Rebus.RabbitMq/blob/master/rabbit-mgmt-console.png) +## Supported out of the box RabbitMQ exchange types +* Direct +* Topic +* [Consistent Hash](https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_consistent_hash_exchange/README.md): requires enabling the corresponding plugin by running the command `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange` + From e1a5300eca63668b30403a0aa603900669e32ddf Mon Sep 17 00:00:00 2001 From: dmitrynovik Date: Mon, 30 Dec 2024 18:49:54 +1100 Subject: [PATCH 3/5] fix: applying SetConsistentHashExchangeName --- Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs b/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs index ec1c678..a4abea3 100644 --- a/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs +++ b/Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs @@ -247,6 +247,12 @@ internal void Configure(RabbitMqTransport transport) transport.SetCallbackOptions(CallbackOptionsBuilder); } + if (ConsistentHashExchangeName != null) + { + transport.SetConsistentHashExchangeName(ConsistentHashExchangeName); + transport.SetNumberOfConsistentHashQueues(NumberOfConsistentHashQueues); + } + transport.SetInputQueueOptions(QueueOptions); } } From e71d646efcfbc41e295b8ba39376c75fa160cb58 Mon Sep 17 00:00:00 2001 From: dmitrynovik Date: Mon, 30 Dec 2024 19:56:16 +1100 Subject: [PATCH 4/5] fix: consistent hash exchange binding --- Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs b/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs index 358d186..80f77a1 100644 --- a/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs +++ b/Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs @@ -214,9 +214,18 @@ private void CreateConsistentHashExchangeAndMultipleQueues() using (var model = connection.CreateModel()) { CreateExchanges(model); - for (int i = 0; i < _numberOfConsistentHashQueues; ++i) + for (int i = 1; i <= _numberOfConsistentHashQueues; ++i) { - CreateQueue($"{Address}_{i}"); + var address = $"{Address}_{i}"; + if (_declareInputQueue) + { + DeclareQueue(address, model); + } + + if (_bindInputQueue) + { + model.QueueBind(address, _consistentHashExchangeName, i.ToString()); + } } } } From 2aa910300bfd58e3440307cc2da26f05ba6d095b Mon Sep 17 00:00:00 2001 From: dmitrynovik Date: Mon, 30 Dec 2024 20:19:42 +1100 Subject: [PATCH 5/5] test: unit test for consistent hash exchange --- .../RabbitMqConsistentHashExchangeTest.cs | 52 +++++++++++++++++++ .../RabbitMqTransportFactory.cs | 19 +++++++ 2 files changed, 71 insertions(+) create mode 100644 Rebus.RabbitMq.Tests/RabbitMqConsistentHashExchangeTest.cs diff --git a/Rebus.RabbitMq.Tests/RabbitMqConsistentHashExchangeTest.cs b/Rebus.RabbitMq.Tests/RabbitMqConsistentHashExchangeTest.cs new file mode 100644 index 0000000..6a53663 --- /dev/null +++ b/Rebus.RabbitMq.Tests/RabbitMqConsistentHashExchangeTest.cs @@ -0,0 +1,52 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; +using Rebus.Activation; +using Rebus.Config; +using Rebus.Tests.Contracts; +using Rebus.Tests.Contracts.Extensions; +// ReSharper disable ArgumentsStyleNamedExpression + +#pragma warning disable 1998 + +namespace Rebus.RabbitMq.Tests +{ + [TestFixture] + public class RabbitMqConsistentHashExchangeTest : FixtureBase + { + [Test] + public async Task WorksWithConsistentHashExchange() + { + const string connectionString = RabbitMqTransportFactory.ConnectionString; + + const string exchangeName = "RebusTestConsistentHashExchange"; + const string queueNamePattern = "consistent-hash-xchange-bound-queue"; + + using (var activator = new BuiltinHandlerActivator()) + { + Configure.With(activator) + .Transport(t => + { + t + .UseRabbitMq(connectionString, queueNamePattern) + // Create 2 Queues & bind to the same consistent hash exchange: + .UseConsistentHashExchange(2, exchangeName); + }) + .Start(); + + // Send N messages to spread semi-evenly among queues + for (int i = 1; i <= 10; i++) + { + await activator.Bus.Advanced.Routing.Send($"{i}@{exchangeName}", $"test_{i}", new Dictionary()); + } + } + + // Assert the exchange and all queues were created: + Assert.That(RabbitMqTransportFactory.ExchangeExists(exchangeName), Is.True); + Assert.That(RabbitMqTransportFactory.QueueExists(queueNamePattern + "_1"), Is.True); + Assert.That(RabbitMqTransportFactory.QueueExists(queueNamePattern + "_2"), Is.True); + } + } +} \ No newline at end of file diff --git a/Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs b/Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs index 33fdb0b..eb8bd9c 100644 --- a/Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs +++ b/Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs @@ -109,5 +109,24 @@ public static bool ExchangeExists(string exchangeName) } } } + + public static bool QueueExists(string name) + { + var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) }; + + using (var connection = connectionFactory.CreateConnection()) + using (var model = connection.CreateModel()) + { + try + { + model.QueueDeclarePassive(name); + return true; + } + catch + { + return false; + } + } + } } } \ No newline at end of file