Skip to content

Commit

Permalink
Limit concurrency during mass deactivation
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenBond committed Oct 15, 2024
1 parent d868ed0 commit e990caa
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 43 deletions.
72 changes: 36 additions & 36 deletions src/Orleans.Runtime/Catalog/ActivationCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ namespace Orleans.Runtime
/// <summary>
/// Identifies activations that have been idle long enough to be deactivated.
/// </summary>
internal class ActivationCollector : IActivationWorkingSetObserver, ILifecycleParticipant<ISiloLifecycle>
internal class ActivationCollector : IActivationWorkingSetObserver, ILifecycleParticipant<ISiloLifecycle>, IDisposable
{
private readonly TimeSpan quantum;
private readonly TimeSpan shortestAgeLimit;
private readonly ConcurrentDictionary<DateTime, Bucket> buckets = new();
private readonly CancellationTokenSource _shutdownCts = new();
private DateTime nextTicket;
private static readonly List<ICollectibleGrainContext> nothing = new(0);
private readonly ILogger logger;
Expand Down Expand Up @@ -73,7 +74,7 @@ public int GetNumRecentlyUsed(TimeSpan recencyPeriod)
/// </summary>
/// <param name="ageLimit">The age limit.</param>
/// <returns>A <see cref="Task"/> representing the work performed.</returns>
public Task CollectActivations(TimeSpan ageLimit) => CollectActivationsImpl(false, ageLimit);
public Task CollectActivations(TimeSpan ageLimit, CancellationToken cancellationToken) => CollectActivationsImpl(false, ageLimit, cancellationToken);

/// <summary>
/// Schedules the provided grain context for collection if it becomes idle for the specified duration.
Expand Down Expand Up @@ -212,7 +213,6 @@ public List<ICollectibleGrainContext> ScanStale()
{
var now = DateTime.UtcNow;
List<ICollectibleGrainContext> condemned = null;
var reason = GetDeactivationReason();
while (DequeueQuantum(out var activations, now))
{
// At this point, all tickets associated with activations are cancelled and any attempts to reschedule will fail silently.
Expand Down Expand Up @@ -241,8 +241,8 @@ public List<ICollectibleGrainContext> ScanStale()
else
{
// Atomically set Deactivating state, to disallow any new requests or new timer ticks to be dispatched on this activation.
activation.Deactivate(reason, cancellationToken: default);
AddActivationToList(activation, ref condemned);
condemned ??= [];
condemned.Add(activation);
}
}
}
Expand All @@ -260,7 +260,6 @@ public List<ICollectibleGrainContext> ScanAll(TimeSpan ageLimit)
{
List<ICollectibleGrainContext> condemned = null;
var now = DateTime.UtcNow;
var reason = GetDeactivationReason();
foreach (var kv in buckets)
{
var bucket = kv.Value;
Expand All @@ -287,10 +286,10 @@ public List<ICollectibleGrainContext> ScanAll(TimeSpan ageLimit)
{
if (bucket.TryRemove(activation))
{
// we removed the activation from the collector. it's our responsibility to deactivate it.
activation.Deactivate(reason, cancellationToken: default);
AddActivationToList(activation, ref condemned);
condemned ??= [];
condemned.Add(activation);
}

// someone else has already deactivated the activation, so there's nothing to do.
}
else
Expand All @@ -312,12 +311,6 @@ private static DeactivationReason GetDeactivationReason()
return reason;
}

private void AddActivationToList(ICollectibleGrainContext activation, ref List<ICollectibleGrainContext> condemned)
{
condemned ??= [];
condemned.Add(activation);
}

private void ThrowIfTicketIsInvalid(DateTime ticket)
{
if (ticket.Ticks == 0) throw new ArgumentException("Empty ticket is not allowed in this context.");
Expand Down Expand Up @@ -418,6 +411,7 @@ private Task Start(CancellationToken cancellationToken)

private async Task Stop(CancellationToken cancellationToken)
{
using var registration = cancellationToken.Register(() => _shutdownCts.Cancel());
_collectionTimer.Dispose();

if (_collectionLoopTask is Task task)
Expand All @@ -431,18 +425,19 @@ void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
lifecycle.Subscribe(
nameof(ActivationCollector),
ServiceLifecycleStage.RuntimeServices,
async cancellation => await Start(cancellation),
async cancellation => await Stop(cancellation));
Start,
Stop);
}

private async Task RunActivationCollectionLoop()
{
await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding);
var cancellationToken = _shutdownCts.Token;
while (await _collectionTimer.WaitForNextTickAsync())
{
try
{
await this.CollectActivationsImpl(true);
await this.CollectActivationsImpl(true, ageLimit: default, cancellationToken);
}
catch (Exception exception)
{
Expand All @@ -451,7 +446,7 @@ private async Task RunActivationCollectionLoop()
}
}

private async Task CollectActivationsImpl(bool scanStale, TimeSpan ageLimit = default)
private async Task CollectActivationsImpl(bool scanStale, TimeSpan ageLimit, CancellationToken cancellationToken)
{
var watch = ValueStopwatch.StartNew();
var number = Interlocked.Increment(ref collectionNumber);
Expand All @@ -470,12 +465,10 @@ private async Task CollectActivationsImpl(bool scanStale, TimeSpan ageLimit = de

List<ICollectibleGrainContext> list = scanStale ? ScanStale() : ScanAll(ageLimit);
CatalogInstruments.ActivationCollections.Add(1);
var count = 0;
if (list != null && list.Count > 0)
if (list is { Count: > 0 })
{
count = list.Count;
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("CollectActivations {Activations}", list.ToStrings(d => d.GrainId.ToString() + d.ActivationId));
await DeactivateActivationsFromCollector(list);
await DeactivateActivationsFromCollector(list, cancellationToken);
}

long memAfter = GC.GetTotalMemory(false) / (1024 * 1024);
Expand All @@ -489,31 +482,38 @@ private async Task CollectActivationsImpl(bool scanStale, TimeSpan ageLimit = de
number,
memAfter,
_activationCount,
count,
list?.Count ?? 0,
ToString(),
watch.Elapsed);
}
}

private async Task DeactivateActivationsFromCollector(List<ICollectibleGrainContext> list)
private async Task DeactivateActivationsFromCollector(List<ICollectibleGrainContext> list, CancellationToken cancellationToken)
{
var mtcs = new MultiTaskCompletionSource(list.Count);

logger.LogInformation((int)ErrorCode.Catalog_ShutdownActivations_1, "DeactivateActivationsFromCollector: total {Count} to promptly Destroy.", list.Count);
logger.LogInformation((int)ErrorCode.Catalog_ShutdownActivations_1, "Deactivating '{Count}' idle activations.", list.Count);
CatalogInstruments.ActivationShutdownViaCollection();

Action signalCompletion = mtcs.SetOneResult;
var reason = GetDeactivationReason();
for (var i = 0; i < list.Count; i++)

var options = new ParallelOptions
{
var activationData = list[i];
// Avoid passing the cancellation token, since we want all of these activations to be deactivated, even if cancellation is triggered.
CancellationToken = CancellationToken.None,
MaxDegreeOfParallelism = Environment.ProcessorCount * 512
};

await Parallel.ForEachAsync(list, options, async (activationData, token) =>
{
// Continue deactivation when ready.
activationData.Deactivate(reason);
activationData.Deactivated.GetAwaiter().OnCompleted(signalCompletion);
}
activationData.Deactivate(reason, cancellationToken);
await activationData.Deactivated.ConfigureAwait(false);
}).WaitAsync(cancellationToken);
}

await mtcs.Task;
public void Dispose()
{
_collectionTimer.Dispose();
_shutdownCts.Dispose();
}

private class Bucket
Expand Down Expand Up @@ -560,7 +560,7 @@ public List<ICollectibleGrainContext> CancelAll()
item.CollectionTicket = default;
}

result ??= new List<ICollectibleGrainContext>();
result ??= [];
result.Add(pair.Value);
}

Expand Down
20 changes: 15 additions & 5 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,19 @@ internal async Task DeactivateActivations(DeactivationReason reason, List<IGrain
if (list == null || list.Count == 0) return;

if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("DeactivateActivations: {Count} activations.", list.Count);
await Parallel.ForEachAsync(list, cancellationToken, (activation, ct) =>
var options = new ParallelOptions
{
CancellationToken = CancellationToken.None,
MaxDegreeOfParallelism = Environment.ProcessorCount * 512
};
await Parallel.ForEachAsync(list, options, (activation, _) =>
{
if (activation.GrainId.Type.IsSystemTarget())
{
return ValueTask.CompletedTask;
}
activation.Deactivate(reason, ct);
activation.Deactivate(reason, cancellationToken);
return new (activation.Deactivated);
}).WaitAsync(cancellationToken);
}
Expand All @@ -282,16 +287,21 @@ public async Task DeactivateAllActivations(CancellationToken cancellationToken)

if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("DeactivateActivations: {Count} activations.", activations.Count);
var reason = new DeactivationReason(DeactivationReasonCode.ShuttingDown, "This process is terminating.");
await Parallel.ForEachAsync(activations, cancellationToken, (kv, ct) =>
var options = new ParallelOptions
{
CancellationToken = CancellationToken.None,
MaxDegreeOfParallelism = Environment.ProcessorCount * 512
};
await Parallel.ForEachAsync(activations, options, (kv, _) =>
{
if (kv.Key.IsSystemTarget())
{
return ValueTask.CompletedTask;
}
var activation = kv.Value;
activation.Deactivate(reason, ct);
return new (activation.Deactivated.WaitAsync(ct));
activation.Deactivate(reason, cancellationToken);
return new (activation.Deactivated);
}).WaitAsync(cancellationToken);
}

Expand Down
3 changes: 2 additions & 1 deletion src/Orleans.Runtime/Silo/SiloControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -98,7 +99,7 @@ public Task ForceGarbageCollection()
public Task ForceActivationCollection(TimeSpan ageLimit)
{
logger.LogInformation("ForceActivationCollection");
return _activationCollector.CollectActivations(ageLimit);
return _activationCollector.CollectActivations(ageLimit, CancellationToken.None);
}

public Task ForceRuntimeStatisticsCollection()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task ElasticChaos()
log.LogInformation("ServiceId: '{ServiceId}'", testCluster.Options.ServiceId);
log.LogInformation("ClusterId: '{ClusterId}'.", testCluster.Options.ClusterId);

var cts = new CancellationTokenSource(TimeSpan.FromMinutes(15));
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
var reconfigurationTimer = CoarseStopwatch.StartNew();
var upperLimit = 10;
var lowerLimit = 1; // Membership is kept on the primary, so we can't go below 1
Expand Down

0 comments on commit e990caa

Please sign in to comment.