diff --git a/src/Subscription/Engine/DefaultMessageLoader.php b/src/Subscription/Engine/DefaultMessageLoader.php new file mode 100644 index 00000000..87c53978 --- /dev/null +++ b/src/Subscription/Engine/DefaultMessageLoader.php @@ -0,0 +1,32 @@ + $subscriptions */ + public function load(int $startIndex, array $subscriptions): Stream + { + return $this->store->load(new Criteria(new FromIndexCriterion($startIndex))); + } + + public function lastIndex(): int + { + $stream = $this->store->load(null, 1, null, true); + + return $stream->index() ?: 0; + } +} diff --git a/src/Subscription/Engine/DefaultSubscriptionEngine.php b/src/Subscription/Engine/DefaultSubscriptionEngine.php index daa93f90..d6b59f1f 100644 --- a/src/Subscription/Engine/DefaultSubscriptionEngine.php +++ b/src/Subscription/Engine/DefaultSubscriptionEngine.php @@ -6,10 +6,6 @@ use Closure; use Patchlevel\EventSourcing\Message\Message; -use Patchlevel\EventSourcing\Metadata\Event\EventMetadataFactory; -use Patchlevel\EventSourcing\Store\Criteria\Criteria; -use Patchlevel\EventSourcing\Store\Criteria\EventsCriterion; -use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy; @@ -18,14 +14,12 @@ use Patchlevel\EventSourcing\Subscription\Store\LockableSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Store\SubscriptionCriteria; use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore; -use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessor; use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessorRepository; use Patchlevel\EventSourcing\Subscription\Subscription; use Psr\Log\LoggerInterface; use Throwable; -use function array_keys; use function count; use function in_array; use function sprintf; @@ -34,14 +28,20 @@ final class DefaultSubscriptionEngine implements SubscriptionEngine { private bool $processing = false; + private readonly MessageLoader $messageLoader; + public function __construct( - private readonly Store $messageStore, + Store|MessageLoader $messageStore, private readonly SubscriptionStore $subscriptionStore, private readonly SubscriberAccessorRepository $subscriberRepository, private readonly RetryStrategy $retryStrategy = new ClockBasedRetryStrategy(), private readonly LoggerInterface|null $logger = null, - private readonly EventMetadataFactory|null $eventMetadataFactory = null, ) { + if ($messageStore instanceof MessageLoader) { + $this->messageLoader = $messageStore; + } else { + $this->messageLoader = new DefaultMessageLoader($messageStore); + } } public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): Result @@ -71,7 +71,7 @@ function (array $subscriptions) use ($skipBooting): Result { /** @var list $errors */ $errors = []; - $latestIndex = $this->latestIndex(); + $latestIndex = $this->messageLoader->lastIndex(); foreach ($subscriptions as $subscription) { $subscriber = $this->subscriber($subscription->id()); @@ -192,15 +192,7 @@ function ($subscriptions) use ($limit): ProcessedResult { $messageCounter = 0; try { - $criteria = new Criteria(new FromIndexCriterion($startIndex)); - - $events = $this->events($subscriptions); - - if ($events) { - $criteria = $criteria->add(new EventsCriterion($events)); - } - - $stream = $this->messageStore->load($criteria); + $stream = $this->messageLoader->load($startIndex, $subscriptions); foreach ($stream as $message) { $index = $stream->index(); @@ -364,15 +356,7 @@ function (array $subscriptions) use ($limit): ProcessedResult { $messageCounter = 0; try { - $criteria = new Criteria(new FromIndexCriterion($startIndex)); - - $events = $this->events($subscriptions); - - if ($events) { - $criteria = $criteria->add(new EventsCriterion($events)); - } - - $stream = $this->messageStore->load($criteria); + $stream = $this->messageLoader->load($startIndex, $subscriptions); foreach ($stream as $message) { $index = $stream->index(); @@ -933,7 +917,7 @@ function (array $subscriptions): void { if ($subscriber->setupMethod() === null && $subscriber->runMode() === RunMode::FromNow) { if ($latestIndex === null) { - $latestIndex = $this->latestIndex(); + $latestIndex = $this->messageLoader->lastIndex(); } $subscription->changePosition($latestIndex); @@ -953,13 +937,6 @@ function (array $subscriptions): void { ); } - private function latestIndex(): int - { - $stream = $this->messageStore->load(null, 1, null, true); - - return $stream->index() ?: 0; - } - /** @param list $subscriptions */ private function lowestSubscriptionPosition(array $subscriptions): int { @@ -1008,44 +985,4 @@ private function handleError(Subscription $subscription, Throwable $throwable): $subscription->error($throwable); $this->subscriptionStore->update($subscription); } - - /** - * @param list $subscriptions - * - * @return list - */ - private function events(array $subscriptions): array - { - if ($this->eventMetadataFactory === null) { - return []; - } - - $eventNames = []; - - foreach ($subscriptions as $subscription) { - $subscriber = $this->subscriber($subscription->id()); - - if (!$subscriber instanceof MetadataSubscriberAccessor) { - return []; - } - - $events = $subscriber->events(); - - foreach ($events as $event) { - if ($event === '*') { - return []; - } - - $metadata = $this->eventMetadataFactory->metadata($event); - - $eventNames[$metadata->name] = true; - - foreach ($metadata->aliases as $alias) { - $eventNames[$alias] = true; - } - } - } - - return array_keys($eventNames); - } } diff --git a/src/Subscription/Engine/EventFilteredMessageLoader.php b/src/Subscription/Engine/EventFilteredMessageLoader.php new file mode 100644 index 00000000..9d137058 --- /dev/null +++ b/src/Subscription/Engine/EventFilteredMessageLoader.php @@ -0,0 +1,84 @@ + $subscriptions */ + public function load(int $startIndex, array $subscriptions): Stream + { + $criteria = new Criteria(new FromIndexCriterion($startIndex)); + + $events = $this->events($subscriptions); + + if ($events !== []) { + $criteria = $criteria->add(new EventsCriterion($events)); + } + + return $this->store->load($criteria); + } + + /** + * @param list $subscriptions + * + * @return list + */ + private function events(array $subscriptions): array + { + $eventNames = []; + + foreach ($subscriptions as $subscription) { + $subscriber = $this->subscriberRepository->get($subscription->id()); + + if (!$subscriber instanceof MetadataSubscriberAccessor) { + return []; + } + + $events = $subscriber->events(); + + foreach ($events as $event) { + if ($event === '*') { + return []; + } + + $metadata = $this->eventMetadataFactory->metadata($event); + + $eventNames[$metadata->name] = true; + + foreach ($metadata->aliases as $alias) { + $eventNames[$alias] = true; + } + } + } + + return array_keys($eventNames); + } + + public function lastIndex(): int + { + $stream = $this->store->load(null, 1, null, true); + + return $stream->index() ?: 0; + } +} diff --git a/src/Subscription/Engine/MessageLoader.php b/src/Subscription/Engine/MessageLoader.php new file mode 100644 index 00000000..1bdf5a65 --- /dev/null +++ b/src/Subscription/Engine/MessageLoader.php @@ -0,0 +1,16 @@ + $subscriptions */ + public function load(int $startIndex, array $subscriptions): Stream; + + public function lastIndex(): int; +} diff --git a/tests/Benchmark/SubscriptionEngineBench.php b/tests/Benchmark/SubscriptionEngineBench.php index 2dc0631d..4f41d000 100644 --- a/tests/Benchmark/SubscriptionEngineBench.php +++ b/tests/Benchmark/SubscriptionEngineBench.php @@ -14,6 +14,7 @@ use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Store\DoctrineSubscriptionStore; use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository; @@ -70,16 +71,21 @@ public function setUp(): void $this->repository->save($profile); + $subscriberAccessorRepository = new MetadataSubscriberAccessorRepository( + [ + new ProfileProjector($connection), + new SendEmailProcessor(), + ], + ); + $this->subscriptionEngine = new DefaultSubscriptionEngine( - $this->store, - $subscriptionStore, - new MetadataSubscriberAccessorRepository( - [ - new ProfileProjector($connection), - new SendEmailProcessor(), - ], + new EventFilteredMessageLoader( + $this->store, + new AttributeEventMetadataFactory(), + $subscriberAccessorRepository, ), - eventMetadataFactory: new AttributeEventMetadataFactory(), + $subscriptionStore, + $subscriberAccessorRepository, ); } diff --git a/tests/Integration/Subscription/SubscriptionTest.php b/tests/Integration/Subscription/SubscriptionTest.php index 56db3700..4ca5035b 100644 --- a/tests/Integration/Subscription/SubscriptionTest.php +++ b/tests/Integration/Subscription/SubscriptionTest.php @@ -25,6 +25,7 @@ use Patchlevel\EventSourcing\Store\DoctrineDbalStore; use Patchlevel\EventSourcing\Subscription\Engine\CatchUpSubscriptionEngine; use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine; +use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredMessageLoader; use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria; use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy; use Patchlevel\EventSourcing\Subscription\RunMode; @@ -94,11 +95,12 @@ public function testHappyPath(): void $schemaDirector->create(); + $subscriberRepository = new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]); + $engine = new DefaultSubscriptionEngine( - $store, + new EventFilteredMessageLoader($store, new AttributeEventMetadataFactory(), $subscriberRepository), $subscriptionStore, - new MetadataSubscriberAccessorRepository([new ProfileProjection($this->projectionConnection)]), - eventMetadataFactory: new AttributeEventMetadataFactory(), + $subscriberRepository, ); self::assertEquals(