diff --git a/docs/pages/pipeline.md b/docs/pages/pipeline.md index 8fe975d6..18ba2a0b 100644 --- a/docs/pages/pipeline.md +++ b/docs/pages/pipeline.md @@ -14,11 +14,9 @@ use Patchlevel\EventSourcing\Pipeline\Middleware\ExcludeEventMiddleware; use Patchlevel\EventSourcing\Pipeline\Middleware\RecalculatePlayheadMiddleware; use Patchlevel\EventSourcing\Pipeline\Middleware\ReplaceEventMiddleware; use Patchlevel\EventSourcing\Pipeline\Pipeline; -use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; use Patchlevel\EventSourcing\Pipeline\Target\StoreTarget; $pipeline = new Pipeline( - new StoreSource($oldStore), new StoreTarget($newStore), [ new ExcludeEventMiddleware([PrivacyAdded::class]), @@ -28,6 +26,8 @@ $pipeline = new Pipeline( new RecalculatePlayheadMiddleware(), ] ); + +$pipeline->run($oldStore->load()); ``` !!! danger @@ -53,69 +53,6 @@ There is a source where the data comes from. A target where the data should flow. And any number of middlewares to do something with the data beforehand. -## Source - -The first thing you need is a source of where the data should come from. - -### Store - -The `StoreSource` is the standard source to load all events from the database. - -```php -use Patchlevel\EventSourcing\Pipeline\Source\StoreSource; - -$source = new StoreSource($store); -``` - -### In Memory - -There is an `InMemorySource` that receives the messages in an array. This source can be used to write pipeline tests. - -```php -use Patchlevel\EventSourcing\EventBus\Message; -use Patchlevel\EventSourcing\Pipeline\Source\InMemorySource; - -$source = new InMemorySource([ - new Message( - Profile::class, - '1', - 1, - new ProfileCreated(Email::fromString('david.badura@patchlevel.de')), - ), - // ... -]); -``` - -### Custom Source - -You can also create your own source class. It has to inherit from `Source`. -Here you can, for example, create a migration from another event sourcing system or similar system. - -```php -use Patchlevel\EventSourcing\EventBus\Message; -use Patchlevel\EventSourcing\Pipeline\Source\Source; - -$source = new class implements Source { - /** - * @return Generator - */ - public function load(): Generator - { - yield new Message( - Profile::class, - '1', - 0, - new ProfileCreated('1', ['name' => 'David']) - ); - } - - public function count(): int - { - reutrn 1; - } -} -``` - ## Target After you have a source, you still need the destination of the pipeline. diff --git a/src/Pipeline/Middleware/ChainMiddleware.php b/src/Pipeline/Middleware/ChainMiddleware.php index 50a06d51..bf67db99 100644 --- a/src/Pipeline/Middleware/ChainMiddleware.php +++ b/src/Pipeline/Middleware/ChainMiddleware.php @@ -10,9 +10,9 @@ final class ChainMiddleware implements Middleware { - /** @param iterable $translators */ + /** @param iterable $middlewares */ public function __construct( - private readonly iterable $translators, + private readonly iterable $middlewares, ) { } @@ -21,7 +21,7 @@ public function __invoke(Message $message): array { $messages = [$message]; - foreach ($this->translators as $middleware) { + foreach ($this->middlewares as $middleware) { $messages = $this->process($middleware, $messages); } @@ -33,12 +33,12 @@ public function __invoke(Message $message): array * * @return list */ - private function process(Middleware $translator, array $messages): array + private function process(Middleware $middleware, array $messages): array { $result = []; foreach ($messages as $message) { - $result += $translator($message); + $result += $middleware($message); } return array_values($result); diff --git a/src/Pipeline/Middleware/ClosureMiddleware.php b/src/Pipeline/Middleware/ClosureMiddleware.php new file mode 100644 index 00000000..33956125 --- /dev/null +++ b/src/Pipeline/Middleware/ClosureMiddleware.php @@ -0,0 +1,23 @@ + $callable */ + public function __construct( + private readonly Closure $callable, + ) { + } + + /** @return list */ + public function __invoke(Message $message): array + { + return ($this->callable)($message); + } +} diff --git a/src/Pipeline/Pipeline.php b/src/Pipeline/Pipeline.php index 8fb072c8..af78d852 100644 --- a/src/Pipeline/Pipeline.php +++ b/src/Pipeline/Pipeline.php @@ -21,7 +21,7 @@ final class Pipeline public function __construct( private readonly Target $target, array|Middleware $middlewares = [], - private readonly float|int $bufferSize = 0, + private readonly float|int $bufferSize = 1_000, ) { if (is_array($middlewares)) { $this->middleware = new ChainMiddleware($middlewares); diff --git a/tests/Unit/Pipeline/Middleware/ChainMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/ChainMiddlewareTest.php new file mode 100644 index 00000000..8f6b0ea3 --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/ChainMiddlewareTest.php @@ -0,0 +1,43 @@ +prophesize(Middleware::class); + $child1->__invoke($message)->willReturn([$message])->shouldBeCalled(); + + $child2 = $this->prophesize(Middleware::class); + $child2->__invoke($message)->willReturn([$message])->shouldBeCalled(); + + $middleware = new ChainMiddleware([ + $child1->reveal(), + $child2->reveal(), + ]); + + $middleware($message); + } +} diff --git a/tests/Unit/Pipeline/Middleware/ClosureMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/ClosureMiddlewareTest.php new file mode 100644 index 00000000..c81494a7 --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/ClosureMiddlewareTest.php @@ -0,0 +1,34 @@ +withHeader(new ArchivedHeader()); + + $result = $middleware($message); + + self::assertSame([], $result); + } + + public function testIncludeEvent(): void + { + $middleware = new ExcludeEventWithHeaderMiddleware(ArchivedHeader::class); + + $message = Message::create( + new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ), + ); + + $result = $middleware($message); + + self::assertSame([$message], $result); + } +} diff --git a/tests/Unit/Pipeline/Middleware/FilterEventMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/FilterEventMiddlewareTest.php new file mode 100644 index 00000000..5ddc0f7e --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/FilterEventMiddlewareTest.php @@ -0,0 +1,52 @@ +withHeader(new ArchivedHeader()); + + $result = $middleware($message); + + self::assertSame([$message], $result); + } +} diff --git a/tests/Unit/Pipeline/Middleware/RecalculatePlayheadMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/RecalculatePlayheadMiddlewareTest.php new file mode 100644 index 00000000..f152e804 --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/RecalculatePlayheadMiddlewareTest.php @@ -0,0 +1,109 @@ +withHeader(new AggregateHeader('profile', '1', 5, new DateTimeImmutable())); + + $result = $middleware($message); + + self::assertCount(1, $result); + self::assertSame('profile', $result[0]->header(AggregateHeader::class)->aggregateName); + self::assertSame(1, $result[0]->header(AggregateHeader::class)->playhead); + } + + public function testRecalculatePlayheadWithSamePlayhead(): void + { + $middleware = new RecalculatePlayheadMiddleware(); + + $event = new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ); + + $message = Message::create($event) + ->withHeader(new AggregateHeader('profile', '1', 1, new DateTimeImmutable())); + + $result = $middleware($message); + + self::assertSame([$message], $result); + } + + public function testRecalculateMultipleMessages(): void + { + $middleware = new RecalculatePlayheadMiddleware(); + + $event = new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ); + + $message = Message::create($event) + ->withHeader(new AggregateHeader('profile', '1', 5, new DateTimeImmutable())); + $result = $middleware($message); + + self::assertCount(1, $result); + self::assertSame('profile', $result[0]->header(AggregateHeader::class)->aggregateName); + self::assertSame(1, $result[0]->header(AggregateHeader::class)->playhead); + + $message = Message::create($event) + ->withHeader(new AggregateHeader('profile', '1', 8, new DateTimeImmutable())); + + $result = $middleware($message); + + self::assertCount(1, $result); + self::assertSame('profile', $result[0]->header(AggregateHeader::class)->aggregateName); + self::assertSame(2, $result[0]->header(AggregateHeader::class)->playhead); + } + + public function testReset(): void + { + $middleware = new RecalculatePlayheadMiddleware(); + + $event = new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ); + + $message = Message::create($event) + ->withHeader(new AggregateHeader('profile', '1', 5, new DateTimeImmutable())); + $result = $middleware($message); + + self::assertCount(1, $result); + self::assertSame('profile', $result[0]->header(AggregateHeader::class)->aggregateName); + self::assertSame(1, $result[0]->header(AggregateHeader::class)->playhead); + + $message = Message::create($event) + ->withHeader(new AggregateHeader('profile', '1', 8, new DateTimeImmutable())); + + $middleware->reset(); + $result = $middleware($message); + + self::assertCount(1, $result); + self::assertSame('profile', $result[0]->header(AggregateHeader::class)->aggregateName); + self::assertSame(1, $result[0]->header(AggregateHeader::class)->playhead); + } +} diff --git a/tests/Unit/Pipeline/Middleware/ReplaceEventMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/ReplaceEventMiddlewareTest.php new file mode 100644 index 00000000..d7098a9f --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/ReplaceEventMiddlewareTest.php @@ -0,0 +1,73 @@ +profileId, + ); + }, + ); + + $message = new Message( + new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ), + ); + + $result = $middleware($message); + + self::assertCount(1, $result); + + $event = $result[0]->event(); + + self::assertInstanceOf(ProfileVisited::class, $event); + } + + public function testReplaceInvalidClass(): void + { + /** @psalm-suppress InvalidArgument */ + $middleware = new ReplaceEventMiddleware( + MessagePublished::class, + static function (ProfileCreated $event) { + return new ProfileVisited( + $event->profileId, + ); + }, + ); + + $message = new Message( + new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ), + ); + + $result = $middleware($message); + + self::assertCount(1, $result); + + $event = $result[0]->event(); + + self::assertInstanceOf(ProfileCreated::class, $event); + } +} diff --git a/tests/Unit/Pipeline/Middleware/UntilEventMiddlewareTest.php b/tests/Unit/Pipeline/Middleware/UntilEventMiddlewareTest.php new file mode 100644 index 00000000..891d22e4 --- /dev/null +++ b/tests/Unit/Pipeline/Middleware/UntilEventMiddlewareTest.php @@ -0,0 +1,54 @@ +withHeader(new AggregateHeader('pofile', '1', 1, new DateTimeImmutable('2020-02-01 00:00:00'))); + + $result = $middleware($message); + + self::assertSame([$message], $result); + } + + public function testNegative(): void + { + $until = new DateTimeImmutable('2020-01-01 00:00:00'); + + $middleware = new UntilEventMiddleware($until); + + $message = Message::create( + new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('info@patchlevel.de'), + ), + )->withHeader(new AggregateHeader('pofile', '1', 1, new DateTimeImmutable('2020-02-01 00:00:00'))); + + $result = $middleware($message); + + self::assertSame([], $result); + } +} diff --git a/tests/Unit/Pipeline/PipelineTest.php b/tests/Unit/Pipeline/PipelineTest.php new file mode 100644 index 00000000..2f456f19 --- /dev/null +++ b/tests/Unit/Pipeline/PipelineTest.php @@ -0,0 +1,132 @@ +messages(); + + $target = new InMemoryTarget(); + $pipeline = new Pipeline($target); + + $pipeline->run($messages); + + self::assertSame($messages, $target->messages()); + } + + public function testPipelineWithMiddleware(): void + { + $messages = $this->messages(); + + $target = new InMemoryTarget(); + $pipeline = new Pipeline( + $target, + [ + new ExcludeEventMiddleware([ProfileCreated::class]), + new RecalculatePlayheadMiddleware(), + ], + ); + + $pipeline->run($messages); + + $resultMessages = $target->messages(); + + self::assertCount(3, $resultMessages); + + self::assertInstanceOf(ProfileVisited::class, $resultMessages[0]->event()); + self::assertSame('1', $resultMessages[0]->header(AggregateHeader::class)->aggregateId); + self::assertSame(1, $resultMessages[0]->header(AggregateHeader::class)->playhead); + + self::assertInstanceOf(ProfileVisited::class, $resultMessages[1]->event()); + self::assertSame('1', $resultMessages[1]->header(AggregateHeader::class)->aggregateId); + self::assertSame(2, $resultMessages[1]->header(AggregateHeader::class)->playhead); + + self::assertInstanceOf(ProfileVisited::class, $resultMessages[2]->event()); + self::assertSame('2', $resultMessages[2]->header(AggregateHeader::class)->aggregateId); + self::assertSame(1, $resultMessages[2]->header(AggregateHeader::class)->playhead); + } + + /** @return list */ + private function messages(): array + { + return [ + Message::create( + new ProfileCreated( + ProfileId::fromString('1'), + Email::fromString('hallo@patchlevel.de'), + ), + ) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 1, + new DateTimeImmutable(), + )), + Message::create( + new ProfileVisited( + ProfileId::fromString('1'), + ), + ) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 2, + new DateTimeImmutable(), + )), + Message::create( + new ProfileVisited( + ProfileId::fromString('1'), + ), + ) + ->withHeader(new AggregateHeader( + 'profile', + '1', + 3, + new DateTimeImmutable(), + )), + + Message::create( + new ProfileCreated( + ProfileId::fromString('2'), + Email::fromString('hallo@patchlevel.de'), + ), + ) + ->withHeader(new AggregateHeader( + 'profile', + '2', + 1, + new DateTimeImmutable(), + )), + + Message::create( + new ProfileVisited( + ProfileId::fromString('2'), + ), + ) + ->withHeader(new AggregateHeader( + 'profile', + '2', + 2, + new DateTimeImmutable(), + )), + ]; + } +} diff --git a/tests/Unit/Pipeline/Target/EventBusTargetTest.php b/tests/Unit/Pipeline/Target/EventBusTargetTest.php new file mode 100644 index 00000000..c9f7fd63 --- /dev/null +++ b/tests/Unit/Pipeline/Target/EventBusTargetTest.php @@ -0,0 +1,34 @@ +prophesize(EventBus::class); + $pipelineStore->dispatch($message)->shouldBeCalled(); + + $storeTarget = new EventBusTarget($pipelineStore->reveal()); + + $storeTarget->save($message); + } +} diff --git a/tests/Unit/Pipeline/Target/InMemoryTargetTest.php b/tests/Unit/Pipeline/Target/InMemoryTargetTest.php new file mode 100644 index 00000000..b18bfd0d --- /dev/null +++ b/tests/Unit/Pipeline/Target/InMemoryTargetTest.php @@ -0,0 +1,30 @@ +save($message); + + $messages = $inMemoryTarget->messages(); + + self::assertSame([$message], $messages); + } +} diff --git a/tests/Unit/Pipeline/Target/StoreTargetTest.php b/tests/Unit/Pipeline/Target/StoreTargetTest.php new file mode 100644 index 00000000..187ac642 --- /dev/null +++ b/tests/Unit/Pipeline/Target/StoreTargetTest.php @@ -0,0 +1,34 @@ +prophesize(Store::class); + $pipelineStore->save($message)->shouldBeCalled(); + + $storeTarget = new StoreTarget($pipelineStore->reveal()); + + $storeTarget->save($message); + } +}