Skip to content

Commit

Permalink
Merge pull request #574 from patchlevel/subscription-engine-result
Browse files Browse the repository at this point in the history
add subscription engine result
  • Loading branch information
DavidBadura authored Apr 12, 2024
2 parents a90bbdc + c7a7fee commit b573bcc
Show file tree
Hide file tree
Showing 11 changed files with 546 additions and 155 deletions.
2 changes: 1 addition & 1 deletion docs/pages/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ $schemaDirector = new DoctrineSchemaDirector($connection, $store);
$cli->addCommands([
new Command\DatabaseCreateCommand($connection, $doctrineHelper),
new Command\DatabaseDropCommand($connection, $doctrineHelper),
new Command\SubscriptionBootCommand($subscriptionEngine, $store),
new Command\SubscriptionBootCommand($subscriptionEngine),
new Command\SubscriptionPauseCommand($subscriptionEngine),
new Command\SubscriptionRunCommand($subscriptionEngine, $store),
new Command\SubscriptionTeardownCommand($subscriptionEngine),
Expand Down
48 changes: 6 additions & 42 deletions src/Console/Command/SubscriptionBootCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@

use Closure;
use Patchlevel\EventSourcing\Console\InputHelper;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\SubscriptionStore;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
use Patchlevel\Worker\DefaultWorker;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
Expand All @@ -23,13 +19,6 @@
)]
final class SubscriptionBootCommand extends SubscriptionCommand
{
public function __construct(
SubscriptionEngine $engine,
private readonly Store $store,
) {
parent::__construct($engine);
}

public function configure(): void
{
parent::configure();
Expand Down Expand Up @@ -86,34 +75,23 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$criteria = $this->subscriptionEngineCriteria($input);
$criteria = $this->resolveCriteriaIntoCriteriaWithOnlyIds($criteria);

if ($this->store instanceof SubscriptionStore) {
$this->store->setupSubscription();
}

if ($setup) {
$this->engine->setup($criteria);
}

$logger = new ConsoleLogger($output);

$finished = false;

$worker = DefaultWorker::create(
function (Closure $stop) use ($criteria, $messageLimit, &$finished, $sleep): void {
$this->engine->boot($criteria, $messageLimit);

if ($this->isBootingFinished($criteria)) {
$finished = true;
$stop();
function (Closure $stop) use ($criteria, $messageLimit, &$finished): void {
$result = $this->engine->boot($criteria, $messageLimit);

if (!$result->streamFinished) {
return;
}

if (!$this->store instanceof SubscriptionStore) {
return;
}

$this->store->wait($sleep);
$finished = true;
$stop();
},
[
'runLimit' => $runLimit,
Expand All @@ -123,22 +101,8 @@ function (Closure $stop) use ($criteria, $messageLimit, &$finished, $sleep): voi
$logger,
);

$supportSubscription = $this->store instanceof SubscriptionStore && $this->store->supportSubscription();
$worker->run($supportSubscription ? 0 : $sleep);
$worker->run($sleep);

return $finished ? 0 : 1;
}

private function isBootingFinished(SubscriptionEngineCriteria $criteria): bool
{
$subscriptions = $this->engine->subscriptions($criteria);

foreach ($subscriptions as $subscription) {
if ($subscription->isBooting()) {
return false;
}
}

return true;
}
}
Loading

0 comments on commit b573bcc

Please sign in to comment.