Skip to content

Commit

Permalink
add subscription engine result
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidBadura committed Apr 12, 2024
1 parent 07c4a52 commit 75fb435
Show file tree
Hide file tree
Showing 9 changed files with 518 additions and 112 deletions.
174 changes: 130 additions & 44 deletions src/Subscription/Engine/DefaultSubscriptionEngine.php

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions src/Subscription/Engine/Error.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

use Throwable;

final class Error
{
public function __construct(
public readonly string $subscriptionId,
public readonly string $message,
public readonly Throwable $throwable,
) {
}
}
15 changes: 15 additions & 0 deletions src/Subscription/Engine/ProcessedResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

final class ProcessedResult
{
/** @param list<Error> $errors */
public function __construct(
public readonly int $processedMessages,
public readonly array $errors = [],
) {
}
}
14 changes: 14 additions & 0 deletions src/Subscription/Engine/Result.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Subscription\Engine;

final class Result
{
/** @param list<Error> $errors */
public function __construct(
public readonly array $errors = [],
) {
}
}
14 changes: 7 additions & 7 deletions src/Subscription/Engine/SubscriptionEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

interface SubscriptionEngine
{
public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): void;
public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $skipBooting = false): Result;

/**
* @param positive-int|null $limit
Expand All @@ -18,7 +18,7 @@ public function setup(SubscriptionEngineCriteria|null $criteria = null, bool $sk
public function boot(
SubscriptionEngineCriteria|null $criteria = null,
int|null $limit = null,
): void;
): ProcessedResult;

/**
* @param positive-int|null $limit
Expand All @@ -28,15 +28,15 @@ public function boot(
public function run(
SubscriptionEngineCriteria|null $criteria = null,
int|null $limit = null,
): void;
): ProcessedResult;

public function teardown(SubscriptionEngineCriteria|null $criteria = null): void;
public function teardown(SubscriptionEngineCriteria|null $criteria = null): Result;

public function remove(SubscriptionEngineCriteria|null $criteria = null): void;
public function remove(SubscriptionEngineCriteria|null $criteria = null): Result;

public function reactivate(SubscriptionEngineCriteria|null $criteria = null): void;
public function reactivate(SubscriptionEngineCriteria|null $criteria = null): Result;

public function pause(SubscriptionEngineCriteria|null $criteria = null): void;
public function pause(SubscriptionEngineCriteria|null $criteria = null): Result;

/** @return list<Subscription> */
public function subscriptions(SubscriptionEngineCriteria|null $criteria = null): array;
Expand Down
13 changes: 11 additions & 2 deletions src/Subscription/Store/DoctrineSubscriptionStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,21 @@ public function remove(Subscription $subscription): void
$this->connection->delete($this->tableName, ['id' => $subscription->id()]);
}

public function inLock(Closure $closure): void
/**
* @param Closure():T $closure
*
* @return T
*
* @throws TransactionCommitNotPossible
*
* @template T
*/
public function inLock(Closure $closure): mixed
{
$this->connection->beginTransaction();

try {
$closure();
return $closure();
} finally {
try {
$this->connection->commit();
Expand Down
11 changes: 10 additions & 1 deletion src/Subscription/Store/LockableSubscriptionStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,14 @@

interface LockableSubscriptionStore extends SubscriptionStore
{
public function inLock(Closure $closure): void;
/**
* @param Closure():T $closure
*
* @return T
*
* @throws TransactionCommitNotPossible
*
* @template T
*/
public function inLock(Closure $closure): mixed;
}
77 changes: 65 additions & 12 deletions tests/Integration/Subscription/SubscriptionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,14 @@ public function testHappyPath(): void
$engine->subscriptions(),
);

$engine->setup();
$engine->boot();
$result = $engine->setup();

self::assertEquals([], $result->errors);

$result = $engine->boot();

self::assertEquals(0, $result->processedMessages);
self::assertEquals([], $result->errors);

self::assertEquals(
[
Expand All @@ -124,7 +130,10 @@ public function testHappyPath(): void
$profile = Profile::create(ProfileId::fromString('1'), 'John');
$repository->save($profile);

$engine->run();
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertEquals([], $result->errors);

self::assertEquals(
[
Expand All @@ -150,7 +159,8 @@ public function testHappyPath(): void
self::assertSame('1', $result['id']);
self::assertSame('John', $result['name']);

$engine->remove();
$result = $engine->remove();
self::assertEquals([], $result->errors);

self::assertEquals(
[
Expand Down Expand Up @@ -213,8 +223,12 @@ public function testErrorHandling(): void
),
);

$engine->setup();
$engine->boot();
$result = $engine->setup();
self::assertEquals([], $result->errors);

$result = $engine->boot();
self::assertEquals(0, $result->processedMessages);
self::assertEquals([], $result->errors);

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

Expand All @@ -228,7 +242,16 @@ public function testErrorHandling(): void
$repository->save($profile);

$subscriber->subscribeError = true;
$engine->run();

$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertCount(1, $result->errors);

$error = $result->errors[0];

self::assertEquals('error_producer', $error->subscriptionId);
self::assertEquals('subscribe error', $error->message);

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

Expand All @@ -237,7 +260,10 @@ public function testErrorHandling(): void
self::assertEquals(Status::Active, $subscription->subscriptionError()?->previousStatus);
self::assertEquals(0, $subscription->retryAttempt());

$engine->run();
$result = $engine->run();

self::assertEquals(0, $result->processedMessages);
self::assertEquals([], $result->errors);

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

Expand All @@ -248,7 +274,15 @@ public function testErrorHandling(): void

$clock->sleep(5);

$engine->run();
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertCount(1, $result->errors);

$error = $result->errors[0];

self::assertEquals('error_producer', $error->subscriptionId);
self::assertEquals('subscribe error', $error->message);

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

Expand All @@ -259,7 +293,15 @@ public function testErrorHandling(): void

$clock->sleep(10);

$engine->run();
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertCount(1, $result->errors);

$error = $result->errors[0];

self::assertEquals('error_producer', $error->subscriptionId);
self::assertEquals('subscribe error', $error->message);

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

Expand All @@ -278,7 +320,15 @@ public function testErrorHandling(): void
self::assertEquals(null, $subscription->subscriptionError());
self::assertEquals(0, $subscription->retryAttempt());

$engine->run();
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertCount(1, $result->errors);

$error = $result->errors[0];

self::assertEquals('error_producer', $error->subscriptionId);
self::assertEquals('subscribe error', $error->message);

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

Expand All @@ -290,7 +340,10 @@ public function testErrorHandling(): void
$clock->sleep(5);
$subscriber->subscribeError = false;

$engine->run();
$result = $engine->run();

self::assertEquals(1, $result->processedMessages);
self::assertEquals([], $result->errors);

$subscription = self::findSubscription($engine->subscriptions(), 'error_producer');

Expand Down
Loading

0 comments on commit 75fb435

Please sign in to comment.