diff --git a/src/DurableTask.Netherite/OrchestrationService/Client.cs b/src/DurableTask.Netherite/OrchestrationService/Client.cs index dbf7ccd3..1761faf1 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Client.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Client.cs @@ -15,6 +15,7 @@ namespace DurableTask.Netherite using DurableTask.Core; using DurableTask.Core.Exceptions; using DurableTask.Core.History; + using DurableTask.Netherite.Scaling; using Microsoft.Azure.Storage; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -30,6 +31,7 @@ class Client : TransportAbstraction.IClient readonly WorkItemTraceHelper workItemTraceHelper; readonly Stopwatch workItemStopwatch; readonly CancellationTokenSource cts; + readonly ILoadPublisherService partitionLoadPublisher; static readonly TimeSpan DefaultTimeout = TimeSpan.FromMinutes(5); @@ -53,11 +55,13 @@ public Client( Guid taskHubGuid, TransportAbstraction.ISender batchSender, WorkItemTraceHelper workItemTraceHelper, - CancellationToken shutdownToken) + CancellationToken shutdownToken, + ILoadPublisherService loadPublisher) { this.host = host; this.ClientId = clientId; this.taskHubGuid = taskHubGuid; + this.partitionLoadPublisher = loadPublisher; this.traceHelper = new ClientTraceHelper(host.LoggerFactory, host.Settings.ClientLogLevelLimit, host.StorageAccountName, host.Settings.HubName, this.ClientId); this.workItemTraceHelper = workItemTraceHelper; this.account = host.StorageAccountName; @@ -419,12 +423,50 @@ void TransportAbstraction.IDurabilityOrExceptionListener.ReportException(Event e public void TryTimeout() { + // Obtain the client task for this client request ID this.client.traceHelper.TraceTimerProgress($"firing ({this.timeoutKey.due:o},{this.timeoutKey.id})"); if (this.client.ResponseWaiters.TryRemove(this.requestId, out var pendingRequest)) { this.client.traceHelper.TraceRequestTimeout(pendingRequest.partitionEventId, pendingRequest.partitionId); - this.continuation.TrySetException(timeoutException); + + // a timeout may represent either a slow partition, or an unresponsive one. + // we check the 'partitions load provider' (either blob or Azure Table) to determine what error to throw. + // TODO: should we introduce a Query API that only searches for a given partitionID? + this.client.partitionLoadPublisher.QueryAsync(CancellationToken.None).ContinueWith((task) => + { + if (task.IsFaulted) + { + // TODO: consider logging that something went wrong in the partition table query + this.continuation.TrySetException(timeoutException); + } + else + { + PartitionLoadInfo info = task.Result[pendingRequest.partitionId]; + if (info.Timestamp.HasValue) + { + // see how much time has gone by between the partition updating it's entry + // and the timeout firing. We'll call this the 'heartbeat' of the partition. + DateTime lastHeartBeatTime = info.Timestamp.Value.UtcDateTime; + TimeSpan timeSinceLastHeartbeat = DateTimeOffset.UtcNow.Subtract(lastHeartBeatTime); + if (timeSinceLastHeartbeat > TimeSpan.FromMinutes(10)) + { + // It's been over 10 minutes, the partition may be unavailable + // TODO: should this be made static? + TimeoutException exception = new TimeoutException($"Client request timed out. " + + $"The target partition's ({this.partitionId}) last heartbeat ocurred at {lastHeartBeatTime}." + + $"It's possible the partition is offline."); + this.continuation.TrySetException(new TimeoutException("Partition is unresponsive")); + } + else + { + // report timeout, the partition may simply be slow + this.continuation.TrySetException(timeoutException); + } + } + } + }); } + // TODO: should we log if the request was not found? } public void TryCancel(OperationCanceledException exception) diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs index 5b60c876..9ea4f8ad 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationService.cs @@ -531,8 +531,7 @@ public uint GetPartitionId(string instanceId) TransportAbstraction.IClient TransportAbstraction.IHost.AddClient(Guid clientId, Guid taskHubGuid, TransportAbstraction.ISender batchSender) { System.Diagnostics.Debug.Assert(this.client == null, "Backend should create only 1 client"); - - this.client = new Client(this, clientId, taskHubGuid, batchSender, this.workItemTraceHelper, this.serviceShutdownSource.Token); + this.client = new Client(this, clientId, taskHubGuid, batchSender, this.workItemTraceHelper, this.serviceShutdownSource.Token, this.storage.LoadPublisher); return this.client; } diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index 46a14f84..b18da371 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -47,7 +47,7 @@ public class NetheriteOrchestrationServiceSettings /// /// Optionally, a name for an Azure Table to use for publishing load information. If set to null or empty, - /// then Azure blobs are used instead. The use of Azure blobs is currently not supported on consumption plans, or on elastic premium plans without runtime scaling. + /// then Azure blobs are used instead. /// public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions"; diff --git a/src/DurableTask.Netherite/Scaling/AzureTableLoadPublisher.cs b/src/DurableTask.Netherite/Scaling/AzureTableLoadPublisher.cs index 033dcf90..86136ec5 100644 --- a/src/DurableTask.Netherite/Scaling/AzureTableLoadPublisher.cs +++ b/src/DurableTask.Netherite/Scaling/AzureTableLoadPublisher.cs @@ -94,6 +94,7 @@ public async Task> QueryAsync(CancellationTo MissRate = double.Parse(e.MissRate.Substring(0, e.MissRate.Length - 1)) / 100, CachePct = int.Parse(e.CachePct.Substring(0, e.CachePct.Length - 1)), CacheMB = e.CacheMB, + Timestamp = e.Timestamp, }); } return result; diff --git a/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs b/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs index 05e81553..8907bc25 100644 --- a/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs +++ b/src/DurableTask.Netherite/Scaling/PartitionLoadInfo.cs @@ -133,6 +133,11 @@ public string IsBusy() [DataMember] public double CacheMB { get; set; } + /// + /// The timestamp of the last write to this row. May be used to flag that partition as possibly unavailable. + /// + public DateTimeOffset? Timestamp { get; set; } // TODO: should this be a DataMember? Are we actually serializing this somewhere? + /// /// The character representing idle load. ///