Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored Categories import #192

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Command/AbstractImportCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ protected function configure(): void
->addOption('parallel', 'p', InputOption::VALUE_NONE, 'Allow parallel task processing')
->addOption('disable-batch', 'd', InputOption::VALUE_NONE, 'Disable batch processing')
->addOption('batch-size', 's', InputOption::VALUE_OPTIONAL, 'Batch Size', 100)
->addOption('from-page', null, InputOption::VALUE_OPTIONAL, 'From page', 1)
->addOption('max-concurrency', 'c', InputOption::VALUE_OPTIONAL, 'Max process concurrency', 5)
->addOption('batch-after-fetch', 'a', InputOption::VALUE_OPTIONAL, 'Fetch all pages then start processing the batches', true)
->addOption('filter', 'f', InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Add filter')
Expand Down
2 changes: 2 additions & 0 deletions src/Command/BatchImportAssociationTypesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Association\AssociationTypePayload;
use Synolia\SyliusAkeneoPlugin\Task\AssociationType\BatchAssociationTypesTask;
use Webmozart\Assert\Assert;

final class BatchImportAssociationTypesCommand extends AbstractBatchCommand
{
Expand All @@ -33,6 +34,7 @@ protected function execute(
InputInterface $input,
OutputInterface $output,
) {
Assert::string($input->getArgument('ids'));
$ids = explode(',', $input->getArgument('ids'));

$this->logger->notice('Processing batch', ['from_id' => $ids[0], 'to_id' => $ids[\count($ids) - 1]]);
Expand Down
2 changes: 2 additions & 0 deletions src/Command/BatchImportAttributesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Attribute\AttributePayload;
use Synolia\SyliusAkeneoPlugin\Task\Attribute\BatchAttributesTask;
use Webmozart\Assert\Assert;

final class BatchImportAttributesCommand extends AbstractBatchCommand
{
Expand All @@ -33,6 +34,7 @@ protected function execute(
InputInterface $input,
OutputInterface $output,
) {
Assert::string($input->getArgument('ids'));
$ids = explode(',', $input->getArgument('ids'));

$this->logger->notice('Processing batch', ['from_id' => $ids[0], 'to_id' => $ids[\count($ids) - 1]]);
Expand Down
50 changes: 50 additions & 0 deletions src/Command/BatchImportCategoriesCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\Command;

use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Category\CategoryPayload;
use Synolia\SyliusAkeneoPlugin\Task\Category\BatchCategoriesTask;
use Webmozart\Assert\Assert;

final class BatchImportCategoriesCommand extends AbstractBatchCommand
{
protected static $defaultDescription = 'Import batch categories ids from Akeneo PIM.';

/** @var string */
protected static $defaultName = 'akeneo:batch:categories';

public function __construct(
private ClientFactoryInterface $clientFactory,
private LoggerInterface $logger,
private BatchCategoriesTask $task,
) {
parent::__construct(self::$defaultName);
}

/**
* {@inheritdoc}
*/
protected function execute(
InputInterface $input,
OutputInterface $output,
) {
Assert::string($input->getArgument('ids'));
$ids = explode(',', $input->getArgument('ids'));

$this->logger->notice('Processing batch', ['from_id' => $ids[0], 'to_id' => $ids[\count($ids) - 1]]);
$this->logger->debug(self::$defaultName, ['batched_ids' => $ids]);

$payload = new CategoryPayload($this->clientFactory->createFromApiCredentials());
$payload->setIds($ids);

$this->task->__invoke($payload);

return 0;
}
}
2 changes: 2 additions & 0 deletions src/Command/BatchImportProductModelsCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\ProductModel\ProductModelPayload;
use Synolia\SyliusAkeneoPlugin\Task\ProductModel\BatchProductModelTask;
use Webmozart\Assert\Assert;

final class BatchImportProductModelsCommand extends AbstractBatchCommand
{
Expand All @@ -33,6 +34,7 @@ protected function execute(
InputInterface $input,
OutputInterface $output,
) {
Assert::string($input->getArgument('ids'));
$ids = explode(',', $input->getArgument('ids'));

$this->logger->notice('Processing batch', ['from_id' => $ids[0], 'to_id' => $ids[\count($ids) - 1]]);
Expand Down
2 changes: 2 additions & 0 deletions src/Command/BatchImportProductsCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Product\ProductPayload;
use Synolia\SyliusAkeneoPlugin\Task\Product\BatchProductsTask;
use Webmozart\Assert\Assert;

final class BatchImportProductsCommand extends AbstractBatchCommand
{
Expand All @@ -33,6 +34,7 @@ protected function execute(
InputInterface $input,
OutputInterface $output,
) {
Assert::string($input->getArgument('ids'));
$ids = explode(',', $input->getArgument('ids'));

$this->logger->notice('Processing batch', ['from_id' => $ids[0], 'to_id' => $ids[\count($ids) - 1]]);
Expand Down
50 changes: 19 additions & 31 deletions src/Command/ImportCategoriesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,60 +5,48 @@
namespace Synolia\SyliusAkeneoPlugin\Command;

use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Exceptions\Command\CommandLockedException;
use Synolia\SyliusAkeneoPlugin\Factory\CategoryPipelineFactory;
use Synolia\SyliusAkeneoPlugin\Logger\Messages;
use Synolia\SyliusAkeneoPlugin\Factory\PayloadFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Category\CategoryPayload;

final class ImportCategoriesCommand extends Command
final class ImportCategoriesCommand extends AbstractImportCommand
{
use LockableTrait;

private const DESCRIPTION = 'Import Categories from Akeneo PIM.';
protected static $defaultDescription = 'Import Categories from Akeneo PIM.';

/** @var string */
protected static $defaultName = 'akeneo:import:categories';

public function __construct(
private CategoryPipelineFactory $categoryPipelineFactory,
private ClientFactoryInterface $clientFactory,
private LoggerInterface $logger,
CategoryPipelineFactory $pipelineFactory,
LoggerInterface $akeneoLogger,
PayloadFactoryInterface $payloadFactory,
) {
parent::__construct(self::$defaultName);
}

protected function configure(): void
{
$this->setDescription(self::DESCRIPTION);
parent::__construct($akeneoLogger, $payloadFactory, $pipelineFactory, self::$defaultName);
}

/**
* {@inheritdoc}
*/
protected function execute(
InputInterface $input,
OutputInterface $output,
) {
if (!$this->lock()) {
$output->writeln(Messages::commandAlreadyRunning());

return 0;
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
try {
$this->preExecute();

$this->logger->notice(self::$defaultName);
/** @var \League\Pipeline\Pipeline $categoryPipeline */
$categoryPipeline = $this->categoryPipelineFactory->create();
$payload = $this->payloadFactory->createFromCommand(CategoryPayload::class, $input, $output);
$this->pipeline->process($payload);

/** @var \Synolia\SyliusAkeneoPlugin\Payload\Category\CategoryPayload $categoryPayload */
$categoryPayload = new CategoryPayload($this->clientFactory->createFromApiCredentials());
$categoryPipeline->process($categoryPayload);
$this->postExecute();
} catch (CommandLockedException $commandLockedException) {
$this->logger->warning($commandLockedException->getMessage());

$this->logger->notice(Messages::endOfCommand(self::$defaultName));
$this->release();
return 1;
}

return 0;
}
Expand Down
4 changes: 4 additions & 0 deletions src/Configuration/ConfigurationContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ public function setFilters(array $filters): self;
public function getHandler(): string;

public function setHandler(string $handler): self;

public function getFromPage(): int;

public function setFromPage(int $fromPage): self;
}
14 changes: 14 additions & 0 deletions src/Configuration/ConfigurationContextTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ trait ConfigurationContextTrait

private string $handler = SymfonyProcessTaskHandler::HANDLER_CODE;

private int $fromPage = 1;

public function getBatchSize(): int
{
return $this->batchSize;
Expand Down Expand Up @@ -147,4 +149,16 @@ public function setHandler(string $handler): ConfigurationContextInterface

return $this;
}

public function getFromPage(): int
{
return $this->fromPage;
}

public function setFromPage(int $fromPage): ConfigurationContextInterface
{
$this->fromPage = $fromPage;

return $this;
}
}
10 changes: 6 additions & 4 deletions src/Factory/CategoryPipelineFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
use League\Pipeline\Pipeline;
use League\Pipeline\PipelineInterface;
use Synolia\SyliusAkeneoPlugin\Pipeline\Processor;
use Synolia\SyliusAkeneoPlugin\Task\Category\CreateUpdateEntityTask;
use Synolia\SyliusAkeneoPlugin\Task\Category\RetrieveCategoriesTask;
use Synolia\SyliusAkeneoPlugin\Task\Category\ProcessCategoriesTask;
use Synolia\SyliusAkeneoPlugin\Task\SetupTask;
use Synolia\SyliusAkeneoPlugin\Task\TearDownTask;

final class CategoryPipelineFactory extends AbstractPipelineFactory
{
Expand All @@ -17,8 +18,9 @@ public function create(): PipelineInterface
$pipeline = new Pipeline(new Processor($this->dispatcher));

return $pipeline
->pipe($this->taskProvider->get(RetrieveCategoriesTask::class))
->pipe($this->taskProvider->get(CreateUpdateEntityTask::class))
->pipe($this->taskProvider->get(SetupTask::class))
->pipe($this->taskProvider->get(ProcessCategoriesTask::class))
->pipe($this->taskProvider->get(TearDownTask::class))
;
}
}
5 changes: 3 additions & 2 deletions src/Factory/PayloadFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ private function createContext(
->setAllowParallel($isParallelAllowed)
->setBatchingAllowed($isBatchingAllowed)
->setProcessAsSoonAsPossible(filter_var($batchAfterFetch, FILTER_VALIDATE_BOOLEAN))
->setBatchSize((int) $input->getOption('batch-size'))
->setMaxRunningProcessQueueSize((int) $input->getOption('max-concurrency'))
->setBatchSize((int) $input->getOption('batch-size')) /** @phpstan-ignore-line Cannot cast mixed to int */
->setFromPage((int) $input->getOption('from-page')) /** @phpstan-ignore-line Cannot cast mixed to int */
->setMaxRunningProcessQueueSize((int) $input->getOption('max-concurrency')) /** @phpstan-ignore-line Cannot cast mixed to int */
->setFilters((array) ($input->getOption('filter') ?: []))
->setHandler($input->getOption('handler') ?? $context->getHandler())
;
Expand Down
5 changes: 2 additions & 3 deletions src/Handler/Task/SymfonyMessengerTaskHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Akeneo\Pim\ApiClient\Pagination\Page;
use Akeneo\Pim\ApiClient\Pagination\PageInterface;
use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
Expand Down Expand Up @@ -37,7 +36,7 @@ public function batch(PipelinePayloadInterface $pipelinePayload, array $items):

public function handle(
PipelinePayloadInterface $pipelinePayload,
ResourceCursorInterface|PageInterface $handleType,
iterable|PageInterface $handleType,
): void {
$count = 0;
$items = [];
Expand Down Expand Up @@ -83,7 +82,7 @@ private function handleByPage(

private function handleByCursor(
PipelinePayloadInterface $payload,
ResourceCursorInterface $resourceCursor,
iterable $resourceCursor,
int &$count = 0,
array &$items = [],
): void {
Expand Down
5 changes: 2 additions & 3 deletions src/Handler/Task/SymfonyProcessTaskHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Akeneo\Pim\ApiClient\Pagination\Page;
use Akeneo\Pim\ApiClient\Pagination\PageInterface;
use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Statement;
Expand Down Expand Up @@ -114,7 +113,7 @@ public function continue(PipelinePayloadInterface $pipelinePayload): void
*/
public function handle(
PipelinePayloadInterface $pipelinePayload,
ResourceCursorInterface|PageInterface $handleType,
iterable|PageInterface $handleType,
): void {
$this->processManager->setInstantProcessing($pipelinePayload->getProcessAsSoonAsPossible());
$this->processManager->setNumberOfParallelProcesses($pipelinePayload->getMaxRunningProcessQueueSize());
Expand Down Expand Up @@ -197,7 +196,7 @@ private function handleByPage(

private function handleByCursor(
PipelinePayloadInterface $payload,
ResourceCursorInterface $resourceCursor,
iterable $resourceCursor,
int &$count = 0,
array &$ids = [],
): void {
Expand Down
3 changes: 1 addition & 2 deletions src/Handler/Task/TaskHandlerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Synolia\SyliusAkeneoPlugin\Handler\Task;

use Akeneo\Pim\ApiClient\Pagination\PageInterface;
use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface;
use Symfony\Component\DependencyInjection\Attribute\AutoconfigureTag;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;

Expand All @@ -25,7 +24,7 @@ public function batch(

public function handle(
PipelinePayloadInterface $pipelinePayload,
ResourceCursorInterface|PageInterface $handleType,
iterable|PageInterface $handleType,
): void;

public function continue(PipelinePayloadInterface $pipelinePayload): void;
Expand Down
12 changes: 12 additions & 0 deletions src/Message/Batch/CategoryBatchMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\Message\Batch;

class CategoryBatchMessage implements BatchMessageInterface
{
public function __construct(public array $items)
{
}
}
35 changes: 35 additions & 0 deletions src/MessageHandler/Batch/CategoryBatchMessageHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\MessageHandler\Batch;

use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Synolia\SyliusAkeneoPlugin\Message\Batch\CategoryBatchMessage;
use Synolia\SyliusAkeneoPlugin\Processor\Resource\Category\CategoryResourceProcessor;
use Synolia\SyliusAkeneoPlugin\Processor\Resource\Exception\MaxResourceProcessorRetryException;

#[AsMessageHandler]
class CategoryBatchMessageHandler
{
public function __construct(
private EventDispatcherInterface $dispatcher,
private CategoryResourceProcessor $resourceProcessor,
) {
}

public function __invoke(CategoryBatchMessage $attributeBatchMessage): void
{
foreach ($attributeBatchMessage->items as $resource) {
try {
$this->resourceProcessor->process($resource);
} catch (MaxResourceProcessorRetryException) {
// Skip the failing line
$this->dispatcher->dispatch(new CategoryBatchMessage([$resource]));

continue;
}
}
}
}
1 change: 1 addition & 0 deletions src/Payload/AbstractPayload.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public function __construct(
$this->isContinue = $commandContext->isContinue();
$this->processAsSoonAsPossible = $commandContext->getProcessAsSoonAsPossible();
$this->handler = $commandContext->getHandler();
$this->fromPage = $commandContext->getFromPage();
}
}

Expand Down
Loading
Loading