Skip to content

Commit

Permalink
Merge pull request #613 from patchlevel/advisory-lock
Browse files Browse the repository at this point in the history
add write lock for event store
  • Loading branch information
DavidBadura authored Jul 16, 2024
2 parents 478d128 + 24fa100 commit 11c7a82
Show file tree
Hide file tree
Showing 8 changed files with 546 additions and 146 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
],
"require": {
"php": "~8.1.0 || ~8.2.0 || ~8.3.0",
"doctrine/dbal": "^3.8.1|^4.0.0",
"doctrine/dbal": "^4.0.0",
"doctrine/migrations": "^3.3.2",
"patchlevel/hydrator": "^1.4.1",
"patchlevel/worker": "^1.2.0",
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ services:
environment:
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=eventstore
expose:
- 5432
ports:
- 5432:5432
30 changes: 27 additions & 3 deletions docs/pages/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ $connection = DriverManager::getConnection(
## Configure Store

You can create a store with the `DoctrineDbalStore` class.
The store needs a dbal connection, an event serializer, an aggregate registry and a table name.
The store needs a dbal connection, an event serializer, an aggregate registry and some options.

```php
use Doctrine\DBAL\Connection;
Expand All @@ -42,9 +42,19 @@ $store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
null,
['table_name' => 'eventstore'],
[/** options */],
);
```
Following options are available in `DoctrineDbalStore`:

| Option | Type | Default | Description |
|-------------------|------------------|------------|----------------------------------------------|
| table_name | string | eventstore | The name of the table in the database |
| aggregate_id_type | "uuid"|"string" | uuid | The type of the `aggregate_id` column |
| locking | bool | true | If the store should use locking for writing |
| lock_id | int | 133742 | The id of the lock |
| lock_timeout | int | -1 | The timeout of the lock. -1 means no timeout |

## Schema

The table structure of the `DoctrineDbalStore` looks like this:
Expand Down Expand Up @@ -339,6 +349,11 @@ $store->save(...$messages);
!!! note

The saving happens in a transaction, so all messages are saved or none.
The store lock the table for writing during each save by default.

!!! tip

Use transactional method if you want call multiple save methods in a transaction.

### Delete & Update

Expand All @@ -348,7 +363,7 @@ In event sourcing, the events are immutable.
### Transaction

There is also the possibility of executing a function in a transaction.
Then dbal takes care of starting a transaction, committing it and then possibly rollback it again.
The store takes care of starting a transaction, committing it and then possibly rollback it again.

```php
use Patchlevel\EventSourcing\Store\Store;
Expand All @@ -365,6 +380,15 @@ $store->transactional(static function () use ($command, $bankAccountRepository):
$bankAccountRepository->save($accountTo);
});
```
!!! note

The store lock the table for writing during the transaction by default.

!!! tip

If you want save only one aggregate, so you don't have to use the transactional method.
The save method in store/repository is already transactional.

## Learn more

* [How to create events](events.md)
Expand Down
4 changes: 3 additions & 1 deletion phpstan.neon.dist
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ parameters:
level: max
paths:
- src
checkGenericClassInNonGenericObjectType: false
ignoreErrors:
-
identifier: missingType.generics
104 changes: 96 additions & 8 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
use Closure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Exception\UniqueConstraintViolationException;
use Doctrine\DBAL\Platforms\MariaDBPlatform;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\DBAL\Platforms\SQLitePlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Types\Type;
Expand Down Expand Up @@ -48,12 +51,19 @@ final class DoctrineDbalStore implements Store, SubscriptionStore, DoctrineSchem
*/
private const MAX_UNSIGNED_SMALL_INT = 65_535;

/**
* Default lock id for advisory lock.
*/
private const DEFAULT_LOCK_ID = 133742;

private readonly HeadersSerializer $headersSerializer;

/** @var array{table_name: string, aggregate_id_type: 'string'|'uuid'} */
/** @var array{table_name: string, aggregate_id_type: 'string'|'uuid', locking: bool, lock_id: int, lock_timeout: int} */
private readonly array $config;

/** @param array{table_name?: string, aggregate_id_type?: 'string'|'uuid'} $config */
private bool $hasLock = false;

/** @param array{table_name?: string, aggregate_id_type?: 'string'|'uuid', locking?: bool, lock_id?: int, lock_timeout?: int} $config */
public function __construct(
private readonly Connection $connection,
private readonly EventSerializer $eventSerializer,
Expand All @@ -65,6 +75,9 @@ public function __construct(
$this->config = array_merge([
'table_name' => 'eventstore',
'aggregate_id_type' => 'uuid',
'locking' => true,
'lock_id' => self::DEFAULT_LOCK_ID,
'lock_timeout' => -1,
], $config);
}

Expand Down Expand Up @@ -155,8 +168,8 @@ public function save(Message ...$messages): void
return;
}

$this->connection->transactional(
function (Connection $connection) use ($messages): void {
$this->transactional(
function () use ($messages): void {
/** @var array<string, int> $achievedUntilPlayhead */
$achievedUntilPlayhead = [];

Expand Down Expand Up @@ -227,7 +240,7 @@ function (Connection $connection) use ($messages): void {
continue;
}

$this->executeSave($columns, $placeholders, $parameters, $types, $connection);
$this->executeSave($columns, $placeholders, $parameters, $types, $this->connection);

$parameters = [];
$placeholders = [];
Expand All @@ -237,13 +250,13 @@ function (Connection $connection) use ($messages): void {
}

if ($position !== 0) {
$this->executeSave($columns, $placeholders, $parameters, $types, $connection);
$this->executeSave($columns, $placeholders, $parameters, $types, $this->connection);
}

foreach ($achievedUntilPlayhead as $key => $playhead) {
[$aggregateName, $aggregateId] = explode('/', $key);

$connection->executeStatement(
$this->connection->executeStatement(
sprintf(
<<<'SQL'
UPDATE %s
Expand Down Expand Up @@ -273,7 +286,18 @@ function (Connection $connection) use ($messages): void {
*/
public function transactional(Closure $function): void
{
$this->connection->transactional($function);
if ($this->hasLock || !$this->config['locking']) {
$this->connection->transactional($function);
} else {
$this->connection->transactional(function () use ($function): void {
$this->lock();
try {
$function();
} finally {
$this->unlock();
}
});
}
}

public function configureSchema(Schema $schema, Connection $connection): void
Expand Down Expand Up @@ -423,4 +447,68 @@ private function executeSave(
throw new UniqueConstraintViolation($e);
}
}

private function lock(): void
{
$this->hasLock = true;

$platform = $this->connection->getDatabasePlatform();

if ($platform instanceof PostgreSQLPlatform) {
$this->connection->executeStatement(
sprintf(
'SELECT pg_advisory_xact_lock(%s)',
$this->config['lock_id'],
),
);

return;
}

if ($platform instanceof MariaDBPlatform || $platform instanceof MySQLPlatform) {
$this->connection->fetchAllAssociative(
sprintf(
'SELECT GET_LOCK("%s", %d)',
$this->config['lock_id'],
$this->config['lock_timeout'],
),
);

return;
}

if ($platform instanceof SQLitePlatform) {
return; // sql locking is not needed because of file locking
}

throw new LockingNotImplemented($platform::class);
}

private function unlock(): void
{
$this->hasLock = false;

$platform = $this->connection->getDatabasePlatform();

if ($platform instanceof PostgreSQLPlatform) {
return; // lock is released automatically after transaction
}

if ($platform instanceof MariaDBPlatform || $platform instanceof MySQLPlatform) {
$this->connection->fetchAllAssociative(
sprintf(
'SELECT RELEASE_LOCK("%s")',
$this->config['lock_id'],
),
);

return;
}

if ($platform instanceof SQLitePlatform) {
return; // sql locking is not needed because of file locking
}

throw new LockingNotImplemented($platform::class);
}
}
21 changes: 21 additions & 0 deletions src/Store/LockingNotImplemented.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store;

use function sprintf;

final class LockingNotImplemented extends StoreException
{
/** @param class-string $platform */
public function __construct(string $platform)
{
parent::__construct(
sprintf(
'Locking is not implemented on platform %s. Disable locking in the store options.',
$platform,
),
);
}
}
49 changes: 49 additions & 0 deletions tests/Integration/Store/StoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,55 @@ public function testSave(): void
self::assertEquals(['profileId' => $profileId->toString(), 'name' => 'test'], json_decode($result1['payload'], true));
}

public function testSaveWithTransactional(): void
{
$profileId = ProfileId::generate();

$messages = [
Message::create(new ProfileCreated($profileId, 'test'))
->withHeader(new AggregateHeader(
'profile',
$profileId->toString(),
1,
new DateTimeImmutable('2020-01-01 00:00:00'),
)),
Message::create(new ProfileCreated($profileId, 'test'))
->withHeader(new AggregateHeader(
'profile',
$profileId->toString(),
2,
new DateTimeImmutable('2020-01-02 00:00:00'),
)),
];

$this->store->transactional(function () use ($messages): void {
$this->store->save(...$messages);
});

/** @var list<array<string, string>> $result */
$result = $this->connection->fetchAllAssociative('SELECT * FROM eventstore');

self::assertCount(2, $result);

$result1 = $result[0];

self::assertEquals($profileId->toString(), $result1['aggregate_id']);
self::assertEquals('profile', $result1['aggregate']);
self::assertEquals('1', $result1['playhead']);
self::assertStringContainsString('2020-01-01 00:00:00', $result1['recorded_on']);
self::assertEquals('profile.created', $result1['event']);
self::assertEquals(['profileId' => $profileId->toString(), 'name' => 'test'], json_decode($result1['payload'], true));

$result2 = $result[1];

self::assertEquals($profileId->toString(), $result2['aggregate_id']);
self::assertEquals('profile', $result2['aggregate']);
self::assertEquals('2', $result2['playhead']);
self::assertStringContainsString('2020-01-02 00:00:00', $result2['recorded_on']);
self::assertEquals('profile.created', $result2['event']);
self::assertEquals(['profileId' => $profileId->toString(), 'name' => 'test'], json_decode($result1['payload'], true));
}

public function testSave10000Messages(): void
{
$profileId = ProfileId::generate();
Expand Down
Loading

0 comments on commit 11c7a82

Please sign in to comment.