Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to override events and expected version #397

Merged
merged 15 commits into from
Dec 27, 2024
Merged
17 changes: 11 additions & 6 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ jobs:
name: "Build and test"
# runs-on: ubuntu-latest
runs-on: self-hosted
strategy:
matrix:
dotnet-version: [ '8.0', '9.0' ]
env:
NUGET_PACKAGES: ${{ github.workspace }}/.nuget/packages
TC_CLOUD_TOKEN: ${{ secrets.TC_TOKEN }}
Expand All @@ -32,27 +35,29 @@ jobs:
name: Setup .NET
uses: actions/setup-dotnet@v4
with:
dotnet-version: |
8.0.x
9.0.x
dotnet-version: ${{ matrix.dotnet-version }}
-
name: Restore
run: |
dotnet restore -p:TargetFramework=net${{ matrix.dotnet-version }} -p:Configuration="Debug CI"
-
name: Build
run: |
dotnet build -c "Debug CI"
dotnet build -c "Debug CI" -f net${{ matrix.dotnet-version }} --no-restore
-
name: Prepare Testcontainers Cloud agent
if: env.TC_CLOUD_TOKEN != ''
uses: atomicjar/testcontainers-cloud-setup-action@main
-
name: Run tests
run: |
dotnet test -c "Debug CI" --no-build
dotnet test -c "Debug CI" --no-build -f net${{ matrix.dotnet-version }}
-
name: Upload Test Results
if: always()
uses: actions/upload-artifact@v4
with:
name: Test Results ${{ matrix.dotnet }}
name: Test Results ${{ matrix.dotnet-version }}
path: |
test-results/**/*.xml
test-results/**/*.trx
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) Eventuous HQ OÜ.All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Persistence;
using Eventuous.Shared;
using static Eventuous.CommandServiceDelegates;

namespace Eventuous;
Expand Down Expand Up @@ -38,46 +40,43 @@ public interface IDefineIdentity<out TCommand, out TAggregate, out TState, TId>
ICommandHandlerBuilder<TCommand, TAggregate, TState, TId> GetIdAsync(Func<TCommand, CancellationToken, ValueTask<TId>> getId);
}

public interface IDefineStore<out TCommand, out TAggregate, out TState, TId>
public interface IDefineStore<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how to resolve the event store from the command. It assigns both reader and writer.
/// If not defined, the reader and writer provided by the functional service will be used.
/// </summary>
/// <param name="resolveStore">Function to resolve the event writer</param>
/// <returns></returns>
IDefineExecution<TCommand, TAggregate, TState, TId> ResolveStore(Func<TCommand, IEventStore> resolveStore);
IDefineExecution<TCommand, TAggregate, TState> ResolveStore(Func<TCommand, IEventStore> resolveStore);
}

public interface IDefineReader<out TCommand, out TAggregate, out TState, TId>
public interface IDefineReader<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how to resolve the event reader from the command.
/// If not defined, the reader provided by the functional service will be used.
/// </summary>
/// <param name="resolveReader">Function to resolve the event reader</param>
/// <returns></returns>
IDefineWriter<TCommand, TAggregate, TState, TId> ResolveReader(Func<TCommand, IEventReader> resolveReader);
IDefineWriter<TCommand, TAggregate, TState> ResolveReader(Func<TCommand, IEventReader> resolveReader);
}

public interface IDefineWriter<out TCommand, out TAggregate, out TState, TId>
public interface IDefineWriter<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how to resolve the event writer from the command.
/// If not defined, the writer provided by the functional service will be used.
/// </summary>
/// <param name="resolveWriter">Function to resolve the event writer</param>
/// <returns></returns>
IDefineExecution<TCommand, TAggregate, TState, TId> ResolveWriter(Func<TCommand, IEventWriter> resolveWriter);
IDefineExecution<TCommand, TAggregate, TState> ResolveWriter(Func<TCommand, IEventWriter> resolveWriter);
}

public interface IDefineEventAmendment<out TCommand, out TAggregate, out TState, TId>
Expand All @@ -90,44 +89,42 @@ public interface IDefineEventAmendment<out TCommand, out TAggregate, out TState,
/// </summary>
/// <param name="amendEvent">A function to amend the event</param>
/// <returns></returns>
IDefineStoreOrExecution<TCommand, TAggregate, TState, TId> AmendEvent(AmendEvent<TCommand> amendEvent);
IDefineStoreOrExecution<TCommand, TAggregate, TState> AmendEvent(AmendEvent<TCommand> amendEvent);
}

public interface IDefineExecution<out TCommand, out TAggregate, out TState, TId>
public interface IDefineExecution<out TCommand, out TAggregate, out TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class {
/// <summary>
/// Defines how the command that acts on the aggregate.
/// </summary>
/// <param name="action">A function that executes an operation on an aggregate</param>
/// <returns></returns>
void Act(Action<TAggregate, TCommand> action);
IDefineAppendAmendment<TCommand> Act(Action<TAggregate, TCommand> action);

/// <summary>
/// Defines how the command that acts on the aggregate.
/// </summary>
/// <param name="action">A function that executes an asynchronous operation on an aggregate</param>
/// <returns></returns>
void ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action);
IDefineAppendAmendment<TCommand> ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action);
}

public interface IDefineStoreOrExecution<out TCommand, out TAggregate, out TState, TId>
: IDefineStore<TCommand, TAggregate, TState, TId>,
IDefineReader<TCommand, TAggregate, TState, TId>,
IDefineWriter<TCommand, TAggregate, TState, TId>,
IDefineExecution<TCommand, TAggregate, TState, TId>
public interface IDefineStoreOrExecution<out TCommand, out TAggregate, out TState>
: IDefineStore<TCommand, TAggregate, TState>,
IDefineReader<TCommand, TAggregate, TState>,
IDefineWriter<TCommand, TAggregate, TState>,
IDefineExecution<TCommand, TAggregate, TState>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id
where TCommand : class;

public interface ICommandHandlerBuilder<out TCommand, out TAggregate, out TState, TId>
: IDefineStore<TCommand, TAggregate, TState, TId>,
IDefineReader<TCommand, TAggregate, TState, TId>,
IDefineWriter<TCommand, TAggregate, TState, TId>,
IDefineExecution<TCommand, TAggregate, TState, TId>,
: IDefineStore<TCommand, TAggregate, TState>,
IDefineReader<TCommand, TAggregate, TState>,
IDefineWriter<TCommand, TAggregate, TState>,
IDefineExecution<TCommand, TAggregate, TState>,
IDefineEventAmendment<TCommand, TAggregate, TState, TId>
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
Expand All @@ -152,18 +149,20 @@ public class CommandHandlerBuilder<TCommand, TAggregate, TState, TId>(
)
: IDefineExpectedState<TCommand, TAggregate, TState, TId>,
IDefineIdentity<TCommand, TAggregate, TState, TId>,
IDefineStoreOrExecution<TCommand, TAggregate, TState, TId>,
IDefineStoreOrExecution<TCommand, TAggregate, TState>,
IDefineAppendAmendment<TCommand>,
ICommandHandlerBuilder<TCommand, TAggregate, TState, TId>
where TCommand : class
where TAggregate : Aggregate<TState>
where TState : State<TState>, new()
where TId : Id {
GetIdFromUntypedCommand<TId>? _getId;
HandleUntypedCommand<TAggregate, TState>? _action;
Func<TCommand, IEventReader>? _reader;
Func<TCommand, IEventWriter>? _writer;
AmendEvent<TCommand>? _amendEvent;
ExpectedState _expectedState = ExpectedState.Any;
GetIdFromUntypedCommand<TId>? _getId;
HandleUntypedCommand<TAggregate, TState>? _action;
Func<TCommand, IEventReader>? _reader;
Func<TCommand, IEventWriter>? _writer;
AmendEvent<TCommand>? _amendEvent;
ExpectedState _expectedState = ExpectedState.Any;
RegisteredHandler<TAggregate, TState, TId>? _handler;

IDefineIdentity<TCommand, TAggregate, TState, TId> IDefineExpectedState<TCommand, TAggregate, TState, TId>.InState(ExpectedState expectedState) {
_expectedState = expectedState;
Expand All @@ -183,50 +182,60 @@ ICommandHandlerBuilder<TCommand, TAggregate, TState, TId> IDefineIdentity<TComma
return this;
}

void IDefineExecution<TCommand, TAggregate, TState, TId>.Act(Action<TAggregate, TCommand> action) {
IDefineAppendAmendment<TCommand> IDefineExecution<TCommand, TAggregate, TState>.Act(Action<TAggregate, TCommand> action) {
_action = (aggregate, cmd, _) => {
action(aggregate, (TCommand)cmd);

return ValueTask.FromResult(aggregate);
};
service.AddHandler<TCommand>(Build());
_handler = Build();
service.AddHandler<TCommand>(_handler);

return this;
}

void IDefineExecution<TCommand, TAggregate, TState, TId>.ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action) {
IDefineAppendAmendment<TCommand> IDefineExecution<TCommand, TAggregate, TState>.ActAsync(Func<TAggregate, TCommand, CancellationToken, Task> action) {
_action = async (aggregate, cmd, token) => {
await action(aggregate, (TCommand)cmd, token).NoContext();

return aggregate;
};
service.AddHandler<TCommand>(Build());
_handler = Build();
service.AddHandler<TCommand>(_handler);

return this;
}

IDefineExecution<TCommand, TAggregate, TState, TId> IDefineStore<TCommand, TAggregate, TState, TId>.ResolveStore(Func<TCommand, IEventStore> resolveStore) {
IDefineExecution<TCommand, TAggregate, TState> IDefineStore<TCommand, TAggregate, TState>.ResolveStore(Func<TCommand, IEventStore> resolveStore) {
Ensure.NotNull(resolveStore, nameof(resolveStore));
_reader = resolveStore;
_writer = resolveStore;

return this;
}

IDefineWriter<TCommand, TAggregate, TState, TId> IDefineReader<TCommand, TAggregate, TState, TId>.ResolveReader(Func<TCommand, IEventReader> resolveReader) {
IDefineWriter<TCommand, TAggregate, TState> IDefineReader<TCommand, TAggregate, TState>.ResolveReader(Func<TCommand, IEventReader> resolveReader) {
_reader = resolveReader;

return this;
}

IDefineExecution<TCommand, TAggregate, TState, TId> IDefineWriter<TCommand, TAggregate, TState, TId>.ResolveWriter(Func<TCommand, IEventWriter> resolveWriter) {
IDefineExecution<TCommand, TAggregate, TState> IDefineWriter<TCommand, TAggregate, TState>.ResolveWriter(Func<TCommand, IEventWriter> resolveWriter) {
_writer = resolveWriter;

return this;
}

IDefineStoreOrExecution<TCommand, TAggregate, TState, TId> IDefineEventAmendment<TCommand, TAggregate, TState, TId>.AmendEvent(AmendEvent<TCommand> amendEvent) {
IDefineStoreOrExecution<TCommand, TAggregate, TState> IDefineEventAmendment<TCommand, TAggregate, TState, TId>.AmendEvent(AmendEvent<TCommand> amendEvent) {
_amendEvent = amendEvent;

return this;
}

void IDefineAppendAmendment<TCommand>.AmendAppend(AmendAppend<TCommand> amendAppend) {
Ensure.NotNull(_handler, "Handler hasn't been built yet").AmendAppend = (append, cmd) => amendAppend(append, (TCommand)cmd);
}

RegisteredHandler<TAggregate, TState, TId> Build() {
return new(
_expectedState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0.

using System.Reflection;
using Eventuous.Persistence;
using static Eventuous.CommandServiceDelegates;
using static Eventuous.FuncServiceDelegates;

Expand All @@ -16,7 +17,9 @@ record RegisteredHandler<TAggregate, TState, TId>(
ResolveReaderFromCommand ResolveReader,
ResolveWriterFromCommand ResolveWriter,
AmendEventFromCommand? AmendEvent
) where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new();
) where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new() {
public AmendAppend? AmendAppend { get; set; }
}

class HandlersMap<TAggregate, TState, TId> where TAggregate : Aggregate<TState> where TId : Id where TState : State<TState>, new() {
readonly TypeMap<RegisteredHandler<TAggregate, TState, TId>> _typeMap = new();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Persistence;

namespace Eventuous;

using static Diagnostics.ApplicationEventSource;
Expand Down Expand Up @@ -87,8 +89,10 @@ public async Task<Result<TState>> Handle<TCommand>(TCommand command, Cancellatio
// Zero in the global position would mean nothing, so the receiver needs to check the Changes.Length
if (result.Changes.Count == 0) return Result<TState>.FromSuccess(result.State, Array.Empty<Change>(), 0);

var proposed = new ProposedAppend(stream, new(result.OriginalVersion), result.Changes.Select(x => new ProposedEvent(x, new())).ToArray());
var final = registeredHandler.AmendAppend?.Invoke(proposed, command) ?? proposed;
var writer = registeredHandler.ResolveWriter(command);
var storeResult = await writer.StoreAggregate<TAggregate, TState>(stream, result, Amend, cancellationToken).NoContext();
var storeResult = await writer.Store(final, Amend, cancellationToken).NoContext();
var changes = result.Changes.Select(x => Change.FromEvent(x, _typeMap));
Log.CommandHandled<TCommand>();

Expand Down
Loading
Loading