From 20d7d7e34e9e501a8a523052c2a1e8414c55a9f9 Mon Sep 17 00:00:00 2001 From: Cees-Jan Kiewiet Date: Sat, 15 Feb 2025 22:52:14 +0100 Subject: [PATCH] Consume should only process one message at a time While consumer all pre-fetched messages concurrently is useful, the expected behavior for consume is to only handle one message at a time. Considering introducing a stream method for the concurrently handling of messages. --- src/Channel.php | 41 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/src/Channel.php b/src/Channel.php index 9aadd92..0f74ce6 100644 --- a/src/Channel.php +++ b/src/Channel.php @@ -24,6 +24,7 @@ use Evenement\EventEmitterTrait; use React\Promise\Deferred; use React\Promise\PromiseInterface; +use SplQueue; use function React\Async\async; use function React\Async\await; @@ -60,6 +61,12 @@ class Channel implements ChannelInterface, EventEmitterInterface /** @var callable[] */ private $deliverCallbacks = []; + /** @var bool[] */ + private $deliveryBusy = []; + + /** @var SplQueue[] */ + private $deliveryQueue = []; + /** @var callable[] */ private $ackCallbacks = []; @@ -227,6 +234,8 @@ public function consume(callable $callback, string $queue = "", string $consumer if ($response instanceof MethodBasicConsumeOkFrame) { $this->deliverCallbacks[$response->consumerTag] = $callback; + $this->deliveryQueue[$response->consumerTag] = new SplQueue(); + $this->deliveryBusy[$response->consumerTag] = false; return $response; } @@ -337,6 +346,7 @@ public function cancel(string $consumerTag, bool $nowait = false): bool|\Bunny\P { $response = $this->cancelImpl($consumerTag, $nowait); unset($this->deliverCallbacks[$consumerTag]); + unset($this->deliveryQueue[$consumerTag]); return $response; } @@ -452,6 +462,8 @@ public function onFrameReceived(AbstractFrame $frame): void // $this->client = null; // break consumers' reference cycle $this->deliverCallbacks = []; + $this->deliveryQueue = []; + $this->deliveryBusy = []; } elseif ($frame instanceof MethodBasicReturnFrame) { $this->returnFrame = $frame; @@ -589,9 +601,8 @@ private function onBodyComplete(): void $content ); - $callback = $this->deliverCallbacks[$this->deliverFrame->consumerTag]; - - $callback($message, $this, $this->client); + $this->deliveryQueue[$this->deliverFrame->consumerTag]->enqueue($message); + $this->deliveryTick($this->deliverFrame->consumerTag); } $this->deliverFrame = null; @@ -620,5 +631,29 @@ private function onBodyComplete(): void throw new \LogicException("Either return or deliver frame has to be handled here."); } } + + private function deliveryTick(string $consumerTag): void + { + if ($this->deliveryBusy[$consumerTag] === true || $this->deliveryQueue[$consumerTag]->isEmpty()) { + return; + } + + $this->deliveryBusy[$consumerTag] = true; + $message = $this->deliveryQueue[$consumerTag]->dequeue(); + $callback = $this->deliverCallbacks[$consumerTag]; + + $outcome = $callback($message, $this, $this->client); + + if (!($outcome instanceof PromiseInterface)) { + $this->deliveryBusy[$consumerTag] = false; + $this->deliveryTick($consumerTag); + return; + } + + $outcome->finally(function () use ($consumerTag) { + $this->deliveryBusy[$consumerTag] = false; + $this->deliveryTick($consumerTag); + }); + } }