Skip to content

Commit

Permalink
Improve timely shutdown of directory partitions when snapshot transfe…
Browse files Browse the repository at this point in the history
…r has been abandoned (#9197)
  • Loading branch information
ReubenBond authored Oct 22, 2024
1 parent f8cdb45 commit ae5515a
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using Microsoft.Extensions.DependencyInjection;
using Orleans.Configuration;
using Orleans.TestingHost;

namespace ChaoticCluster.Silo;
Expand All @@ -8,9 +6,9 @@ class SiloBuilderConfigurator : ISiloConfigurator
{
public void Configure(ISiloBuilder siloBuilder)
{
#pragma warning disable ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
#pragma warning disable ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
siloBuilder.AddDistributedGrainDirectory();
#pragma warning restore ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
#pragma warning restore ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
}
}

Expand Down
29 changes: 27 additions & 2 deletions src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ private async Task<TResult> InvokeAsync<TState, TResult>(
}
}

public async ValueTask<Immutable<List<GrainAddress>>> RecoverRegisteredActivations(MembershipVersion membershipVersion, RingRange range, SiloAddress siloAddress, int partitionIndex)
{
foreach (var partition in _partitions)
{
partition.OnRecoveringPartition(membershipVersion, range, siloAddress, partitionIndex).Ignore();
}

return await GetRegisteredActivations(membershipVersion, range, false);
}

public async ValueTask<Immutable<List<GrainAddress>>> GetRegisteredActivations(MembershipVersion membershipVersion, RingRange range, bool isValidation)
{
if (!isValidation && _logger.IsEnabled(LogLevel.Debug))
Expand All @@ -181,7 +191,7 @@ public async ValueTask<Immutable<List<GrainAddress>>> GetRegisteredActivations(M
foreach (var (grainId, activation) in localActivations)
{
var directory = GetGrainDirectory(activation, grainDirectoryResolver);
if (directory is not null && directory == this)
if (directory == this)
{
var address = activation.Address;
if (!range.Contains(address.GrainId))
Expand Down Expand Up @@ -296,6 +306,7 @@ async Task OnShuttingDown(CancellationToken token)
{
tasks.Add(partition.OnShuttingDown(token));
}

await Task.WhenAll(tasks).SuppressThrowing();
}
}
Expand All @@ -309,6 +320,8 @@ private async Task ProcessMembershipUpdates()
{
try
{
DirectoryMembershipSnapshot previous = _membershipService.CurrentView;
var previousRanges = RingRangeCollection.Empty;
await foreach (var update in _membershipService.ViewUpdates.WithCancellation(_stoppedCts.Token))
{
tasks.RemoveAll(t => t.IsCompleted);
Expand All @@ -326,6 +339,7 @@ private async Task ProcessMembershipUpdates()
}

var current = update;
var currentRanges = current.GetMemberRanges(Silo);

foreach (var partition in _partitions)
{
Expand All @@ -334,10 +348,21 @@ private async Task ProcessMembershipUpdates()

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Updated view from '{PreviousVersion}' to '{Version}'.", previousUpdate.Version, update.Version);
var deltaSize = currentRanges.SizePercent - previousRanges.SizePercent;
var meanSizePercent = current.Members.Length > 0 ? 100.0 / current.Members.Length : 0f;
var deviationFromMean = Math.Abs(meanSizePercent - currentRanges.SizePercent);
_logger.LogDebug(
"Updated view from '{PreviousVersion}' to '{Version}'. Now responsible for {Range:0.00}% (Δ {DeltaPercent:0.00}%). {DeviationFromMean:0.00}% from ideal share.",
previous.Version,
current.Version,
currentRanges.SizePercent,
deltaSize,
deviationFromMean);
}

previousUpdate = update.ClusterMembershipSnapshot;
previous = current;
previousRanges = currentRanges;
}
}
catch (Exception exception)
Expand Down
89 changes: 65 additions & 24 deletions src/Orleans.Runtime/GrainDirectory/GrainDirectoryReplica.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,25 @@ public async ValueTask<DirectoryMembershipSnapshot> RefreshViewAsync(MembershipV

ValueTask<bool> IGrainDirectoryPartition.AcknowledgeSnapshotTransferAsync(SiloAddress silo, int partitionIndex, MembershipVersion rangeVersion)
{
RemoveSnapshotTransferPartner((silo, partitionIndex), rangeVersion);
RemoveSnapshotTransferPartner(
(silo, partitionIndex, rangeVersion),
snapshotFilter: (state, snapshot) => snapshot.DirectoryMembershipVersion == state.rangeVersion,
partnerFilter: (state, silo, partitionIndex) => silo.Equals(state.silo) && partitionIndex == state.partitionIndex);
return new(true);
}

private void RemoveSnapshotTransferPartner((SiloAddress Silo, int PartitionIndex) owner, MembershipVersion? rangeVersion)
private void RemoveSnapshotTransferPartner<TState>(TState state, Func<TState, PartitionSnapshotState, bool> snapshotFilter, Func<TState, SiloAddress, int, bool> partnerFilter)
{
for (var i = 0; i < _partitionSnapshots.Count; ++i)
{
var partitionSnapshot = _partitionSnapshots[i];
if (rangeVersion.HasValue && partitionSnapshot.DirectoryMembershipVersion != rangeVersion.Value)
if (!snapshotFilter(state, partitionSnapshot))
{
continue;
}

var partners = partitionSnapshot.TransferPartners;
partners.RemoveWhere(p => p.SiloAddress.Equals(owner.Silo) && (owner.PartitionIndex < 0 || p.PartitionIndex == owner.PartitionIndex));
partners.RemoveWhere(p => partnerFilter(state, p.SiloAddress, p.PartitionIndex));
if (partners.Count == 0)
{
_partitionSnapshots.RemoveAt(i);
Expand Down Expand Up @@ -276,9 +279,32 @@ private void OnSiloRemovedFromCluster(ClusterMember change)
}
}

RemoveSnapshotTransferPartner((change.SiloAddress, -1), rangeVersion: null);
RemoveSnapshotTransferPartner(
change.SiloAddress,
snapshotFilter: (state, snapshot) => true,
partnerFilter: (state, silo, partitionIndex) => silo.Equals(state));
}

internal Task OnRecoveringPartition(MembershipVersion version, RingRange range, SiloAddress siloAddress, int partitionIndex) =>
this.QueueTask(
async () =>
{
try
{
await WaitForRange(range, version);
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Error waiting for range to unlock.");
}
// Remove all snapshots that are associated with the given replica prior or equal to the specified version.
RemoveSnapshotTransferPartner(
(Version: version, SiloAddress: siloAddress, PartitionIndex: partitionIndex),
snapshotFilter: (state, snapshot) => snapshot.DirectoryMembershipVersion <= state.Version,
partnerFilter: (state, silo, partitionIndex) => partitionIndex == state.PartitionIndex && silo.Equals(state.SiloAddress));
});

internal Task ProcessMembershipUpdateAsync(DirectoryMembershipSnapshot current) =>
this.QueueAction(
static state => state.Self.ProcessMembershipUpdate(state.Current),
Expand All @@ -302,15 +328,6 @@ private void ProcessMembershipUpdate(DirectoryMembershipSnapshot current)
var previousRange = previous.GetRange(_id, _partitionIndex);
_currentRange = current.GetRange(_id, _partitionIndex);

// It is important that this method is synchronous, to ensure that updates are atomic.
var deltaSize = _currentRange.SizePercent - previousRange.SizePercent;
var meanSizePercent = current.Members.Length > 0 ? 100.0 / current.Members.Length : 0f;
var deviationFromMean = Math.Abs(meanSizePercent - _currentRange.SizePercent);
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Updating view from '{PreviousVersion}' to '{Version}'. Now responsible for '{Range}' (Δ {DeltaPercent:0.00}%. {DeviationFromMean:0.00}% from ideal share).", previous.Version, current.Version, _currentRange, deltaSize, deviationFromMean);
}

var removedRange = previousRange.Difference(_currentRange).SingleOrDefault();
var addedRange = _currentRange.Difference(previousRange).SingleOrDefault();

Expand Down Expand Up @@ -347,7 +364,7 @@ private async Task ReleaseRangeAsync(DirectoryMembershipSnapshot previous, Direc
var (tcs, sw) = LockRange(removedRange, current.Version);
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Relinquishing ownership of range '{Range}'.", removedRange);
_logger.LogDebug("Relinquishing ownership of range '{Range}' at version '{Version}'.", removedRange, current.Version);
}

try
Expand All @@ -361,10 +378,6 @@ private async Task ReleaseRangeAsync(DirectoryMembershipSnapshot previous, Direc
await WaitForRange(removedRange, previous.Version);

GrainRuntime.CheckRuntimeContext(this);
if (_logger.IsEnabled(LogLevel.Trace))
{
_logger.LogTrace("Relinquishing ownership of range '{Range}'.", removedRange);
}

foreach (var (range, ownerIndex, partitionIndex) in current.RangeOwners)
{
Expand Down Expand Up @@ -396,14 +409,28 @@ private async Task ReleaseRangeAsync(DirectoryMembershipSnapshot previous, Direc
_directory.Remove(address.GrainId);
}

if (transferPartners.Count > 0)
var isContiguous = current.Version.Value == previous.Version.Value + 1;
if (!isContiguous)
{
_partitionSnapshots.Add(new PartitionSnapshotState(previous.Version, removedAddresses, transferPartners));
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Encountered non-contiguous update from '{Previous}' to '{Current}' while releasing range '{Range}'. Dropping snapshot.", previous.Version, current.Version, removedRange);
}

return;
}
else

if (transferPartners.Count == 0)
{
_logger.LogDebug("Dropping snapshot since there are no transfer partners.");
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("No transfer partners for snapshot of range '{Range}' at version '{Version}'. Dropping snapshot.", removedRange, current.Version);
}

return;
}

_partitionSnapshots.Add(new PartitionSnapshotState(previous.Version, removedAddresses, transferPartners));
}
finally
{
Expand Down Expand Up @@ -641,7 +668,21 @@ async Task<List<GrainAddress>> GetRegisteredActivationsFromClusterMember(Members
var client = _grainFactory.GetSystemTarget<IGrainDirectoryClient>(Constants.GrainDirectory, siloAddress);
var result = await InvokeOnClusterMember(
siloAddress,
async () => await client.GetRegisteredActivations(version, range, isValidation),
async () =>
{
var innerSw = ValueStopwatch.StartNew();
Immutable<List<GrainAddress>> result = default;
if (isValidation)
{
result = await client.GetRegisteredActivations(version, range, isValidation: true);
}
else
{
result = await client.RecoverRegisteredActivations(version, range, _id, _partitionIndex);
}
return result;
},
new Immutable<List<GrainAddress>>([]),
nameof(GetRegisteredActivations));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ internal interface IGrainDirectoryClient : ISystemTarget
{
[Alias("GetRegisteredActivations")]
ValueTask<Immutable<List<GrainAddress>>> GetRegisteredActivations(MembershipVersion membershipVersion, RingRange range, bool isValidation);

[Alias("RecoverRegisteredActivations")]
ValueTask<Immutable<List<GrainAddress>>> RecoverRegisteredActivations(MembershipVersion membershipVersion, RingRange range, SiloAddress siloAddress, int partitionId);
}

[Alias("IGrainDirectoryReplicaTestHooks")]
Expand Down
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Hosting/CoreHostingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private static void ConfigurePrimarySiloEndpoint(OptionsBuilder<DevelopmentClust
/// <param name="siloBuilder">The silo builder to register the directory implementation with.</param>
/// <param name="name">The name of the directory to register, or null to register the directory as the default.</param>
/// <returns>The provided silo builder.</returns>
[Experimental("ORLEANSEXP002")]
[Experimental("ORLEANSEXP003")]
public static ISiloBuilder AddDistributedGrainDirectory(this ISiloBuilder siloBuilder, string? name = null)
{
var services = siloBuilder.Services;
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.TestingHost/ConfigureDistributedGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Orleans.TestingHost;

internal class ConfigureDistributedGrainDirectory : ISiloConfigurator
{
#pragma warning disable ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
#pragma warning disable ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
public void Configure(ISiloBuilder siloBuilder) => siloBuilder.AddDistributedGrainDirectory();
#pragma warning restore ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
#pragma warning restore ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ private class SiloBuilderConfigurator : ISiloConfigurator
public void Configure(ISiloBuilder siloBuilder)
{
siloBuilder.Configure<SiloMessagingOptions>(o => o.ResponseTimeout = o.SystemResponseTimeout = TimeSpan.FromMinutes(2));
#pragma warning disable ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
#pragma warning disable ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
siloBuilder.AddDistributedGrainDirectory();
#pragma warning restore ORLEANSEXP002 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
#pragma warning restore ORLEANSEXP003 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
}
}
}
Expand Down

0 comments on commit ae5515a

Please sign in to comment.