Skip to content

Commit

Permalink
Implemented the state management
Browse files Browse the repository at this point in the history
  • Loading branch information
sebprt committed Nov 2, 2023
1 parent 247b399 commit bca6690
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 33 deletions.
79 changes: 79 additions & 0 deletions src/Feature/State/Builder/RabbitMQBuilder.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

declare(strict_types=1);

namespace Kiboko\Component\Satellite\Feature\State\Builder;

use PhpParser\Builder;
use PhpParser\Node;
use PhpParser\Node\Identifier;

final class RabbitMQBuilder implements Builder
{
private ?Node\Expr $exchange = null;
private ?Node\Expr $lineThreshold = null;

public function __construct(
private readonly Node\Expr $stepCode,
private readonly Node\Expr $stepLabel,
private readonly Node\Expr $topic,
) {}

public function withExchange(
Node\Expr $exchange,
): self {
$this->exchange = $exchange;

return $this;
}

public function withThreshold(
Node\Expr $lineThreshold,
): self {
$this->lineThreshold = $lineThreshold;

return $this;
}

public function getNode(): Node\Expr
{
$args = [
new Node\Arg(
new Node\Expr\StaticCall(
class: new Node\Name\FullyQualified('Kiboko\\Component\\Flow\\RabbitMQ\\StateManager'),
name: 'withAuthentication',
args: array_merge([
new Node\Arg(
value: new Node\Expr\New_(
class: new Node\Name\FullyQualified(
'Bunny\\Client',
),
),
name: new Node\Identifier('connection')
),
new Node\Arg(
value: $this->topic,
name: new Node\Identifier('topic')
),
$this->lineThreshold != null ? new Node\Arg(
value: $this->lineThreshold,
name: new Node\Identifier('lineThreshold')
) : null,
$this->exchange != null ? new Node\Arg(
value: $this->exchange,
name: new Node\Identifier('exchange')
) : null,
]),
),
name: new Node\Identifier('manager'),
),
new Node\Arg($this->stepCode, name: new Node\Identifier('stepCode')),
new Node\Arg($this->stepLabel, name: new Node\Identifier('stepLabel')),
];

return new Node\Expr\New_(
class: new Node\Name\FullyQualified('Kiboko\\Component\\Flow\\RabbitMQ\\State'),
args: $args,
);
}
}
29 changes: 10 additions & 19 deletions src/Feature/State/Builder/State.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,29 @@

namespace Kiboko\Component\Satellite\Feature\State\Builder;

use Kiboko\Contract\Configurator\StepBuilderInterface;
use PhpParser\Builder;
use PhpParser\Node;

final class State implements StepBuilderInterface
final class State implements Builder
{
private ?Node\Expr $logger = null;
private ?Node\Expr $rejection = null;
private ?Node\Expr $state = null;

public function getNode(): Node\Stmt
public function withState(Node\Expr $state): self
{
return new Node\Stmt\Nop();
}

public function withLogger(Node\Expr $logger): StepBuilderInterface
{
$this->logger = $logger;
$this->state = $state;

return $this;
}

public function withRejection(Node\Expr $rejection): StepBuilderInterface
private static function nullState(): Node\Expr
{
$this->rejection = $rejection;

return $this;
return new Node\Expr\New_(
new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullState::class)
);
}

public function withState(Node\Expr $state): StepBuilderInterface
public function getNode(): Node\Expr
{
$this->state = $state;

return $this;
return $this->state ?? self::nullState();
}
}
25 changes: 13 additions & 12 deletions src/Feature/State/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ public function getConfigTreeBuilder(): TreeBuilder
/* @phpstan-ignore-next-line */
$builder->getRootNode()
->children()
->arrayNode('destinations')
->fixXmlConfig('destination')
->requiresAtLeastOneElement()
->cannotBeEmpty()
->ignoreExtraKeys()
->arrayPrototype()
->children()
->append((new Configuration\RedisConfiguration())->getConfigTreeBuilder()->getRootNode())
->append((new Configuration\MemcachedConfiguration())->getConfigTreeBuilder()->getRootNode())
->end()
->end()
->end()
->arrayNode('destinations')
->fixXmlConfig('destination')
->requiresAtLeastOneElement()
->cannotBeEmpty()
->ignoreExtraKeys()
->arrayPrototype()
->children()
->append((new Configuration\RedisConfiguration())->getConfigTreeBuilder()->getRootNode())
->append((new Configuration\MemcachedConfiguration())->getConfigTreeBuilder()->getRootNode())
->append((new Configuration\RabbitMQConfiguration())->getConfigTreeBuilder()->getRootNode())
->end()
->end()
->end()
->end()
;

Expand Down
72 changes: 72 additions & 0 deletions src/Feature/State/Configuration/RabbitMQConfiguration.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

declare(strict_types=1);

namespace Kiboko\Component\Satellite\Feature\State\Configuration;

use Symfony\Component\Config;
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
use function Kiboko\Component\SatelliteToolbox\Configuration\asExpression;
use function Kiboko\Component\SatelliteToolbox\Configuration\isExpression;

final class RabbitMQConfiguration implements Config\Definition\ConfigurationInterface
{
public function getConfigTreeBuilder(): TreeBuilder
{
$builder = new TreeBuilder('rabbitmq');

/* @phpstan-ignore-next-line */
$builder->getRootNode()
->children()
->variableNode('host')
->validate()
->ifTrue(isExpression())
->then(asExpression())
->end()
->isRequired()
->end()
->variableNode('port')
->validate()
->ifTrue(isExpression())
->then(asExpression())
->end()
->isRequired()
->end()
->variableNode('user')
->validate()
->ifTrue(isExpression())
->then(asExpression())
->end()
->end()
->variableNode('password')
->validate()
->ifTrue(isExpression())
->then(asExpression())
->end()
->end()
->variableNode('vhost')
->validate()
->ifTrue(isExpression())
->then(asExpression())
->end()
->isRequired()
->end()
->variableNode('exchange')
->validate()
->ifTrue(isExpression())
->then(asExpression())
->end()
->end()
->variableNode('topic')
->validate()
->ifTrue(isExpression())
->then(asExpression())
->end()
->isRequired()
->end()
->end()
;

return $builder;
}
}
72 changes: 72 additions & 0 deletions src/Feature/State/Factory/RabbitMQFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

declare(strict_types=1);

namespace Kiboko\Component\Satellite\Feature\State\Factory;

use Kiboko\Component\Satellite\ExpressionLanguage as Satellite;
use Kiboko\Component\Satellite\Feature\State;
use Kiboko\Contract\Configurator;
use Symfony\Component\Config\Definition\ConfigurationInterface;
use Symfony\Component\Config\Definition\Exception as Symfony;
use Symfony\Component\Config\Definition\Processor;
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;

use function Kiboko\Component\SatelliteToolbox\Configuration\compileValueWhenExpression;

final readonly class RabbitMQFactory implements Configurator\FactoryInterface
{
private Processor $processor;
private ConfigurationInterface $configuration;

public function __construct(
private ExpressionLanguage $interpreter = new Satellite\ExpressionLanguage(),
) {
$this->processor = new Processor();
$this->configuration = new State\Configuration\RabbitMQConfiguration();
}

public function configuration(): ConfigurationInterface
{
return $this->configuration;
}

/**
* @throws Configurator\ConfigurationExceptionInterface
*/
public function normalize(array $config): array
{
try {
return $this->processor->processConfiguration($this->configuration, $config);
} catch (Symfony\InvalidTypeException|Symfony\InvalidConfigurationException $exception) {
throw new Configurator\InvalidConfigurationException($exception->getMessage(), 0, $exception);
}
}

public function validate(array $config): bool
{
try {
$this->processor->processConfiguration($this->configuration, $config);

return true;
} catch (\Exception) {
}

return false;
}

public function compile(array $config): Repository\RabbitMQRepository
{
$builder = new State\Builder\RabbitMQBuilder(
stepCode: compileValueWhenExpression($this->interpreter, uniqid()),
stepLabel: compileValueWhenExpression($this->interpreter, uniqid()),
topic: compileValueWhenExpression($this->interpreter, $config['topic']),
);

if (\array_key_exists('exchange', $config)) {
$builder->withExchange(compileValueWhenExpression($this->interpreter, $config['exchange']));
}

return new Repository\RabbitMQRepository($builder);
}
}
26 changes: 26 additions & 0 deletions src/Feature/State/Factory/Repository/RabbitMQRepository.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Kiboko\Component\Satellite\Feature\State\Factory\Repository;

use Kiboko\Component\Satellite\Feature\State;
use Kiboko\Contract\Configurator;

final class RabbitMQRepository implements Configurator\RepositoryInterface
{
use State\RepositoryTrait;

public function __construct(private readonly State\Builder\RabbitMQBuilder $builder)
{
$this->files = [];
$this->packages = [
'php-etl/rabbitmq-flow',
];
}

public function getBuilder(): State\Builder\RabbitMQBuilder
{
return $this->builder;
}
}
2 changes: 1 addition & 1 deletion src/Feature/State/Repository.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Kiboko\Contract\Configurator;
use PhpParser\Node;

final class Repository implements Configurator\StepRepositoryInterface
final class Repository implements Configurator\RepositoryInterface
{
use RepositoryTrait;

Expand Down
25 changes: 24 additions & 1 deletion src/Feature/State/Service.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Kiboko\Component\Satellite\Feature\State;

use Kiboko\Component\Satellite\ExpressionLanguage as Satellite;
use Kiboko\Component\Satellite\Feature\State\Repository;
use Kiboko\Contract\Configurator;
use Kiboko\Contract\Configurator\Feature;
use Symfony\Component\Config\Definition\Exception as Symfony;
Expand Down Expand Up @@ -61,7 +62,29 @@ public function validate(array $config): bool
public function compile(array $config): Repository
{
$builder = new Builder\State();
$repository = new Repository($builder);

return new Repository($builder);
try {
if (!\array_key_exists('destinations', $config)
|| (is_countable($config['destinations']) ? \count($config['destinations']) : 0) <= 0
) {
return $repository;
}

foreach ($config['destinations'] as $destination) {
if (\array_key_exists('rabbitmq', $destination)) {
$factory = new Factory\RabbitMQFactory($this->interpreter);

$rabbitmqRepository = $factory->compile($destination['rabbitmq']);

$repository->merge($rabbitmqRepository);
$builder->withState($rabbitmqRepository->getBuilder()->getNode());
}
}

return $repository;
} catch (Symfony\InvalidTypeException|Symfony\InvalidConfigurationException $exception) {
throw new Configurator\InvalidConfigurationException(message: $exception->getMessage(), previous: $exception);
}
}
}

0 comments on commit bca6690

Please sign in to comment.