Skip to content

Commit

Permalink
Removed timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Oct 11, 2021
1 parent bd75404 commit ebda0df
Show file tree
Hide file tree
Showing 14 changed files with 59 additions and 103 deletions.
19 changes: 6 additions & 13 deletions src/DotNext.IO/IO/Log/IAuditTrail.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,26 @@ public interface IAuditTrail
/// Waits for the commit.
/// </summary>
/// <param name="index">The index of the log record to be committed.</param>
/// <param name="timeout">The timeout used to wait for the commit.</param>
/// <param name="token">The token that can be used to cancel waiting.</param>
/// <returns><see langword="true"/> if log entry with the specified <paramref name="index"/> is committed; otherwise, <see langword="false"/>.</returns>
/// <returns>The task representing asynchronous result.</returns>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than 1.</exception>
/// <exception cref="OperationCanceledException">The operation has been cancelled.</exception>
async ValueTask<bool> WaitForCommitAsync(long index, TimeSpan timeout, CancellationToken token = default)
async ValueTask WaitForCommitAsync(long index, CancellationToken token = default)
{
if (index < 0L)
throw new ArgumentOutOfRangeException(nameof(index));

for (var timeoutMeasurement = new Timeout(timeout); LastCommittedEntryIndex < index; await WaitForCommitAsync(timeout, token).ConfigureAwait(false))
{
if (!timeoutMeasurement.RemainingTime.TryGetValue(out timeout))
return false;
}

return true;
while (LastCommittedEntryIndex < index)
await WaitForCommitAsync(token).ConfigureAwait(false);
}

/// <summary>
/// Waits for the commit.
/// </summary>
/// <param name="timeout">The timeout used to wait for the commit.</param>
/// <param name="token">The token that can be used to cancel waiting.</param>
/// <returns><see langword="true"/> if log entry is committed; otherwise, <see langword="false"/>.</returns>
/// <returns>The task representing asynchronous result.</returns>
/// <exception cref="OperationCanceledException">The operation has been cancelled.</exception>
ValueTask<bool> WaitForCommitAsync(TimeSpan timeout, CancellationToken token);
ValueTask WaitForCommitAsync(CancellationToken token = default);

/// <summary>
/// Commits log entries into the underlying storage and marks these entries as committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static async Task Appending()
Equal(0, auditTrail.LastCommittedEntryIndex);
//commit all entries
Equal(2, await auditTrail.CommitAsync(CancellationToken.None));
True(await auditTrail.WaitForCommitAsync(2, TimeSpan.Zero));
await auditTrail.WaitForCommitAsync(2);
Equal(2, auditTrail.LastCommittedEntryIndex);
//check overlapping with committed entries
await ThrowsAsync<InvalidOperationException>(() => auditTrail.AppendAsync(new LogEntryList(entry1, entry2), 2).AsTask());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ protected override async Task ExecuteAsync(CancellationToken token)

while (!token.IsCancellationRequested)
{
await state.WaitForCommitAsync(Timeout.InfiniteTimeSpan, token).ConfigureAwait(false);
await state.WaitForCommitAsync(token).ConfigureAwait(false);
await compaction(token).ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,12 @@ ValueTask IPersistentState.UpdateVotedForAsync(ClusterMemberId? id)
}

/// <inheritdoc/>
ValueTask<bool> IAuditTrail.WaitForCommitAsync(TimeSpan timeout, CancellationToken token)
=> commitEvent.WaitAsync(timeout, token);
ValueTask IAuditTrail.WaitForCommitAsync(CancellationToken token)
=> commitEvent.WaitAsync(token);

/// <inheritdoc/>
ValueTask<bool> IAuditTrail.WaitForCommitAsync(long index, TimeSpan timeout, CancellationToken token)
=> commitEvent.WaitForCommitAsync(IsCommittedPredicate, this, index, timeout, token);
ValueTask IAuditTrail.WaitForCommitAsync(long index, CancellationToken token)
=> commitEvent.WaitForCommitAsync(IsCommittedPredicate, this, index, token);

private async Task EnsureConsistency(TimeSpan timeout, CancellationToken token)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ private void DrainReplicationQueue()
private ValueTask<bool> WaitForReplicationAsync(TimeSpan period, CancellationToken token)
=> replicationEvent.WaitAsync(period, token);

internal Task ForceReplicationAsync(TimeSpan timeout, CancellationToken token)
internal Task ForceReplicationAsync(CancellationToken token)
{
var result = replicationQueue.Task;

// resume heartbeat loop to force replication
replicationEvent.Signal();

// enqueue a new task representing completion callback
return result.WaitAsync(timeout, token);
return result.WaitAsync(token);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -807,28 +807,22 @@ ValueTask<long> IAuditTrail.DropAsync(long startIndex, CancellationToken token)
/// <summary>
/// Waits for the commit.
/// </summary>
/// <param name="timeout">The timeout used to wait for the commit.</param>
/// <param name="token">The token that can be used to cancel waiting.</param>
/// <returns>The task representing asynchronous result.</returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
/// <exception cref="TimeoutException">The operation has timed out.</exception>
public ValueTask<bool> WaitForCommitAsync(TimeSpan timeout, CancellationToken token)
=> commitEvent.WaitAsync(timeout, token);
/// <exception cref="OperationCanceledException">The operation has been cancelled.</exception>
public ValueTask WaitForCommitAsync(CancellationToken token = default)
=> commitEvent.WaitAsync(token);

/// <summary>
/// Waits for specific commit.
/// Waits for the commit.
/// </summary>
/// <param name="index">The index of the log record to be committed.</param>
/// <param name="timeout">The timeout used to wait for the commit.</param>
/// <param name="token">The token that can be used to cancel waiting.</param>
/// <returns>
/// <see langword="true"/> if log entry at given index has been committed;
/// <see langword="false"/> in case of timeout.
/// </returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
/// <exception cref="TimeoutException">The operation has timed out.</exception>
public ValueTask<bool> WaitForCommitAsync(long index, TimeSpan timeout, CancellationToken token)
=> commitEvent.WaitForCommitAsync(NodeState.IsCommittedPredicate, state, index, timeout, token);
/// <returns>The task representing asynchronous result.</returns>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than 1.</exception>
/// <exception cref="OperationCanceledException">The operation has been cancelled.</exception>
public ValueTask WaitForCommitAsync(long index, CancellationToken token = default)
=> commitEvent.WaitForCommitAsync(NodeState.IsCommittedPredicate, state, index, token);

// this operation doesn't require write lock
private async ValueTask BuildSnapshotAsync(int sessionId, long upperBoundIndex, SnapshotBuilder builder, CancellationToken token)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ protected async Task<bool> AddMemberAsync<TAddress>(TMember member, int rounds,
// proposes a new member
if (await configurationStorage.AddMemberAsync(member.Id, addressProvider(member), token).ConfigureAwait(false))
{
await ReplicateAsync(new EmptyLogEntry(Term), Timeout.Infinite, token).ConfigureAwait(false);
while (!await ReplicateAsync(new EmptyLogEntry(Term), token).ConfigureAwait(false));

// ensure that the newly added member has been committed
await configurationStorage.WaitForApplyAsync(token).ConfigureAwait(false);
Expand Down Expand Up @@ -253,7 +253,7 @@ public async Task<bool> RemoveMemberAsync<TAddress>(ClusterMemberId id, ICluster
// remove the existing member
if (await configurationStorage.RemoveMemberAsync(id, token).ConfigureAwait(false))
{
await ReplicateAsync(new EmptyLogEntry(Term), Timeout.Infinite, token).ConfigureAwait(false);
while (!await ReplicateAsync(new EmptyLogEntry(Term), token).ConfigureAwait(false));

// ensure that the removed member has been committed
await configurationStorage.WaitForApplyAsync(token).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,47 +701,38 @@ async void IRaftStateMachine.MoveToLeaderState(IRaftClusterMember newLeader)
/// <summary>
/// Forces replication.
/// </summary>
/// <param name="timeout">The time to wait until replication ends.</param>
/// <param name="token">The token that can be used to cancel waiting.</param>
/// <returns>The task representing asynchronous result.</returns>
/// <exception cref="InvalidOperationException">The local cluster member is not a leader.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
public Task ForceReplicationAsync(TimeSpan timeout, CancellationToken token = default)
=> state is LeaderState leaderState ? leaderState.ForceReplicationAsync(timeout, token) : Task.FromException<bool>(new InvalidOperationException(ExceptionMessages.LocalNodeNotLeader));
public Task ForceReplicationAsync(CancellationToken token = default)
=> state is LeaderState leaderState ? leaderState.ForceReplicationAsync(token) : Task.FromException(new InvalidOperationException(ExceptionMessages.LocalNodeNotLeader));

private async Task<bool> ReplicateAsync<TEntry>(TEntry entry, Timeout timeout, CancellationToken token)
/// <summary>
/// Appends a new log entry and ensures that it is replicated and committed.
/// </summary>
/// <typeparam name="TEntry">The type of the log entry.</typeparam>
/// <param name="entry">The log entry to be added.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns><see langword="true"/> if the appended log entry has been committed by the majority of nodes; <see langword="false"/> if retry is required.</returns>
/// <exception cref="InvalidOperationException">The current node is not a leader.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
public async Task<bool> ReplicateAsync<TEntry>(TEntry entry, CancellationToken token)
where TEntry : notnull, IRaftLogEntry
{
using var tokenSource = token.LinkTo(LifecycleToken);

// 1 - append entry to the log
var index = await auditTrail.AppendAsync(entry, token).ConfigureAwait(false);
timeout.ThrowIfExpired(out var remaining);

// 2 - force replication
await ForceReplicationAsync(remaining, token).ConfigureAwait(false);
timeout.ThrowIfExpired(out remaining);
await ForceReplicationAsync(token).ConfigureAwait(false);

// 3 - wait for commit
return await auditTrail.WaitForCommitAsync(index, remaining, token).ConfigureAwait(false)
? auditTrail.Term == entry.Term
: throw new TimeoutException();
}
await auditTrail.WaitForCommitAsync(index, token).ConfigureAwait(false);

/// <summary>
/// Appends and replicates the log entry.
/// </summary>
/// <typeparam name="TEntry">The type of the log entry.</typeparam>
/// <param name="entry">The log entry to append.</param>
/// <param name="timeout">The timeout for the operation.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns>
/// <see langword="true"/> if the has been replicated successfully;
/// otherwise, <see langword="false"/>.
/// </returns>
public Task<bool> ReplicateAsync<TEntry>(TEntry entry, TimeSpan timeout, CancellationToken token)
where TEntry : notnull, IRaftLogEntry
=> ReplicateAsync(entry, new(timeout), token);
return auditTrail.Term == entry.Term;
}

private TMember? TryGetPeer(EndPoint peer)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,13 @@ namespace DotNext.Net.Cluster.Replication;

internal static class CommitEvent
{
internal static async ValueTask<bool> WaitForCommitAsync<T>(this AsyncManualResetEvent commitEvent, Func<T, long, bool> commitChecker, T arg, long index, TimeSpan timeout, CancellationToken token)
internal static async ValueTask WaitForCommitAsync<T>(this AsyncManualResetEvent commitEvent, Func<T, long, bool> commitChecker, T arg, long index, CancellationToken token)
where T : class
{
if (index < 0L)
throw new ArgumentOutOfRangeException(nameof(index));

for (var timeoutMeasurement = new Timeout(timeout); !commitChecker(arg, index); await commitEvent.WaitAsync(commitChecker, arg, index, timeout, token).ConfigureAwait(false))
{
if (!timeoutMeasurement.RemainingTime.TryGetValue(out timeout))
return false;
}

return true;
while (!commitChecker(arg, index))
await commitEvent.WaitAsync(commitChecker, arg, index, token).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ public interface IReplicationCluster : ICluster
/// <summary>
/// Forces replication.
/// </summary>
/// <param name="timeout">The time to wait until replication ends.</param>
/// <param name="token">The token that can be used to cancel waiting.</param>
/// <exception cref="InvalidOperationException">The local cluster member is not a leader.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
/// <exception cref="TimeoutException">The operation has timed out.</exception>
Task ForceReplicationAsync(TimeSpan timeout, CancellationToken token = default);
Task ForceReplicationAsync(CancellationToken token = default);

/// <summary>
/// Represents an event raised when the local node completes its replication with another
Expand Down Expand Up @@ -49,12 +47,10 @@ public interface IReplicationCluster<TEntry> : IReplicationCluster
/// </summary>
/// <typeparam name="TEntryImpl">The type of the log entry.</typeparam>
/// <param name="entry">The log entry to be added.</param>
/// <param name="timeout">The timeout of the operation.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns><see langword="true"/> if the appended log entry has been committed by the majority of nodes; <see langword="false"/> if retry is required.</returns>
/// <exception cref="InvalidOperationException">The current node is not a leader.</exception>
/// <exception cref="TimeoutException">The operation timed out.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
Task<bool> ReplicateAsync<TEntryImpl>(TEntryImpl entry, TimeSpan timeout, CancellationToken token = default)
Task<bool> ReplicateAsync<TEntryImpl>(TEntryImpl entry, CancellationToken token = default)
where TEntryImpl : notnull, TEntry;
}
12 changes: 6 additions & 6 deletions src/examples/RaftNode/DataModifier.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
using DotNext.Net.Cluster.Consensus.Raft;
using DotNext;
using DotNext.Net.Cluster.Consensus.Raft;
using DotNext.Threading;
using static System.Threading.Timeout;

namespace RaftNode;

internal sealed class DataModifier : BackgroundService
{
private readonly IRaftCluster cluster;
private readonly IValueProvider valueProvider;
private readonly ISupplier<long> valueProvider;

public DataModifier(IRaftCluster cluster, IValueProvider provider)
public DataModifier(IRaftCluster cluster, ISupplier<long> provider)
{
this.cluster = cluster;
valueProvider = provider;
Expand All @@ -24,14 +24,14 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var leadershipToken = cluster.LeadershipToken;
if (!leadershipToken.IsCancellationRequested)
{
var newValue = valueProvider.Value + 500L;
var newValue = valueProvider.Invoke() + 500L;
Console.WriteLine("Saving value {0} generated by the leader node", newValue);

var source = stoppingToken.LinkTo(leadershipToken);
try
{
var entry = new Int64LogEntry { Content = newValue, Term = cluster.Term };
await cluster.ReplicateAsync(entry, InfiniteTimeSpan, stoppingToken);
await cluster.ReplicateAsync(entry, stoppingToken);
}
catch (Exception e)
{
Expand Down
8 changes: 0 additions & 8 deletions src/examples/RaftNode/IValueProvider.cs

This file was deleted.

14 changes: 4 additions & 10 deletions src/examples/RaftNode/SimplePersistentState.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using DotNext.IO;
using DotNext;
using DotNext.IO;
using DotNext.Net.Cluster.Consensus.Raft;
using static DotNext.Threading.AtomicInt64;

namespace RaftNode;

internal sealed class SimplePersistentState : PersistentState, IValueProvider
internal sealed class SimplePersistentState : PersistentState, ISupplier<long>
{
internal const string LogLocation = "logLocation";

Expand Down Expand Up @@ -67,7 +68,7 @@ private static Options CreateOptions(AppEventSource source)
return result;
}

long IValueProvider.Value => content.VolatileRead();
long ISupplier<long>.Invoke() => content.VolatileRead();

private async ValueTask UpdateValue(LogEntry entry)
{
Expand All @@ -79,13 +80,6 @@ private async ValueTask UpdateValue(LogEntry entry)
protected override ValueTask ApplyAsync(LogEntry entry)
=> entry.Length == 0L ? new ValueTask() : UpdateValue(entry);

async Task IValueProvider.UpdateValueAsync(long value, TimeSpan timeout, CancellationToken token)
{
var commitIndex = LastCommittedEntryIndex;
await AppendAsync(new Int64LogEntry { Content = value, Term = Term }, token);
await WaitForCommitAsync(commitIndex + 1L, timeout, token);
}

protected override SnapshotBuilder CreateSnapshotBuilder(in SnapshotBuilderContext context)
{
Console.WriteLine("Building snapshot");
Expand Down
5 changes: 3 additions & 2 deletions src/examples/RaftNode/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using DotNext.Net.Cluster;
using DotNext;
using DotNext.Net.Cluster;
using DotNext.Net.Cluster.Consensus.Raft;
using DotNext.Net.Cluster.Consensus.Raft.Http;
using DotNext.Net.Http;
Expand Down Expand Up @@ -42,7 +43,7 @@ public void ConfigureServices(IServiceCollection services)
if (!string.IsNullOrWhiteSpace(path))
{
services.AddSingleton<AppEventSource>();
services.UsePersistenceEngine<IValueProvider, SimplePersistentState>()
services.UsePersistenceEngine<ISupplier<long>, SimplePersistentState>()
.AddSingleton<IHostedService, DataModifier>();
}
}
Expand Down

0 comments on commit ebda0df

Please sign in to comment.