Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch processing working *with* external transports. Closes GH-1076 #1084

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/Transports/Kafka/BatchMessaging/BatchMessaging.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<TargetFrameworks>net8.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.7"/>
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0"/>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Wolverine.Kafka\Wolverine.Kafka.csproj" />
</ItemGroup>

</Project>
6 changes: 6 additions & 0 deletions src/Transports/Kafka/BatchMessaging/BatchMessaging.http
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
@BatchMessaging_HostAddress = http://localhost:5089

GET {{BatchMessaging_HostAddress}}/weatherforecast/
Accept: application/json

###
52 changes: 52 additions & 0 deletions src/Transports/Kafka/BatchMessaging/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using Confluent.Kafka;
using Oakton;
using Wolverine;
using Wolverine.Kafka;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

builder.Host.UseWolverine(opts =>
{
opts.UseKafka("localhost:9092").AutoProvision();

opts.PublishAllMessages().ToKafkaTopic("topic_0");

opts.BatchMessagesOf<TestMessage>();
opts.ListenToKafkaTopic("topic_0");
});

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}

app.MapPost("/test", async (IMessageBus bus) =>
{
var message = new TestMessage();
await bus.PublishAsync(message);
await bus.PublishAsync(message);
// results in:
// No known handler for TestMessage#08dced0c-3834-b4c6-54d7-e075bf020000 from kafka://topic/topic_0
})
.WithOpenApi();

return await app.RunOaktonCommands(args);

public partial class Program {}


public record TestMessage;

public class TestMessagesHandler
{
public void Handle(TestMessage[] messages)
{
Console.WriteLine("Messages received");
}
}
41 changes: 41 additions & 0 deletions src/Transports/Kafka/BatchMessaging/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"$schema": "http://json.schemastore.org/launchsettings.json",
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:3391",
"sslPort": 44303
}
},
"profiles": {
"http": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"launchUrl": "swagger",
"applicationUrl": "http://localhost:5089",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"https": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"launchUrl": "swagger",
"applicationUrl": "https://localhost:7028;http://localhost:5089",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "swagger",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
}
}
9 changes: 9 additions & 0 deletions src/Transports/Kafka/BatchMessaging/appsettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<PropertyGroup>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
<TargetFrameworks>net8.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
Expand All @@ -21,6 +22,7 @@

<ItemGroup>
<ProjectReference Include="..\..\..\Testing\Wolverine.ComplianceTests\Wolverine.ComplianceTests.csproj" />
<ProjectReference Include="..\BatchMessaging\BatchMessaging.csproj" />
<ProjectReference Include="..\Wolverine.Kafka\Wolverine.Kafka.csproj"/>
</ItemGroup>

Expand All @@ -30,4 +32,8 @@
</Compile>
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
<PackageReference Include="Alba" Version="8.0.0" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Alba;
using Oakton;
using Shouldly;
using Wolverine.Tracking;

namespace Wolverine.Kafka.Tests;

public class batch_processing_with_kafka
{
[Fact]
public async Task end_to_end()
{
OaktonEnvironment.AutoStartHost = true;

await using var host = await AlbaHost.For<Program>(_ => {});

IScenarioResult result = null!;

Func<IMessageContext, Task> execute = async _ =>
{
result = await host.Scenario(x => { x.Post.Url("/test"); });
};

var tracked = await host
.TrackActivity()
.WaitForMessageToBeReceivedAt<TestMessage[]>(host)
.ExecuteAndWaitAsync(execute);

tracked.FindSingleTrackedMessageOfType<TestMessage[]>()
.Length.ShouldBe(2);
}
}
1 change: 0 additions & 1 deletion src/Wolverine/Runtime/Handlers/HandlerGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ namespace Wolverine.Runtime.Handlers;

public partial class HandlerGraph : ICodeFileCollectionWithServices, IWithFailurePolicies
{
public static readonly string Context = "context";
private readonly List<HandlerCall> _calls = new();
private readonly object _compilingLock = new();

Expand Down
3 changes: 3 additions & 0 deletions src/Wolverine/WolverineOptions.Batching.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public LocalQueueConfiguration BatchMessagesOf(Type elementType, Action<Batching
{
throw new ArgumentNullException(nameof(elementType));
}

// GH-1076
HandlerGraph.RegisterMessageType(elementType);

var options = new BatchingOptions(elementType);
var localQueue = Transports.GetOrCreate<LocalTransport>().FindQueueForMessageType(elementType);
Expand Down
7 changes: 7 additions & 0 deletions wolverine.sln
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.RavenDb", "src\Pe
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RavenDbTests", "src\Persistence\RavenDbTests\RavenDbTests.csproj", "{71B152DD-7A0B-4935-B8B1-1060E674D23D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BatchMessaging", "src\Transports\Kafka\BatchMessaging\BatchMessaging.csproj", "{B035801D-E786-4AAA-858A-0770D88116D6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -610,6 +612,10 @@ Global
{71B152DD-7A0B-4935-B8B1-1060E674D23D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{71B152DD-7A0B-4935-B8B1-1060E674D23D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{71B152DD-7A0B-4935-B8B1-1060E674D23D}.Release|Any CPU.Build.0 = Release|Any CPU
{B035801D-E786-4AAA-858A-0770D88116D6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B035801D-E786-4AAA-858A-0770D88116D6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B035801D-E786-4AAA-858A-0770D88116D6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B035801D-E786-4AAA-858A-0770D88116D6}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{24497E6A-D6B1-4C80-ABFB-57FFAD5070C4} = {96119B5E-B5F0-400A-9580-B342EBE26212}
Expand Down Expand Up @@ -717,5 +723,6 @@ Global
{1A34A78B-F6AB-41A9-8B90-384C6E1DBC63} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{AAFFC067-D110-45FF-9FA0-8E02F77D9D14} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{71B152DD-7A0B-4935-B8B1-1060E674D23D} = {7A9E0EAE-9ABF-40F6-9DB9-8FB1243F4210}
{B035801D-E786-4AAA-858A-0770D88116D6} = {63E9B289-95E8-4F2B-A064-156971A6853C}
EndGlobalSection
EndGlobal
Loading