From 220dc2cda642f2adfa1898f1589db4cd5680d52c Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Thu, 6 Jul 2023 15:46:51 -0700 Subject: [PATCH 1/2] support specification of consumer group --- .../Connections/EventHubsUtil.cs | 45 +++++++++- .../NetheriteOrchestrationServiceSettings.cs | 8 ++ .../EventHubs/EventHubsConnections.cs | 35 ++++++++ .../EventHubs/EventHubsTransport.cs | 85 ++++++++++++++----- test/PerformanceTests/host.json | 4 + 5 files changed, 154 insertions(+), 23 deletions(-) diff --git a/src/DurableTask.Netherite/Connections/EventHubsUtil.cs b/src/DurableTask.Netherite/Connections/EventHubsUtil.cs index 7786676d..a22dc9f9 100644 --- a/src/DurableTask.Netherite/Connections/EventHubsUtil.cs +++ b/src/DurableTask.Netherite/Connections/EventHubsUtil.cs @@ -26,7 +26,7 @@ public static class EventHubsUtil /// true if the event hub was created. public static async Task EnsureEventHubExistsAsync(ConnectionInfo info, string eventHubName, int partitionCount, CancellationToken cancellationToken) { - var response = await SendHttpRequest(info, eventHubName, partitionCount, cancellationToken); + var response = await SendEventHubRequest(info, eventHubName, partitionCount, cancellationToken); if (response.StatusCode != System.Net.HttpStatusCode.Conflict) { response.EnsureSuccessStatusCode(); @@ -42,7 +42,7 @@ public static async Task EnsureEventHubExistsAsync(ConnectionInfo info, st /// true if the event hub was deleted. public static async Task DeleteEventHubIfExistsAsync(ConnectionInfo info, string eventHubName, CancellationToken cancellationToken) { - var response = await SendHttpRequest(info, eventHubName, null, cancellationToken); + var response = await SendEventHubRequest(info, eventHubName, null, cancellationToken); if (response.StatusCode != System.Net.HttpStatusCode.NotFound) { response.EnsureSuccessStatusCode(); @@ -50,12 +50,24 @@ public static async Task DeleteEventHubIfExistsAsync(ConnectionInfo info, return response.StatusCode == System.Net.HttpStatusCode.OK; } - static async Task SendHttpRequest(ConnectionInfo info, string eventHubName, int? partitionCount, CancellationToken cancellationToken) + public static async Task EnsureConsumerGroupExistsAsync(ConnectionInfo info, string eventHubName, string consumerGroup, CancellationToken cancellationToken) + { + var response = await SendConsumerGroupRequest(info, eventHubName, consumerGroup, delete: false, cancellationToken); + if (response.StatusCode != System.Net.HttpStatusCode.Conflict) + { + response.EnsureSuccessStatusCode(); + } + return response.StatusCode == System.Net.HttpStatusCode.Created; + } + + // for documentation of these APIs, see https://learn.microsoft.com/en-us/rest/api/eventhub/event-hubs-management-rest + + static async Task SendEventHubRequest(ConnectionInfo info, string eventHubPath, int? partitionCount, CancellationToken cancellationToken) { // send an http request to create or delete the eventhub HttpClient client = new HttpClient(); var request = new HttpRequestMessage(); - request.RequestUri = new Uri($"https://{info.HostName}/{eventHubName}?timeout=60&api-version=2014-01"); + request.RequestUri = new Uri($"https://{info.HostName}/{eventHubPath}?timeout=60&api-version=2014-01"); request.Method = partitionCount.HasValue ? HttpMethod.Put : HttpMethod.Delete; if (partitionCount.HasValue) { @@ -78,5 +90,30 @@ static async Task SendHttpRequest(ConnectionInfo info, stri return await client.SendAsync(request); } + + static async Task SendConsumerGroupRequest(ConnectionInfo info, string eventHubPath, string consumerGroupName, bool delete, CancellationToken cancellationToken) + { + // send an http request to create or delete the eventhub + HttpClient client = new HttpClient(); + var request = new HttpRequestMessage(); + request.RequestUri = new Uri($"https://{info.HostName}/{eventHubPath}/consumerGroups/{consumerGroupName}?timeout=60&api-version=2014-01"); + request.Method = delete ? HttpMethod.Delete : HttpMethod.Put; + request.Content = new StringContent(@" + + + + + + ", + Encoding.UTF8, + "application/xml"); + + request.Headers.Add("Host", info.HostName); + + // add an authorization header to the request + await info.AuthorizeHttpRequestMessage(request, cancellationToken); + + return await client.SendAsync(request); + } } } diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index 70e6e504..77ea04cc 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -51,6 +51,14 @@ public class NetheriteOrchestrationServiceSettings /// public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions"; + /// + /// The consumer group to use. By specifying different consumer groups, two task hubs can use + /// the same event hubs namespace at the same time. However, note that this can waste bandwidth since messages + /// are always delivered to all consumer groups (even if only meaningfully processed by one of them). Also, + /// for some event hub plan the number of consumer groups is limited. + /// + public string EventHubsConsumerGroup { get; set; } = "$Default"; + /// /// Tuning parameters for the FASTER logs /// diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs index 32598e5d..db16ec49 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs @@ -20,6 +20,7 @@ class EventHubsConnections readonly string partitionHub; readonly string loadMonitorHub; readonly CancellationToken shutdownToken; + readonly string consumerGroup; EventHubClient partitionClient; List clientClients; @@ -46,6 +47,7 @@ public EventHubsConnections( string partitionHub, string[] clientHubs, string loadMonitorHub, + string consumerGroup, CancellationToken shutdownToken) { this.connectionInfo = connectionInfo; @@ -53,8 +55,11 @@ public EventHubsConnections( this.clientHubs = clientHubs; this.loadMonitorHub = loadMonitorHub; this.shutdownToken = shutdownToken; + this.consumerGroup = consumerGroup; } + const string defaultConsumerGroup = "$Default"; + public string Fingerprint => $"{this.connectionInfo.HostName}{this.partitionHub}/{this.CreationTimestamp:o}"; public async Task StartAsync(TaskhubParameters parameters) @@ -63,6 +68,11 @@ await Task.WhenAll( this.EnsurePartitionsAsync(parameters.PartitionCount), this.EnsureClientsAsync(), this.EnsureLoadMonitorAsync()); + + if (this.consumerGroup != defaultConsumerGroup) + { + await this.EnsureConsumerGroupsExistAsync(); + } } public Task StopAsync() @@ -121,6 +131,31 @@ async Task EnsureEventHubExistsAsync(string eventHubName, int partitionCount) } } + public Task EnsureConsumerGroupsExistAsync() + { + return Task.WhenAll( + EnsureExistsAsync(this.partitionHub), + EnsureExistsAsync(this.loadMonitorHub), + Task.WhenAll(this.clientHubs.Select(clientHub => EnsureExistsAsync(clientHub)).ToList()) + ); + + async Task EnsureExistsAsync(string eventHubName) + { + this.TraceHelper.LogDebug("Creating ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup); + bool success = await EventHubsUtil.EnsureConsumerGroupExistsAsync(this.connectionInfo, eventHubName, this.consumerGroup, CancellationToken.None); + if (success) + { + this.TraceHelper.LogInformation("Created ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup, CancellationToken.None); + } + else + { + this.TraceHelper.LogDebug("Conflict on ConsumerGroup {eventHubName}|{name}", eventHubName, this.consumerGroup, CancellationToken.None); + await Task.Delay(TimeSpan.FromSeconds(5)); + } + } + } + + internal async Task DeletePartitions() { await EventHubsUtil.DeleteEventHubIfExistsAsync(this.connectionInfo, this.partitionHub, CancellationToken.None); diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs index ffe1cc6f..36ba163c 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs @@ -32,7 +32,9 @@ class EventHubsTransport : readonly ILogger logger; readonly EventHubsTraceHelper traceHelper; readonly IStorageLayer storage; - readonly string shortClientId; + readonly string consumerGroup; + + string shortClientId; EventProcessorHost eventProcessorHost; EventProcessorHost loadMonitorHost; @@ -70,6 +72,7 @@ public EventHubsTransport(TransportAbstraction.IHost host, NetheriteOrchestratio string namespaceName = settings.EventHubsConnection.ResourceName; this.logger = EventHubsTraceHelper.CreateLogger(loggerFactory); this.traceHelper = new EventHubsTraceHelper(this.logger, settings.TransportLogLevelLimit, null, settings.StorageAccountName, settings.HubName, namespaceName); + this.consumerGroup = settings.EventHubsConsumerGroup; this.ClientId = Guid.NewGuid(); this.shortClientId = Client.GetShortId(this.ClientId); } @@ -78,9 +81,6 @@ public EventHubsTransport(TransportAbstraction.IHost host, NetheriteOrchestratio public static string PartitionHub = "partitions"; public static string[] ClientHubs = { "clients0", "clients1", "clients2", "clients3" }; public static string LoadMonitorHub = "loadmonitor"; - public static string PartitionConsumerGroup = "$Default"; - public static string ClientConsumerGroup = "$Default"; - public static string LoadMonitorConsumerGroup = "$Default"; async Task ITransportLayer.StartAsync() { @@ -106,7 +106,7 @@ async Task ITransportLayer.StartAsync() // check that the storage format is supported, and load the relevant FASTER tuning parameters BlobManager.LoadAndCheckStorageFormat(this.parameters.StorageFormat, this.settings, this.host.TraceWarning); - this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.shutdownSource.Token) + this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.consumerGroup, this.shutdownSource.Token) { Host = host, TraceHelper = this.traceHelper, @@ -114,26 +114,75 @@ async Task ITransportLayer.StartAsync() await this.connections.StartAsync(this.parameters); + this.traceHelper.LogInformation("EventHubsTransport is connecting to {fingerPrint} via consumer group {consumerGroupName}", this.connections.Fingerprint, this.consumerGroup); + return this.parameters; } async Task ITransportLayer.StartClientAsync() { - this.client = this.host.AddClient(this.ClientId, this.parameters.TaskhubGuid, this); - var channel = Channel.CreateBounded(new BoundedChannelOptions(500) { SingleReader = true, AllowSynchronousContinuations = true }); - var clientReceivers = this.connections.CreateClientReceivers(this.ClientId, EventHubsTransport.ClientConsumerGroup); + PartitionReceiver[] clientReceivers = null; + + int attempt = 0; + int maxAttempts = 8; + + while (attempt++ < maxAttempts) + { + this.ClientId = Guid.NewGuid(); + this.shortClientId = Client.GetShortId(this.ClientId); + + clientReceivers = this.connections.CreateClientReceivers(this.ClientId, this.consumerGroup); + + try + { + this.clientConnectionsEstablished = Enumerable + .Range(0, EventHubsConnections.NumClientChannels) + .Select(i => this.ClientEstablishConnectionAsync(i, clientReceivers[i])) + .ToArray(); + + // we must wait for the client receive connections to be established before continuing + // otherwise, we may miss messages that are sent before the client receiver establishes the receive position + await Task.WhenAll(this.clientConnectionsEstablished); + + break; // was successful, so we exit retry loop + } + catch (Microsoft.Azure.EventHubs.QuotaExceededException) when (attempt < maxAttempts) + { + this.traceHelper.LogWarning("EventHubsTransport encountered quota-exceeded exception"); + } + catch (Exception exception) when (attempt < maxAttempts) + { + this.traceHelper.LogInformation("EventHubsTransport failed with exception {exception}", exception); + } + + TimeSpan retryDelay = TimeSpan.FromSeconds(1 + attempt * 10); + this.traceHelper.LogDebug("EventHubsTransport retrying client connection in {retryDelay}", retryDelay); + Task retryDelayTask = Task.Delay(retryDelay); + + foreach (var clientReceiver in clientReceivers) + { + try + { + await clientReceiver.CloseAsync(); + } + + catch (Exception exception) + { + this.traceHelper.LogWarning("EventHubsTransport failed to close partition receiver {clientReceiver} during retry: {exception}", clientReceiver, exception); + } + } + + await retryDelayTask; + } + + this.traceHelper.LogInformation("EventHubsTransport connected to {fingerPrint} via consumer group {consumerGroup}", this.connections.Fingerprint, this.consumerGroup); - this.clientConnectionsEstablished = Enumerable - .Range(0, EventHubsConnections.NumClientChannels) - .Select(i => this.ClientEstablishConnectionAsync(i, clientReceivers[i])) - .ToArray(); - this.clientReceiveLoops = Enumerable .Range(0, EventHubsConnections.NumClientChannels) .Select(i => this.ClientReceiveLoopAsync(i, clientReceivers[i], channel.Writer)) @@ -141,9 +190,7 @@ async Task ITransportLayer.StartClientAsync() this.clientProcessTask = this.ClientProcessLoopAsync(channel.Reader); - // we must wait for the client receive connections to be established before continuing - // otherwise, we may miss messages that are sent before the client receiver establishes the receive position - await Task.WhenAll(this.clientConnectionsEstablished); + this.client = this.host.AddClient(this.ClientId, this.parameters.TaskhubGuid, this); } async Task ITransportLayer.StartWorkersAsync() @@ -170,7 +217,7 @@ async Task StartPartitionHost() this.eventProcessorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync( Guid.NewGuid().ToString(), EventHubsTransport.PartitionHub, - EventHubsTransport.PartitionConsumerGroup, + this.consumerGroup, this.settings.BlobStorageConnection, this.cloudBlobContainer.Name, $"{this.pathPrefix}eh-checkpoints/{(EventHubsTransport.PartitionHub)}/{formattedCreationDate}"); @@ -192,7 +239,7 @@ await this.eventProcessorHost.RegisterEventProcessorFactoryAsync( this.traceHelper.LogInformation($"EventHubsTransport is starting scripted partition host"); this.scriptedEventProcessorHost = new ScriptedEventProcessorHost( EventHubsTransport.PartitionHub, - EventHubsTransport.PartitionConsumerGroup, + this.consumerGroup, this.settings.EventHubsConnection, this.settings.BlobStorageConnection, this.cloudBlobContainer.Name, @@ -216,7 +263,7 @@ async Task StartLoadMonitorHost() this.loadMonitorHost = await this.settings.EventHubsConnection.GetEventProcessorHostAsync( Guid.NewGuid().ToString(), LoadMonitorHub, - LoadMonitorConsumerGroup, + this.consumerGroup, this.settings.BlobStorageConnection, this.cloudBlobContainer.Name, $"{this.pathPrefix}eh-checkpoints/{LoadMonitorHub}"); diff --git a/test/PerformanceTests/host.json b/test/PerformanceTests/host.json index 72df494d..dfb32f78 100644 --- a/test/PerformanceTests/host.json +++ b/test/PerformanceTests/host.json @@ -71,6 +71,10 @@ // set this to control the max size of the orchestration instance cache // "InstanceCacheSizeMB": "50", + // the consumer group to use. Two task hubs can use the same event hubs namespace + // if their consumer groups are different. + "EventHubsConsumerGroup": "$Default", + // set this to true to use the PSF support in Faster. Will soon be obsolete. "UsePSFQueries": "false", From aa67f468a11b7549a2d91b910a9b87bbfda28915 Mon Sep 17 00:00:00 2001 From: sebastianburckhardt Date: Wed, 7 Feb 2024 15:21:11 -0800 Subject: [PATCH 2/2] fix partition deletion problem, update tracing, change configuration parameter --- .../PartitionState/IPartitionState.cs | 3 +- .../Abstractions/TransportAbstraction.cs | 3 +- .../NetheriteOrchestrationServiceSettings.cs | 14 ++-- .../OrchestrationService/Partition.cs | 4 +- .../Faster/FasterStorageProvider.cs | 3 +- .../StorageLayer/Faster/PartitionStorage.cs | 4 +- .../StorageLayer/Faster/StoreWorker.cs | 4 +- .../StorageLayer/Memory/MemoryStorage.cs | 7 +- .../EventHubs/BlobBatchReceiver.cs | 7 ++ .../EventHubs/EventHubsConnections.cs | 17 +++++ .../EventHubs/EventHubsProcessor.cs | 2 +- .../EventHubs/EventHubsTransport.cs | 75 +++++++++++++++++-- .../EventHubs/LoadMonitorProcessor.cs | 2 +- 13 files changed, 122 insertions(+), 23 deletions(-) diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionState.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionState.cs index 62f52716..98a48939 100644 --- a/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionState.cs +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/IPartitionState.cs @@ -19,9 +19,10 @@ interface IPartitionState /// The partition. /// An error handler to initiate and/or indicate termination of this partition. /// A fingerprint for the input queue. + /// Initial offset for the input queue, if this partition is being created. /// the input queue position from which to resume input processing /// Indicates that termination was signaled before the operation completed. - Task<(long,int)> CreateOrRestoreAsync(Partition localPartition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint); + Task<(long,int)> CreateOrRestoreAsync(Partition localPartition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint, long initialOffset); /// /// Starts processing, after creating or restoring the partition state. diff --git a/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs b/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs index ba66e667..28b1d203 100644 --- a/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs +++ b/src/DurableTask.Netherite/Abstractions/TransportAbstraction.cs @@ -84,13 +84,14 @@ public interface IPartition /// /// A termination object for initiating and/or detecting termination of the partition. /// Fingerprint for the intput queue. + /// Initial offset for the input queue, if this partition is being created. /// The input queue position of the next message to receive. /// /// The termination token source can be used for immediately terminating the partition. /// Also, it can be used to detect that the partition has terminated for any other reason, /// be it cleanly (after StopAsync) or uncleanly (after losing a lease or hitting a fatal error). /// - Task<(long,int)> CreateOrRestoreAsync(IPartitionErrorHandler termination, TaskhubParameters parameters, string inputQueueFingerprint); + Task<(long,int)> CreateOrRestoreAsync(IPartitionErrorHandler termination, TaskhubParameters parameters, string inputQueueFingerprint, long initialOffset = 0); /// /// Clean shutdown: stop processing, save partition state to storage, and release ownership. diff --git a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs index 7a5eb0fa..1c1ae038 100644 --- a/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs +++ b/src/DurableTask.Netherite/OrchestrationService/NetheriteOrchestrationServiceSettings.cs @@ -52,12 +52,14 @@ public class NetheriteOrchestrationServiceSettings public string LoadInformationAzureTableName { get; set; } = "DurableTaskPartitions"; /// - /// The consumer group to use. By specifying different consumer groups, two task hubs can use - /// the same event hubs namespace at the same time. However, note that this can waste bandwidth since messages - /// are always delivered to all consumer groups (even if only meaningfully processed by one of them). Also, - /// for some event hub plan the number of consumer groups is limited. - /// - public string EventHubsConsumerGroup { get; set; } = "$Default"; + /// If true, use a consumer group with the same name as this task hub, for all event hubs in the namespace. This allows connecting multiple task hubs to the same namespace. + /// + /// + /// This feature is recommended only for small message volumes and a small number of hubs - because all messages are delivered to all connected task hubs, the total + /// message volume grows quadratically. + /// All task hubs connecting to the same namespace must have different names. + /// Note that even though the consumer groups are automatically created for each task hub, they are not automatically deleted when the task hub is deleted. + public bool UseSeparateConsumerGroups { get; set; } /// /// Tuning parameters for the FASTER logs diff --git a/src/DurableTask.Netherite/OrchestrationService/Partition.cs b/src/DurableTask.Netherite/OrchestrationService/Partition.cs index 7f641cce..03b704ef 100644 --- a/src/DurableTask.Netherite/OrchestrationService/Partition.cs +++ b/src/DurableTask.Netherite/OrchestrationService/Partition.cs @@ -91,7 +91,7 @@ public Partition( this.LastTransition = this.CurrentTimeMs; } - public async Task<(long, int)> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, TaskhubParameters parameters, string inputQueueFingerprint) + public async Task<(long, int)> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler, TaskhubParameters parameters, string inputQueueFingerprint, long initialOffset) { EventTraceContext.Clear(); @@ -130,7 +130,7 @@ public Partition( this.PendingTimers = new BatchTimer(this.ErrorHandler.Token, this.TimersFired); // goes to storage to create or restore the partition state - inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint).ConfigureAwait(false); + inputQueuePosition = await this.State.CreateOrRestoreAsync(this, this.ErrorHandler, inputQueueFingerprint, initialOffset).ConfigureAwait(false); // start processing the timers this.PendingTimers.Start($"Timer{this.PartitionId:D2}"); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs index 37ce09a3..0f172d7e 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/FasterStorageProvider.cs @@ -159,8 +159,9 @@ async Task IStorageLayer.CreateTaskhubIfNotExistsAsync() this.traceHelper.TraceProgress("Created new taskhub"); // zap the partition hub so we start from zero queue positions - if (this.settings.TransportChoice == TransportChoices.EventHubs) + if (this.settings.TransportChoice == TransportChoices.EventHubs && !this.settings.UseSeparateConsumerGroups) { + this.traceHelper.TraceProgress("Deleting partition event hub, to ensure all partitions start at sequence number zero"); await EventHubsUtil.DeleteEventHubIfExistsAsync(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, CancellationToken.None); } } diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs index 18dddf85..edbb7511 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/PartitionStorage.cs @@ -72,7 +72,7 @@ async Task TerminationWrapper(Task what) await what; } - public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint) + public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler errorHandler, string inputQueueFingerprint, long initialOffset) { this.partition = partition; this.terminationToken = errorHandler.Token; @@ -146,7 +146,7 @@ async Task TerminationWrapper(Task what) this.TraceHelper.FasterProgress("Creating store"); // this is a fresh partition - await this.TerminationWrapper(this.storeWorker.Initialize(this.log.BeginAddress, inputQueueFingerprint)); + await this.TerminationWrapper(this.storeWorker.Initialize(this.log.BeginAddress, inputQueueFingerprint, initialOffset)); await this.TerminationWrapper(this.storeWorker.TakeFullCheckpointAsync("initial checkpoint").AsTask()); this.TraceHelper.FasterStoreCreated(this.storeWorker.InputQueuePosition, stopwatch.ElapsedMilliseconds); diff --git a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs index 64e0231e..895e80d8 100644 --- a/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs +++ b/src/DurableTask.Netherite/StorageLayer/Faster/StoreWorker.cs @@ -87,12 +87,12 @@ public StoreWorker(TrackedObjectStore store, Partition partition, FasterTraceHel this.effectTracker = new TrackedObjectStoreEffectTracker(this.partition, this, store); } - public async Task Initialize(long initialCommitLogPosition, string fingerprint) + public async Task Initialize(long initialCommitLogPosition, string fingerprint, long initialOffset) { this.partition.ErrorHandler.Token.ThrowIfCancellationRequested(); this.InputQueueFingerprint = fingerprint; - this.InputQueuePosition = (0,0); + this.InputQueuePosition = (initialOffset, 0); this.CommitLogPosition = initialCommitLogPosition; this.store.InitMainSession(); diff --git a/src/DurableTask.Netherite/StorageLayer/Memory/MemoryStorage.cs b/src/DurableTask.Netherite/StorageLayer/Memory/MemoryStorage.cs index 8e3ea367..c824b5c3 100644 --- a/src/DurableTask.Netherite/StorageLayer/Memory/MemoryStorage.cs +++ b/src/DurableTask.Netherite/StorageLayer/Memory/MemoryStorage.cs @@ -52,7 +52,7 @@ public void SubmitEvents(IList entries) base.SubmitBatch(entries); } - public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler termination, string fingerprint) + public async Task<(long,int)> CreateOrRestoreAsync(Partition partition, IPartitionErrorHandler termination, string fingerprint, long initialOffset) { await Task.Yield(); this.partition = partition; @@ -68,6 +68,11 @@ public void SubmitEvents(IList entries) } } + if (initialOffset != 0) + { + throw new NetheriteConfigurationException("memory storage cannot be started with a non-zero initial offset"); + } + this.commitPosition = 1; this.inputQueuePosition = (0,0); return this.inputQueuePosition; diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/BlobBatchReceiver.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/BlobBatchReceiver.cs index 84acf8f6..256a005e 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/BlobBatchReceiver.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/BlobBatchReceiver.cs @@ -28,6 +28,7 @@ class BlobBatchReceiver where TEvent : Event readonly BlobContainerClient containerClient; readonly bool keepUntilConfirmed; readonly bool isClientReceiver; + readonly bool useSeparateConsumerGroups; // Event Hubs discards messages after 24h, so we can throw away batches that are older than that readonly static TimeSpan expirationTimeSpan = TimeSpan.FromHours(24) + TimeSpan.FromMinutes(1); @@ -45,6 +46,7 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper, this.keepUntilConfirmed = keepUntilConfirmed; this.blobDeletions = this.keepUntilConfirmed ? new BlobDeletions(this) : null; this.isClientReceiver = typeof(TEvent) == typeof(ClientEvent); + this.useSeparateConsumerGroups = settings.UseSeparateConsumerGroups; } public async IAsyncEnumerable<(EventData eventData, TEvent[] events, long)> ReceiveEventsAsync( @@ -195,6 +197,11 @@ public BlobBatchReceiver(string traceContext, EventHubsTraceHelper traceHelper, // Ignored packets are very common for clients because multiple clients may share the same partition. We log this only for debug purposes. this.traceHelper.LogDebug("{context} ignored {count} packets for different client", this.traceContext, ignoredPacketCount); } + else if (this.useSeparateConsumerGroups) + { + // Ignored packets are common when using multiple task hubs with separate consumer groups. We log this only for debug purposes. + this.traceHelper.LogDebug("{context} ignored {count} packets for different taskhub", this.traceContext, ignoredPacketCount); + } else { // Ignored packets may indicate misconfiguration (multiple taskhubs using same EH namespace). We create a visible warning. diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs index db16ec49..0b89678f 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsConnections.cs @@ -260,6 +260,23 @@ async Task EnsureLoadMonitorAsync(int retries = EventHubCreationRetries) await this.EnsureLoadMonitorAsync(retries - 1); } + public async Task> GetStartingSequenceNumbers() + { + Task[] tasks = new Task[this.partitionPartitions.Count]; + for (int i = 0; i < this.partitionPartitions.Count; i++) + { + tasks[i] = GetLastEnqueuedSequenceNumber(i); + } + await Task.WhenAll(tasks); + return tasks.Select(t => t.Result).ToList(); + + async Task GetLastEnqueuedSequenceNumber(int i) + { + var info = await this.partitionPartitions[i].client.GetPartitionRuntimeInformationAsync(this.partitionPartitions[i].id); + return info.LastEnqueuedSequenceNumber + 1; + } + } + public static async Task> GetQueuePositionsAsync(ConnectionInfo connectionInfo, string partitionHub) { var client = connectionInfo.CreateEventHubClient(partitionHub); diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs index 78acf837..464caf0f 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsProcessor.cs @@ -221,7 +221,7 @@ async Task StartPartitionAsync(PartitionIncarnation prior // start this partition (which may include waiting for the lease to become available) c.Partition = this.host.AddPartition(this.partitionId, this.sender); - c.NextPacketToReceive = await c.Partition.CreateOrRestoreAsync(c.ErrorHandler, this.parameters, this.eventHubsTransport.Fingerprint); + c.NextPacketToReceive = await c.Partition.CreateOrRestoreAsync(c.ErrorHandler, this.parameters, this.eventHubsTransport.Fingerprint, this.eventHubsTransport.GetInitialOffset((int)this.partitionId)); this.traceHelper.LogInformation("EventHubsProcessor {eventHubName}/{eventHubPartition} started partition (incarnation {incarnation}), next expected packet is #{nextSeqno}", this.eventHubName, this.eventHubPartition, c.Incarnation, c.NextPacketToReceive); diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs index 4ce41e37..3a723ca3 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/EventHubsTransport.cs @@ -55,11 +55,13 @@ class EventHubsTransport : CloudBlockBlob partitionScript; ScriptedEventProcessorHost scriptedEventProcessorHost; + Offsets offsets; + int shutdownTriggered; public Guid ClientId { get; private set; } - public string Fingerprint => this.connections.Fingerprint; + public string Fingerprint { get; private set; } public bool FatalExceptionObserved { get; private set; } @@ -76,7 +78,7 @@ public EventHubsTransport(TransportAbstraction.IHost host, NetheriteOrchestratio string namespaceName = settings.EventHubsConnection.ResourceName; this.logger = EventHubsTraceHelper.CreateLogger(loggerFactory); this.traceHelper = new EventHubsTraceHelper(this.logger, settings.TransportLogLevelLimit, null, settings.StorageAccountName, settings.HubName, namespaceName); - this.consumerGroup = settings.EventHubsConsumerGroup; + this.consumerGroup = settings.UseSeparateConsumerGroups ? settings.HubName : "$Default"; this.ClientId = Guid.NewGuid(); this.shortClientId = Client.GetShortId(this.ClientId); } @@ -112,17 +114,80 @@ async Task ITransportLayer.StartAsync() this.connections = new EventHubsConnections(this.settings.EventHubsConnection, EventHubsTransport.PartitionHub, EventHubsTransport.ClientHubs, EventHubsTransport.LoadMonitorHub, this.consumerGroup, this.shutdownSource.Token) { - Host = host, + Host = this.host, TraceHelper = this.traceHelper, }; await this.connections.StartAsync(this.parameters); - this.traceHelper.LogInformation("EventHubsTransport is connecting to {fingerPrint} via consumer group {consumerGroupName}", this.connections.Fingerprint, this.consumerGroup); + if (this.settings.UseSeparateConsumerGroups) + { + this.traceHelper.LogInformation($"Determining initial partition offsets for consumer group '{this.consumerGroup}'"); + string formattedFingerprint = this.connections.CreationTimestamp.ToString("o").Replace("/", "-"); + var offsetsBlob = this.cloudBlobContainer.GetBlockBlobReference($"{this.pathPrefix}offsets/{formattedFingerprint}"); + + try + { + var jsonText = await offsetsBlob.DownloadTextAsync(this.shutdownSource.Token); + this.offsets = JsonConvert.DeserializeObject(jsonText); + this.traceHelper.LogInformation($"Loaded initial partition offsets [{string.Join(", ", this.offsets.InitialOffsets)}]"); + } + catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == (int)System.Net.HttpStatusCode.NotFound) + { + try + { + this.traceHelper.LogDebug("Creating offsets"); + this.offsets = new Offsets() + { + Guid = Guid.NewGuid(), + InitialOffsets = await this.connections.GetStartingSequenceNumbers(), + }; + var jsonText = JsonConvert.SerializeObject(this.offsets, Formatting.Indented, new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.None }); + var noOverwrite = AccessCondition.GenerateIfNoneMatchCondition("*"); + await offsetsBlob.UploadTextAsync(jsonText, null, noOverwrite, null, null); + this.traceHelper.LogInformation($"Created initial partition offsets [{string.Join(", ", this.offsets.InitialOffsets)}]"); + } + catch (StorageException) when (ex.RequestInformation.HttpStatusCode == (int)System.Net.HttpStatusCode.Conflict) + { + this.traceHelper.LogDebug("Lost creation race, reloading"); + var jsonText = await offsetsBlob.DownloadTextAsync(this.shutdownSource.Token); + this.offsets = JsonConvert.DeserializeObject(jsonText); + this.traceHelper.LogInformation($"Loaded initial partition offsets on second attempt [{string.Join(", ", this.offsets.InitialOffsets)}]"); + } + } + + this.Fingerprint = $"{this.connections.Fingerprint}-{this.offsets.Guid}"; + } + else + { + this.Fingerprint = this.connections.Fingerprint; + } + + this.traceHelper.LogInformation($"EventHubs transport connected to partition event hubs with fingerprint {this.Fingerprint}"); return this.parameters; } + class Offsets + { + public Guid Guid { get; set; } + + public List InitialOffsets { get; set; } + } + + internal long GetInitialOffset(int partitionId) + { + if (this.offsets != null) + { + return this.offsets.InitialOffsets[partitionId]; + } + else + { + return 0; + } + } + + async Task ITransportLayer.StartClientAsync() { var channel = Channel.CreateBounded(new BoundedChannelOptions(500) @@ -185,7 +250,7 @@ async Task ITransportLayer.StartClientAsync() await retryDelayTask; } - this.traceHelper.LogInformation("EventHubsTransport connected to {fingerPrint} via consumer group {consumerGroup}", this.connections.Fingerprint, this.consumerGroup); + this.traceHelper.LogInformation("EventHubsTransport sucessfully established client connection with {fingerPrint} via {consumerGroup}", this.Fingerprint, this.consumerGroup); this.clientReceiveLoops = Enumerable .Range(0, EventHubsConnections.NumClientChannels) diff --git a/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorProcessor.cs b/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorProcessor.cs index 42d92b00..80fff3be 100644 --- a/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorProcessor.cs +++ b/src/DurableTask.Netherite/TransportLayer/EventHubs/LoadMonitorProcessor.cs @@ -142,7 +142,7 @@ async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumera } finally { - this.traceHelper.LogInformation("LoadMonitor exits receive loop"); + this.traceHelper.LogTrace("LoadMonitor finished processing packets"); } }