Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] On a timeout, check for partition 'heartbeat' and report possibly unavailable partitions #387

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 45 additions & 3 deletions src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace DurableTask.Netherite
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
using DurableTask.Netherite.Scaling;
using Microsoft.Azure.Storage;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
Expand All @@ -30,6 +31,7 @@ class Client : TransportAbstraction.IClient
readonly WorkItemTraceHelper workItemTraceHelper;
readonly Stopwatch workItemStopwatch;
readonly CancellationTokenSource cts;
readonly ILoadPublisherService partitionLoadPublisher;

static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(5);

Expand All @@ -53,11 +55,13 @@ public Client(
Guid taskHubGuid,
TransportAbstraction.ISender batchSender,
WorkItemTraceHelper workItemTraceHelper,
CancellationToken shutdownToken)
CancellationToken shutdownToken,
ILoadPublisherService loadPublisher)
{
this.host = host;
this.ClientId = clientId;
this.taskHubGuid = taskHubGuid;
this.partitionLoadPublisher = loadPublisher;
this.traceHelper = new ClientTraceHelper(host.LoggerFactory, host.Settings.ClientLogLevelLimit, host.StorageAccountName, host.Settings.HubName, this.ClientId);
this.workItemTraceHelper = workItemTraceHelper;
this.account = host.StorageAccountName;
Expand Down Expand Up @@ -299,7 +303,7 @@ async Task<ClientEvent> PerformRequestWithTimeoutAndCancellation(IClientRequestE
}

static readonly TimeoutException timeoutException = new TimeoutException("Client request timed out.");
static readonly OperationCanceledException shutdownException = new OperationCanceledException("Client request was cancelled because host is shutting down.");
readonly OperationCanceledException shutdownException = new OperationCanceledException("Client request was cancelled because host is shutting down.");
davidmrdavid marked this conversation as resolved.
Show resolved Hide resolved

internal class PendingRequest : TransportAbstraction.IDurabilityOrExceptionListener
{
Expand Down Expand Up @@ -419,12 +423,50 @@ void TransportAbstraction.IDurabilityOrExceptionListener.ReportException(Event e

public void TryTimeout()
{
// Obtain the client task for this client request ID
this.client.traceHelper.TraceTimerProgress($"firing ({this.timeoutKey.due:o},{this.timeoutKey.id})");
if (this.client.ResponseWaiters.TryRemove(this.requestId, out var pendingRequest))
{
this.client.traceHelper.TraceRequestTimeout(pendingRequest.partitionEventId, pendingRequest.partitionId);
this.continuation.TrySetException(timeoutException);

// a timeout may represent either a slow partition, or an unresponsive one.
// we check the 'partitions load provider' (either blob or Azure Table) to determine what error to throw.
// TODO: should we introduce a Query API that only searches for a given partitionID?
this.client.partitionLoadPublisher.QueryAsync(CancellationToken.None).ContinueWith((task) =>
davidmrdavid marked this conversation as resolved.
Show resolved Hide resolved
{
if (task.IsFaulted)
{
// TODO: consider logging that something went wrong in the partition table query
this.continuation.TrySetException(timeoutException);
}
else
{
PartitionLoadInfo info = task.Result[pendingRequest.partitionId];
if (info.Timestamp.HasValue)
{
// see how much time has gone by between the partition updating it's entry
// and the timeout firing. We'll call this the 'heartbeat' of the partition.
DateTime lastHeartBeatTime = info.Timestamp.Value.UtcDateTime;
TimeSpan timeSinceLastHeartbeat = DateTimeOffset.UtcNow.Subtract(lastHeartBeatTime);
if (timeSinceLastHeartbeat > TimeSpan.FromMinutes(10))
{
// It's been over 10 minutes, the partition may be unavailable
// TODO: should this be made static?
TimeoutException exception = new TimeoutException($"Client request timed out. " +
$"The target partition's ({this.partitionId}) last heartbeat ocurred at {lastHeartBeatTime}." +
$"It's possible the partition is offline.");
this.continuation.TrySetException(new TimeoutException("Partition is unresponsive"));
}
else
{
// report timeout, the partition may simply be slow
this.continuation.TrySetException(timeoutException);
}
}
}
});
}
// TODO: should we log if the request was not found?
}

public void TryCancel(OperationCanceledException exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,7 @@ public uint GetPartitionId(string instanceId)
TransportAbstraction.IClient TransportAbstraction.IHost.AddClient(Guid clientId, Guid taskHubGuid, TransportAbstraction.ISender batchSender)
{
System.Diagnostics.Debug.Assert(this.client == null, "Backend should create only 1 client");

this.client = new Client(this, clientId, taskHubGuid, batchSender, this.workItemTraceHelper, this.serviceShutdownSource.Token);
this.client = new Client(this, clientId, taskHubGuid, batchSender, this.workItemTraceHelper, this.serviceShutdownSource.Token, this.storage.LoadPublisher);
return this.client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class NetheriteOrchestrationServiceSettings

/// <summary>
/// Optionally, a name for an Azure Table to use for publishing load information. If set to null or empty,
/// then Azure blobs are used instead. The use of Azure blobs is currently not supported on consumption plans, or on elastic premium plans without runtime scaling.
/// then Azure blobs are used instead.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still accurate?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I don't quite remember, this was a limitation of the scale controller (couldn't read anything other than Azure Tables). Not sure if we changed that since then.

/// </summary>
public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public async Task<Dictionary<uint, PartitionLoadInfo>> QueryAsync(CancellationTo
MissRate = double.Parse(e.MissRate.Substring(0, e.MissRate.Length - 1)) / 100,
CachePct = int.Parse(e.CachePct.Substring(0, e.CachePct.Length - 1)),
CacheMB = e.CacheMB,
Timestamp = e.Timestamp,
});
}
return result;
Expand Down
5 changes: 5 additions & 0 deletions src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public string IsBusy()
[DataMember]
public double CacheMB { get; set; }

/// <summary>
/// The timestamp of the last write to this row. May be used to flag that partition as possibly unavailable.
/// </summary>
public DateTimeOffset? Timestamp { get; set; } // TODO: should this be a DataMember? Are we actually serializing this somewhere?

/// <summary>
/// The character representing idle load.
/// </summary>
Expand Down