Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: adding support for RabbitMQ consistent hash exchange type #127

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ Build and run your RabbitMQ with management console, with the following shell co
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 8080:15672 rabbitmq:3-management
```

In the docker desktop, exec the following shell command:
```
rabbitmq-plugins enable rabbitmq_consistent_hash_exchange
```

From your web brower, navigate to the RabbitMQ management console,

```
Expand All @@ -31,4 +36,9 @@ In Visual Studio 2017, run All Tests form the menus: Tests -> Run -> All Tests.

![](https://github.com/jonmat/Rebus.RabbitMq/blob/master/rabbit-mgmt-console.png)

## Supported out of the box RabbitMQ exchange types
* Direct
* Topic
* [Consistent Hash](https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_consistent_hash_exchange/README.md): requires enabling the corresponding plugin by running the command `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`


52 changes: 52 additions & 0 deletions Rebus.RabbitMq.Tests/RabbitMqConsistentHashExchangeTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using Rebus.Activation;
using Rebus.Config;
using Rebus.Tests.Contracts;
using Rebus.Tests.Contracts.Extensions;
// ReSharper disable ArgumentsStyleNamedExpression

#pragma warning disable 1998

namespace Rebus.RabbitMq.Tests
{
[TestFixture]
public class RabbitMqConsistentHashExchangeTest : FixtureBase
{
[Test]
public async Task WorksWithConsistentHashExchange()
{
const string connectionString = RabbitMqTransportFactory.ConnectionString;

const string exchangeName = "RebusTestConsistentHashExchange";
const string queueNamePattern = "consistent-hash-xchange-bound-queue";

using (var activator = new BuiltinHandlerActivator())
{
Configure.With(activator)
.Transport(t =>
{
t
.UseRabbitMq(connectionString, queueNamePattern)
// Create 2 Queues & bind to the same consistent hash exchange:
.UseConsistentHashExchange(2, exchangeName);
})
.Start();

// Send N messages to spread semi-evenly among queues
for (int i = 1; i <= 10; i++)
{
await activator.Bus.Advanced.Routing.Send($"{i}@{exchangeName}", $"test_{i}", new Dictionary<string, string>());
}
}

// Assert the exchange and all queues were created:
Assert.That(RabbitMqTransportFactory.ExchangeExists(exchangeName), Is.True);
Assert.That(RabbitMqTransportFactory.QueueExists(queueNamePattern + "_1"), Is.True);
Assert.That(RabbitMqTransportFactory.QueueExists(queueNamePattern + "_2"), Is.True);
}
}
}
19 changes: 19 additions & 0 deletions Rebus.RabbitMq.Tests/RabbitMqTransportFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,24 @@ public static bool ExchangeExists(string exchangeName)
}
}
}

public static bool QueueExists(string name)
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(ConnectionString) };

using (var connection = connectionFactory.CreateConnection())
using (var model = connection.CreateModel())
{
try
{
model.QueueDeclarePassive(name);
return true;
}
catch
{
return false;
}
}
}
}
}
45 changes: 44 additions & 1 deletion Rebus.RabbitMq/Config/RabbitMqOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,37 @@ public RabbitMqOptionsBuilder ExchangeNames(
return this;
}

public bool IsConsistentHashExchangeUsed() => !string.IsNullOrWhiteSpace(ConsistentHashExchangeName);

public void UseConsistentHashExchange(int numberOfQueues, string exchangeName = "RebusConsistentHash")
{
if (numberOfQueues < 2)
{
throw new ArgumentException("Number of queues for consistent hash exchange should be at least 2", nameof(numberOfQueues));
}

if (string.IsNullOrWhiteSpace(exchangeName))
{
ConsistentHashExchangeName = null;
NumberOfConsistentHashQueues = 0;
return;
}

NumberOfConsistentHashQueues = numberOfQueues;

if (exchangeName == DirectExchangeName)
{
throw new ArgumentException($"Exchange names for DIRECT and X-CONSISTENT-HASH are both set to '{DirectExchangeName}' - they must be different!");
}

if (exchangeName == TopicExchangeName)
{
throw new ArgumentException($"Exchange names for TOPIC and X-CONSISTENT-HASH are both set to '{TopicExchangeName}' - they must be different!");
}

ConsistentHashExchangeName = exchangeName;
}

/// <summary>
/// Adds the given custom properties to be added to the RabbitMQ client connection when it is established
/// </summary>
Expand Down Expand Up @@ -157,14 +188,15 @@ public RabbitMqOptionsBuilder Ssl(SslSettings sslSettings)

internal string DirectExchangeName { get; private set; }
internal string TopicExchangeName { get; private set; }

public string ConsistentHashExchangeName { get; private set; }
internal int? MaxNumberOfMessagesToPrefetch { get; private set; }

internal SslSettings SslSettings { get; private set; }

internal RabbitMqCallbackOptionsBuilder CallbackOptionsBuilder { get; } = new RabbitMqCallbackOptionsBuilder();

internal RabbitMqQueueOptionsBuilder QueueOptions { get; } = new RabbitMqQueueOptionsBuilder();
public int NumberOfConsistentHashQueues { get; private set; } = 0;

internal void Configure(RabbitMqTransport transport)
{
Expand All @@ -185,6 +217,11 @@ internal void Configure(RabbitMqTransport transport)
transport.SetDeclareInputQueue(DeclareInputQueue.Value);
}

if (NumberOfConsistentHashQueues > 1)
{
transport.SetNumberOfConsistentHashQueues(NumberOfConsistentHashQueues);
}

if (BindInputQueue.HasValue)
{
transport.SetBindInputQueue(BindInputQueue.Value);
Expand All @@ -210,6 +247,12 @@ internal void Configure(RabbitMqTransport transport)
transport.SetCallbackOptions(CallbackOptionsBuilder);
}

if (ConsistentHashExchangeName != null)
{
transport.SetConsistentHashExchangeName(ConsistentHashExchangeName);
transport.SetNumberOfConsistentHashQueues(NumberOfConsistentHashQueues);
}

transport.SetInputQueueOptions(QueueOptions);
}
}
Expand Down
99 changes: 81 additions & 18 deletions Rebus.RabbitMq/RabbitMq/RabbitMqTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class RabbitMqTransport : ITransport, IDisposable, IInitializable, ISubsc

RabbitMqCallbackOptionsBuilder _callbackOptions = new RabbitMqCallbackOptionsBuilder();
RabbitMqQueueOptionsBuilder _inputQueueOptions = new RabbitMqQueueOptionsBuilder();
private int _numberOfConsistentHashQueues = 0;
private string _consistentHashExchangeName;

/// <summary>
/// Constructs the RabbitMQ transport with multiple connection endpoints. They will be tryed in random order until working one is found
Expand Down Expand Up @@ -123,6 +125,8 @@ public void SetDeclareInputQueue(bool value)
_declareInputQueue = value;
}

internal void SetNumberOfConsistentHashQueues(int value) => _numberOfConsistentHashQueues = value;

/// <summary>
/// Sets whether a binding for the input queue should be declared
/// </summary>
Expand All @@ -147,6 +151,14 @@ public void SetTopicExchangeName(string topicExchangeName)
_topicExchangeName = topicExchangeName;
}

/// <summary>
/// Sets the name of the exchange used to send parallel messages
/// </summary>
public void SetConsistentHashExchangeName(string consistentHashExchangeName)
{
_consistentHashExchangeName = consistentHashExchangeName;
}

/// <summary>
/// Configures how many messages to prefetch
/// </summary>
Expand Down Expand Up @@ -175,14 +187,52 @@ public void SetInputQueueOptions(RabbitMqQueueOptionsBuilder inputQueueOptions)
_inputQueueOptions = inputQueueOptions;
}

public bool IsConsistentHashExchangeUsed() => _numberOfConsistentHashQueues > 1;

/// <summary>
/// Initializes the transport by creating the input queue
/// </summary>
public void Initialize()
{
if (Address == null) { return; }

CreateQueue(Address);
if (!IsConsistentHashExchangeUsed())
{
CreateQueue(Address);
}
else
{
CreateConsistentHashExchangeAndMultipleQueues();
}
}

private void CreateConsistentHashExchangeAndMultipleQueues()
{
var connection = _connectionManager.GetConnection();
try
{
using (var model = connection.CreateModel())
{
CreateExchanges(model);
for (int i = 1; i <= _numberOfConsistentHashQueues; ++i)
{
var address = $"{Address}_{i}";
if (_declareInputQueue)
{
DeclareQueue(address, model);
}

if (_bindInputQueue)
{
model.QueueBind(address, _consistentHashExchangeName, i.ToString());
}
}
}
}
catch (Exception exception)
{
throw new RebusApplicationException(exception, $"RabbitMQ Transport initialization using consistent hash exchange failed");
}
}

/// <summary>
Expand All @@ -196,23 +246,8 @@ public void CreateQueue(string address)
{
using (var model = connection.CreateModel())
{
const bool durable = true;

if (_declareExchanges)
{
model.ExchangeDeclare(_directExchangeName, ExchangeType.Direct, durable);
model.ExchangeDeclare(_topicExchangeName, ExchangeType.Topic, durable);
}

if (_declareInputQueue)
{
DeclareQueue(address, model);
}

if (_bindInputQueue)
{
BindInputQueue(address, model);
}
CreateExchanges(model);
CreateQueue(address, model);
}
}
catch (Exception exception)
Expand All @@ -221,6 +256,33 @@ public void CreateQueue(string address)
}
}

private void CreateExchanges(IModel model)
{
if (_declareExchanges)
{
model.ExchangeDeclare(_directExchangeName, ExchangeType.Direct, true);
model.ExchangeDeclare(_topicExchangeName, ExchangeType.Topic, true);
}

if (IsConsistentHashExchangeUsed())
{
model.ExchangeDeclare(_consistentHashExchangeName, "x-consistent-hash", true);
}
}

private void CreateQueue(string address, IModel model)
{
if (_declareInputQueue)
{
DeclareQueue(address, model);
}

if (_bindInputQueue)
{
BindInputQueue(address, model);
}
}

void BindInputQueue(string address, IModel model)
{
model.QueueBind(address, _directExchangeName, address);
Expand All @@ -231,6 +293,7 @@ void DeclareQueue(string address, IModel model)
if (Address != null && Address.Equals(address))
{
// This is the input queue => we use the queue setting to create the queue

model.QueueDeclare(address,
exclusive: _inputQueueOptions.Exclusive,
durable: _inputQueueOptions.Durable,
Expand Down