Skip to content

Commit

Permalink
Merge pull request #1791 from rabbitmq/rabbitmq-server-13387
Browse files Browse the repository at this point in the history
Demonstrate handling of `Channel.Close`
  • Loading branch information
lukebakken authored Feb 24, 2025
2 parents eb64b79 + 00e227a commit 1d5f574
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 1 deletion.
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/Impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ await _session0.HandleFrameAsync(frame, cancellationToken)
// frames for non-zero channels (and any inbound
// commands on channel zero that aren't
// Connection.CloseOk) must be discarded.
if (_closeReason is null)
if (CloseReason is null)
{
// No close reason, not quiescing the
// connection. Handle the frame. (Of course, the
Expand Down
5 changes: 5 additions & 0 deletions projects/RabbitMQ.Client/Impl/SessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public ISession Lookup(int number)
{
lock (_sessionMap)
{
/*
* Note: rabbitmq/rabbitmq-server#13337
* When investigating the above issue, a couple KeyNotFoundExceptions
* were thrown here during test shutdown. No reliable reproducer.
*/
return _sessionMap[number];
}
}
Expand Down
164 changes: 164 additions & 0 deletions projects/Test/Integration/GH/TestGitHubIssues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
Expand Down Expand Up @@ -165,5 +167,167 @@ await Assert.ThrowsAnyAsync<BrokerUnreachableException>(
async () => await _connFactory.CreateConnectionAsync());
Assert.IsAssignableFrom<PossibleAuthenticationFailureException>(ex.InnerException);
}

[Fact]
public async Task SendInvalidPublishMaybeClosesConnection_GH13387()
{
const int messageCount = 200;

_connFactory = new ConnectionFactory();
_conn = await _connFactory.CreateConnectionAsync();

var opts = new CreateChannelOptions(
publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true);
_channel = await _conn.CreateChannelAsync(opts);

await _channel.BasicQosAsync(0, 10, false);

string queueName = GenerateQueueName();
QueueDeclareOk q = await _channel.QueueDeclareAsync(queueName);
Assert.Equal(queueName, q.QueueName);

byte[] body = Encoding.ASCII.GetBytes("incoming message");
var publishTasks = new List<ValueTask>();
for (int i = 0; i < messageCount; i++)
{
ValueTask pt = _channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: queueName,
body: body);
publishTasks.Add(pt);
if (i % 20 == 0)
{
foreach (ValueTask t in publishTasks)
{
await t;
}
publishTasks.Clear();
}
}

foreach (ValueTask t in publishTasks)
{
await t;
}
publishTasks.Clear();

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

_conn.CallbackExceptionAsync += (object sender, CallbackExceptionEventArgs args) =>
{
if (IsVerbose)
{
_output.WriteLine("_conn.CallbackExceptionAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
return Task.CompletedTask;
};

_conn.ConnectionShutdownAsync += (object sender, ShutdownEventArgs args) =>
{
if (args.Exception is not null)
{
if (IsVerbose)
{
_output.WriteLine("_conn.ConnectionShutdownAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
}
else
{
if (IsVerbose)
{
_output.WriteLine("_conn.ConnectionShutdownAsync");
}
tcs.TrySetResult(false);
}
return Task.CompletedTask;
};

_channel.CallbackExceptionAsync += (object sender, CallbackExceptionEventArgs args) =>
{
if (IsVerbose)
{
_output.WriteLine("_channel.CallbackExceptionAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
return Task.CompletedTask;
};

_channel.ChannelShutdownAsync += (object sender, ShutdownEventArgs args) =>
{
if (args.Exception is not null)
{
if (IsVerbose)
{
_output.WriteLine("_channel.ChannelShutdownAsync: {0}", args.Exception);
}
tcs.TrySetException(args.Exception);
}
else
{
if (IsVerbose)
{
_output.WriteLine("_channel.ChannelShutdownAsync");
}
tcs.TrySetResult(false);
}
return Task.CompletedTask;
};

var ackExceptions = new List<Exception>();
var publishExceptions = new List<Exception>();
var props = new BasicProperties { Expiration = "-1" };
int receivedCounter = 0;
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (object sender, BasicDeliverEventArgs args) =>
{
var c = (AsyncEventingBasicConsumer)sender;
IChannel ch = c.Channel;
try
{
await ch.BasicAckAsync(args.DeliveryTag, false);
}
catch (Exception ex)
{
ackExceptions.Add(ex);
}

try
{
await ch.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,
mandatory: true, basicProperties: props, body: body);
}
catch (Exception ex)
{
publishExceptions.Add(ex);
}

if (Interlocked.Increment(ref receivedCounter) >= messageCount)
{
tcs.SetResult(true);
}
};

consumer.ShutdownAsync += (object sender, ShutdownEventArgs args) =>
{
if (IsVerbose)
{
_output.WriteLine("consumer.ShutdownAsync");
}
return Task.CompletedTask;
};

await _channel.BasicConsumeAsync(queueName, false, consumer);

await tcs.Task;

if (IsVerbose)
{
_output.WriteLine("saw {0} ackExceptions", ackExceptions.Count);
_output.WriteLine("saw {0} publishExceptions", publishExceptions.Count);
}
}
}
}

0 comments on commit 1d5f574

Please sign in to comment.