Skip to content

Commit

Permalink
Merge pull request #35 from AlexeyKosov/v6-compatibility
Browse files Browse the repository at this point in the history
V6 compatibility
  • Loading branch information
makasim authored Oct 4, 2021
2 parents 2a23934 + 892d5b5 commit b82867c
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 27 deletions.
1 change: 0 additions & 1 deletion DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public function getConfigTreeBuilder()
->booleanNode('remove')->defaultTrue()->end()
->scalarNode('connection')->defaultValue('default')->cannotBeEmpty()->end()
->scalarNode('index_name')->isRequired()->cannotBeEmpty()->end()
->scalarNode('type_name')->isRequired()->cannotBeEmpty()->end()
->scalarNode('model_class')->isRequired()->cannotBeEmpty()->end()
->scalarNode('model_id')->defaultValue('id')->cannotBeEmpty()->end()
->scalarNode('repository_method')->defaultValue('find')->cannotBeEmpty()->end()
Expand Down
7 changes: 3 additions & 4 deletions DependencyInjection/EnqueueElasticaExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public function load(array $configs, ContainerBuilder $container)
->addTag('kernel.event_subscriber')
;

$container->register('enqueue_elastica.queue_pager_perister', QueuePagerPersister::class)
$container->register('enqueue_elastica.queue_pager_persister', QueuePagerPersister::class)
->addArgument(new Reference('enqueue_elastica.context'))
->addArgument(new Reference('fos_elastica.persister_registry'))
->addArgument(new Reference('event_dispatcher'))
Expand All @@ -67,9 +67,8 @@ public function load(array $configs, ContainerBuilder $container)

foreach ($config['doctrine']['queue_listeners'] as $listenerConfig) {
$listenerId = sprintf(
'enqueue_elastica.doctrine_queue_listener.%s.%s',
$listenerConfig['index_name'],
$listenerConfig['type_name']
'enqueue_elastica.doctrine_queue_listener.%s',
$listenerConfig['index_name']
);

$container->register($listenerId, SyncIndexWithObjectChangeListener::class)
Expand Down
12 changes: 4 additions & 8 deletions Doctrine/Queue/SyncIndexWithObjectChangeProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\Persistence\ManagerRegistry;

final class SyncIndexWithObjectChangeProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface
{
Expand Down Expand Up @@ -49,9 +49,6 @@ public function process(Message $message, Context $context): Result
if (false == isset($data['index_name'])) {
return Result::reject('The message data misses index_name');
}
if (false == isset($data['type_name'])) {
return Result::reject('The message data misses type_name');
}
if (false == isset($data['repository_method'])) {
return Result::reject('The message data misses repository_method');
}
Expand All @@ -60,11 +57,10 @@ public function process(Message $message, Context $context): Result
$modelClass = $data['model_class'];
$id = $data['id'];
$index = $data['index_name'];
$type = $data['type_name'];
$repositoryMethod = $data['repository_method'];

$repository = $this->doctrine->getManagerForClass($modelClass)->getRepository($modelClass);
$persister = $this->persisterRegistry->getPersister($index, $type);
$persister = $this->persisterRegistry->getPersister($index);

switch ($action) {
case self::UPDATE_ACTION:
Expand All @@ -75,7 +71,7 @@ public function process(Message $message, Context $context): Result
}

if ($persister->handlesObject($object)) {
if ($this->indexable->isObjectIndexable($index, $type, $object)) {
if ($this->indexable->isObjectIndexable($index, $object)) {
$persister->replaceOne($object);
} else {
$persister->deleteOne($object);
Expand All @@ -90,7 +86,7 @@ public function process(Message $message, Context $context): Result
return Result::ack(sprintf('The object "%s" with id "%s" could not be found.', $modelClass, $id));
}

if ($persister->handlesObject($object) && $this->indexable->isObjectIndexable($index, $type, $object)) {
if ($persister->handlesObject($object) && $this->indexable->isObjectIndexable($index, $object)) {
$persister->insertOne($object);
}

Expand Down
3 changes: 1 addition & 2 deletions Doctrine/SyncIndexWithObjectChangeListener.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php
namespace Enqueue\ElasticaBundle\Doctrine;

use Doctrine\Common\Persistence\Event\LifecycleEventArgs;
use Doctrine\Persistence\Event\LifecycleEventArgs;
use Doctrine\ORM\Event\PostFlushEventArgs;
use Enqueue\ElasticaBundle\Doctrine\Queue\Commands;
use Enqueue\ElasticaBundle\Doctrine\Queue\SyncIndexWithObjectChangeProcessor as SyncProcessor;
Expand Down Expand Up @@ -100,7 +100,6 @@ private function sendUpdateIndexMessage($action, $id)
'model_id' => $this->config['model_id'],
'id' => $id,
'index_name' => $this->config['index_name'],
'type_name' => $this->config['type_name'],
'repository_method' => $this->config['repository_method'],
]));

Expand Down
3 changes: 1 addition & 2 deletions Persister/Listener/PurgePopulateQueueListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use FOS\ElasticaBundle\Persister\Event\PrePersistEvent;
use Interop\Queue\Context;
use FOS\ElasticaBundle\Persister\Event\Events;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

class PurgePopulateQueueListener implements EventSubscriberInterface
Expand Down Expand Up @@ -44,7 +43,7 @@ public function purgePopulateQueue(PrePersistEvent $event)
public static function getSubscribedEvents()
{
return [
Events::PRE_PERSIST => 'purgePopulateQueue',
PrePersistEvent::class => 'purgePopulateQueue',
];
}
}
9 changes: 4 additions & 5 deletions Persister/QueuePagerPersister.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use Enqueue\ElasticaBundle\Queue\Commands;
use Enqueue\Util\JSON;
use FOS\ElasticaBundle\Persister\Event\Events;
use FOS\ElasticaBundle\Persister\Event\PostAsyncInsertObjectsEvent;
use FOS\ElasticaBundle\Persister\Event\PostPersistEvent;
use FOS\ElasticaBundle\Persister\Event\PrePersistEvent;
Expand Down Expand Up @@ -55,10 +54,10 @@ public function insert(PagerInterface $pager, array $options = array())

$pager->setCurrentPage($options['first_page']);

$objectPersister = $this->registry->getPersister($options['indexName'], $options['typeName']);
$objectPersister = $this->registry->getPersister($options['indexName']);

$event = new PrePersistEvent($pager, $objectPersister, $options);
$this->dispatcher->dispatch($event, Events::PRE_PERSIST);
$this->dispatcher->dispatch($event);
$pager = $event->getPager();
$options = $event->getOptions();

Expand Down Expand Up @@ -122,7 +121,7 @@ public function insert(PagerInterface $pager, array $options = array())
$errorMessage,
$data['options']
);
$this->dispatcher->dispatch($event, Events::POST_ASYNC_INSERT_OBJECTS);
$this->dispatcher->dispatch($event);
}

if (microtime(true) > $limitTime) {
Expand All @@ -131,6 +130,6 @@ public function insert(PagerInterface $pager, array $options = array())
}

$event = new PostPersistEvent($pager, $objectPersister, $options);
$this->dispatcher->dispatch($event, Events::POST_PERSIST);
$this->dispatcher->dispatch($event);
}
}
5 changes: 1 addition & 4 deletions Queue/PopulateProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,12 @@ public function process(Message $message, Context $context): Result
if (!isset($data['options']['indexName'])) {
return Result::reply($this->createReplyMessage($context, $message, 0,'The message is invalid. Missing indexName option.'));
}
if (!isset($data['options']['typeName'])) {
return Result::reply($this->createReplyMessage($context, $message, 0,'The message is invalid. Missing typeName option.'));
}

$options = $data['options'];
$options['first_page'] = $data['page'];
$options['last_page'] = $data['page'];

$provider = $this->pagerProviderRegistry->getProvider($options['indexName'], $options['typeName']);
$provider = $this->pagerProviderRegistry->getProvider($options['indexName']);
$pager = $provider->provide($options);
$pager->setMaxPerPage($options['max_per_page']);
$pager->setCurrentPage($options['first_page']);
Expand Down
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
"require": {
"php": "^7.1",
"symfony/framework-bundle": "^4.0|^5.0",
"friendsofsymfony/elastica-bundle": "^5.0|^6.0",
"friendsofsymfony/elastica-bundle": "^6.0",
"enqueue/enqueue-bundle": "^0.10"
},
"require-dev": {
"doctrine/orm": "^2.0"
},
"autoload": {
"psr-4": { "Enqueue\\ElasticaBundle\\": "" }
},
Expand Down

0 comments on commit b82867c

Please sign in to comment.