From bb6697f9fd0f7d81bd0691655d36866b6229aec7 Mon Sep 17 00:00:00 2001 From: JT Date: Wed, 9 Oct 2024 19:55:49 +0800 Subject: [PATCH] Upgrade to Rabbit.NET v7 RC12 --- .../Internal/ConnectionMonitor.cs | 23 +++++++++++-------- .../Internal/RabbitMqChannelAgent.cs | 14 +++++++---- .../Internal/WorkerQueueMessageConsumer.cs | 12 ++++------ .../Wolverine.RabbitMQ.csproj | 2 +- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/ConnectionMonitor.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/ConnectionMonitor.cs index da778b0bb..7895b2312 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/ConnectionMonitor.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/ConnectionMonitor.cs @@ -37,10 +37,10 @@ public async Task ConnectAsync() { _connection = await _transport.CreateConnectionAsync(); - _connection.ConnectionShutdown += connectionOnConnectionShutdown; - _connection.ConnectionUnblocked += connectionOnConnectionUnblocked; - _connection.ConnectionBlocked += connectionOnConnectionBlocked; - _connection.CallbackException += connectionOnCallbackException; + _connection.ConnectionShutdownAsync += connectionOnConnectionShutdownAsync; + _connection.ConnectionUnblockedAsync += connectionOnConnectionUnblockedAsync; + _connection.ConnectionBlockedAsync += connectionOnConnectionBlockedAsync; + _connection.CallbackExceptionAsync += connectionOnCallbackExceptionAsync; } public Task CreateChannelAsync() @@ -73,32 +73,37 @@ public void Track(RabbitMqChannelAgent agent) _agents.Add(agent); } - private void connectionOnCallbackException(object? sender, CallbackExceptionEventArgs e) + private Task connectionOnCallbackExceptionAsync(object? sender, CallbackExceptionEventArgs e) { if (e.Exception != null) { _logger.LogError(e.Exception, "Rabbit MQ connection error on callback"); } + + return Task.CompletedTask; } - private void connectionOnConnectionBlocked(object? sender, ConnectionBlockedEventArgs e) + private Task connectionOnConnectionBlockedAsync(object? sender, ConnectionBlockedEventArgs e) { _logger.LogInformation("Rabbit MQ connection is blocked because of {Reason}", e.Reason); + return Task.CompletedTask; } - private void connectionOnConnectionUnblocked(object? sender, EventArgs e) + private Task connectionOnConnectionUnblockedAsync(object? sender, AsyncEventArgs e) { _logger.LogInformation("Rabbit MQ connection unblocked"); + return Task.CompletedTask; } - private void connectionOnConnectionShutdown(object? sender, ShutdownEventArgs e) + private Task connectionOnConnectionShutdownAsync(object? sender, ShutdownEventArgs e) { - if (e.Initiator == ShutdownInitiator.Application) return; + if (e.Initiator == ShutdownInitiator.Application) return Task.CompletedTask; if (e.Exception != null) { _logger.LogError(e.Exception, "Unexpected Rabbit MQ connection shutdown"); } + return Task.CompletedTask; } public void Remove(RabbitMqChannelAgent agent) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqChannelAgent.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqChannelAgent.cs index 15475fa1b..36fbeefbd 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqChannelAgent.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqChannelAgent.cs @@ -1,5 +1,6 @@ using Microsoft.Extensions.Logging; using RabbitMQ.Client; +using RabbitMQ.Client.Events; namespace Wolverine.RabbitMQ.Internal; @@ -64,19 +65,20 @@ protected async Task startNewChannel() { Channel = await _monitor.CreateChannelAsync(); - Channel.CallbackException += (sender, args) => + Channel.CallbackExceptionAsync += (sender, args) => { Logger.LogError(args.Exception, "Callback error in Rabbit Mq agent"); + return Task.CompletedTask; }; - Channel.ChannelShutdown += ChannelOnModelShutdown; + Channel.ChannelShutdownAsync += ChannelOnModelShutdown; Logger.LogInformation("Opened a new channel for Wolverine endpoint {Endpoint}", this); } - private void ChannelOnModelShutdown(object? sender, ShutdownEventArgs e) + private Task ChannelOnModelShutdown(object? sender, ShutdownEventArgs e) { - if (e.Initiator == ShutdownInitiator.Application) return; + if (e.Initiator == ShutdownInitiator.Application) return Task.CompletedTask; if (e.Exception != null) { @@ -85,13 +87,15 @@ private void ChannelOnModelShutdown(object? sender, ShutdownEventArgs e) } _ = EnsureConnected(); + + return Task.CompletedTask; } protected async Task teardownChannel() { if (Channel != null) { - Channel.ChannelShutdown -= ChannelOnModelShutdown; + Channel.ChannelShutdownAsync -= ChannelOnModelShutdown; await Channel.CloseAsync(); await Channel.AbortAsync(); Channel.Dispose(); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs index 521cd711c..3d1af99e5 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs @@ -31,14 +31,10 @@ public void Dispose() _latched = true; } - public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, - string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) - { - return HandleBasicDeliverImpl(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); - } - - public async Task HandleBasicDeliverImpl(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, - IReadOnlyBasicProperties properties, ReadOnlyMemory body) + //TODO do something with the token passed in here + public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, + string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body, + CancellationToken cancellationToken = new()) { if (_latched || _cancellation.IsCancellationRequested) { diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Wolverine.RabbitMQ.csproj b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Wolverine.RabbitMQ.csproj index 5d70f7471..eb71cbf92 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Wolverine.RabbitMQ.csproj +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Wolverine.RabbitMQ.csproj @@ -12,7 +12,7 @@ - +