A simple wrapper around RabbitMQ.Client to make it easier to use.
var rabbitMqConnectionManager = new RabbitMqConnectionManager(new Uri("amqp://localhost"), "test", TimeSpan.FromSeconds(30));
const string queueName = "some-queue-name";
var options = new RabbitMqServiceOptionsBuilder()
.WithRetry(TimeSpan.FromSeconds(15), null, TimeSpan.FromSeconds(1))
.WithDeliveryMode(DeliveryMode.Persistent)
.WithConnectionManager(rabbitMqConnectionManager)
.WithSerializer(message => Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), "application/json")
.Build();
var serviceClient = new RabbitMqServiceClient(options);
// Create a queue
await serviceClient.CreateQueueAsync(queueName + "-Error", true);
// Create a queue with options builder
var createQueueOption = new CreateQueueOptionsBuilder(QueueType.Classic)
.Durable()
.WithDeadLetterExchange(RabbitMqConstants.DefaultExchangeName)
.WithDeadLetterRoutingKey(queueName + "-Error")
.Build();
await serviceClient.CreateQueueAsync(queueName, createQueueOption);
// Listen to queue (Auto reconnect is enabled)
var consumerOptions = new ConsumerOptionsBuilder<string>()
.WithDefaultDeSerializer(message => JsonConvert.DeserializeObject<string>(Encoding.UTF8.GetString(message.Span)))
.WithSimpleMessageAck()
.WithCustomPipe(async (context, next) =>
{
Console.WriteLine("Some logging message before processing");
await next();
Console.WriteLine("Some logging message after processing");
})
.Build();
var activeConsumer = await serviceClient.StartListeningQueueAsync(queueName, consumerOptions, (message, items, ct) =>
{
Console.WriteLine(message);
return Task.CompletedTask;
});
// Enqueue a message
await serviceClient.EnqueueMessageAsync(queueName, "some-message");
await Task.Delay(100);
// Enqueue using EnqueueQueueClient
var queueClient = serviceClient.CreateQueueClient(queueName);
await queueClient.EnqueueMessageAsync("some-other-message");
// Cancel listening
activeConsumer.Cancel();
// Purge the queue
await serviceClient.PurgeQueueAsync(queueName);
// Delete a queue
await serviceClient.DeleteQueueAsync(queueName, false, false);
await serviceClient.DeleteQueueAsync(queueName + "-Error", false, false);
Internally the library is using pipeline pattern.
- When publishing a message it's using Message Pipeline
- When executing an action (Create/Delete/Purge a queue or start consuming it) it's using the Action Pipeline
- When processing a message received from a queue it's using the Consumer Pipeline
The Message Pipeline and Action Pipeline may share same pipe elements since they work mostly the same way for the same thing.
This pipe will handle exceptions and execute the pipeline again.
The default retry logic will wait the defined time between retry and will retry until either:
- The maximum number of retry has been reach
- The max duration has been reach The default logic is only handling connection error related to RabbitMQ.
You can change this logic by providing another implementation of IGenericPipe
to the RabbitMqServiceOptionsBuilder.WithRetry()
.
The connection pipe mission is to fill the field IPipeContext.Channel
that will be used later in next pipes.
The serializer pipe will transform the message object
to a byte[]
to be store in the queue in the Publish Pipe.
This pipe is optional. It will specify expiration on messages.
The expiration can be defined when creating the pipe option with WithPerMessageTtl()
or it can be specified when publishing a message using the contextItems
parameters
by defining the key MessageTtlClientPipe.ContextItemExpirationKey
. If no expiration is define in either WithPerMessageTtl()
nor MessageTtlClientPipe.ContextItemExpirationKey
the message will not have any expiration set.
Custom pipes can be inserted in the pipelines using .WithCustomPipe()
or directly in RabbitMqServiceOptions.Customs
.
Only the pipes implementing IGenericPipe
or IMessagePipe
will be inserted in Message Pipeline.
This pipe will publish the serialized message.
See Message Pipeline
See Message Pipeline
Custom pipes can be inserted in the pipelines using .WithCustomPipe()
or directly in RabbitMqServiceOptions.Customs
.
Only the pipes implementing IGenericPipe
or IActionPipe
will be inserted in Action Pipeline.
This pipe will execute the function PipeContextAction.Action
This pipe is use internally for IActiveConsumer.CancelAsync()
and IActiveConsumer.CancelAfterCurrentTaskCompletedAsync()
This pipe is responsible to acknowledge successfully processed messages, and handle failure (reject or retry logic).
SimpleMessageAcknowledgementPipe
This pipe is going to Ack messages when processing worked and Reject when an exception is throw.FastRetryMessageAcknowledgementPipe
This pipe is going to Ack messages when processing worked. When an exception occurs it will requeue the message with a headerRetryCount
. If the processing failed, and the message already have a headerRetryCount
with a value equal to the maximum retry count, then the message will be rejected. Rejecting the message will delete it, except if you have configured a dead letter queue on the queue. See Dead Letter Exchanges for more details.DelayedRetryMessageAcknowledgementPipe
This pipe is working likeFastRetryMessageAcknowledgementPipe
but instead of immediately retry failed messages it delays them. To achieve this, the pipe enqueue failed message with a Per-Message TTL (an expiration) in another queue. This queue must have the first queue configured as dead letter queue (you can use the helperDelayedRetryMessageAcknowledgementPipe.CreateDelayQueueAsync
for that), so when messages expire they are enqueue by RabbitMQ server into the first queue.
All those pipes set the Context.Items
key FinalAttempt
to true
on the last attempt. This is used by the ExceptionConsumerPipe
to know if it's the final attempt. Useful if you want to log exceptions as warning when the message will be retried and as error when it's the last attempt for a message and that it will be rejected.
This pipe can be used to log exception thrown during processing.
When an exception is throw from the processor (or from any CustomPipe) it will call LogExceptionDelegate
with the exception, and a boolean that indicate when it's the last attempt to process this message. See Message Ack Pipe
This pipe is responsible to deserialize the message, using the given deserializer.
You can also provide multiple deserializer to use the appropriate one depending on the Content-Type of the message.
You can push whatever logic you want here.
Using the builder, you can either provide an implementation of IConsumerPipeBuilder<T>
or you can write a function inline:
.WithCustomPipe(async (context, next) =>
{
Console.WriteLine("Some logging message before processing");
await next();
Console.WriteLine("Some logging message after processing");
})
This one is going to call the function given to the method StartListeningQueueAsync()
.
You can use the field Context.Items
to share value between pipes, and with the caller outside the pipe.
Custom pipes can be inserted in the pipeline by adding them into ConsumerOptions.Customs
or by using the builder ConsumerOptionsBuilder.WithCustomPipe()
You can use the field Context.Items
to share value between pipes and with the processor function.