Skip to content

Commit

Permalink
Add EventDispatcherFactory with default Retry Strategy (#81)
Browse files Browse the repository at this point in the history
* Add EventDispatcherFactory with default Retry Strategy

* Add EventDispatcherFactory with default Retry Strategy

* Code Styles

* PHPCs fixes

---------

Co-authored-by: Thomas Eimers <[email protected]>
  • Loading branch information
notdefine and Thomas Eimers authored Jun 19, 2024
1 parent af888c9 commit 3b691c0
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 1 deletion.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
"symfony/doctrine-messenger": "^5.4 || ^6.0",
"symfony/event-dispatcher": "^5.4 || ^6.0",
"symfony/messenger": "^5.4 || ^6.0",
"symfony/redis-messenger": "^5.4 || ^6.0"
"symfony/redis-messenger": "^5.4 || ^6.0",
"symfony/dependency-injection": "^5.4 || ^6.0"
},
"require-dev": {
"ext-json": "*",
Expand Down
5 changes: 5 additions & 0 deletions src/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

namespace Xtreamwayz\PsrContainerMessenger;

use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Xtreamwayz\PsrContainerMessenger\Container\AddBusNameStampMiddlewareFactory;
use Xtreamwayz\PsrContainerMessenger\Container\HandleMessageMiddlewareFactory;
use Xtreamwayz\PsrContainerMessenger\Container\MessageBusFactory;
use Xtreamwayz\PsrContainerMessenger\Container\SendMessageMiddlewareFactory;
use Xtreamwayz\PsrContainerMessenger\Event\EventDispatcherDelegatorFactory;

class ConfigProvider
{
Expand Down Expand Up @@ -49,6 +51,9 @@ public function getDependencies(): array

'messenger.command.middleware.add_bus_stamp' => [AddBusNameStampMiddlewareFactory::class, 'messenger.command.bus'],
],
'delegators' => [
EventDispatcher::class => [EventDispatcherDelegatorFactory::class],
],
];
// phpcs:enable
}
Expand Down
77 changes: 77 additions & 0 deletions src/Event/EventDispatcherDelegatorFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

declare(strict_types=1);

namespace Xtreamwayz\PsrContainerMessenger\Event;

use Laminas\ServiceManager\Factory\DelegatorFactoryInterface;
use Laminas\ServiceManager\ServiceLocatorInterface;
use Psr\Container\ContainerInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;

class EventDispatcherDelegatorFactory implements DelegatorFactoryInterface
{
protected const MAX_RETRIES = 3;
protected const WAIT_IN_SECONDS = 5 * 60;
protected const WAIT_MULTIPLIER = 4;
public const FAILED_QUEUE = 'failed';

/**
* @param string $name
*/
public function __invoke(
ContainerInterface $container,
$name,
callable $callback,
?array $options = null
): EventDispatcherInterface {
$config = $container->get('config');
$availableRoutes = [];
foreach ($config['messenger']['buses']['messenger.command.bus']['routes'] as $routes) {
foreach ($routes as $route) {
if ($route === self::FAILED_QUEUE) {
continue;
}
$availableRoutes[$route] = $route;
}
}

/** @var EventDispatcherInterface $eventDispatcher */
$eventDispatcher = $callback();

$strategieContainer = new ContainerBuilder();
$failureContainer = new ContainerBuilder();

$logger = $container->get($config['messenger']['logger']);

$eventDispatcher->addSubscriber(
new SendFailedMessageForRetryListener($container, $strategieContainer, $logger)
);

foreach ($availableRoutes as $availableRoute) {
$strategieContainer->set(
$availableRoute,
new MultiplierRetryStrategy(self::MAX_RETRIES, self::WAIT_IN_SECONDS * 1000, self::WAIT_MULTIPLIER)
);
$failureContainer->set($availableRoute, $container->get(self::FAILED_QUEUE));
}

$eventDispatcher->addSubscriber(
new SendFailedMessageToFailureTransportListener($failureContainer, $logger)
);
return $eventDispatcher;
}

public function createDelegatorWithName(
ServiceLocatorInterface $serviceLocator,
string $name,
string $requestedName,
callable $callback
): EventDispatcherInterface {
return $this($serviceLocator, $name, $callback);
}
}

0 comments on commit 3b691c0

Please sign in to comment.