diff --git a/src/DurableTask.Netherite/OrchestrationService/Client.cs b/src/DurableTask.Netherite/OrchestrationService/Client.cs
index dbf7ccd3..1761faf1 100644
--- a/src/DurableTask.Netherite/OrchestrationService/Client.cs
+++ b/src/DurableTask.Netherite/OrchestrationService/Client.cs
@@ -15,6 +15,7 @@ namespace DurableTask.Netherite
using DurableTask.Core;
using DurableTask.Core.Exceptions;
using DurableTask.Core.History;
+ using DurableTask.Netherite.Scaling;
using Microsoft.Azure.Storage;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
@@ -30,6 +31,7 @@ class Client : TransportAbstraction.IClient
readonly WorkItemTraceHelper workItemTraceHelper;
readonly Stopwatch workItemStopwatch;
readonly CancellationTokenSource cts;
+ readonly ILoadPublisherService partitionLoadPublisher;
static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(5);
@@ -53,11 +55,13 @@ public Client(
Guid taskHubGuid,
TransportAbstraction.ISender batchSender,
WorkItemTraceHelper workItemTraceHelper,
- CancellationToken shutdownToken)
+ CancellationToken shutdownToken,
+ ILoadPublisherService loadPublisher)
{
this.host = host;
this.ClientId = clientId;
this.taskHubGuid = taskHubGuid;
+ this.partitionLoadPublisher = loadPublisher;
this.traceHelper = new ClientTraceHelper(host.LoggerFactory, host.Settings.ClientLogLevelLimit, host.StorageAccountName, host.Settings.HubName, this.ClientId);
this.workItemTraceHelper = workItemTraceHelper;
this.account = host.StorageAccountName;
@@ -419,12 +423,50 @@ void TransportAbstraction.IDurabilityOrExceptionListener.ReportException(Event e
public void TryTimeout()
{
+ // Obtain the client task for this client request ID
this.client.traceHelper.TraceTimerProgress($"firing ({this.timeoutKey.due:o},{this.timeoutKey.id})");
if (this.client.ResponseWaiters.TryRemove(this.requestId, out var pendingRequest))
{
this.client.traceHelper.TraceRequestTimeout(pendingRequest.partitionEventId, pendingRequest.partitionId);
- this.continuation.TrySetException(timeoutException);
+
+ // a timeout may represent either a slow partition, or an unresponsive one.
+ // we check the 'partitions load provider' (either blob or Azure Table) to determine what error to throw.
+ // TODO: should we introduce a Query API that only searches for a given partitionID?
+ this.client.partitionLoadPublisher.QueryAsync(CancellationToken.None).ContinueWith((task) =>
+ {
+ if (task.IsFaulted)
+ {
+ // TODO: consider logging that something went wrong in the partition table query
+ this.continuation.TrySetException(timeoutException);
+ }
+ else
+ {
+ PartitionLoadInfo info = task.Result[pendingRequest.partitionId];
+ if (info.Timestamp.HasValue)
+ {
+ // see how much time has gone by between the partition updating it's entry
+ // and the timeout firing. We'll call this the 'heartbeat' of the partition.
+ DateTime lastHeartBeatTime = info.Timestamp.Value.UtcDateTime;
+ TimeSpan timeSinceLastHeartbeat = DateTimeOffset.UtcNow.Subtract(lastHeartBeatTime);
+ if (timeSinceLastHeartbeat > TimeSpan.FromMinutes(10))
+ {
+ // It's been over 10 minutes, the partition may be unavailable
+ // TODO: should this be made static?
+ TimeoutException exception = new TimeoutException($"Client request timed out. " +
+ $"The target partition's ({this.partitionId}) last heartbeat ocurred at {lastHeartBeatTime}." +
+ $"It's possible the partition is offline.");
+ this.continuation.TrySetException(new TimeoutException("Partition is unresponsive"));
+ }
+ else
+ {
+ // report timeout, the partition may simply be slow
+ this.continuation.TrySetException(timeoutException);
+ }
+ }
+ }
+ });
}
+ // TODO: should we log if the request was not found?
}
public void TryCancel(OperationCanceledException exception)
diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs
index 5b60c876..9ea4f8ad 100644
--- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs
+++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs
@@ -531,8 +531,7 @@ public uint GetPartitionId(string instanceId)
TransportAbstraction.IClient TransportAbstraction.IHost.AddClient(Guid clientId, Guid taskHubGuid, TransportAbstraction.ISender batchSender)
{
System.Diagnostics.Debug.Assert(this.client == null, "Backend should create only 1 client");
-
- this.client = new Client(this, clientId, taskHubGuid, batchSender, this.workItemTraceHelper, this.serviceShutdownSource.Token);
+ this.client = new Client(this, clientId, taskHubGuid, batchSender, this.workItemTraceHelper, this.serviceShutdownSource.Token, this.storage.LoadPublisher);
return this.client;
}
diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
index 46a14f84..b18da371 100644
--- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
+++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs
@@ -47,7 +47,7 @@ public class NetheriteOrchestrationServiceSettings
///
/// Optionally, a name for an Azure Table to use for publishing load information. If set to null or empty,
- /// then Azure blobs are used instead. The use of Azure blobs is currently not supported on consumption plans, or on elastic premium plans without runtime scaling.
+ /// then Azure blobs are used instead.
///
public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions";
diff --git a/src/DurableTask.Netherite/Scaling/AzureTableLoadPublisher.cs b/src/DurableTask.Netherite/Scaling/AzureTableLoadPublisher.cs
index 033dcf90..86136ec5 100644
--- a/src/DurableTask.Netherite/Scaling/AzureTableLoadPublisher.cs
+++ b/src/DurableTask.Netherite/Scaling/AzureTableLoadPublisher.cs
@@ -94,6 +94,7 @@ public async Task> QueryAsync(CancellationTo
MissRate = double.Parse(e.MissRate.Substring(0, e.MissRate.Length - 1)) / 100,
CachePct = int.Parse(e.CachePct.Substring(0, e.CachePct.Length - 1)),
CacheMB = e.CacheMB,
+ Timestamp = e.Timestamp,
});
}
return result;
diff --git a/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs b/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs
index 05e81553..8907bc25 100644
--- a/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs
+++ b/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs
@@ -133,6 +133,11 @@ public string IsBusy()
[DataMember]
public double CacheMB { get; set; }
+ ///
+ /// The timestamp of the last write to this row. May be used to flag that partition as possibly unavailable.
+ ///
+ public DateTimeOffset? Timestamp { get; set; } // TODO: should this be a DataMember? Are we actually serializing this somewhere?
+
///
/// The character representing idle load.
///