Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure Service Bus control queue option #1082

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions docs/guide/durability/ravendb.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,48 @@ public static class RecordTeamHandler
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/RavenDbTests/transactional_middleware.cs#L47-L59' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_ravendb_side_effects' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## System Control Queues

The RavenDb integration to Wolverine does not yet come with a built in database control queue
mechanism, so you will need to add that from external messaging brokers as in this example
using Azure Service Bus:

<!-- snippet: sample_enabling_azure_service_bus_control_queues -->
<a id='snippet-sample_enabling_azure_service_bus_control_queues'></a>
```cs
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
// One way or another, you're probably pulling the Azure Service Bus
// connection string out of configuration
var azureServiceBusConnectionString = builder
.Configuration
.GetConnectionString("azure-service-bus")!;

// Connect to the broker in the simplest possible way
opts.UseAzureServiceBus(azureServiceBusConnectionString)
.AutoProvision()

// This enables Wolverine to use temporary Azure Service Bus
// queues created at runtime for communication between
// Wolverine nodes
.EnableWolverineControlQueues();

});
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L193-L216' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_enabling_azure_service_bus_control_queues' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

For local development, there is also an option to let Wolverine just use its TCP transport
as a control endpoint with this configuration option:

```csharp
WolverineOptions.UseTcpForControlEndpoint();
```

In the option above, Wolverine is just looking for an unused port, and assigning that found port
as the listener for the node being bootstrapped.

## RavenOps Side Effects

The `RavenOps` static class can be used as a convenience for RavenDb integration with Wolverine:
Expand Down
4 changes: 2 additions & 2 deletions docs/guide/messaging/expiration.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public async Task message_expiration(IMessageBus bus)
await bus.SendAsync(new StatusUpdate("Okay"), new DeliveryOptions { DeliverBy = DateTime.Today.AddHours(15) });
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L387-L399' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_message_expiration_by_message' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L418-L430' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_message_expiration_by_message' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## By Subscriber
Expand Down Expand Up @@ -83,7 +83,7 @@ builder.UseWolverine(opts =>
using var host = builder.Build();
await host.StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L253-L280' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_delivery_expiration_rules_per_subscriber' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L284-L311' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_delivery_expiration_rules_per_subscriber' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## By Message Type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ builder.UseWolverine(opts =>
using var host = builder.Build();
await host.StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L339-L384' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_conventional_routing_for_azure_service_bus' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L370-L415' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_conventional_routing_for_azure_service_bus' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Route to Topics and Subscriptions
Expand Down
34 changes: 34 additions & 0 deletions docs/guide/messaging/transports/azureservicebus/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,40 @@ to notice that in the Azure Portal.

And also see the next section.

## Wolverine Control Queues

You can opt into using temporary Azure Service Bus queues for intra-node communication
that Wolverine needs for leader election and background worker distribution. Using Azure
Service Bus for this feature is more efficient than the built in database control
queues that Wolverine uses otherwise, and is necessary for message storage options like
RavenDb that do not have a built in control queue mechanism.

<!-- snippet: sample_enabling_azure_service_bus_control_queues -->
<a id='snippet-sample_enabling_azure_service_bus_control_queues'></a>
```cs
var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
// One way or another, you're probably pulling the Azure Service Bus
// connection string out of configuration
var azureServiceBusConnectionString = builder
.Configuration
.GetConnectionString("azure-service-bus")!;

// Connect to the broker in the simplest possible way
opts.UseAzureServiceBus(azureServiceBusConnectionString)
.AutoProvision()

// This enables Wolverine to use temporary Azure Service Bus
// queues created at runtime for communication between
// Wolverine nodes
.EnableWolverineControlQueues();

});
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L193-L216' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_enabling_azure_service_bus_control_queues' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Disabling System Queues

If your application will not have permissions to create temporary queues in Azure Service Bus, you will probably want
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ builder.UseWolverine(opts =>
using var host = builder.Build();
await host.StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L285-L307' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_conventional_listener_configuration_for_azure_service_bus' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L316-L338' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_conventional_listener_configuration_for_azure_service_bus' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Note that any of these settings would be overridden by specific configuration to
Expand Down
4 changes: 2 additions & 2 deletions docs/guide/messaging/transports/azureservicebus/publishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ builder.UseWolverine(opts =>
using var host = builder.Build();
await host.StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L224-L248' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_publishing_to_specific_azure_service_bus_queue' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L255-L279' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_publishing_to_specific_azure_service_bus_queue' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->


Expand Down Expand Up @@ -60,7 +60,7 @@ builder.UseWolverine(opts =>
using var host = builder.Build();
await host.StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L312-L334' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_conventional_subscriber_configuration_for_azure_service_bus' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L343-L365' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_conventional_subscriber_configuration_for_azure_service_bus' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Note that any of these settings would be overridden by specific configuration to
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/runtime.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ opts.ListenToAzureServiceBusQueue("incoming")
opts.PublishAllMessages().ToAzureServiceBusQueue("outgoing")
.UseDurableOutbox();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L205-L215' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_durable_endpoint' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Transports/Azure/Wolverine.AzureServiceBus.Tests/DocumentationSamples.cs#L236-L246' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_durable_endpoint' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Or use policies to do this in one fell swoop (which may not be what you actually want, but you could do this!):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,37 @@ public async Task configure_subscription_filter()
await host.StartAsync();
}

public async Task configure_control_queues()
{
#region sample_enabling_azure_service_bus_control_queues

var builder = Host.CreateApplicationBuilder();
builder.UseWolverine(opts =>
{
// One way or another, you're probably pulling the Azure Service Bus
// connection string out of configuration
var azureServiceBusConnectionString = builder
.Configuration
.GetConnectionString("azure-service-bus")!;

// Connect to the broker in the simplest possible way
opts.UseAzureServiceBus(azureServiceBusConnectionString)
.AutoProvision()

// This enables Wolverine to use temporary Azure Service Bus
// queues created at runtime for communication between
// Wolverine nodes
.EnableWolverineControlQueues();


});

#endregion

using var host = builder.Build();
await host.StartAsync();
}

public async Task configure_durable_listener()
{
var builder = Host.CreateApplicationBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ public static AzureServiceBusConfiguration UseAzureServiceBusTesting(this Wolver

var connectionString = File.ReadAllText(path).Trim();

return options.UseAzureServiceBus(connectionString);
return options.UseAzureServiceBus(connectionString).AutoProvision();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using IntegrationTests;
using Npgsql;
using Weasel.Postgresql;
using Wolverine.ComplianceTests;
using Wolverine.Postgresql;
using Xunit.Abstractions;

namespace Wolverine.AzureServiceBus.Tests;

public class leader_election : LeadershipElectionCompliance
{
public leader_election(ITestOutputHelper output) : base(output)
{
}

protected override async Task beforeBuildingHost()
{
await using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
await conn.OpenAsync();
await conn.DropSchemaAsync("registry");
await conn.CloseAsync();
}

protected override void configureNode(WolverineOptions opts)
{
opts.UseAzureServiceBusTesting().EnableWolverineControlQueues();
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "registry");
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using Azure.Messaging.ServiceBus.Administration;
using JasperFx.Core;
using Wolverine.AzureServiceBus.Internal;
using Wolverine.Configuration;
using Wolverine.Transports;

namespace Wolverine.AzureServiceBus;
Expand Down Expand Up @@ -90,4 +92,28 @@ public AzureServiceBusConfiguration SystemQueuesAreEnabled(bool enabled)
Transport.SystemQueuesEnabled = enabled;
return this;
}

/// <summary>
/// Utilize an Azure Service Bus queue as the control queue between Wolverine nodes
/// This is more efficient than the built in Wolverine database control
/// queues if Azure Service Bus is an option
/// </summary>
/// <returns></returns>
public AzureServiceBusConfiguration EnableWolverineControlQueues()
{
var queueName = "wolverine.control." + Options.Durability.AssignedNodeNumber;

var queue = Transport.Queues[queueName];

queue.Options.AutoDeleteOnIdle = 5.Minutes();
queue.Mode = EndpointMode.BufferedInMemory;
queue.IsListener = true;
queue.EndpointName = "Control";
queue.IsUsedForReplies = true;
queue.Role = EndpointRole.System;

Options.Transports.NodeControlEndpoint = queue;

return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public override string SanitizeIdentifier(string identifier)
}

/// <summary>
/// Is this transport connection allowed to build and use response and retry queues
/// Is this transport connection allowed to build and use response, retry, and control queues
/// for just this node?
/// </summary>
public bool SystemQueuesEnabled { get; set; } = true;
Expand Down Expand Up @@ -95,9 +95,8 @@ protected override void tryBuildSystemEndpoints(IWolverineRuntime runtime)
retryQueue.IsListener = true;
retryQueue.EndpointName = RetryEndpointName;
retryQueue.Role = EndpointRole.System;

RetryQueue = retryQueue;

}

public override Endpoint? ReplyEndpoint()
Expand Down
Loading