Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for case where a TopicActor can continue sending messages to a member that no longer exists #2147

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/Proto.Cluster/PubSub/BatchingProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,10 @@ private async Task PublishBatch(PubSubBatchWithReceipts batchWrapper)
{
retries++;

var response = await _publisher.PublishBatch(_topic, batchWrapper.Batch,
CancellationTokens.FromSeconds(_config.PublishTimeoutInSeconds)).ConfigureAwait(false);
// timeout immediately if the producer is stopped
using var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(
_cts.Token, CancellationTokens.FromSeconds(_config.PublishTimeoutInSeconds));
var response = await _publisher.PublishBatch(_topic, batchWrapper.Batch, tokenSource.Token).ConfigureAwait(false);

if (response == null)
{
Expand All @@ -244,6 +246,12 @@ private async Task PublishBatch(PubSubBatchWithReceipts batchWrapper)
}
catch (Exception e)
{
if (_cts.Token.IsCancellationRequested)
{
// we are stopping
break;
}

var decision = await _config.OnPublishingError(retries, e, batchWrapper.Batch).ConfigureAwait(false);

if (decision == PublishingErrorDecision.FailBatchAndStop)
Expand Down
19 changes: 18 additions & 1 deletion src/Proto.Cluster/PubSub/TopicActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public Task ReceiveAsync(IContext context) =>
PubSubBatch batch => OnPubSubBatch(context, batch),
NotifyAboutFailingSubscribersRequest msg => OnNotifyAboutFailingSubscribers(context, msg),
ClusterTopology msg => OnClusterTopologyChanged(context, msg),
DeadLetterResponse msg => OnDeadLetterResponse(msg),
_ => Task.CompletedTask
};

Expand Down Expand Up @@ -113,7 +114,12 @@ from member in members

foreach (var md in memberDeliveries)
{
context.Send(md.Pid, md.Message);
// use request instead of send. The delivery actor doesn't respond, but we'll get a DeadLetterResponse if we can't reach it.
// it's possible for a proto actor client to subscribe, and then shutdown without unsubscribing, and without us knowing that it left,
// since as a client it was never in the topology. This will allow us to stop sending to a subscriber that no longer exists.
// in theory, it's possible to unsubscribe a subscriber that's still alive, if they are unreachable for a short time,
// but regardless, application level logic is always required to ensure the subscription remains alive anyway.
context.Request(md.Pid, md.Message);
}

context.Respond(new PublishResponse());
Expand Down Expand Up @@ -298,4 +304,15 @@ private async Task OnSubscribe(IContext context, SubscribeRequest sub)
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
context.Respond(new SubscribeResponse());
}

private async Task OnDeadLetterResponse(DeadLetterResponse msg)
{
var deadLetterSub = msg.Target == null ? null : _subscribers.FirstOrDefault(s => s.Pid.Address == msg.Target.Address);
if (deadLetterSub != null)
{
_subscribers = _subscribers.Remove(deadLetterSub);
Logger.LogDebug("Topic {Topic} - {Subscriber} unsubscribed due to dead letter", _topic, deadLetterSub);
await SaveSubscriptions(_topic, new Subscribers { Subscribers_ = { _subscribers } }).ConfigureAwait(false);
}
}
}
86 changes: 86 additions & 0 deletions tests/Proto.Cluster.PubSub.Tests/PubSubClientTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// -----------------------------------------------------------------------
// <copyright file = "PubSubClientTests.cs" company = "Asynkron AB">
// Copyright (C) 2015-2024 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------

using FluentAssertions;
using Xunit;
using static Proto.Cluster.PubSub.Tests.WaitHelper;

namespace Proto.Cluster.PubSub.Tests;

[Collection("PubSub")] // The CI is just to slow to run cluster fixture based tests in parallel
public class PubSubClientTests : IAsyncLifetime
{
private readonly PubSubClusterFixture _fixture;

public PubSubClientTests()
{
_fixture = new PubSubClusterFixture();
}

public async Task InitializeAsync()
{
await _fixture.SpawnMember();
await _fixture.SpawnClient();
}

public Task DisposeAsync() => _fixture.DisposeAsync();

[Fact]
public async Task When_client_leaves_PID_subscribers_get_removed_due_to_dead_letter()
{
const string topic = "leaving-client";

// pid subscriber
var props = Props.FromFunc(ctx =>
{
if (ctx.Message is DataPublished msg)
{
_fixture.Deliveries.Add(new Delivery(ctx.Self.ToDiagnosticString(), msg.Data));
}

return Task.CompletedTask;
}
);

var client = _fixture.Clients.First();
var clientPid = client.System.Root.Spawn(props);
await client.Subscribe(topic, clientPid);

var subscribers = await _fixture.GetSubscribersForTopic(topic);
subscribers.Subscribers_.Count.Should().Be(1);
subscribers.Subscribers_.Should().Contain(s => s.Pid.Equals(clientPid));

// message should send
await _fixture.PublishData(topic, 1);

await WaitUntil(() => _fixture.Deliveries.Count == 1);
_fixture.Deliveries.Count.Should().Be(1);
_fixture.Deliveries.Should().ContainSingle(d => d.Data == 1);

// shutdown client
await _fixture.RemoveNode(client, graceful: true);

// subscription should remain, because it never unsubscribed, and there was no topology change
subscribers = await _fixture.GetSubscribersForTopic(topic);
subscribers.Subscribers_.Count.Should().Be(1);
subscribers.Subscribers_.Should().Contain(s => s.Pid.Equals(clientPid));

// messages should not send or attempt to send
await _fixture.PublishData(topic, 2);
await Task.Delay(3000);
await _fixture.PublishData(topic, 2);

// dead letter should be received, so the subscription is removed
await WaitUntil(async () =>
{
subscribers = await _fixture.GetSubscribersForTopic(topic);
return subscribers.Subscribers_.Count == 0;
});

_fixture.Deliveries.Count.Should().Be(1);
_fixture.Deliveries.Should().ContainSingle(d => d.Data == 1);
}
}
Loading