Skip to content

Commit

Permalink
Demonstrate handling of Channel.Close
Browse files Browse the repository at this point in the history
Related to rabbitmq/rabbitmq-server#13387

During the delivery callback for a message, immediately publish with an invalid value for message expiration. This causes an immediate channel closure.

* Add logging, use `Try...` methods for `tcs`.
  • Loading branch information
lukebakken committed Feb 21, 2025
1 parent eb64b79 commit 54c1675
Showing 1 changed file with 151 additions and 0 deletions.
151 changes: 151 additions & 0 deletions projects/Test/Integration/GH/TestGitHubIssues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
Expand Down Expand Up @@ -165,5 +167,154 @@ await Assert.ThrowsAnyAsync<BrokerUnreachableException>(
async () => await _connFactory.CreateConnectionAsync());
Assert.IsAssignableFrom<PossibleAuthenticationFailureException>(ex.InnerException);
}

[Fact]
public async Task SendInvalidPublishMaybeClosesConnection_GH13387()
{
const int messageCount = 200;

_connFactory = new ConnectionFactory();
_conn = await _connFactory.CreateConnectionAsync();

var opts = new CreateChannelOptions(
publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);
_channel = await _conn.CreateChannelAsync(opts);

await _channel.BasicQosAsync(0, 10, false);

string queueName = GenerateQueueName();
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
Assert.Equal(queueName, q.QueueName);

byte[] body = Encoding.ASCII.GetBytes("incoming message");
var publishTasks = new List<ValueTask>();
for (int i = 0; i < messageCount; i++)
{
ValueTask pt = _channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: queueName,
body: body);
publishTasks.Add(pt);
if (i % 20 == 0)
{
foreach (ValueTask t in publishTasks)
{
await t;
}
publishTasks.Clear();
}
}

foreach (ValueTask t in publishTasks)
{
await t;
}
publishTasks.Clear();

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

_conn.CallbackExceptionAsync += (object sender, CallbackExceptionEventArgs args) =>
{
if (IsVerbose)
{
_output.WriteLine("_conn.CallbackExceptionAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
return Task.CompletedTask;
};

_conn.ConnectionShutdownAsync += (object sender, ShutdownEventArgs args) =>
{
if (args.Exception is not null)
{
if (IsVerbose)
{
_output.WriteLine("_conn.ConnectionShutdownAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
}
else
{
if (IsVerbose)
{
_output.WriteLine("_conn.ConnectionShutdownAsync");
}
tcs.TrySetResult(false);
}
return Task.CompletedTask;
};

_channel.CallbackExceptionAsync += (object sender, CallbackExceptionEventArgs args) =>
{
if (IsVerbose)
{
_output.WriteLine("_channel.CallbackExceptionAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
return Task.CompletedTask;
};

_channel.ChannelShutdownAsync += (object sender, ShutdownEventArgs args) =>
{
if (args.Exception is not null)
{
if (IsVerbose)
{
_output.WriteLine("_channel.ChannelShutdownAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
}
else
{
if (IsVerbose)
{
_output.WriteLine("_channel.ChannelShutdownAsync");
}
tcs.TrySetResult(false);
}
return Task.CompletedTask;
};

Exception? publishException = null;
var props = new BasicProperties { Expiration = "-1" };
int receivedCounter = 0;
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) =>
{
var c = (AsyncEventingBasicConsumer)sender;
IChannel ch = c.Channel;
await ch.BasicAckAsync(args.DeliveryTag, false);

try
{
await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
mandatory: true, basicProperties: props, body: body);
}
catch (Exception ex)
{
publishException = ex;
}

if (Interlocked.Increment(ref receivedCounter) >= messageCount)
{
tcs.SetResult(true);
}
};

consumer.ShutdownAsync += (object sender, ShutdownEventArgs args) =>
{
return Task.CompletedTask;
};

await _channel.BasicConsumeAsync(queueName, false, consumer);

await tcs.Task;

if (IsVerbose && publishException is not null)
{
_output.WriteLine("publishException: {0}", publishException);
}
}
}
}

0 comments on commit 54c1675

Please sign in to comment.