diff --git a/src/Feature/State/Builder/RabbitMQBuilder.php b/src/Feature/State/Builder/RabbitMQBuilder.php new file mode 100644 index 00000000..aaf33db6 --- /dev/null +++ b/src/Feature/State/Builder/RabbitMQBuilder.php @@ -0,0 +1,79 @@ +exchange = $exchange; + + return $this; + } + + public function withThreshold( + Node\Expr $lineThreshold, + ): self { + $this->lineThreshold = $lineThreshold; + + return $this; + } + + public function getNode(): Node\Expr + { + $args = [ + new Node\Arg( + new Node\Expr\StaticCall( + class: new Node\Name\FullyQualified('Kiboko\\Component\\Flow\\RabbitMQ\\StateManager'), + name: 'withAuthentication', + args: array_merge([ + new Node\Arg( + value: new Node\Expr\New_( + class: new Node\Name\FullyQualified( + 'Bunny\\Client', + ), + ), + name: new Node\Identifier('connection') + ), + new Node\Arg( + value: $this->topic, + name: new Node\Identifier('topic') + ), + $this->lineThreshold != null ? new Node\Arg( + value: $this->lineThreshold, + name: new Node\Identifier('lineThreshold') + ) : null, + $this->exchange != null ? new Node\Arg( + value: $this->exchange, + name: new Node\Identifier('exchange') + ) : null, + ]), + ), + name: new Node\Identifier('manager'), + ), + new Node\Arg($this->stepCode, name: new Node\Identifier('stepCode')), + new Node\Arg($this->stepLabel, name: new Node\Identifier('stepLabel')), + ]; + + return new Node\Expr\New_( + class: new Node\Name\FullyQualified('Kiboko\\Component\\Flow\\RabbitMQ\\State'), + args: $args, + ); + } +} diff --git a/src/Feature/State/Builder/State.php b/src/Feature/State/Builder/State.php index 3e16e6d3..1e5385b1 100644 --- a/src/Feature/State/Builder/State.php +++ b/src/Feature/State/Builder/State.php @@ -4,38 +4,29 @@ namespace Kiboko\Component\Satellite\Feature\State\Builder; -use Kiboko\Contract\Configurator\StepBuilderInterface; +use PhpParser\Builder; use PhpParser\Node; -final class State implements StepBuilderInterface +final class State implements Builder { - private ?Node\Expr $logger = null; - private ?Node\Expr $rejection = null; private ?Node\Expr $state = null; - public function getNode(): Node\Stmt + public function withState(Node\Expr $state): self { - return new Node\Stmt\Nop(); - } - - public function withLogger(Node\Expr $logger): StepBuilderInterface - { - $this->logger = $logger; + $this->state = $state; return $this; } - public function withRejection(Node\Expr $rejection): StepBuilderInterface + private static function nullState(): Node\Expr { - $this->rejection = $rejection; - - return $this; + return new Node\Expr\New_( + new Node\Name\FullyQualified(\Kiboko\Contract\Pipeline\NullState::class) + ); } - public function withState(Node\Expr $state): StepBuilderInterface + public function getNode(): Node\Expr { - $this->state = $state; - - return $this; + return $this->state ?? self::nullState(); } } diff --git a/src/Feature/State/Configuration.php b/src/Feature/State/Configuration.php index 8e79b9a1..be852657 100644 --- a/src/Feature/State/Configuration.php +++ b/src/Feature/State/Configuration.php @@ -16,18 +16,19 @@ public function getConfigTreeBuilder(): TreeBuilder /* @phpstan-ignore-next-line */ $builder->getRootNode() ->children() - ->arrayNode('destinations') - ->fixXmlConfig('destination') - ->requiresAtLeastOneElement() - ->cannotBeEmpty() - ->ignoreExtraKeys() - ->arrayPrototype() - ->children() - ->append((new Configuration\RedisConfiguration())->getConfigTreeBuilder()->getRootNode()) - ->append((new Configuration\MemcachedConfiguration())->getConfigTreeBuilder()->getRootNode()) - ->end() - ->end() - ->end() + ->arrayNode('destinations') + ->fixXmlConfig('destination') + ->requiresAtLeastOneElement() + ->cannotBeEmpty() + ->ignoreExtraKeys() + ->arrayPrototype() + ->children() + ->append((new Configuration\RedisConfiguration())->getConfigTreeBuilder()->getRootNode()) + ->append((new Configuration\MemcachedConfiguration())->getConfigTreeBuilder()->getRootNode()) + ->append((new Configuration\RabbitMQConfiguration())->getConfigTreeBuilder()->getRootNode()) + ->end() + ->end() + ->end() ->end() ; diff --git a/src/Feature/State/Configuration/RabbitMQConfiguration.php b/src/Feature/State/Configuration/RabbitMQConfiguration.php new file mode 100644 index 00000000..3ad79997 --- /dev/null +++ b/src/Feature/State/Configuration/RabbitMQConfiguration.php @@ -0,0 +1,72 @@ +getRootNode() + ->children() + ->variableNode('host') + ->validate() + ->ifTrue(isExpression()) + ->then(asExpression()) + ->end() + ->isRequired() + ->end() + ->variableNode('port') + ->validate() + ->ifTrue(isExpression()) + ->then(asExpression()) + ->end() + ->isRequired() + ->end() + ->variableNode('user') + ->validate() + ->ifTrue(isExpression()) + ->then(asExpression()) + ->end() + ->end() + ->variableNode('password') + ->validate() + ->ifTrue(isExpression()) + ->then(asExpression()) + ->end() + ->end() + ->variableNode('vhost') + ->validate() + ->ifTrue(isExpression()) + ->then(asExpression()) + ->end() + ->isRequired() + ->end() + ->variableNode('exchange') + ->validate() + ->ifTrue(isExpression()) + ->then(asExpression()) + ->end() + ->end() + ->variableNode('topic') + ->validate() + ->ifTrue(isExpression()) + ->then(asExpression()) + ->end() + ->isRequired() + ->end() + ->end() + ; + + return $builder; + } +} diff --git a/src/Feature/State/Factory/RabbitMQFactory.php b/src/Feature/State/Factory/RabbitMQFactory.php new file mode 100644 index 00000000..b9ade8ca --- /dev/null +++ b/src/Feature/State/Factory/RabbitMQFactory.php @@ -0,0 +1,72 @@ +processor = new Processor(); + $this->configuration = new State\Configuration\RabbitMQConfiguration(); + } + + public function configuration(): ConfigurationInterface + { + return $this->configuration; + } + + /** + * @throws Configurator\ConfigurationExceptionInterface + */ + public function normalize(array $config): array + { + try { + return $this->processor->processConfiguration($this->configuration, $config); + } catch (Symfony\InvalidTypeException|Symfony\InvalidConfigurationException $exception) { + throw new Configurator\InvalidConfigurationException($exception->getMessage(), 0, $exception); + } + } + + public function validate(array $config): bool + { + try { + $this->processor->processConfiguration($this->configuration, $config); + + return true; + } catch (\Exception) { + } + + return false; + } + + public function compile(array $config): Repository\RabbitMQRepository + { + $builder = new State\Builder\RabbitMQBuilder( + stepCode: compileValueWhenExpression($this->interpreter, uniqid()), + stepLabel: compileValueWhenExpression($this->interpreter, uniqid()), + topic: compileValueWhenExpression($this->interpreter, $config['topic']), + ); + + if (\array_key_exists('exchange', $config)) { + $builder->withExchange(compileValueWhenExpression($this->interpreter, $config['exchange'])); + } + + return new Repository\RabbitMQRepository($builder); + } +} diff --git a/src/Feature/State/Factory/Repository/RabbitMQRepository.php b/src/Feature/State/Factory/Repository/RabbitMQRepository.php new file mode 100644 index 00000000..0f867a05 --- /dev/null +++ b/src/Feature/State/Factory/Repository/RabbitMQRepository.php @@ -0,0 +1,26 @@ +files = []; + $this->packages = [ + 'php-etl/rabbitmq-flow', + ]; + } + + public function getBuilder(): State\Builder\RabbitMQBuilder + { + return $this->builder; + } +} diff --git a/src/Feature/State/Repository.php b/src/Feature/State/Repository.php index 76249882..b7cb3407 100644 --- a/src/Feature/State/Repository.php +++ b/src/Feature/State/Repository.php @@ -7,7 +7,7 @@ use Kiboko\Contract\Configurator; use PhpParser\Node; -final class Repository implements Configurator\StepRepositoryInterface +final class Repository implements Configurator\RepositoryInterface { use RepositoryTrait; diff --git a/src/Feature/State/Service.php b/src/Feature/State/Service.php index 72720cba..4bc8a39e 100644 --- a/src/Feature/State/Service.php +++ b/src/Feature/State/Service.php @@ -5,6 +5,7 @@ namespace Kiboko\Component\Satellite\Feature\State; use Kiboko\Component\Satellite\ExpressionLanguage as Satellite; +use Kiboko\Component\Satellite\Feature\State\Repository; use Kiboko\Contract\Configurator; use Kiboko\Contract\Configurator\Feature; use Symfony\Component\Config\Definition\Exception as Symfony; @@ -61,7 +62,29 @@ public function validate(array $config): bool public function compile(array $config): Repository { $builder = new Builder\State(); + $repository = new Repository($builder); - return new Repository($builder); + try { + if (!\array_key_exists('destinations', $config) + || (is_countable($config['destinations']) ? \count($config['destinations']) : 0) <= 0 + ) { + return $repository; + } + + foreach ($config['destinations'] as $destination) { + if (\array_key_exists('rabbitmq', $destination)) { + $factory = new Factory\RabbitMQFactory($this->interpreter); + + $rabbitmqRepository = $factory->compile($destination['rabbitmq']); + + $repository->merge($rabbitmqRepository); + $builder->withState($rabbitmqRepository->getBuilder()->getNode()); + } + } + + return $repository; + } catch (Symfony\InvalidTypeException|Symfony\InvalidConfigurationException $exception) { + throw new Configurator\InvalidConfigurationException(message: $exception->getMessage(), previous: $exception); + } } }