From 838c3a4bb6bed38d7788b64b34aea06498c24b9e Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 17 Dec 2018 14:57:02 +0200 Subject: [PATCH] Upgrade to Enqueue 0.9 --- DependencyInjection/Configuration.php | 2 +- .../EnqueueElasticaExtension.php | 63 ++++++++++++------- .../SyncIndexWithObjectChangeProcessor.php | 44 ++++--------- .../SyncIndexWithObjectChangeListener.php | 7 +-- .../Listener/PurgePopulateQueueListener.php | 10 +-- Persister/QueuePagerPersister.php | 7 +-- Queue/PopulateProcessor.php | 54 ++++------------ Resources/config/services.yml | 37 ----------- composer.json | 10 +-- 9 files changed, 78 insertions(+), 156 deletions(-) delete mode 100644 Resources/config/services.yml diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index da5a154..64e3230 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -17,7 +17,7 @@ public function getConfigTreeBuilder() $rootNode ->children() ->booleanNode('enabled')->defaultValue(true)->end() - ->scalarNode('context')->defaultValue('enqueue.transport.context')->cannotBeEmpty()->end() + ->scalarNode('transport')->defaultValue('%enqueue.default_transport%')->cannotBeEmpty()->isRequired()->end() ->arrayNode('doctrine') ->children() ->scalarNode('driver')->defaultValue('orm')->cannotBeEmpty() diff --git a/DependencyInjection/EnqueueElasticaExtension.php b/DependencyInjection/EnqueueElasticaExtension.php index 3022d3e..c614a07 100644 --- a/DependencyInjection/EnqueueElasticaExtension.php +++ b/DependencyInjection/EnqueueElasticaExtension.php @@ -2,10 +2,14 @@ namespace Enqueue\ElasticaBundle\DependencyInjection; +use Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor; use Enqueue\ElasticaBundle\Doctrine\SyncIndexWithObjectChangeListener; -use Symfony\Component\Config\FileLocator; +use Enqueue\ElasticaBundle\Persister\Listener\PurgePopulateQueueListener; +use Enqueue\ElasticaBundle\Persister\QueuePagerPersister; +use Enqueue\ElasticaBundle\Queue\PopulateProcessor; +use Enqueue\Symfony\DependencyInjection\TransportFactory; +use Enqueue\Symfony\DiUtils; use Symfony\Component\DependencyInjection\ContainerBuilder; -use Symfony\Component\DependencyInjection\Loader\YamlFileLoader; use Symfony\Component\DependencyInjection\Reference; use Symfony\Component\HttpKernel\DependencyInjection\Extension; @@ -22,13 +26,44 @@ public function load(array $configs, ContainerBuilder $container) return; } - $loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); - $loader->load('services.yml'); + $transport = $container->getParameterBag()->resolveValue($config['transport']); - $container->setAlias('enqueue_elastica.context', $config['context']); + $diUtils = new DiUtils(TransportFactory::MODULE, $transport); + $container->setAlias('enqueue_elastica.context', $diUtils->format('context')); + + $container->register('enqueue_elastica.populate_processor', PopulateProcessor::class) + ->addArgument(new Reference('fos_elastica.pager_provider_registry')) + ->addArgument(new Reference('fos_elastica.pager_persister_registry')) + + ->addTag('enqueue.command_subscriber', ['client' => $transport]) + ->addTag('enqueue.transport.processor', ['transport' => $transport]) + ; + + $container->register('enqueue_elastica.purge_populate_queue_listener', PurgePopulateQueueListener::class) + ->addArgument(new Reference('enqueue_elastica.context')) + + ->addTag('kernel.event_subscriber') + ; + + $container->register('enqueue_elastica.queue_pager_perister', QueuePagerPersister::class) + ->addArgument(new Reference('enqueue_elastica.context')) + ->addArgument(new Reference('fos_elastica.persister_registry')) + ->addArgument(new Reference('event_dispatcher')) + + ->addTag('fos_elastica.pager_persister', ['persisterName' => 'queue']) + ; - $doctrineDriver = $config['doctrine']['driver']; if (false == empty($config['doctrine']['queue_listeners'])) { + $doctrineDriver = $config['doctrine']['driver']; + + $container->register('enqueue_elastica.doctrine.sync_index_with_object_change_processor', SyncIndexWithObjectChangeProcessor::class) + ->addArgument(new Reference($this->getManagerRegistry($doctrineDriver))) + ->addArgument(new Reference('fos_elastica.persister_registry')) + ->addArgument(new Reference('fos_elastica.indexable')) + ->addTag('enqueue.command_subscriber', ['client' => $transport]) + ->addTag('enqueue.transport.processor', ['transport' => $transport]) + ; + foreach ($config['doctrine']['queue_listeners'] as $listenerConfig) { $listenerId = sprintf( 'enqueue_elastica.doctrine_queue_listener.%s.%s', @@ -45,19 +80,8 @@ public function load(array $configs, ContainerBuilder $container) ; } } - - $serviceId = 'enqueue_elastica.doctrine.sync_index_with_object_change_processor'; - $managerRegistry = $this->getManagerRegistry($doctrineDriver); - $container - ->getDefinition($serviceId) - ->replaceArgument(0, new Reference($managerRegistry)); } - /** - * @param string $driver - * - * @return string - */ private function getManagerRegistry(string $driver): string { switch ($driver) { @@ -70,11 +94,6 @@ private function getManagerRegistry(string $driver): string } } - /** - * @param string $driver - * - * @return string - */ private function getEventSubscriber(string $driver): string { switch ($driver) { diff --git a/Doctrine/Queue/SyncIndexWithObjectChangeProcessor.php b/Doctrine/Queue/SyncIndexWithObjectChangeProcessor.php index 6f7549d..1f818ca 100644 --- a/Doctrine/Queue/SyncIndexWithObjectChangeProcessor.php +++ b/Doctrine/Queue/SyncIndexWithObjectChangeProcessor.php @@ -7,12 +7,12 @@ use Enqueue\Util\JSON; use FOS\ElasticaBundle\Persister\PersisterRegistry; use FOS\ElasticaBundle\Provider\IndexableInterface; -use Interop\Queue\PsrContext; -use Interop\Queue\PsrMessage; -use Interop\Queue\PsrProcessor; +use Interop\Queue\Context; +use Interop\Queue\Message; +use Interop\Queue\Processor; use Doctrine\Common\Persistence\ManagerRegistry; -final class SyncIndexWithObjectChangeProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface +final class SyncIndexWithObjectChangeProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface { const INSERT_ACTION = 'insert'; @@ -20,19 +20,10 @@ final class SyncIndexWithObjectChangeProcessor implements PsrProcessor, CommandS const REMOVE_ACTION = 'remove'; - /** - * @var PersisterRegistry - */ private $persisterRegistry; - /** - * @var IndexableInterface - */ private $indexable; - /** - * @var ManagerRegistry - */ private $doctrine; public function __construct(ManagerRegistry $doctrine, PersisterRegistry $persisterRegistry, IndexableInterface $indexable) @@ -42,10 +33,7 @@ public function __construct(ManagerRegistry $doctrine, PersisterRegistry $persis $this->doctrine = $doctrine; } - /** - * {@inheritdoc} - */ - public function process(PsrMessage $message, PsrContext $context) + public function process(Message $message, Context $context): Result { $data = JSON::decode($message->getBody()); @@ -94,7 +82,7 @@ public function process(PsrMessage $message, PsrContext $context) } } - return self::ACK; + return Result::ack(); case self::INSERT_ACTION: if (false == $object = $repository->{$repositoryMethod}($id)) { $persister->deleteById($id); @@ -106,33 +94,27 @@ public function process(PsrMessage $message, PsrContext $context) $persister->insertOne($object); } - return self::ACK; + return Result::ack(); case self::REMOVE_ACTION: $persister->deleteById($id); - return self::ACK; + return Result::ack(); default: return Result::reject(sprintf('The action "%s" is not supported', $action)); } } - /** - * {@inheritdoc} - */ - public static function getSubscribedCommand() + public static function getSubscribedCommand(): array { return [ - 'processorName' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE, - 'queueName' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE, - 'queueNameHardcoded' => true, + 'command' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE, + 'queue' => Commands::SYNC_INDEX_WITH_OBJECT_CHANGE, + 'prefix_queue' => false, 'exclusive' => true, ]; } - /** - * {@inheritdoc} - */ - public static function getSubscribedQueues() + public static function getSubscribedQueues(): array { return [Commands::SYNC_INDEX_WITH_OBJECT_CHANGE]; } diff --git a/Doctrine/SyncIndexWithObjectChangeListener.php b/Doctrine/SyncIndexWithObjectChangeListener.php index 3c6421f..d286a94 100644 --- a/Doctrine/SyncIndexWithObjectChangeListener.php +++ b/Doctrine/SyncIndexWithObjectChangeListener.php @@ -5,14 +5,11 @@ use Enqueue\ElasticaBundle\Doctrine\Queue\Commands; use Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor as SyncProcessor; use Enqueue\Util\JSON; -use Interop\Queue\PsrContext; +use Interop\Queue\Context; use Doctrine\Common\EventSubscriber; final class SyncIndexWithObjectChangeListener implements EventSubscriber { - /** - * @var PsrContext - */ private $context; /** @@ -25,7 +22,7 @@ final class SyncIndexWithObjectChangeListener implements EventSubscriber */ private $config; - public function __construct(PsrContext $context, $modelClass, array $config) + public function __construct(Context $context, $modelClass, array $config) { $this->context = $context; $this->modelClass = $modelClass; diff --git a/Persister/Listener/PurgePopulateQueueListener.php b/Persister/Listener/PurgePopulateQueueListener.php index e8f4231..052e80b 100644 --- a/Persister/Listener/PurgePopulateQueueListener.php +++ b/Persister/Listener/PurgePopulateQueueListener.php @@ -2,21 +2,15 @@ namespace Enqueue\ElasticaBundle\Persister\Listener; use FOS\ElasticaBundle\Persister\Event\PrePersistEvent; -use Interop\Queue\PsrContext; +use Interop\Queue\Context; use FOS\ElasticaBundle\Persister\Event\Events; use Symfony\Component\EventDispatcher\EventSubscriberInterface; class PurgePopulateQueueListener implements EventSubscriberInterface { - /** - * @var PsrContext - */ private $context; - /** - * @param PsrContext $context - */ - public function __construct(PsrContext $context) + public function __construct(Context $context) { $this->context = $context; } diff --git a/Persister/QueuePagerPersister.php b/Persister/QueuePagerPersister.php index 7c97525..91ef724 100644 --- a/Persister/QueuePagerPersister.php +++ b/Persister/QueuePagerPersister.php @@ -10,16 +10,13 @@ use FOS\ElasticaBundle\Persister\PagerPersisterInterface; use FOS\ElasticaBundle\Persister\PersisterRegistry; use FOS\ElasticaBundle\Provider\PagerInterface; -use Interop\Queue\PsrContext; +use Interop\Queue\Context; use Symfony\Component\EventDispatcher\EventDispatcherInterface; final class QueuePagerPersister implements PagerPersisterInterface { const NAME = 'queue'; - /** - * @var PsrContext - */ private $context; /** @@ -32,7 +29,7 @@ final class QueuePagerPersister implements PagerPersisterInterface */ private $dispatcher; - public function __construct(PsrContext $context, PersisterRegistry $registry, EventDispatcherInterface $dispatcher) + public function __construct(Context $context, PersisterRegistry $registry, EventDispatcherInterface $dispatcher) { $this->context = $context; $this->dispatcher = $dispatcher; diff --git a/Queue/PopulateProcessor.php b/Queue/PopulateProcessor.php index b39b347..409cf32 100644 --- a/Queue/PopulateProcessor.php +++ b/Queue/PopulateProcessor.php @@ -7,21 +7,15 @@ use FOS\ElasticaBundle\Persister\InPlacePagerPersister; use FOS\ElasticaBundle\Persister\PagerPersisterRegistry; use FOS\ElasticaBundle\Provider\PagerProviderRegistry; -use Interop\Queue\PsrContext; -use Interop\Queue\PsrMessage; -use Interop\Queue\PsrProcessor; +use Interop\Queue\Context; +use Interop\Queue\Message; +use Interop\Queue\Processor; use Enqueue\Util\JSON; -final class PopulateProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface +final class PopulateProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface { - /** - * @var PagerProviderRegistry - */ private $pagerProviderRegistry; - /** - * @var PagerPersisterRegistry - */ private $pagerPersisterRegistry; public function __construct( @@ -32,10 +26,7 @@ public function __construct( $this->pagerProviderRegistry = $pagerProviderRegistry; } - /** - * {@inheritdoc} - */ - public function process(PsrMessage $message, PsrContext $context) + public function process(Message $message, Context $context): Result { if ($message->isRedelivered()) { $replyMessage = $this->createReplyMessage($context, $message, 0,'The message was redelivered. Chances are that something has gone wrong.'); @@ -81,14 +72,7 @@ public function process(PsrMessage $message, PsrContext $context) } } - /** - * @param PsrContext $context - * @param PsrMessage $message - * @param int $objectsCount - * @param \Throwable $e - * @return PsrMessage - */ - private function createExceptionReplyMessage(PsrContext $context, PsrMessage $message, $objectsCount, \Throwable $e) + private function createExceptionReplyMessage(Context $context, Message $message, int $objectsCount, \Throwable $e): Message { $errorMessage = sprintf( 'The queue processor has failed to process the message with exception: %s: %s in file %s at line %s.', @@ -101,15 +85,7 @@ private function createExceptionReplyMessage(PsrContext $context, PsrMessage $me return $this->createReplyMessage($context, $message, $errorMessage); } - /** - * @param PsrContext $context - * @param PsrMessage $message - * @param int $objectsCount - * @param string|null $error - * - * @return PsrMessage - */ - private function createReplyMessage(PsrContext $context, PsrMessage $message, $objectsCount, $error = null) + private function createReplyMessage(Context $context, Message $message, int $objectsCount, string $error = null): Message { $replyMessage = $context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders()); $replyMessage->setProperty('fos-populate-objects-count', $objectsCount); @@ -121,23 +97,17 @@ private function createReplyMessage(PsrContext $context, PsrMessage $message, $o return $replyMessage; } - /** - * {@inheritdoc} - */ - public static function getSubscribedCommand() + public static function getSubscribedCommand(): array { return [ - 'processorName' => Commands::POPULATE, - 'queueName' => Commands::POPULATE, - 'queueNameHardcoded' => true, + 'command' => Commands::POPULATE, + 'queue' => Commands::POPULATE, + 'prefix_queue' => false, 'exclusive' => true, ]; } - /** - * {@inheritdoc} - */ - public static function getSubscribedQueues() + public static function getSubscribedQueues(): array { return [Commands::POPULATE]; } diff --git a/Resources/config/services.yml b/Resources/config/services.yml deleted file mode 100644 index e900cd3..0000000 --- a/Resources/config/services.yml +++ /dev/null @@ -1,37 +0,0 @@ -services: - enqueue_elastica.populate_processor: - class: 'Enqueue\ElasticaBundle\Queue\PopulateProcessor' - public: true - arguments: - - '@fos_elastica.pager_provider_registry' - - '@fos_elastica.pager_persister_registry' - tags: - - { name: "enqueue.client.processor" } - - enqueue_elastica.doctrine.sync_index_with_object_change_processor: - class: 'Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor' - public: true - arguments: - - '@doctrine' - - '@fos_elastica.persister_registry' - - '@fos_elastica.indexable' - tags: - - { name: "enqueue.client.processor" } - - enqueue_elastica.purge_populate_queue_listener: - class: 'Enqueue\ElasticaBundle\Persister\Listener\PurgePopulateQueueListener' - public: true - arguments: - - '@enqueue_elastica.context' - tags: - - { name: 'kernel.event_subscriber' } - - enqueue_elastica.queue_pager_perister: - class: 'Enqueue\ElasticaBundle\Persister\QueuePagerPersister' - public: true - arguments: - - '@enqueue_elastica.context' - - '@fos_elastica.persister_registry' - - '@event_dispatcher' - tags: - - { name: 'fos_elastica.pager_persister', persisterName: 'queue' } diff --git a/composer.json b/composer.json index add21ca..7391d3d 100644 --- a/composer.json +++ b/composer.json @@ -5,10 +5,10 @@ "keywords": ["elasticsearch", "elastica", "fos", "performance"], "license": "MIT", "require": { - "php": "^7", - "symfony/framework-bundle": "^3|^4", - "friendsofsymfony/elastica-bundle": "^4.1|^5", - "enqueue/enqueue-bundle": "^0.8" + "php": "^7.1", + "symfony/framework-bundle": "^3.4|^4", + "friendsofsymfony/elastica-bundle": "^5", + "enqueue/enqueue-bundle": "^0.9" }, "autoload": { "psr-4": { "Enqueue\\ElasticaBundle\\": "" } @@ -16,7 +16,7 @@ "minimum-stability": "dev", "extra": { "branch-alias": { - "dev-master": "0.8.x-dev" + "dev-master": "0.9.x-dev" } } }