diff --git a/docs/guide/durability/ravendb.md b/docs/guide/durability/ravendb.md index 98255258a..9ccdc0464 100644 --- a/docs/guide/durability/ravendb.md +++ b/docs/guide/durability/ravendb.md @@ -144,6 +144,48 @@ public static class RecordTeamHandler snippet source | anchor +## System Control Queues + +The RavenDb integration to Wolverine does not yet come with a built in database control queue +mechanism, so you will need to add that from external messaging brokers as in this example +using Azure Service Bus: + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + // One way or another, you're probably pulling the Azure Service Bus + // connection string out of configuration + var azureServiceBusConnectionString = builder + .Configuration + .GetConnectionString("azure-service-bus")!; + + // Connect to the broker in the simplest possible way + opts.UseAzureServiceBus(azureServiceBusConnectionString) + .AutoProvision() + + // This enables Wolverine to use temporary Azure Service Bus + // queues created at runtime for communication between + // Wolverine nodes + .EnableWolverineControlQueues(); + +}); +``` +snippet source | anchor + + +For local development, there is also an option to let Wolverine just use its TCP transport +as a control endpoint with this configuration option: + +```csharp +WolverineOptions.UseTcpForControlEndpoint(); +``` + +In the option above, Wolverine is just looking for an unused port, and assigning that found port +as the listener for the node being bootstrapped. + ## RavenOps Side Effects The `RavenOps` static class can be used as a convenience for RavenDb integration with Wolverine: diff --git a/docs/guide/messaging/expiration.md b/docs/guide/messaging/expiration.md index 02603d4cf..8fad89988 100644 --- a/docs/guide/messaging/expiration.md +++ b/docs/guide/messaging/expiration.md @@ -47,7 +47,7 @@ public async Task message_expiration(IMessageBus bus) await bus.SendAsync(new StatusUpdate("Okay"), new DeliveryOptions { DeliverBy = DateTime.Today.AddHours(15) }); } ``` -snippet source | anchor +snippet source | anchor ## By Subscriber @@ -83,7 +83,7 @@ builder.UseWolverine(opts => using var host = builder.Build(); await host.StartAsync(); ``` -snippet source | anchor +snippet source | anchor ## By Message Type diff --git a/docs/guide/messaging/transports/azureservicebus/conventional-routing.md b/docs/guide/messaging/transports/azureservicebus/conventional-routing.md index 40c263e2e..36a088edb 100644 --- a/docs/guide/messaging/transports/azureservicebus/conventional-routing.md +++ b/docs/guide/messaging/transports/azureservicebus/conventional-routing.md @@ -54,7 +54,7 @@ builder.UseWolverine(opts => using var host = builder.Build(); await host.StartAsync(); ``` -snippet source | anchor +snippet source | anchor ## Route to Topics and Subscriptions diff --git a/docs/guide/messaging/transports/azureservicebus/index.md b/docs/guide/messaging/transports/azureservicebus/index.md index d506d9a52..8db6ab721 100644 --- a/docs/guide/messaging/transports/azureservicebus/index.md +++ b/docs/guide/messaging/transports/azureservicebus/index.md @@ -63,6 +63,40 @@ to notice that in the Azure Portal. And also see the next section. +## Wolverine Control Queues + +You can opt into using temporary Azure Service Bus queues for intra-node communication +that Wolverine needs for leader election and background worker distribution. Using Azure +Service Bus for this feature is more efficient than the built in database control +queues that Wolverine uses otherwise, and is necessary for message storage options like +RavenDb that do not have a built in control queue mechanism. + + + +```cs +var builder = Host.CreateApplicationBuilder(); +builder.UseWolverine(opts => +{ + // One way or another, you're probably pulling the Azure Service Bus + // connection string out of configuration + var azureServiceBusConnectionString = builder + .Configuration + .GetConnectionString("azure-service-bus")!; + + // Connect to the broker in the simplest possible way + opts.UseAzureServiceBus(azureServiceBusConnectionString) + .AutoProvision() + + // This enables Wolverine to use temporary Azure Service Bus + // queues created at runtime for communication between + // Wolverine nodes + .EnableWolverineControlQueues(); + +}); +``` +snippet source | anchor + + ## Disabling System Queues If your application will not have permissions to create temporary queues in Azure Service Bus, you will probably want diff --git a/docs/guide/messaging/transports/azureservicebus/listening.md b/docs/guide/messaging/transports/azureservicebus/listening.md index 2fd571d8c..78f5046ad 100644 --- a/docs/guide/messaging/transports/azureservicebus/listening.md +++ b/docs/guide/messaging/transports/azureservicebus/listening.md @@ -73,7 +73,7 @@ builder.UseWolverine(opts => using var host = builder.Build(); await host.StartAsync(); ``` -snippet source | anchor +snippet source | anchor Note that any of these settings would be overridden by specific configuration to diff --git a/docs/guide/messaging/transports/azureservicebus/publishing.md b/docs/guide/messaging/transports/azureservicebus/publishing.md index 615dc3a3b..b73f54ec2 100644 --- a/docs/guide/messaging/transports/azureservicebus/publishing.md +++ b/docs/guide/messaging/transports/azureservicebus/publishing.md @@ -28,7 +28,7 @@ builder.UseWolverine(opts => using var host = builder.Build(); await host.StartAsync(); ``` -snippet source | anchor +snippet source | anchor @@ -60,7 +60,7 @@ builder.UseWolverine(opts => using var host = builder.Build(); await host.StartAsync(); ``` -snippet source | anchor +snippet source | anchor Note that any of these settings would be overridden by specific configuration to diff --git a/docs/guide/runtime.md b/docs/guide/runtime.md index 18c3a84f6..790a8bcb4 100644 --- a/docs/guide/runtime.md +++ b/docs/guide/runtime.md @@ -181,7 +181,7 @@ opts.ListenToAzureServiceBusQueue("incoming") opts.PublishAllMessages().ToAzureServiceBusQueue("outgoing") .UseDurableOutbox(); ``` -snippet source | anchor +snippet source | anchor Or use policies to do this in one fell swoop (which may not be what you actually want, but you could do this!): diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs index 7e11c820a..d7715f901 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs @@ -188,6 +188,37 @@ public async Task configure_subscription_filter() await host.StartAsync(); } + public async Task configure_control_queues() + { + #region sample_enabling_azure_service_bus_control_queues + + var builder = Host.CreateApplicationBuilder(); + builder.UseWolverine(opts => + { + // One way or another, you're probably pulling the Azure Service Bus + // connection string out of configuration + var azureServiceBusConnectionString = builder + .Configuration + .GetConnectionString("azure-service-bus")!; + + // Connect to the broker in the simplest possible way + opts.UseAzureServiceBus(azureServiceBusConnectionString) + .AutoProvision() + + // This enables Wolverine to use temporary Azure Service Bus + // queues created at runtime for communication between + // Wolverine nodes + .EnableWolverineControlQueues(); + + + }); + + #endregion + + using var host = builder.Build(); + await host.StartAsync(); + } + public async Task configure_durable_listener() { var builder = Host.CreateApplicationBuilder(); diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/TestingExtensions.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/TestingExtensions.cs index 6fb08b96f..73509c6fd 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/TestingExtensions.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/TestingExtensions.cs @@ -14,6 +14,6 @@ public static AzureServiceBusConfiguration UseAzureServiceBusTesting(this Wolver var connectionString = File.ReadAllText(path).Trim(); - return options.UseAzureServiceBus(connectionString); + return options.UseAzureServiceBus(connectionString).AutoProvision(); } } \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/leader_election.cs b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/leader_election.cs new file mode 100644 index 000000000..3547443b0 --- /dev/null +++ b/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/leader_election.cs @@ -0,0 +1,29 @@ +using IntegrationTests; +using Npgsql; +using Weasel.Postgresql; +using Wolverine.ComplianceTests; +using Wolverine.Postgresql; +using Xunit.Abstractions; + +namespace Wolverine.AzureServiceBus.Tests; + +public class leader_election : LeadershipElectionCompliance +{ + public leader_election(ITestOutputHelper output) : base(output) + { + } + + protected override async Task beforeBuildingHost() + { + await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync("registry"); + await conn.CloseAsync(); + } + + protected override void configureNode(WolverineOptions opts) + { + opts.UseAzureServiceBusTesting().EnableWolverineControlQueues(); + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "registry"); + } +} \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs index 784667bf5..8832e0efe 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusConfiguration.cs @@ -1,5 +1,7 @@ using Azure.Messaging.ServiceBus.Administration; +using JasperFx.Core; using Wolverine.AzureServiceBus.Internal; +using Wolverine.Configuration; using Wolverine.Transports; namespace Wolverine.AzureServiceBus; @@ -90,4 +92,28 @@ public AzureServiceBusConfiguration SystemQueuesAreEnabled(bool enabled) Transport.SystemQueuesEnabled = enabled; return this; } + + /// + /// Utilize an Azure Service Bus queue as the control queue between Wolverine nodes + /// This is more efficient than the built in Wolverine database control + /// queues if Azure Service Bus is an option + /// + /// + public AzureServiceBusConfiguration EnableWolverineControlQueues() + { + var queueName = "wolverine.control." + Options.Durability.AssignedNodeNumber; + + var queue = Transport.Queues[queueName]; + + queue.Options.AutoDeleteOnIdle = 5.Minutes(); + queue.Mode = EndpointMode.BufferedInMemory; + queue.IsListener = true; + queue.EndpointName = "Control"; + queue.IsUsedForReplies = true; + queue.Role = EndpointRole.System; + + Options.Transports.NodeControlEndpoint = queue; + + return this; + } } \ No newline at end of file diff --git a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs index f5d3699c1..9a0137f02 100644 --- a/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs +++ b/src/Transports/Azure/Wolverine.AzureServiceBus/AzureServiceBusTransport.cs @@ -39,7 +39,7 @@ public override string SanitizeIdentifier(string identifier) } /// - /// Is this transport connection allowed to build and use response and retry queues + /// Is this transport connection allowed to build and use response, retry, and control queues /// for just this node? /// public bool SystemQueuesEnabled { get; set; } = true; @@ -95,9 +95,8 @@ protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime) retryQueue.IsListener = true; retryQueue.EndpointName = RetryEndpointName; retryQueue.Role = EndpointRole.System; - + RetryQueue = retryQueue; - } public override Endpoint? ReplyEndpoint()