From 7d48b067ab4ae73b476f5869ac5a38d3e446cbeb Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 16 Oct 2024 08:09:49 -0500 Subject: [PATCH 1/2] Ability to use Azure Service Bus temp queues as control queues. Closes GH-993 --- docs/guide/durability/ravendb.md | 42 +++++++++++++++++++ .../transports/azureservicebus/index.md | 10 +++++ .../DocumentationSamples.cs | 31 ++++++++++++++ .../TestingExtensions.cs | 2 +- .../leader_election.cs | 29 +++++++++++++ .../AzureServiceBusConfiguration.cs | 26 ++++++++++++ .../AzureServiceBusTransport.cs | 5 +-- 7 files changed, 141 insertions(+), 4 deletions(-) create mode 100644 src/Transports/Azure/Wolverine.AzureServiceBus.Tests/leader_election.cs diff --git a/docs/guide/durability/ravendb.md b/docs/guide/durability/ravendb.md index 98255258a..9ccdc0464 100644 --- a/docs/guide/durability/ravendb.md +++ b/docs/guide/durability/ravendb.md @@ -144,6 +144,48 @@ public static class RecordTeamHandler snippet source | anchor +## System Control Queues + +The RavenDb integration to Wolverine does not yet come with a built in database control queue +mechanism, so you will need to add that from external messaging brokers as in this example +using Azure Service Bus: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + // One way or another, you're probably pulling the Azure Service Bus + // connection string out of configuration + var azureServiceBusConnectionString = builder + .Configuration + .GetConnectionString("azure-service-bus")!; + + // Connect to the broker in the simplest possible way + opts.UseAzureServiceBus(azureServiceBusConnectionString) + .AutoProvision() + + // This enables Wolverine to use temporary Azure Service Bus + // queues created at runtime for communication between + // Wolverine nodes + .EnableWolverineControlQueues(); + +}); +``` +snippet source | anchor + + +For local development, there is also an option to let Wolverine just use its TCP transport +as a control endpoint with this configuration option: + +```csharp +WolverineOptions.UseTcpForControlEndpoint(); +``` + +In the option above, Wolverine is just looking for an unused port, and assigning that found port +as the listener for the node being bootstrapped. + ## RavenOps Side Effects The `RavenOps` static class can be used as a convenience for RavenDb integration with Wolverine: diff --git a/docs/guide/messaging/transports/azureservicebus/index.md b/docs/guide/messaging/transports/azureservicebus/index.md index d506d9a52..46ba6f8fa 100644 --- a/docs/guide/messaging/transports/azureservicebus/index.md +++ b/docs/guide/messaging/transports/azureservicebus/index.md @@ -63,6 +63,16 @@ to notice that in the Azure Portal. And also see the next section. +## Wolverine Control Queues + +You can opt into using temporary Azure Service Bus queues for intra-node communication +that Wolverine needs for leader election and background worker distribution. Using Azure +Service Bus for this feature is more efficient than the built in database control +queues that Wolverine uses otherwise, and is necessary for message storage options like +RavenDb that do not have a built in control queue mechanism. + +snippet: sample_enabling_azure_service_bus_control_queues + ## Disabling System Queues If your application will not have permissions to create temporary queues in Azure Service Bus, you will probably want diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs index 7e11c820a..d7715f901 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs @@ -188,6 +188,37 @@ public async Task configure_subscription_filter() await host.StartAsync(); } + public async Task configure_control_queues() + { + #region sample_enabling_azure_service_bus_control_queues + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + // One way or another, you're probably pulling the Azure Service Bus + // connection string out of configuration + var azureServiceBusConnectionString = builder + .Configuration + .GetConnectionString("azure-service-bus")!; + + // Connect to the broker in the simplest possible way + opts.UseAzureServiceBus(azureServiceBusConnectionString) + .AutoProvision() + + // This enables Wolverine to use temporary Azure Service Bus + // queues created at runtime for communication between + // Wolverine nodes + .EnableWolverineControlQueues(); + + + }); + + #endregion + + using var host = builder.Build(); + await host.StartAsync(); + } + public async Task configure_durable_listener() { var builder = Host.CreateApplicationBuilder(); diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/TestingExtensions.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/TestingExtensions.cs index 6fb08b96f..73509c6fd 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/TestingExtensions.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/TestingExtensions.cs @@ -14,6 +14,6 @@ public static AzureServiceBusConfiguration UseAzureServiceBusTesting(this Wolver var connectionString = File.ReadAllText(path).Trim(); - return options.UseAzureServiceBus(connectionString); + return options.UseAzureServiceBus(connectionString).AutoProvision(); } } \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/leader_election.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/leader_election.cs new file mode 100644 index 000000000..3547443b0 --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/leader_election.cs @@ -0,0 +1,29 @@ +using IntegrationTests; +using Npgsql; +using Weasel.Postgresql; +using Wolverine.ComplianceTests; +using Wolverine.Postgresql; +using Xunit.Abstractions; + +namespace Wolverine.AzureServiceBus.Tests; + +public class leader_election : LeadershipElectionCompliance +{ + public leader_election(ITestOutputHelper output) : base(output) + { + } + + protected override async Task beforeBuildingHost() + { + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync("registry"); + await conn.CloseAsync(); + } + + protected override void configureNode(WolverineOptions opts) + { + opts.UseAzureServiceBusTesting().EnableWolverineControlQueues(); + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "registry"); + } +} \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs index 784667bf5..8832e0efe 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs @@ -1,5 +1,7 @@ using Azure.Messaging.ServiceBus.Administration; +using JasperFx.Core; using Wolverine.AzureServiceBus.Internal; +using Wolverine.Configuration; using Wolverine.Transports; namespace Wolverine.AzureServiceBus; @@ -90,4 +92,28 @@ public AzureServiceBusConfiguration SystemQueuesAreEnabled(bool enabled) Transport.SystemQueuesEnabled = enabled; return this; } + + /// + /// Utilize an Azure Service Bus queue as the control queue between Wolverine nodes + /// This is more efficient than the built in Wolverine database control + /// queues if Azure Service Bus is an option + /// + /// + public AzureServiceBusConfiguration EnableWolverineControlQueues() + { + var queueName = "wolverine.control." + Options.Durability.AssignedNodeNumber; + + var queue = Transport.Queues[queueName]; + + queue.Options.AutoDeleteOnIdle = 5.Minutes(); + queue.Mode = EndpointMode.BufferedInMemory; + queue.IsListener = true; + queue.EndpointName = "Control"; + queue.IsUsedForReplies = true; + queue.Role = EndpointRole.System; + + Options.Transports.NodeControlEndpoint = queue; + + return this; + } } \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs index f5d3699c1..9a0137f02 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs @@ -39,7 +39,7 @@ public override string SanitizeIdentifier(string identifier) } /// - /// Is this transport connection allowed to build and use response and retry queues + /// Is this transport connection allowed to build and use response, retry, and control queues /// for just this node? /// public bool SystemQueuesEnabled { get; set; } = true; @@ -95,9 +95,8 @@ protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime) retryQueue.IsListener = true; retryQueue.EndpointName = RetryEndpointName; retryQueue.Role = EndpointRole.System; - + RetryQueue = retryQueue; - } public override Endpoint? ReplyEndpoint() From dce7537aa2bfc902e336f5e0de7ab4e5938fc538 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 16 Oct 2024 08:10:42 -0500 Subject: [PATCH 2/2] doc snippet updates --- docs/guide/messaging/expiration.md | 4 +-- .../azureservicebus/conventional-routing.md | 2 +- .../transports/azureservicebus/index.md | 26 ++++++++++++++++++- .../transports/azureservicebus/listening.md | 2 +- .../transports/azureservicebus/publishing.md | 4 +-- docs/guide/runtime.md | 2 +- 6 files changed, 32 insertions(+), 8 deletions(-) diff --git a/docs/guide/messaging/expiration.md b/docs/guide/messaging/expiration.md index 02603d4cf..8fad89988 100644 --- a/docs/guide/messaging/expiration.md +++ b/docs/guide/messaging/expiration.md @@ -47,7 +47,7 @@ public async Task message_expiration(IMessageBus bus) await bus.SendAsync(new StatusUpdate("Okay"), new DeliveryOptions { DeliverBy = DateTime.Today.AddHours(15) }); } ``` -snippet source | anchor +snippet source | anchor ## By Subscriber @@ -83,7 +83,7 @@ builder.UseWolverine(opts => using var host = builder.Build(); await host.StartAsync(); ``` -snippet source | anchor +snippet source | anchor ## By Message Type diff --git a/docs/guide/messaging/transports/azureservicebus/conventional-routing.md b/docs/guide/messaging/transports/azureservicebus/conventional-routing.md index 40c263e2e..36a088edb 100644 --- a/docs/guide/messaging/transports/azureservicebus/conventional-routing.md +++ b/docs/guide/messaging/transports/azureservicebus/conventional-routing.md @@ -54,7 +54,7 @@ builder.UseWolverine(opts => using var host = builder.Build(); await host.StartAsync(); ``` -snippet source | anchor +snippet source | anchor ## Route to Topics and Subscriptions diff --git a/docs/guide/messaging/transports/azureservicebus/index.md b/docs/guide/messaging/transports/azureservicebus/index.md index 46ba6f8fa..8db6ab721 100644 --- a/docs/guide/messaging/transports/azureservicebus/index.md +++ b/docs/guide/messaging/transports/azureservicebus/index.md @@ -71,7 +71,31 @@ Service Bus for this feature is more efficient than the built in database contro queues that Wolverine uses otherwise, and is necessary for message storage options like RavenDb that do not have a built in control queue mechanism. -snippet: sample_enabling_azure_service_bus_control_queues + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + // One way or another, you're probably pulling the Azure Service Bus + // connection string out of configuration + var azureServiceBusConnectionString = builder + .Configuration + .GetConnectionString("azure-service-bus")!; + + // Connect to the broker in the simplest possible way + opts.UseAzureServiceBus(azureServiceBusConnectionString) + .AutoProvision() + + // This enables Wolverine to use temporary Azure Service Bus + // queues created at runtime for communication between + // Wolverine nodes + .EnableWolverineControlQueues(); + +}); +``` +snippet source | anchor + ## Disabling System Queues diff --git a/docs/guide/messaging/transports/azureservicebus/listening.md b/docs/guide/messaging/transports/azureservicebus/listening.md index 2fd571d8c..78f5046ad 100644 --- a/docs/guide/messaging/transports/azureservicebus/listening.md +++ b/docs/guide/messaging/transports/azureservicebus/listening.md @@ -73,7 +73,7 @@ builder.UseWolverine(opts => using var host = builder.Build(); await host.StartAsync(); ``` -snippet source | anchor +snippet source | anchor Note that any of these settings would be overridden by specific configuration to diff --git a/docs/guide/messaging/transports/azureservicebus/publishing.md b/docs/guide/messaging/transports/azureservicebus/publishing.md index 615dc3a3b..b73f54ec2 100644 --- a/docs/guide/messaging/transports/azureservicebus/publishing.md +++ b/docs/guide/messaging/transports/azureservicebus/publishing.md @@ -28,7 +28,7 @@ builder.UseWolverine(opts => using var host = builder.Build(); await host.StartAsync(); ``` -snippet source | anchor +snippet source | anchor @@ -60,7 +60,7 @@ builder.UseWolverine(opts => using var host = builder.Build(); await host.StartAsync(); ``` -snippet source | anchor +snippet source | anchor Note that any of these settings would be overridden by specific configuration to diff --git a/docs/guide/runtime.md b/docs/guide/runtime.md index 18c3a84f6..790a8bcb4 100644 --- a/docs/guide/runtime.md +++ b/docs/guide/runtime.md @@ -181,7 +181,7 @@ opts.ListenToAzureServiceBusQueue("incoming") opts.PublishAllMessages().ToAzureServiceBusQueue("outgoing") .UseDurableOutbox(); ``` -snippet source | anchor +snippet source | anchor Or use policies to do this in one fell swoop (which may not be what you actually want, but you could do this!):