diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index c1fbddd9b..121c82ad9 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -36,7 +36,7 @@ public function __construct( ) { } - public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): void + public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): Result { $criteria ??= new SubscriptionEngineCriteria(); @@ -47,19 +47,22 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk $this->discoverNewSubscriptions(); $this->retrySubscriptions($criteria); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, status: [Status::New], ), - function (array $subscriptions) use ($skipBooting): void { + function (array $subscriptions) use ($skipBooting): Result { if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions to setup, finish setup.'); - return; + return new Result(); } + /** @var list $errors */ + $errors = []; + $latestIndex = $this->latestIndex(); foreach ($subscriptions as $subscription) { @@ -118,8 +121,16 @@ function (array $subscriptions) use ($skipBooting): void { )); $this->handleError($subscription, $e); + + $errors[] = new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); } } + + return new Result($errors); }, ); } @@ -127,7 +138,7 @@ function (array $subscriptions) use ($skipBooting): void { public function boot( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, - ): void { + ): ProcessedResult { $criteria ??= new SubscriptionEngineCriteria(); $this->logger?->info( @@ -137,19 +148,22 @@ public function boot( $this->discoverNewSubscriptions(); $this->retrySubscriptions($criteria); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, status: [Status::Booting], ), - function ($subscriptions) use ($limit): void { + function ($subscriptions) use ($limit): ProcessedResult { if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions in booting status, finish booting.'); - return; + return new ProcessedResult(0); } + /** @var list $errors */ + $errors = []; + $startIndex = $this->lowestSubscriptionPosition($subscriptions); $this->logger?->debug( @@ -192,7 +206,13 @@ function ($subscriptions) use ($limit): void { continue; } - $this->handleMessage($index, $message, $subscription); + $error = $this->handleMessage($index, $message, $subscription); + + if (!$error) { + continue; + } + + $errors[] = $error; } $messageCounter++; @@ -212,7 +232,9 @@ function ($subscriptions) use ($limit): void { ), ); - return; + return new ProcessedResult( + $messageCounter, + ); } } } finally { @@ -258,6 +280,11 @@ function ($subscriptions) use ($limit): void { } $this->logger?->info('Subscription Engine: Finish booting.'); + + return new ProcessedResult( + $messageCounter, + $errors, + ); }, ); } @@ -265,7 +292,7 @@ function ($subscriptions) use ($limit): void { public function run( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, - ): void { + ): ProcessedResult { $criteria ??= new SubscriptionEngineCriteria(); $this->logger?->info('Subscription Engine: Start processing.'); @@ -274,19 +301,22 @@ public function run( $this->markDetachedSubscriptions($criteria); $this->retrySubscriptions($criteria); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, status: [Status::Active], ), - function (array $subscriptions) use ($limit): void { + function (array $subscriptions) use ($limit): ProcessedResult { if (count($subscriptions) === 0) { $this->logger?->info('Subscription Engine: No subscriptions to process, finish processing.'); - return; + return new ProcessedResult(0); } + /** @var list $errors */ + $errors = []; + $startIndex = $this->lowestSubscriptionPosition($subscriptions); $this->logger?->debug( @@ -328,12 +358,21 @@ function (array $subscriptions) use ($limit): void { continue; } - $this->handleMessage($index, $message, $subscription); + $error = $this->handleMessage($index, $message, $subscription); + + if (!$error) { + continue; + } + + $errors[] = $error; } $messageCounter++; - $this->logger?->debug(sprintf('Subscription Engine: Current event stream position: %s', $index)); + $this->logger?->debug(sprintf( + 'Subscription Engine: Current event stream position: %s', + $index, + )); if ($limit !== null && $messageCounter >= $limit) { $this->logger?->info( @@ -343,7 +382,7 @@ function (array $subscriptions) use ($limit): void { ), ); - return; + return new ProcessedResult($messageCounter, $errors); } } } finally { @@ -385,11 +424,13 @@ function (array $subscriptions) use ($limit): void { $endIndex, ), ); + + return new ProcessedResult($messageCounter, $errors); }, ); } - public function teardown(SubscriptionEngineCriteria|null $criteria = null): void + public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result { $criteria ??= new SubscriptionEngineCriteria(); @@ -397,13 +438,16 @@ public function teardown(SubscriptionEngineCriteria|null $criteria = null): void $this->logger?->info('Subscription Engine: Start teardown detached subscriptions.'); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, status: [Status::Detached], ), - function (array $subscriptions): void { + function (array $subscriptions): Result { + /** @var list $errors */ + $errors = []; + foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); @@ -451,6 +495,13 @@ function (array $subscriptions): void { $e->getMessage(), ), ); + + $errors[] = new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); + continue; } @@ -465,22 +516,27 @@ function (array $subscriptions): void { } $this->logger?->info('Subscription Engine: Finish teardown.'); + + return new Result($errors); }, ); } - public function remove(SubscriptionEngineCriteria|null $criteria = null): void + public function remove(SubscriptionEngineCriteria|null $criteria = null): Result { $criteria ??= new SubscriptionEngineCriteria(); $this->discoverNewSubscriptions(); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, ), - function (array $subscriptions): void { + function (array $subscriptions): Result { + /** @var list $errors */ + $errors = []; + foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); @@ -519,6 +575,12 @@ function (array $subscriptions): void { $e->getMessage(), ), ); + + $errors[] = new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); } $this->subscriptionStore->remove($subscription); @@ -527,17 +589,19 @@ function (array $subscriptions): void { sprintf('Subscription Engine: Subscription "%s" removed.', $subscription->id()), ); } + + return new Result($errors); }, ); } - public function reactivate(SubscriptionEngineCriteria|null $criteria = null): void + public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Result { $criteria ??= new SubscriptionEngineCriteria(); $this->discoverNewSubscriptions(); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, @@ -548,14 +612,16 @@ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): vo Status::Finished, ], ), - function (array $subscriptions): void { - /** @var Subscription $subscription */ + function (array $subscriptions): Result { foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); if (!$subscriber) { $this->logger?->debug( - sprintf('Subscription Engine: Subscriber for "%s" not found, skipped.', $subscription->id()), + sprintf( + 'Subscription Engine: Subscriber for "%s" not found, skipped.', + $subscription->id(), + ), ); continue; @@ -587,17 +653,19 @@ function (array $subscriptions): void { $subscription->id(), )); } + + return new Result(); }, ); } - public function pause(SubscriptionEngineCriteria|null $criteria = null): void + public function pause(SubscriptionEngineCriteria|null $criteria = null): Result { $criteria ??= new SubscriptionEngineCriteria(); $this->discoverNewSubscriptions(); - $this->findForUpdate( + return $this->findForUpdate( new SubscriptionCriteria( ids: $criteria->ids, groups: $criteria->groups, @@ -607,14 +675,17 @@ public function pause(SubscriptionEngineCriteria|null $criteria = null): void Status::Error, ], ), - function (array $subscriptions): void { + function (array $subscriptions): Result { /** @var Subscription $subscription */ foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); if (!$subscriber) { $this->logger?->debug( - sprintf('Subscription Engine: Subscriber for "%s" not found, skipped.', $subscription->id()), + sprintf( + 'Subscription Engine: Subscriber for "%s" not found, skipped.', + $subscription->id(), + ), ); continue; @@ -629,6 +700,8 @@ function (array $subscriptions): void { $subscription->id(), )); } + + return new Result(); }, ); } @@ -648,7 +721,7 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): ); } - private function handleMessage(int $index, Message $message, Subscription $subscription): void + private function handleMessage(int $index, Message $message, Subscription $subscription): Error|null { $subscriber = $this->subscriber($subscription->id()); @@ -670,7 +743,7 @@ private function handleMessage(int $index, Message $message, Subscription $subsc ), ); - return; + return null; } try { @@ -690,7 +763,11 @@ private function handleMessage(int $index, Message $message, Subscription $subsc $this->handleError($subscription, $e); - return; + return new Error( + $subscription->id(), + $e->getMessage(), + $e, + ); } $subscription->changePosition($index); @@ -704,6 +781,8 @@ private function handleMessage(int $index, Message $message, Subscription $subsc $message->event()::class, ), ); + + return null; } private function subscriber(string $subscriberId): SubscriberAccessor|null @@ -846,20 +925,27 @@ private function lowestSubscriptionPosition(array $subscriptions): int return $min; } - /** @param Closure(list):void $closure */ - private function findForUpdate(SubscriptionCriteria $criteria, Closure $closure): void + /** + * @param Closure(list):T $closure + * + * @return T + * + * @template T + */ + private function findForUpdate(SubscriptionCriteria $criteria, Closure $closure): mixed { if (!$this->subscriptionStore instanceof LockableSubscriptionStore) { - $closure($this->subscriptionStore->find($criteria)); - - return; + return $closure($this->subscriptionStore->find($criteria)); } - $this->subscriptionStore->inLock(function () use ($closure, $criteria): void { - $subscriptions = $this->subscriptionStore->find($criteria); + return $this->subscriptionStore->inLock( + /** @return T */ + function () use ($closure, $criteria): mixed { + $subscriptions = $this->subscriptionStore->find($criteria); - $closure($subscriptions); - }); + return $closure($subscriptions); + }, + ); } private function handleError(Subscription $subscription, Throwable $throwable): void diff --git a/src/Subscription/Engine/Error.php b/src/Subscription/Engine/Error.php new file mode 100644 index 000000000..c28021a6a --- /dev/null +++ b/src/Subscription/Engine/Error.php @@ -0,0 +1,17 @@ + $errors */ + public function __construct( + public readonly int $processedMessages, + public readonly array $errors = [], + ) { + } +} diff --git a/src/Subscription/Engine/Result.php b/src/Subscription/Engine/Result.php new file mode 100644 index 000000000..d644bb17d --- /dev/null +++ b/src/Subscription/Engine/Result.php @@ -0,0 +1,14 @@ + $errors */ + public function __construct( + public readonly array $errors = [], + ) { + } +} diff --git a/src/Subscription/Engine/SubscriptionEngine.php b/src/Subscription/Engine/SubscriptionEngine.php index 0e4229eaa..bdba70f12 100644 --- a/src/Subscription/Engine/SubscriptionEngine.php +++ b/src/Subscription/Engine/SubscriptionEngine.php @@ -8,7 +8,7 @@ interface SubscriptionEngine { - public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): void; + public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): Result; /** * @param positive-int|null $limit @@ -18,7 +18,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk public function boot( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, - ): void; + ): ProcessedResult; /** * @param positive-int|null $limit @@ -28,15 +28,15 @@ public function boot( public function run( SubscriptionEngineCriteria|null $criteria = null, int|null $limit = null, - ): void; + ): ProcessedResult; - public function teardown(SubscriptionEngineCriteria|null $criteria = null): void; + public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result; - public function remove(SubscriptionEngineCriteria|null $criteria = null): void; + public function remove(SubscriptionEngineCriteria|null $criteria = null): Result; - public function reactivate(SubscriptionEngineCriteria|null $criteria = null): void; + public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Result; - public function pause(SubscriptionEngineCriteria|null $criteria = null): void; + public function pause(SubscriptionEngineCriteria|null $criteria = null): Result; /** @return list */ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array; diff --git a/src/Subscription/Store/DoctrineSubscriptionStore.php b/src/Subscription/Store/DoctrineSubscriptionStore.php index 63a6d5c44..661d833d9 100644 --- a/src/Subscription/Store/DoctrineSubscriptionStore.php +++ b/src/Subscription/Store/DoctrineSubscriptionStore.php @@ -182,12 +182,21 @@ public function remove(Subscription $subscription): void $this->connection->delete($this->tableName, ['id' => $subscription->id()]); } - public function inLock(Closure $closure): void + /** + * @param Closure():T $closure + * + * @return T + * + * @throws TransactionCommitNotPossible + * + * @template T + */ + public function inLock(Closure $closure): mixed { $this->connection->beginTransaction(); try { - $closure(); + return $closure(); } finally { try { $this->connection->commit(); diff --git a/src/Subscription/Store/LockableSubscriptionStore.php b/src/Subscription/Store/LockableSubscriptionStore.php index 48d62366e..2163aa6a7 100644 --- a/src/Subscription/Store/LockableSubscriptionStore.php +++ b/src/Subscription/Store/LockableSubscriptionStore.php @@ -8,5 +8,14 @@ interface LockableSubscriptionStore extends SubscriptionStore { - public function inLock(Closure $closure): void; + /** + * @param Closure():T $closure + * + * @return T + * + * @throws TransactionCommitNotPossible + * + * @template T + */ + public function inLock(Closure $closure): mixed; } diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 603dd9bce..290bb8761 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -105,8 +105,14 @@ public function testHappyPath(): void $engine->subscriptions(), ); - $engine->setup(); - $engine->boot(); + $result = $engine->setup(); + + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals( [ @@ -124,7 +130,10 @@ public function testHappyPath(): void $profile = Profile::create(ProfileId::fromString('1'), 'John'); $repository->save($profile); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals( [ @@ -150,7 +159,8 @@ public function testHappyPath(): void self::assertSame('1', $result['id']); self::assertSame('John', $result['name']); - $engine->remove(); + $result = $engine->remove(); + self::assertEquals([], $result->errors); self::assertEquals( [ @@ -213,8 +223,12 @@ public function testErrorHandling(): void ), ); - $engine->setup(); - $engine->boot(); + $result = $engine->setup(); + self::assertEquals([], $result->errors); + + $result = $engine->boot(); + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -228,7 +242,16 @@ public function testErrorHandling(): void $repository->save($profile); $subscriber->subscribeError = true; - $engine->run(); + + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -237,7 +260,10 @@ public function testErrorHandling(): void self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus); self::assertEquals(0, $subscription->retryAttempt()); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -248,7 +274,15 @@ public function testErrorHandling(): void $clock->sleep(5); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -259,7 +293,15 @@ public function testErrorHandling(): void $clock->sleep(10); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -278,7 +320,15 @@ public function testErrorHandling(): void self::assertEquals(null, $subscription->subscriptionError()); self::assertEquals(0, $subscription->retryAttempt()); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals('error_producer', $error->subscriptionId); + self::assertEquals('subscribe error', $error->message); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); @@ -290,7 +340,10 @@ public function testErrorHandling(): void $clock->sleep(5); $subscriber->subscribeError = false; - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); $subscription = self::findSubscription($engine->subscriptions(), 'error_producer'); diff --git a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php index e6ce5068c..3742081df 100644 --- a/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php +++ b/tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php @@ -55,10 +55,11 @@ public function testNothingToSetup(): void logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); self::assertEquals([], $store->addedSubscriptions); self::assertEquals([], $store->updatedSubscriptions); + self::assertEquals([], $result->errors); } public function testSetupWithoutCreateMethod(): void @@ -82,7 +83,9 @@ class { logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -131,7 +134,9 @@ public function create(): void logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -187,7 +192,15 @@ public function create(): void logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); + + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); self::assertEquals( [ @@ -236,7 +249,9 @@ class { logger: new NullLogger(), ); - $engine->setup(null, true); + $result = $engine->setup(null, true); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -276,7 +291,9 @@ class { logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -315,7 +332,9 @@ class { logger: new NullLogger(), ); - $engine->setup(); + $result = $engine->setup(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -342,7 +361,10 @@ public function testNothingToBoot(): void logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([], $store->addedSubscriptions); self::assertEquals([], $store->updatedSubscriptions); @@ -367,7 +389,10 @@ class { logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -416,7 +441,10 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->addedSubscriptions); @@ -440,6 +468,74 @@ public function handle(Message $message): void self::assertSame($message, $subscriber->message); } + public function testBootWithError(): void + { + $subscriptionId = 'test'; + $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] + class { + public function __construct( + public readonly RuntimeException $exception = new RuntimeException('ERROR'), + ) { + } + + #[Subscribe(ProfileVisited::class)] + public function handle(Message $message): void + { + throw $this->exception; + } + }; + + $subscriptionStore = new DummySubscriptionStore([ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Booting, + ), + ]); + + $message = new Message(new ProfileVisited(ProfileId::fromString('test'))); + + $streamableStore = $this->prophesize(Store::class); + $streamableStore->load($this->criteria())->willReturn(new ArrayStream([$message]))->shouldBeCalledOnce(); + + $engine = new DefaultSubscriptionEngine( + $streamableStore->reveal(), + $subscriptionStore, + new MetadataSubscriberAccessorRepository([$subscriber]), + logger: new NullLogger(), + ); + + $result = $engine->boot(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); + + self::assertEquals( + [ + new Subscription( + $subscriptionId, + Subscription::DEFAULT_GROUP, + RunMode::FromBeginning, + Status::Error, + 0, + new SubscriptionError( + 'ERROR', + Status::Booting, + ThrowableToErrorContextTransformer::transform($subscriber->exception), + ), + ), + ], + $subscriptionStore->updatedSubscriptions, + ); + } + public function testBootWithLimit(): void { $subscriptionId = 'test'; @@ -475,7 +571,10 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->boot(new SubscriptionEngineCriteria(), 1); + $result = $engine->boot(new SubscriptionEngineCriteria(), 1); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->addedSubscriptions); @@ -546,7 +645,10 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -620,7 +722,10 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(2, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -677,7 +782,10 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->boot(); + $result = $engine->boot(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -716,7 +824,10 @@ class { logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -763,7 +874,10 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -817,7 +931,10 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(new SubscriptionEngineCriteria(), 1); + $result = $engine->run(new SubscriptionEngineCriteria(), 1); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -886,7 +1003,10 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -947,7 +1067,16 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); self::assertEquals( [ @@ -968,7 +1097,7 @@ public function handle(Message $message): void ); } - public function testRunningMarkOutdated(): void + public function testRunningMarkDetached(): void { $subscriptionId = 'test'; @@ -991,7 +1120,10 @@ public function testRunningMarkOutdated(): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1027,7 +1159,10 @@ public function testRunningWithoutActiveSubscribers(): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); } @@ -1069,7 +1204,10 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(2, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1119,7 +1257,10 @@ public function handle(Message $message): void logger: new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1158,7 +1299,9 @@ class { logger: new NullLogger(), ); - $engine->teardown(); + $result = $engine->teardown(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1195,7 +1338,9 @@ class { logger: new NullLogger(), ); - $engine->teardown(); + $result = $engine->teardown(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1234,7 +1379,9 @@ public function drop(): void logger: new NullLogger(), ); - $engine->teardown(); + $result = $engine->teardown(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1274,7 +1421,15 @@ public function drop(): void logger: new NullLogger(), ); - $engine->teardown(); + $result = $engine->teardown(); + + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([], $subscriptionStore->removedSubscriptions); @@ -1302,7 +1457,9 @@ public function testTeardownWithoutSubscriber(): void logger: new NullLogger(), ); - $engine->teardown(); + $result = $engine->teardown(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([], $subscriptionStore->removedSubscriptions); @@ -1325,7 +1482,9 @@ class { logger: new NullLogger(), ); - $engine->remove(); + $result = $engine->remove(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1368,7 +1527,9 @@ public function drop(): void logger: new NullLogger(), ); - $engine->remove(); + $result = $engine->remove(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1399,7 +1560,9 @@ class { logger: new NullLogger(), ); - $engine->remove(); + $result = $engine->remove(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1436,7 +1599,15 @@ public function drop(): void logger: new NullLogger(), ); - $engine->remove(); + $result = $engine->remove(); + + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1463,7 +1634,9 @@ public function testRemoveWithoutSubscriber(): void logger: new NullLogger(), ); - $engine->remove(); + $result = $engine->remove(); + + self::assertEquals([], $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); self::assertEquals([$subscription], $subscriptionStore->removedSubscriptions); @@ -1486,7 +1659,9 @@ class { logger: new NullLogger(), ); - $engine->reactivate(); + $result = $engine->reactivate(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1525,7 +1700,9 @@ class { logger: new NullLogger(), ); - $engine->reactivate(); + $result = $engine->reactivate(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1538,7 +1715,7 @@ class { ], $subscriptionStore->updatedSubscriptions); } - public function testReactivateOutdated(): void + public function testReactivateDetached(): void { $subscriptionId = 'test'; $subscriber = new #[Subscriber('test', RunMode::FromBeginning)] @@ -1563,7 +1740,9 @@ class { logger: new NullLogger(), ); - $engine->reactivate(); + $result = $engine->reactivate(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1600,7 +1779,9 @@ class { logger: new NullLogger(), ); - $engine->reactivate(); + $result = $engine->reactivate(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1637,7 +1818,9 @@ class { logger: new NullLogger(), ); - $engine->reactivate(); + $result = $engine->reactivate(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1666,7 +1849,9 @@ class { logger: new NullLogger(), ); - $engine->pause(); + $result = $engine->pause(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1703,7 +1888,9 @@ class { logger: new NullLogger(), ); - $engine->pause(); + $result = $engine->pause(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1740,7 +1927,9 @@ class { logger: new NullLogger(), ); - $engine->pause(); + $result = $engine->pause(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1779,7 +1968,9 @@ class { logger: new NullLogger(), ); - $engine->pause(); + $result = $engine->pause(); + + self::assertEquals([], $result->errors); self::assertEquals([ new Subscription( @@ -1870,7 +2061,16 @@ public function subscribe(): void new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(1, $result->processedMessages); + self::assertCount(1, $result->errors); + + $error = $result->errors[0]; + + self::assertEquals($subscriptionId, $error->subscriptionId); + self::assertEquals('ERROR2', $error->message); + self::assertInstanceOf(RuntimeException::class, $error->throwable); self::assertCount(2, $subscriptionStore->updatedSubscriptions); @@ -1923,7 +2123,10 @@ class { new NullLogger(), ); - $engine->run(); + $result = $engine->run(); + + self::assertEquals(0, $result->processedMessages); + self::assertCount(0, $result->errors); self::assertEquals([], $subscriptionStore->updatedSubscriptions); }