Skip to content

Commit

Permalink
Merge pull request #662 from patchlevel/only-retry-relevant-subscript…
Browse files Browse the repository at this point in the history
…ions

only retry relevant subscriptions
  • Loading branch information
DavidBadura authored Dec 29, 2024
2 parents 774ca7e + 26d177c commit 94a73ae
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 13 deletions.
19 changes: 6 additions & 13 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use Throwable;

use function count;
use function in_array;
use function sprintf;

final class DefaultSubscriptionEngine implements SubscriptionEngine
Expand Down Expand Up @@ -60,7 +59,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk
);

$this->discoverNewSubscriptions();
$this->retrySubscriptions($criteria);
$this->retrySubscriptions($criteria, Status::New);

return $this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
Expand Down Expand Up @@ -169,7 +168,7 @@ public function boot(
);

$this->discoverNewSubscriptions();
$this->retrySubscriptions($criteria);
$this->retrySubscriptions($criteria, Status::Booting);

return $this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
Expand Down Expand Up @@ -340,7 +339,7 @@ public function run(

$this->discoverNewSubscriptions();
$this->markDetachedSubscriptions($criteria);
$this->retrySubscriptions($criteria);
$this->retrySubscriptions($criteria, Status::Active);

return $this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
Expand Down Expand Up @@ -901,15 +900,15 @@ function (SubscriptionCollection $subscriptions): void {
);
}

private function retrySubscriptions(SubscriptionEngineCriteria $criteria): void
private function retrySubscriptions(SubscriptionEngineCriteria $criteria, Status $previousStatus): void
{
$this->subscriptionManager->findForUpdate(
new SubscriptionCriteria(
ids: $criteria->ids,
groups: $criteria->groups,
status: [Status::Error],
),
function (SubscriptionCollection $subscriptions): void {
function (SubscriptionCollection $subscriptions) use ($previousStatus): void {
/** @var Subscription $subscription */
foreach ($subscriptions as $subscription) {
$error = $subscription->subscriptionError();
Expand All @@ -918,13 +917,7 @@ function (SubscriptionCollection $subscriptions): void {
continue;
}

$retryable = in_array(
$error->previousStatus,
[Status::New, Status::Booting, Status::Active],
true,
);

if (!$retryable) {
if ($error->previousStatus !== $previousStatus) {
continue;
}

Expand Down
52 changes: 52 additions & 0 deletions tests/Unit/Subscription/Engine/DefaultSubscriptionEngineTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3050,6 +3050,58 @@ public function subscribe(): void
self::assertEquals(1, $update2->retryAttempt());
}

#[DataProvider('statusProvider')]
public function testShouldNotRetryOtherStatus(string $method, string $status): void
{
$subscriptionId = 'test';
$subscriber = new #[Subscriber('test', RunMode::FromBeginning)]
class {
};

$streamableStore = $this->prophesize(Store::class);

$subscription = new Subscription(
$subscriptionId,
Subscription::DEFAULT_GROUP,
RunMode::FromBeginning,
Status::Error,
0,
new SubscriptionError('ERROR', Status::from($status)),
);

$subscriptionStore = new DummySubscriptionStore([$subscription]);

$retryStrategy = $this->prophesize(RetryStrategy::class);
$retryStrategy->shouldRetry($subscription)->shouldNotBeCalled();

$engine = new DefaultSubscriptionEngine(
$streamableStore->reveal(),
$subscriptionStore,
new MetadataSubscriberAccessorRepository([$subscriber]),
$retryStrategy->reveal(),
new NullLogger(),
);

$result = match ($method) {
'setup' => $engine->setup(),
'boot' => $engine->boot(),
'run' => $engine->run(),
};

self::assertCount(0, $result->errors);
self::assertEquals([], $subscriptionStore->updatedSubscriptions);
}

public static function statusProvider(): Generator
{
yield 'setup_booting' => ['setup', 'booting'];
yield 'setup_active' => ['setup', 'active'];
yield 'boot_new' => ['boot', 'new'];
yield 'boot_active' => ['boot', 'active'];
yield 'run_new' => ['run', 'new'];
yield 'run_booting' => ['run', 'booting'];
}

public function testShouldNotRetry(): void
{
$subscriptionId = 'test';
Expand Down

0 comments on commit 94a73ae

Please sign in to comment.