diff --git a/src/Bunny/Async/Client.php b/src/Bunny/Async/Client.php index 41ac1e0..d644042 100644 --- a/src/Bunny/Async/Client.php +++ b/src/Bunny/Async/Client.php @@ -297,7 +297,16 @@ public function addAwaitCallback(callable $callback) */ public function onDataAvailable() { - $this->read(); + try { + $this->read(); + } catch (\Throwable $e) { + foreach ($this->awaitCallbacks as $k => $callback) { + if ($callback($e) === true) { + unset($this->awaitCallbacks[$k]); + break; + } + } + } while (($frame = $this->reader->consumeFrame($this->readBuffer)) !== null) { foreach ($this->awaitCallbacks as $k => $callback) { diff --git a/src/Bunny/Client.php b/src/Bunny/Client.php index c059963..99e1548 100644 --- a/src/Bunny/Client.php +++ b/src/Bunny/Client.php @@ -191,7 +191,7 @@ public function run($maxSeconds = null) $now = microtime(true); $nextStreamSelectTimeout = ($this->lastWrite ?: $now) + $this->options["heartbeat"]; if (!isset($nextHeartbeat)) { - $nextHeartbeat = $now + $this->options["heartbeat"]; + $nextHeartbeat = $nextStreamSelectTimeout;; } if ($stopTime !== null && $stopTime < $nextStreamSelectTimeout) { $nextStreamSelectTimeout = $stopTime;