Skip to content

Commit

Permalink
Merge pull request #164 from jakubkulhan/0.6.x-consume-should-on-proc…
Browse files Browse the repository at this point in the history
…ess-one-message-at-a-time

Consume should only process one message at a time
  • Loading branch information
WyriHaximus authored Feb 18, 2025
2 parents a168f09 + 20d7d7e commit 625753e
Showing 1 changed file with 38 additions and 3 deletions.
41 changes: 38 additions & 3 deletions src/Channel.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
use Evenement\EventEmitterTrait;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use SplQueue;
use function React\Async\async;
use function React\Async\await;

Expand Down Expand Up @@ -60,6 +61,12 @@ class Channel implements ChannelInterface, EventEmitterInterface
/** @var callable[] */
private $deliverCallbacks = [];

/** @var bool[] */
private $deliveryBusy = [];

/** @var SplQueue[] */
private $deliveryQueue = [];

/** @var callable[] */
private $ackCallbacks = [];

Expand Down Expand Up @@ -227,6 +234,8 @@ public function consume(callable $callback, string $queue = "", string $consumer

if ($response instanceof MethodBasicConsumeOkFrame) {
$this->deliverCallbacks[$response->consumerTag] = $callback;
$this->deliveryQueue[$response->consumerTag] = new SplQueue();
$this->deliveryBusy[$response->consumerTag] = false;
return $response;

}
Expand Down Expand Up @@ -337,6 +346,7 @@ public function cancel(string $consumerTag, bool $nowait = false): bool|\Bunny\P
{
$response = $this->cancelImpl($consumerTag, $nowait);
unset($this->deliverCallbacks[$consumerTag]);
unset($this->deliveryQueue[$consumerTag]);
return $response;
}

Expand Down Expand Up @@ -452,6 +462,8 @@ public function onFrameReceived(AbstractFrame $frame): void
// $this->client = null;
// break consumers' reference cycle
$this->deliverCallbacks = [];
$this->deliveryQueue = [];
$this->deliveryBusy = [];

} elseif ($frame instanceof MethodBasicReturnFrame) {
$this->returnFrame = $frame;
Expand Down Expand Up @@ -589,9 +601,8 @@ private function onBodyComplete(): void
$content
);

$callback = $this->deliverCallbacks[$this->deliverFrame->consumerTag];

$callback($message, $this, $this->client);
$this->deliveryQueue[$this->deliverFrame->consumerTag]->enqueue($message);
$this->deliveryTick($this->deliverFrame->consumerTag);
}

$this->deliverFrame = null;
Expand Down Expand Up @@ -620,5 +631,29 @@ private function onBodyComplete(): void
throw new \LogicException("Either return or deliver frame has to be handled here.");
}
}

private function deliveryTick(string $consumerTag): void
{
if ($this->deliveryBusy[$consumerTag] === true || $this->deliveryQueue[$consumerTag]->isEmpty()) {
return;
}

$this->deliveryBusy[$consumerTag] = true;
$message = $this->deliveryQueue[$consumerTag]->dequeue();
$callback = $this->deliverCallbacks[$consumerTag];

$outcome = $callback($message, $this, $this->client);

if (!($outcome instanceof PromiseInterface)) {
$this->deliveryBusy[$consumerTag] = false;
$this->deliveryTick($consumerTag);
return;
}

$outcome->finally(function () use ($consumerTag) {
$this->deliveryBusy[$consumerTag] = false;
$this->deliveryTick($consumerTag);
});
}
}

0 comments on commit 625753e

Please sign in to comment.