Skip to content

Commit

Permalink
Supporting of M:N relation (many type of messages in many channels) r…
Browse files Browse the repository at this point in the history
…equired (#224)

* fixed according to issue.

* Improve tests

---------

Co-authored-by: Alexander Makarov <[email protected]>
Co-authored-by: viktorprogger <[email protected]>
Co-authored-by: Sergei Predvoditelev <[email protected]>
  • Loading branch information
4 people authored Dec 8, 2024
1 parent ace417c commit 3602957
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
15 changes: 8 additions & 7 deletions src/Middleware/Consume/ConsumeMiddlewareDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
final class ConsumeMiddlewareDispatcher
{
/**
* Contains a middleware pipeline handler.
* Contains a middleware pipeline handlers.
*
* @var MiddlewareConsumeStack|null The middleware stack.
* @var MiddlewareConsumeStack[] The middleware stack divided by message types.
*/
private ?MiddlewareConsumeStack $stack = null;
private array $stack = [];

/**
* @var array[]|callable[]|MiddlewareConsumeInterface[]|string[]
Expand All @@ -37,11 +37,12 @@ public function dispatch(
ConsumeRequest $request,
MessageHandlerConsumeInterface $finishHandler
): ConsumeRequest {
if ($this->stack === null) {
$this->stack = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler);
$handlerName = $request->getMessage()->getHandlerName();
if (!array_key_exists($handlerName, $this->stack)) {
$this->stack[$handlerName] = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler);
}

return $this->stack->handleConsume($request);
return $this->stack[$handlerName]->handleConsume($request);
}

/**
Expand All @@ -68,7 +69,7 @@ public function withMiddlewares(array $middlewareDefinitions): self

// Fixes a memory leak.
unset($instance->stack);
$instance->stack = null;
$instance->stack = [];

return $instance;
}
Expand Down
9 changes: 8 additions & 1 deletion tests/Integration/MessageConsumingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@
final class MessageConsumingTest extends TestCase
{
private array $messagesProcessed;
private array $messagesProcessedSecond;

public function testMessagesConsumed(): void
{
$this->messagesProcessed = [];
$this->messagesProcessedSecond = [];

$container = $this->createMock(ContainerInterface::class);
$worker = new Worker(
['test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData()],
[
'test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData(),
'test2' => fn (MessageInterface $message): mixed => $this->messagesProcessedSecond[] = $message->getData(),
],
new NullLogger(),
new Injector($container),
$container,
Expand All @@ -38,9 +43,11 @@ public function testMessagesConsumed(): void
$messages = [1, 'foo', 'bar-baz'];
foreach ($messages as $message) {
$worker->process(new Message('test', $message), $this->getQueue());
$worker->process(new Message('test2', $message), $this->getQueue());
}

$this->assertEquals($messages, $this->messagesProcessed);
$this->assertEquals($messages, $this->messagesProcessedSecond);
}

public function testMessagesConsumedByHandlerClass(): void
Expand Down

0 comments on commit 3602957

Please sign in to comment.