Skip to content

Commit

Permalink
Fix stateless worker race condition causing activation directory leak
Browse files Browse the repository at this point in the history
Explanation of issue:

`StatelessWorkerGrainContext` listens to destruction events of its internal worker activations using `OnDestroyActivation`. It intends to unregister itself from the catalog once its last worker has been destroyed (collected).

It did this by enqueueing a work item that removes the worker context from the `_workers` list. It then checked if `_workers` was empty.

Work items are processed in a background loop, triggered by a work signal. The code relied on the work signal's `RunContinuationsAsynchronously` property being set to `false`, seemingly assuming that that would guarantee the work item to be processed on the same thread enqueueing and signalling the work item.

However, `RunContinuationsAsynchronously` does _not_ guarantee the continuation to run synchronously when set to `false`, only that it runs asynchronously when set to `true`. I couldn't reproduce this behaviour in a unit test, but apparently it happens in production.

Since `StatelessWorkerGrainContext` was the only occurence of `RunContinuationsAsynchronously = false` in the codebase and everywhere else it's set to `true`, I changed it to `true` here as well. This triggers the race condition (makes the newly added test fail).

Then as a proper fix, I moved the check that unregisters the stateless worker context from the catalog when the last worker is removed to the work item itself to make sure it always runs synchronously after the worker list update.

Background:

We noticed that our "total activations" metrics were not matching "per grain type activations" metrics and were ever-growing in production. This seemed correlate with long GC pauses after a few days of runtime, causing silo restarts.

A production memory dump revealed that, indeed, millions of activations were tracked in the `ActivationDirectory` that shouldn't be there, and they were all stateless worker contexts.
  • Loading branch information
EdeMeijer committed Oct 18, 2024
1 parent 8848cc1 commit e9b5620
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 7 deletions.
12 changes: 7 additions & 5 deletions src/Orleans.Runtime/Catalog/StatelessWorkerGrainContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal class StatelessWorkerGrainContext : IGrainContext, IAsyncDisposable, IA
private readonly int _maxWorkers;
private readonly List<ActivationData> _workers = new();
private readonly ConcurrentQueue<(WorkItemType Type, object State)> _workItems = new();
private readonly SingleWaiterAutoResetEvent _workSignal = new() { RunContinuationsAsynchronously = false };
private readonly SingleWaiterAutoResetEvent _workSignal = new() { RunContinuationsAsynchronously = true };

/// <summary>
/// The <see cref="Task"/> representing the <see cref="RunMessageLoop"/> invocation.
Expand Down Expand Up @@ -153,6 +153,12 @@ private async Task RunMessageLoop()
{
var grainContext = (ActivationData)workItem.State;
_workers.Remove(grainContext);
if (_workers.Count == 0)
{
// When the last worker is destroyed, we can consider the stateless worker grain
// activation to be destroyed as well
_shared.InternalRuntime.Catalog.UnregisterMessageTarget(this);
}
break;
}
default: throw new NotSupportedException($"Work item of type {workItem.Type} is not supported");
Expand Down Expand Up @@ -310,10 +316,6 @@ public void OnCreateActivation(IGrainContext grainContext)
public void OnDestroyActivation(IGrainContext grainContext)
{
EnqueueWorkItem(WorkItemType.OnDestroyActivation, grainContext);
if (_workers.Count == 0)
{
_shared.InternalRuntime.Catalog.UnregisterMessageTarget(this);
}
}

public void Rehydrate(IRehydrationContext context)
Expand Down
28 changes: 26 additions & 2 deletions test/Tester/StatelessWorkerActivationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,33 @@ public async Task MultipleWorkerInvocationUnderLoad()
await Task.WhenAll(waiters);
}

private static async Task Until(Func<Task<bool>> condition)
[Fact, TestCategory("BVT"), TestCategory("StatelessWorker")]
public async Task CatalogCleanupOnDeactivation()
{
var workerGrain = _fixture.GrainFactory.GetGrain<IStatelessWorkerGrain>(0);
var mgmt = _fixture.GrainFactory.GetGrain<IManagementGrain>(0);

var numActivations = await mgmt.GetGrainActivationCount((GrainReference)workerGrain);
Assert.Equal(0, numActivations);

// Activate grain
await workerGrain.DummyCall();

numActivations = await mgmt.GetGrainActivationCount((GrainReference)workerGrain);
Assert.Equal(1, numActivations);

// Deactivate grain by forcing activation collection
await mgmt.ForceActivationCollection(TimeSpan.Zero);

// The activation count for the stateless worker grain should become 0 again
await Until(
async () => await mgmt.GetGrainActivationCount((GrainReference)workerGrain) == 0,
5_000
);
}

private static async Task Until(Func<Task<bool>> condition, int maxTimeout = 40_000)
{
var maxTimeout = 40_000;
while (!await condition() && (maxTimeout -= 10) > 0) await Task.Delay(10);
Assert.True(maxTimeout > 0);
}
Expand Down

0 comments on commit e9b5620

Please sign in to comment.