Skip to content

Commit

Permalink
add failed status in subscription engine
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Dec 30, 2024
1 parent 36630de commit 692ad42
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 10 deletions.
16 changes: 13 additions & 3 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,11 @@ stateDiagram-v2
Error --> Booting
Error --> Active
Error --> Paused
Error --> [*]
Error --> Failed
Failed --> New
Failed --> Booting
Failed --> Active
Failed --> [*]
Detached --> Active
Detached --> [*]
```
Expand Down Expand Up @@ -632,9 +636,15 @@ If an error occurs in a subscriber, then the subscription is set to Error.
This can happen in the create process, in the boot process or in the run process.
This subscription will then no longer boot/run until the subscription is reactivate or retried.

The subscription engine has a retry strategy to retry subscriptions that have failed.
The subscription engine has a retry strategy to retry subscriptions that have an error.
It tries to reactivate the subscription after a certain time and a certain number of attempts.
If this does not work, the subscription is set to error and must be manually reactivated.
If this does not work, the subscription changes the status to failed.

### Failed

If the retry strategy says that the subscription should not be retried anymore,
e.g. the maximum number of retry attempts has been reached, then the subscription is set to failed.
The subscription will be now ignored by the subscription engine in all future runs.

There are two options here:

Expand Down
9 changes: 8 additions & 1 deletion src/Subscription/Engine/DefaultSubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ConditionalRetryStrategy;
use Patchlevel\EventSourcing\Subscription\RetryStrategy\RetryStrategy;
use Patchlevel\EventSourcing\Subscription\RunMode;
use Patchlevel\EventSourcing\Subscription\Status;
Expand Down Expand Up @@ -667,6 +668,7 @@ public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Re
groups: $criteria->groups,
status: [
Status::Error,
Status::Failed,
Status::Detached,
Status::Paused,
Status::Finished,
Expand Down Expand Up @@ -984,7 +986,12 @@ private function discoverNewSubscriptions(): void

private function handleError(Subscription $subscription, Throwable $throwable): void
{
$subscription->error($throwable);
if ($this->retryStrategy instanceof ConditionalRetryStrategy && !$this->retryStrategy->canRetry($subscription)) {
$subscription->failed($throwable);
} else {
$subscription->error($throwable);
}

$this->subscriptionManager->update($subscription);

if (!isset($this->batching[$subscription->id()])) {
Expand Down
9 changes: 7 additions & 2 deletions src/Subscription/RetryStrategy/ClockBasedRetryStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use function round;
use function sprintf;

final class ClockBasedRetryStrategy implements RetryStrategy
final class ClockBasedRetryStrategy implements ConditionalRetryStrategy
{
public const DEFAULT_BASE_DELAY = 5;
public const DEFAULT_DELAY_FACTOR = 2;
Expand All @@ -30,9 +30,14 @@ public function __construct(
) {
}

public function canRetry(Subscription $subscription): bool
{
return $subscription->retryAttempt() < $this->maxAttempts;
}

public function shouldRetry(Subscription $subscription): bool
{
if ($subscription->retryAttempt() >= $this->maxAttempts) {
if ($this->canRetry($subscription) === false) {
return false;
}

Expand Down
12 changes: 12 additions & 0 deletions src/Subscription/RetryStrategy/ConditionalRetryStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\RetryStrategy;

use Patchlevel\EventSourcing\Subscription\Subscription;

interface ConditionalRetryStrategy extends RetryStrategy
{
public function canRetry(Subscription $subscription): bool;
}
7 changes: 6 additions & 1 deletion src/Subscription/RetryStrategy/NoRetryStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@

use Patchlevel\EventSourcing\Subscription\Subscription;

final class NoRetryStrategy implements RetryStrategy
final class NoRetryStrategy implements ConditionalRetryStrategy
{
public function shouldRetry(Subscription $subscription): bool
{
return false;
}

public function canRetry(Subscription $subscription): bool
{
return false;
}
}
1 change: 1 addition & 0 deletions src/Subscription/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ enum Status: string
case Finished = 'finished';
case Detached = 'detached';
case Error = 'error';
case Failed = 'failed';
}
19 changes: 19 additions & 0 deletions src/Subscription/Subscription.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,25 @@ public function isError(): bool
return $this->status === Status::Error;
}

public function failed(Throwable|string $error): void
{
$previousStatus = $this->status;
$this->status = Status::Failed;

if ($error instanceof Throwable) {
$this->error = SubscriptionError::fromThrowable($previousStatus, $error);

return;
}

$this->error = new SubscriptionError($error, $previousStatus);
}

public function isFailed(): bool
{
return $this->status === Status::Failed;
}

public function retryAttempt(): int
{
return $this->retryAttempt;
Expand Down
33 changes: 30 additions & 3 deletions tests/Integration/Subscription/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ public function testErrorHandling(): void

$subscriber->subscribeError = true;

// first run, error

$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
Expand All @@ -269,6 +271,8 @@ public function testErrorHandling(): void
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(0, $subscription->retryAttempt());

// second run, time has not passed yet, no retry, no error

$result = $engine->run();

self::assertEquals(0, $result->processedMessages);
Expand All @@ -281,8 +285,9 @@ public function testErrorHandling(): void
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(0, $subscription->retryAttempt());

$clock->sleep(5);
// third run, time has passed, 1. retry, error again

$clock->sleep(5);
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
Expand All @@ -300,8 +305,9 @@ public function testErrorHandling(): void
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(1, $subscription->retryAttempt());

$clock->sleep(10);
// fourth run, time has passed, 2. retry, max retries reached, failed

$clock->sleep(10);
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
Expand All @@ -314,11 +320,28 @@ public function testErrorHandling(): void

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

self::assertEquals(Status::Error, $subscription->status());
self::assertEquals(Status::Failed, $subscription->status());
self::assertEquals('subscribe error', $subscription->subscriptionError()?->errorMessage);
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(2, $subscription->retryAttempt());

// fifth run, time has passed, skip failed subscription

$clock->sleep(20);
$result = $engine->run();

self::assertEquals(0, $result->processedMessages);
self::assertEquals([], $result->errors);

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

self::assertEquals(Status::Failed, $subscription->status());
self::assertEquals('subscribe error', $subscription->subscriptionError()?->errorMessage);
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(2, $subscription->retryAttempt());

// reactivated subscription

$engine->reactivate(new SubscriptionEngineCriteria(
ids: ['error_producer'],
));
Expand All @@ -329,6 +352,8 @@ public function testErrorHandling(): void
self::assertEquals(null, $subscription->subscriptionError());
self::assertEquals(0, $subscription->retryAttempt());

// sixth run, error again

$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
Expand All @@ -346,6 +371,8 @@ public function testErrorHandling(): void
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(0, $subscription->retryAttempt());

// seventh run, time has passed, error fixed, 1. retry, no error

$clock->sleep(5);
$subscriber->subscribeError = false;

Expand Down
Loading

0 comments on commit 692ad42

Please sign in to comment.