Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The [ReadAggregate] support & more validation on [Aggregate] usage #1290

Merged
merged 2 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/guide/durability/marten/event-sourcing.md
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,22 @@ public static class RaiseIfValidatedHandler

To mark a Marten event stream as archived from a Wolverine aggregate handler, just append the special Marten [Archived](https://martendb.io/events/archiving.html#archived-event)
event to the stream just like you would in any other aggregate handler.

## Reading the Latest Version of an Aggregate

::: info
This is using Marten's [FetchLatest(https://martendb.io/events/projections/read-aggregates.html#fetchlatest) API]() and is limited to single stream
projections.
:::

If you want to inject the current state of an event sourced aggregate as a parameter into
a message handler method strictly for information and don't need the heavier "aggregate handler workflow," use the `[ReadAggregate]` attribute like this:

snippet: sample_using_ReadAggregate_in_messsage_handlers

If the aggregate doesn't exist, the HTTP request will stop with a 404 status code.
The aggregate/stream identity is found with the same rules as the `[Entity]` or `[Aggregate]` attributes:

1. You can specify a particular request body property name or route argument
2. Look for a request body property or route argument named "EntityTypeId"
3. Look for a request body property or route argument named "Id" or "id"
20 changes: 20 additions & 0 deletions docs/guide/http/marten.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,25 @@ public static (UpdatedAggregate, Events) ConfirmDifferent(ConfirmOrder command,
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Marten/Orders.cs#L268-L282' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_returning_updated_aggregate_as_response_from_http_endpoint' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Reading the Latest Version of an Aggregate

::: info
This is using Marten's [FetchLatest(https://martendb.io/events/projections/read-aggregates.html#fetchlatest) API]() and is limited to single stream
projections.
:::

If you want to inject the current state of an event sourced aggregate as a parameter into
an HTTP endpoint method, use the `[ReadAggregate]` attribute like this:

snippet: sample_using_ReadAggregate_in_HTTP

If the aggregate doesn't exist, the HTTP request will stop with a 404 status code.
The aggregate/stream identity is found with the same rules as the `[Entity]` or `[Aggregate]` attributes:

1. You can specify a particular request body property name or route argument
2. Look for a request body property or route argument named "EntityTypeId"
3. Look for a request body property or route argument named "Id" or "id"

### Compiled Query Resource Writer Policy

Marten integration comes with an `IResourceWriterPolicy` policy that handles compiled queries as return types.
Expand Down Expand Up @@ -359,3 +378,4 @@ public class ApprovedInvoicedCompiledQuery : ICompiledListQuery<Invoice>
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Marten/Documents.cs#L109-L119' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_compiled_query_return_query' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

6 changes: 6 additions & 0 deletions src/Http/Wolverine.Http.Marten/AggregateAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public AggregateAttribute(string routeOrParameterName)

public override Variable Modify(HttpChain chain, ParameterInfo parameter, IServiceContainer container)
{
if (chain.Method.Method.GetParameters().Where(x => x.HasAttribute<AggregateAttribute>()).Count() > 1)
{
throw new InvalidOperationException(
"It is only possible (today) to use a single [Aggregate] attribute on an HTTP handler method. Maybe use [ReadAggregate] if all you need is the projected data");
}

chain.Metadata.Produces(404);

AggregateType = parameter.ParameterType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Diagnostics;
using Alba;
using Marten;
using Shouldly;
using WolverineWebApi.Marten;

namespace Wolverine.Http.Tests.Marten;

public class using_read_aggregate_attribute(AppFixture fixture) : IntegrationContext(fixture)
{
[Fact]
public async Task happy_path_reading_aggregate()
{
var id = Guid.NewGuid();

// Creating a new order
await Scenario(x =>
{
x.Post.Json(new StartOrderWithId(id, ["Socks", "Shoes", "Shirt"])).ToUrl("/orders/create4");
});

var result = await Host.GetAsJson<Order>("/orders/latest/" + id);
result.Items.Keys.ShouldContain("Socks");
}

[Fact]
public async Task sad_path_no_aggregate_return_404()
{
await Scenario(x =>
{
x.Get.Url("/orders/latest/" + Guid.NewGuid());
x.StatusCodeShouldBe(404);
});
}
}
2 changes: 1 addition & 1 deletion src/Http/Wolverine.Http.Tests/Wolverine.Http.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<IsPackable>false</IsPackable>
<TargetFrameworks>net8.0;net9.0</TargetFrameworks>
<TargetFrameworks>net9.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
Expand Down
7 changes: 7 additions & 0 deletions src/Http/WolverineWebApi/Marten/Orders.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,11 @@ [new OrderConfirmed()]
}

#endregion

#region sample_using_ReadAggregate_in_HTTP

[WolverineGet("/orders/latest/{id}")]
public static Order GetLatest(Guid id, [ReadAggregate] Order order) => order;

#endregion
}
11 changes: 4 additions & 7 deletions src/Http/WolverineWebApi/WolverineWebApi.csproj
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFrameworks>net9.0</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Lamar.Diagnostics" Version="14.0.1" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.2" />
Expand All @@ -18,19 +20,14 @@
<ProjectReference Include="..\Wolverine.Http\Wolverine.Http.csproj" />
</ItemGroup>

<ItemGroup Condition="'$(targetframework)' == 'net7.0'">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="[7.0.1, 9.0.0)" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="[7.0.1, 9.0.0)" />
</ItemGroup>

<ItemGroup Condition="'$(targetframework)' == 'net8.0'">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="8.0.11" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.11" />
</ItemGroup>

<ItemGroup Condition="'$(targetframework)' == 'net9.0'">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="9.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.2" />
</ItemGroup>

<ItemGroup>
Expand Down
95 changes: 95 additions & 0 deletions src/Persistence/MartenTests/read_aggregate_attribute_usage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using IntegrationTests;
using JasperFx.CodeGeneration;
using Marten;
using Marten.Events.Projections;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Oakton.Resources;
using Shouldly;
using Wolverine;
using Wolverine.Marten;

namespace MartenTests;

public class read_aggregate_attribute_usage: PostgresqlContext, IAsyncLifetime
{
private IHost theHost;
private IDocumentStore theStore;
private Guid theStreamId;

public async Task InitializeAsync()
{
theHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;

opts.Discovery.DisableConventionalDiscovery().IncludeType(typeof(FindLettersHandler));

opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.Projections.Snapshot<LetterAggregate>(SnapshotLifecycle.Async);

m.DisableNpgsqlLogging = true;
})
.UseLightweightSessions()
.IntegrateWithWolverine();

opts.Services.AddResourceSetupOnStartup();
}).StartAsync();

theStore = theHost.Services.GetRequiredService<IDocumentStore>();
}

public async Task DisposeAsync()
{
await theHost.StopAsync();
theHost.Dispose();
}

[Fact]
public async Task use_end_to_end_happy_past()
{
var streamId = Guid.NewGuid();
using (var session = theStore.LightweightSession())
{
session.Events.StartStream<LetterAggregate>(streamId, new AEvent(), new AEvent(), new CEvent());
await session.SaveChangesAsync();

var latest = await session.Events.FetchLatest<LetterAggregate>(streamId);
latest.ShouldNotBeNull();
}

var envelope = await theHost.MessageBus().InvokeAsync<LetterAggregateEnvelope>(new FindAggregate(streamId));
envelope.Inner.ACount.ShouldBe(2);
envelope.Inner.CCount.ShouldBe(1);
}

[Fact]
public async Task end_to_end_sad_path()
{
var envelope = await theHost.MessageBus().InvokeAsync<LetterAggregateEnvelope>(new FindAggregate(Guid.NewGuid()));
envelope.ShouldBeNull();
}
}



public record LetterAggregateEnvelope(LetterAggregate Inner);

#region sample_using_ReadAggregate_in_messsage_handlers

public record FindAggregate(Guid Id);

public static class FindLettersHandler
{
// This is admittedly just some weak sauce testing support code
public static LetterAggregateEnvelope Handle(
FindAggregate command,
[ReadAggregate] LetterAggregate aggregate)

=> new LetterAggregateEnvelope(aggregate);
}

#endregion
95 changes: 95 additions & 0 deletions src/Persistence/Wolverine.Marten/ReadAggregateAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System.Reflection;
using JasperFx.CodeGeneration;
using JasperFx.CodeGeneration.Frames;
using JasperFx.CodeGeneration.Model;
using JasperFx.Core.Reflection;
using Marten;
using Marten.Events;
using Wolverine.Attributes;
using Wolverine.Configuration;
using Wolverine.Marten.Persistence.Sagas;
using Wolverine.Persistence;
using Wolverine.Runtime;

namespace Wolverine.Marten;

/// <summary>
/// Use Marten's FetchLatest() API to retrieve the parameter value
/// </summary>
public class ReadAggregateAttribute : WolverineParameterAttribute
{
public ReadAggregateAttribute()
{
ValueSource = ValueSource.Anything;
}

public ReadAggregateAttribute(string argumentName) : base(argumentName)
{
ValueSource = ValueSource.Anything;
}

/// <summary>
/// Is the existence of this aggregate required for the rest of the handler action or HTTP endpoint
/// execution to continue? Default is true.
/// </summary>
public bool Required { get; set; } = true;

public override Variable Modify(IChain chain, ParameterInfo parameter, IServiceContainer container, GenerationRules rules)
{
// I know it's goofy that this refers to the saga, but it should work fine here too
var idType = new MartenPersistenceFrameProvider().DetermineSagaIdType(parameter.ParameterType, container);

if (!tryFindIdentityVariable(chain, parameter, idType, out var identity))
{
throw new InvalidEntityLoadUsageException(this, parameter);
}

var frame = new FetchLatestAggregateFrame(parameter.ParameterType, identity);

if (Required)
{
var otherFrames = chain.AddStopConditionIfNull(frame.Aggregate);

var block = new LoadEntityFrameBlock(frame.Aggregate, otherFrames);
chain.Middleware.Add(block);

return block.Mirror;
}

chain.Middleware.Add(frame);

return frame.Aggregate;
}
}

internal class FetchLatestAggregateFrame : AsyncFrame
{
private readonly Variable _identity;
private Variable _session;
private Variable _token;

public FetchLatestAggregateFrame(Type aggregateType, Variable identity)
{
_identity = identity;
Aggregate = new Variable(aggregateType, this);
}

public Variable Aggregate { get; }

public override IEnumerable<Variable> FindVariables(IMethodVariables chain)
{
_session = chain.FindVariable(typeof(IDocumentSession));
yield return _session;

_token = chain.FindVariable(typeof(CancellationToken));
yield return _token;

yield return _identity;
}

public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
{
writer.Write($"var {Aggregate.Usage} = await {_session.Usage}.Events.{nameof(IEventStore.FetchLatest)}<{Aggregate.VariableType.FullNameInCode()}>({_identity.Usage}, {_token.Usage});");
Next?.GenerateCode(method, writer);
}
}
30 changes: 30 additions & 0 deletions src/Wolverine/Attributes/WolverineParameterAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using JasperFx.CodeGeneration;
using JasperFx.CodeGeneration.Frames;
using JasperFx.CodeGeneration.Model;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Wolverine.Configuration;
using Wolverine.Runtime;
Expand Down Expand Up @@ -55,4 +56,33 @@ internal static void TryApply(MethodCall call, IServiceContainer container, Gene
}
}
}

protected bool tryFindIdentityVariable(IChain chain, ParameterInfo parameter, Type idType, out Variable variable)
{
if (ArgumentName.IsNotEmpty())
{
if (chain.TryFindVariable(ArgumentName, ValueSource, idType, out variable))
{
return true;
}
}

if (chain.TryFindVariable(parameter.ParameterType.Name + "Id", ValueSource, idType, out variable))
{
return true;
}

if (chain.TryFindVariable("Id", ValueSource, idType, out variable))
{
return true;
}

if (chain.TryFindVariable("id", ValueSource, idType, out variable))
{
return true;
}

variable = default;
return false;
}
}
Loading
Loading