Skip to content

A simple wrapper around RabbitMQ.Client to make it easier to use.

License

Notifications You must be signed in to change notification settings

Socolin/Socolin.RabbitMQ.Client

Repository files navigation

Socolin.RabbitMQ.Client

Nuget GitHub GitHub Workflow Status

A simple wrapper around RabbitMQ.Client to make it easier to use.

Example

var rabbitMqConnectionManager = new RabbitMqConnectionManager(new Uri("amqp://localhost"), "test", TimeSpan.FromSeconds(30));
const string queueName = "some-queue-name";
var options = new RabbitMqServiceOptionsBuilder()
    .WithRetry(TimeSpan.FromSeconds(15), null, TimeSpan.FromSeconds(1))
    .WithDeliveryMode(DeliveryMode.Persistent)
    .WithConnectionManager(rabbitMqConnectionManager)
    .WithSerializer(message => Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), "application/json")
    .Build();
var serviceClient = new RabbitMqServiceClient(options);

// Create a queue
await serviceClient.CreateQueueAsync(queueName + "-Error", true);

// Create a queue with options builder
var createQueueOption = new CreateQueueOptionsBuilder(QueueType.Classic)
    .Durable()
    .WithDeadLetterExchange(RabbitMqConstants.DefaultExchangeName)
    .WithDeadLetterRoutingKey(queueName + "-Error")
    .Build();
await serviceClient.CreateQueueAsync(queueName, createQueueOption);

// Listen to queue (Auto reconnect is enabled)
var consumerOptions = new ConsumerOptionsBuilder<string>()
    .WithDefaultDeSerializer(message => JsonConvert.DeserializeObject<string>(Encoding.UTF8.GetString(message.Span)))
    .WithSimpleMessageAck()
    .WithCustomPipe(async (context, next) =>
    {
        Console.WriteLine("Some logging message before processing");
        await next();
        Console.WriteLine("Some logging message after processing");
    })
    .Build();
var activeConsumer = await serviceClient.StartListeningQueueAsync(queueName, consumerOptions, (message, items, ct) =>
{
    Console.WriteLine(message);
    return Task.CompletedTask;
});

// Enqueue a message
await serviceClient.EnqueueMessageAsync(queueName, "some-message");
await Task.Delay(100);

// Enqueue using EnqueueQueueClient
var queueClient = serviceClient.CreateQueueClient(queueName);
await queueClient.EnqueueMessageAsync("some-other-message");

// Cancel listening
activeConsumer.Cancel();

// Purge the queue
await serviceClient.PurgeQueueAsync(queueName);

// Delete a queue
await serviceClient.DeleteQueueAsync(queueName, false, false);
await serviceClient.DeleteQueueAsync(queueName + "-Error", false, false);

Architecture

Internally the library is using pipeline pattern.

  • When publishing a message it's using Message Pipeline
  • When executing an action (Create/Delete/Purge a queue or start consuming it) it's using the Action Pipeline
  • When processing a message received from a queue it's using the Consumer Pipeline

The Message Pipeline and Action Pipeline may share same pipe elements since they work mostly the same way for the same thing.

Message Pipeline

Message Pipeline

Retry Pipe

This pipe will handle exceptions and execute the pipeline again.

The default retry logic will wait the defined time between retry and will retry until either:

  • The maximum number of retry has been reach
  • The max duration has been reach The default logic is only handling connection error related to RabbitMQ.

You can change this logic by providing another implementation of IGenericPipe to the RabbitMqServiceOptionsBuilder.WithRetry().

Connection Pipe

The connection pipe mission is to fill the field IPipeContext.Channel that will be used later in next pipes.

Serializer Pipe

The serializer pipe will transform the message object to a byte[] to be store in the queue in the Publish Pipe.

Per-Message TTL Pipe

This pipe is optional. It will specify expiration on messages.

The expiration can be defined when creating the pipe option with WithPerMessageTtl() or it can be specified when publishing a message using the contextItems parameters by defining the key MessageTtlClientPipe.ContextItemExpirationKey. If no expiration is define in either WithPerMessageTtl() nor MessageTtlClientPipe.ContextItemExpirationKey the message will not have any expiration set.

Custom Pipes

Custom pipes can be inserted in the pipelines using .WithCustomPipe() or directly in RabbitMqServiceOptions.Customs. Only the pipes implementing IGenericPipe or IMessagePipe will be inserted in Message Pipeline.

Publish Pipe

This pipe will publish the serialized message.

Action Pipeline

Action Pipeline

Retry Pipe

See Message Pipeline

Connection Pipe

See Message Pipeline

Custom Pipes

Custom pipes can be inserted in the pipelines using .WithCustomPipe() or directly in RabbitMqServiceOptions.Customs. Only the pipes implementing IGenericPipe or IActionPipe will be inserted in Action Pipeline.

Execute Pipe

This pipe will execute the function PipeContextAction.Action

Consumer Pipeline

Consumer Pipeline

Canceller pipe

This pipe is use internally for IActiveConsumer.CancelAsync() and IActiveConsumer.CancelAfterCurrentTaskCompletedAsync()

Message Ack Pipe

This pipe is responsible to acknowledge successfully processed messages, and handle failure (reject or retry logic).

  • SimpleMessageAcknowledgementPipe This pipe is going to Ack messages when processing worked and Reject when an exception is throw.
  • FastRetryMessageAcknowledgementPipe This pipe is going to Ack messages when processing worked. When an exception occurs it will requeue the message with a header RetryCount. If the processing failed, and the message already have a header RetryCount with a value equal to the maximum retry count, then the message will be rejected. Rejecting the message will delete it, except if you have configured a dead letter queue on the queue. See Dead Letter Exchanges for more details.
  • DelayedRetryMessageAcknowledgementPipe This pipe is working like FastRetryMessageAcknowledgementPipe but instead of immediately retry failed messages it delays them. To achieve this, the pipe enqueue failed message with a Per-Message TTL (an expiration) in another queue. This queue must have the first queue configured as dead letter queue (you can use the helper DelayedRetryMessageAcknowledgementPipe.CreateDelayQueueAsync for that), so when messages expire they are enqueue by RabbitMQ server into the first queue.

All those pipes set the Context.Items key FinalAttempt to true on the last attempt. This is used by the ExceptionConsumerPipe to know if it's the final attempt. Useful if you want to log exceptions as warning when the message will be retried and as error when it's the last attempt for a message and that it will be rejected.

DelayedRetryMessageAcknowledgementPipe logic

DelayedRetryMessageAcknowledgementPipe logic

LogException Pipe

This pipe can be used to log exception thrown during processing. When an exception is throw from the processor (or from any CustomPipe) it will call LogExceptionDelegate with the exception, and a boolean that indicate when it's the last attempt to process this message. See Message Ack Pipe

Deserialize Pipe

This pipe is responsible to deserialize the message, using the given deserializer.

You can also provide multiple deserializer to use the appropriate one depending on the Content-Type of the message.

Custom Pipes

You can push whatever logic you want here.

Using the builder, you can either provide an implementation of IConsumerPipeBuilder<T> or you can write a function inline:

.WithCustomPipe(async (context, next) =>
{
    Console.WriteLine("Some logging message before processing");
    await next();
    Console.WriteLine("Some logging message after processing");
})

Processor Pipe

This one is going to call the function given to the method StartListeningQueueAsync().

Customization

Message Pipeline and Action Pipeline

You can use the field Context.Items to share value between pipes, and with the caller outside the pipe.

Consumer Pipe

Custom pipes can be inserted in the pipeline by adding them into ConsumerOptions.Customs or by using the builder ConsumerOptionsBuilder.WithCustomPipe()

You can use the field Context.Items to share value between pipes and with the processor function.

About

A simple wrapper around RabbitMQ.Client to make it easier to use.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages