Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipe & reducer #643

Merged
merged 12 commits into from
Nov 7, 2024
Merged

Add pipe & reducer #643

merged 12 commits into from
Nov 7, 2024

Conversation

DavidBadura
Copy link
Member

@DavidBadura DavidBadura commented Oct 18, 2024

Add two new components: Pipe and Reducer

Pipe

The Pipe is a construct that allows you to chain multiple translators.
This can be used to manipulate, filter or expand messages or events.
This can be used for anti-corruption layers, data migration, or to fix errors in the event stream.

$messages = new Pipe(
    [$message1, $message2, /* ... */],
    new ExcludeEventTranslator(ProfileVisited::class)),
);

foreach ($messages as $message) {
  // do something
}

Reducer

The Reducer is a construct that allows you to reduce messages to a state.
This can be used to build temporal projections or to create a read model.

$state = (new Reducer())
    ->initState(['name' => 'unknown'])
    ->match([
        ProfileCreated::class => static function (Message $message): array {
            return ['name' => $message->event()->name];
        },
        NameChanged::class => static function (Message $message): array {
            return ['name' => $message->event()->name];
        },
    ])
    ->reduce(
        new Pipe(
            $store->load(new Criteria(
                new AggregateIdCriterion($profileId->toString()),
                new AggregateNameCriterion('profile'),
            )),
            new UntilEventTranslator(new DateTimeImmutable()),
        ),
    );

@DavidBadura DavidBadura added the enhancement New feature or request label Oct 18, 2024
Copy link

github-actions bot commented Oct 18, 2024

Hello 👋

here is the most recent benchmark result:

SubscriptionEngineBench
=======================

+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+
|                           | time (kde mode)                               | memory                                     |
+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+
| subject                   | Tag: <current>  | Tag: base       | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+
| benchHandle10000Events () | 3.082s (±0.00%) | 3.088s (±0.00%) | -0.18%    | 34.234mb        | 34.742mb   | -1.46%      |
+---------------------------+-----------------+-----------------+-----------+-----------------+------------+-------------+

SubscriptionEngineBatchBench
============================

+---------------------------+-------------------+-------------------+-----------+-----------------+------------+-------------+
|                           | time (kde mode)                                   | memory                                     |
+---------------------------+-------------------+-------------------+-----------+-----------------+------------+-------------+
| subject                   | Tag: <current>    | Tag: base         | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+---------------------------+-------------------+-------------------+-----------+-----------------+------------+-------------+
| benchHandle10000Events () | 70.266ms (±0.00%) | 69.019ms (±0.00%) | +1.81%    | 34.234mb        | 34.234mb   | 0.00%       |
+---------------------------+-------------------+-------------------+-----------+-----------------+------------+-------------+

SimpleSetupStreamStoreBench
===========================

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad1Event ()                     | 944.900μs (±0.00%) | 892.800μs (±0.00%) | +5.84%    | 34.744mb        | 34.744mb   | 0.00%       |
| benchLoad10000Events ()                | 46.662ms (±0.00%)  | 47.126ms (±0.00%)  | -0.98%    | 34.744mb        | 34.744mb   | 0.00%       |
| benchSave1Event ()                     | 1.101ms (±0.00%)   | 939.200μs (±0.00%) | +17.21%   | 34.744mb        | 34.744mb   | 0.00%       |
| benchSave10000Events ()                | 226.426ms (±0.00%) | 228.536ms (±0.00%) | -0.92%    | 34.744mb        | 34.744mb   | 0.00%       |
| benchSave10000Aggregates ()            | 7.727s (±0.00%)    | 7.693s (±0.00%)    | +0.45%    | 34.744mb        | 34.744mb   | 0.00%       |
| benchSave10000AggregatesTransaction () | 4.801s (±0.00%)    | 4.792s (±0.00%)    | +0.19%    | 34.744mb        | 34.744mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

SnapshotsBench
==============

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad10000EventsMissingSnapshot () | 48.251ms (±0.00%)  | 48.594ms (±0.00%)  | -0.71%    | 33.836mb        | 33.836mb   | 0.00%       |
| benchLoad10000Events ()                | 941.900μs (±0.00%) | 938.400μs (±0.00%) | +0.37%    | 33.836mb        | 33.836mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

SplitStreamBench
================

+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                         | time (kde mode)                                     | memory                                     |
+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                 | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad10000Events () | 4.548ms (±0.00%)   | 4.515ms (±0.00%)   | +0.74%    | 37.072mb        | 37.072mb   | 0.00%       |
| benchSave10000Events () | 356.380ms (±0.00%) | 368.852ms (±0.00%) | -3.38%    | 37.144mb        | 37.144mb   | -0.00%      |
+-------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

PersonalDataBench
=================

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad1Event ()                     | 901.500μs (±0.00%) | 896.400μs (±0.00%) | +0.57%    | 34.895mb        | 34.895mb   | 0.00%       |
| benchLoad10000Events ()                | 84.481ms (±0.00%)  | 83.268ms (±0.00%)  | +1.46%    | 34.895mb        | 34.895mb   | 0.00%       |
| benchSave1Event ()                     | 1.542ms (±0.00%)   | 1.466ms (±0.00%)   | +5.16%    | 34.895mb        | 34.895mb   | 0.00%       |
| benchSave10000Events ()                | 266.214ms (±0.00%) | 270.823ms (±0.00%) | -1.70%    | 34.897mb        | 34.897mb   | 0.00%       |
| benchSave10000Aggregates ()            | 11.977s (±0.00%)   | 12.000s (±0.00%)   | -0.19%    | 34.895mb        | 34.895mb   | 0.00%       |
| benchSave10000AggregatesTransaction () | 8.939s (±0.00%)    | 8.930s (±0.00%)    | +0.10%    | 35.396mb        | 35.396mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

SimpleSetupBench
================

+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
|                                        | time (kde mode)                                     | memory                                     |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| subject                                | Tag: <current>     | Tag: base          | time-diff | Tag: <current>  | Tag: base  | memory-diff |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+
| benchLoad1Event ()                     | 965.700μs (±0.00%) | 975.800μs (±0.00%) | -1.04%    | 33.765mb        | 33.765mb   | 0.00%       |
| benchLoad10000Events ()                | 50.878ms (±0.00%)  | 48.654ms (±0.00%)  | +4.57%    | 33.765mb        | 33.765mb   | 0.00%       |
| benchSave1Event ()                     | 1.195ms (±0.00%)   | 944.800μs (±0.00%) | +26.43%   | 33.765mb        | 33.765mb   | 0.00%       |
| benchSave10000Events ()                | 234.461ms (±0.00%) | 236.146ms (±0.00%) | -0.71%    | 33.765mb        | 33.765mb   | 0.00%       |
| benchSave10000Aggregates ()            | 7.561s (±0.00%)    | 7.795s (±0.00%)    | -3.00%    | 33.765mb        | 33.765mb   | 0.00%       |
| benchSave10000AggregatesTransaction () | 4.796s (±0.00%)    | 4.802s (±0.00%)    | -0.13%    | 33.765mb        | 33.765mb   | 0.00%       |
+----------------------------------------+--------------------+--------------------+-----------+-----------------+------------+-------------+

This comment gets update everytime a new commit comes in!

@DavidBadura DavidBadura changed the title add pipeline again Add pipe & reducer Oct 22, 2024
@DavidBadura DavidBadura added this to the 3.6.0 milestone Oct 22, 2024
@DavidBadura DavidBadura marked this pull request as ready for review October 23, 2024 10:51
src/Message/Pipe.php Outdated Show resolved Hide resolved
@DavidBadura DavidBadura merged commit f289c56 into 3.6.x Nov 7, 2024
40 checks passed
@DavidBadura DavidBadura deleted the pipeline branch November 7, 2024 13:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants