From e257dd3f0c72567090e7a6a87a740aad0b49f4c1 Mon Sep 17 00:00:00 2001 From: "Nathan.ZH" <449063862@qq.com> Date: Thu, 12 Aug 2021 10:16:00 +0800 Subject: [PATCH 1/9] =?UTF-8?q?Support=20requeue=20the=20message=20when=20?= =?UTF-8?q?return=20`NACK`=20for=20`AMQP`=20consumer.=E3=80=82=20(#3932)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Update CHANGELOG-2.2.md Co-authored-by: 李铭昕 <715557344@qq.com> --- src/Consumer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Consumer.php b/src/Consumer.php index 7684555..fc9c85b 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -204,7 +204,7 @@ protected function getCallback(ConsumerMessageInterface $consumerMessage, AMQPMe } if ($result === Result::NACK) { $this->logger->debug($deliveryTag . ' uacked.'); - return $channel->basic_nack($deliveryTag); + return $channel->basic_nack($deliveryTag, false, $consumerMessage->isRequeue()); } if ($consumerMessage->isRequeue() && $result === Result::REQUEUE) { $this->logger->debug($deliveryTag . ' requeued.'); From 8f5f4348e89fab415f84bd9b6fd3039414a6eff5 Mon Sep 17 00:00:00 2001 From: Hyman Date: Sat, 28 Aug 2021 17:36:30 +0800 Subject: [PATCH 2/9] Support delayed message exchange for AMQP. (#3987) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 李铭昕 <715557344@qq.com> --- src/Message/ConsumerDelayedMessageTrait.php | 30 ++++++++++++++ src/Message/ProducerDelayedMessageTrait.php | 43 +++++++++++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 src/Message/ConsumerDelayedMessageTrait.php create mode 100644 src/Message/ProducerDelayedMessageTrait.php diff --git a/src/Message/ConsumerDelayedMessageTrait.php b/src/Message/ConsumerDelayedMessageTrait.php new file mode 100644 index 0000000..bb81857 --- /dev/null +++ b/src/Message/ConsumerDelayedMessageTrait.php @@ -0,0 +1,30 @@ +setQueue((string) $this->getQueue()) + ->setArguments(new AMQPTable(['x-dead-letter-exchange' => 'delayed'])); + } +} diff --git a/src/Message/ProducerDelayedMessageTrait.php b/src/Message/ProducerDelayedMessageTrait.php new file mode 100644 index 0000000..10cf8b6 --- /dev/null +++ b/src/Message/ProducerDelayedMessageTrait.php @@ -0,0 +1,43 @@ +properties['application_headers'] = new AMQPTable([$name => $millisecond]); + return $this; + } + + /** + * Overwrite. + */ + public function getExchangeBuilder(): ExchangeBuilder + { + return (new ExchangeBuilder())->setExchange((string) $this->getExchange()) + ->setType('x-delayed-message') + ->setArguments(new AMQPTable(['x-delayed-type' => $this->getType()])); + } +} From 1478c3fbc491e64c2353ada82d8d596ec970eb83 Mon Sep 17 00:00:00 2001 From: "Nathan.ZH" <449063862@qq.com> Date: Fri, 10 Sep 2021 12:12:02 +0800 Subject: [PATCH 3/9] Added method `ConsumerDelayedMessageTrait::getDeadLetterExchange()`. (#4040) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Added method `ConsumerDelayedMessageTrait::getDeadLetterExchange()` which used to rewrite `x-dead-letter-exchange` by yourself. Co-authored-by: 张玉波 Co-authored-by: 李铭昕 <715557344@qq.com> --- src/Message/ConsumerDelayedMessageTrait.php | 7 +++++- tests/ConsumerTest.php | 11 +++++++++ tests/Stub/Delay2Consumer.php | 20 +++++++++++++++++ tests/Stub/DelayConsumer.php | 25 +++++++++++++++++++++ 4 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 tests/Stub/Delay2Consumer.php create mode 100644 tests/Stub/DelayConsumer.php diff --git a/src/Message/ConsumerDelayedMessageTrait.php b/src/Message/ConsumerDelayedMessageTrait.php index bb81857..9827321 100644 --- a/src/Message/ConsumerDelayedMessageTrait.php +++ b/src/Message/ConsumerDelayedMessageTrait.php @@ -25,6 +25,11 @@ trait ConsumerDelayedMessageTrait public function getQueueBuilder(): QueueBuilder { return (new QueueBuilder())->setQueue((string) $this->getQueue()) - ->setArguments(new AMQPTable(['x-dead-letter-exchange' => 'delayed'])); + ->setArguments(new AMQPTable(['x-dead-letter-exchange' => $this->getDeadLetterExchange()])); + } + + protected function getDeadLetterExchange(): string + { + return 'delayed'; } } diff --git a/tests/ConsumerTest.php b/tests/ConsumerTest.php index d6af64a..2eac0c6 100644 --- a/tests/ConsumerTest.php +++ b/tests/ConsumerTest.php @@ -18,6 +18,8 @@ use Hyperf\Utils\Reflection\ClassInvoker; use HyperfTest\Amqp\Stub\AMQPConnectionStub; use HyperfTest\Amqp\Stub\ContainerStub; +use HyperfTest\Amqp\Stub\Delay2Consumer; +use HyperfTest\Amqp\Stub\DelayConsumer; use Mockery; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; @@ -56,4 +58,13 @@ public function testWaitChannel() $chan->close(); $invoker->wait_channel(1); } + + public function testRewriteDelayMessage() + { + $consumer = new DelayConsumer(); + $this->assertSame('x-delayed', (new ClassInvoker($consumer))->getDeadLetterExchange()); + + $consumer = new Delay2Consumer(); + $this->assertSame('delayed', (new ClassInvoker($consumer))->getDeadLetterExchange()); + } } diff --git a/tests/Stub/Delay2Consumer.php b/tests/Stub/Delay2Consumer.php new file mode 100644 index 0000000..186ea21 --- /dev/null +++ b/tests/Stub/Delay2Consumer.php @@ -0,0 +1,20 @@ + Date: Mon, 18 Oct 2021 09:16:14 +0800 Subject: [PATCH 4/9] Release v2.2.12 (#4142) --- src/IO/SwooleIO.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/IO/SwooleIO.php b/src/IO/SwooleIO.php index 21c0dd6..cf0ceaf 100644 --- a/src/IO/SwooleIO.php +++ b/src/IO/SwooleIO.php @@ -77,7 +77,7 @@ public function connect() public function read($len) { - do { + while (true) { if ($len <= strlen($this->buffer)) { $data = substr($this->buffer, 0, $len); $this->buffer = substr($this->buffer, $len); @@ -96,7 +96,7 @@ public function read($len) } $this->buffer .= $buffer; - } while (true); + } } public function write($data) From 94fb0c46667dc134c2682498f30499e32d319630 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=93=AD=E6=98=95?= <715557344@qq.com> Date: Thu, 28 Oct 2021 17:13:49 +0800 Subject: [PATCH 5/9] Added versions (v1.0, v2.0, v3.0) support for `psr/log`. --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 1f71108..41847c9 100644 --- a/composer.json +++ b/composer.json @@ -24,7 +24,7 @@ "php-amqplib/php-amqplib": "^3.0", "psr/container": "^1.0|^2.0", "psr/event-dispatcher": "^1.0", - "psr/log": "^1.0" + "psr/log": "^1.0|^2.0|^3.0" }, "suggest": { "hyperf/di": "Required to use annotations.", From ec3a68b46410a5a18f0837e26a4bec114ebec6b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=93=AD=E6=98=95?= Date: Mon, 13 Dec 2021 09:53:18 +0800 Subject: [PATCH 6/9] Fixed fatal error for declaration when using amqplib 3.1.1 (#4346) --- src/IO/SwooleIO.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/IO/SwooleIO.php b/src/IO/SwooleIO.php index cf0ceaf..acb7b0b 100644 --- a/src/IO/SwooleIO.php +++ b/src/IO/SwooleIO.php @@ -117,7 +117,7 @@ public function close() $this->sock && $this->sock->close(); } - public function select($sec, $usec) + public function select($sec, $usec = 0) { return 1; } From 015e129ea7849451af12c497c855ff02bc0fa39a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=93=AD=E6=98=95?= Date: Mon, 13 Dec 2021 11:14:24 +0800 Subject: [PATCH 7/9] Fixed bug that amqp io has been bound to more than one coroutine when out of buffer. (#4347) --- src/AMQPConnection.php | 48 +++++++++++++++++-- src/Exception/LoopBrokenException.php | 16 +++++++ src/Exception/SendChannelClosedException.php | 16 +++++++ src/Exception/SendChannelTimeoutException.php | 16 +++++++ 4 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 src/Exception/LoopBrokenException.php create mode 100644 src/Exception/SendChannelClosedException.php create mode 100644 src/Exception/SendChannelTimeoutException.php diff --git a/src/AMQPConnection.php b/src/AMQPConnection.php index d6a420a..6e4de53 100644 --- a/src/AMQPConnection.php +++ b/src/AMQPConnection.php @@ -11,6 +11,9 @@ */ namespace Hyperf\Amqp; +use Hyperf\Amqp\Exception\LoopBrokenException; +use Hyperf\Amqp\Exception\SendChannelClosedException; +use Hyperf\Amqp\Exception\SendChannelTimeoutException; use Hyperf\Engine\Channel; use Hyperf\Utils\Channel\ChannelManager; use Hyperf\Utils\Coordinator\Constants; @@ -76,6 +79,8 @@ class AMQPConnection extends AbstractConnection */ protected $exited = false; + protected Channel $chan; + public function __construct( string $user, string $password, @@ -91,6 +96,7 @@ public function __construct( ) { $this->channelManager = new ChannelManager(16); $this->channelManager->get(0, true); + $this->chan = $this->channelManager->make(65535); parent::__construct($user, $password, $vhost, $insist, $login_method, $login_response, $locale, $io, $heartbeat, $connection_timeout, $channel_rpc_timeout); @@ -102,7 +108,13 @@ public function write($data) { $this->loop(); - parent::write($data); + $this->chan->push($data, 5); + if ($this->chan->isClosing()) { + throw new SendChannelClosedException('Writing data failed, because send channel closed.'); + } + if ($this->chan->isTimeout()) { + throw new SendChannelTimeoutException('Writing data failed, because send channel timeout.'); + } } /** @@ -192,6 +204,8 @@ public function close($reply_code = 0, $reply_text = '', $method_sig = [0, 0]) try { $res = parent::close($reply_code, $reply_text, $method_sig); } finally { + $this->setIsConnected(false); + $this->chan->close(); $this->channelManager->flush(); } return $res; @@ -219,6 +233,34 @@ protected function loop(): void $this->loop = true; + Coroutine::create(function () { + try { + while (true) { + $data = $this->chan->pop(-1); + if ($this->chan->isClosing()) { + throw new SendChannelClosedException('Write failed, because send channel closed.'); + } + + if ($data === false || $data === '') { + throw new LoopBrokenException('Push channel broken or write empty string for connection.'); + } + + parent::write($data); + } + } catch (\Throwable $exception) { + $level = $this->exited ? 'warning' : 'error'; + $this->logger && $this->logger->log($level, 'Send loop broken. The reason is ' . (string) $exception); + } finally { + $this->loop = false; + if (! $this->exited) { + // When loop broken, AMQPConnection will not be able to communicate with AMQP server. + // So flush all recv channels to ensure closing AMQP connections quickly. + $this->channelManager->flush(); + $this->close(); + } + } + }); + Coroutine::create(function () { try { while (true) { @@ -275,13 +317,13 @@ protected function heartbeat(): void try { // PING - if ($this->isConnected()) { + if ($this->isConnected() && $this->chan->isEmpty()) { $pkt = new AMQPWriter(); $pkt->write_octet(8); $pkt->write_short(0); $pkt->write_long(0); $pkt->write_octet(0xCE); - $this->getIO()->write($pkt->getvalue()); + $this->chan->push($pkt->getvalue(), 0.001); } } catch (\Throwable $exception) { $this->logger && $this->logger->error((string) $exception); diff --git a/src/Exception/LoopBrokenException.php b/src/Exception/LoopBrokenException.php new file mode 100644 index 0000000..454ef2b --- /dev/null +++ b/src/Exception/LoopBrokenException.php @@ -0,0 +1,16 @@ + Date: Wed, 15 Dec 2021 16:18:55 +0800 Subject: [PATCH 8/9] Use `Swoole\Coroutine\Socket` in `Hyperf\Amqp\IO\SwooleIO`. (#4360) * No longer uses `Swoole\Coroutine\Client`, but uses `Swoole\Coroutine\Socket`, which is more stable and has better performance in `Hyperf\Amqp\IO\SwooleIO`. --- src/ConnectionFactory.php | 3 ++- src/IO/SwooleIO.php | 50 +++++++++++++++------------------------ 2 files changed, 21 insertions(+), 32 deletions(-) diff --git a/src/ConnectionFactory.php b/src/ConnectionFactory.php index c09c52b..8f72223 100644 --- a/src/ConnectionFactory.php +++ b/src/ConnectionFactory.php @@ -98,7 +98,8 @@ public function make(array $config): AMQPConnection $io = new SwooleIO( $host, $port, - $params->getConnectionTimeout() + $params->getConnectionTimeout(), + $params->getReadWriteTimeout(), ); $connection = new AMQPConnection( diff --git a/src/IO/SwooleIO.php b/src/IO/SwooleIO.php index acb7b0b..062baec 100644 --- a/src/IO/SwooleIO.php +++ b/src/IO/SwooleIO.php @@ -15,8 +15,7 @@ use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Wire\AMQPWriter; use PhpAmqpLib\Wire\IO\AbstractIO; -use Swoole\Coroutine\Client; -use const SWOOLE_SOCK_TCP; +use Swoole\Coroutine\Socket; class SwooleIO extends AbstractIO { @@ -40,17 +39,17 @@ class SwooleIO extends AbstractIO /** * @var int */ - protected $heartbeat; + protected $readWriteTimeout; /** - * @var null|Client + * @var int */ - private $sock; + protected $heartbeat; /** - * @var string + * @var null|Socket */ - private $buffer = ''; + private $sock; /** * @throws \InvalidArgumentException when readWriteTimeout argument does not 2x the heartbeat @@ -58,11 +57,13 @@ class SwooleIO extends AbstractIO public function __construct( string $host, int $port, - int $connectionTimeout + int $connectionTimeout, + int $readWriteTimeout = 3 ) { $this->host = $host; $this->port = $port; $this->connectionTimeout = $connectionTimeout; + $this->readWriteTimeout = $readWriteTimeout; } /** @@ -77,34 +78,21 @@ public function connect() public function read($len) { - while (true) { - if ($len <= strlen($this->buffer)) { - $data = substr($this->buffer, 0, $len); - $this->buffer = substr($this->buffer, $len); - - return $data; - } - - if (! $this->sock->isConnected()) { - throw new AMQPConnectionClosedException('Broken pipe or closed connection. ' . $this->sock->errMsg); - } - - $buffer = $this->sock->recv(-1); - - if ($buffer === '') { - throw new AMQPConnectionClosedException('Connection is closed. The reason is ' . $this->sock->errMsg); - } - - $this->buffer .= $buffer; + $data = $this->sock->recvAll($len, $this->readWriteTimeout); + if ($data === false || strlen($data) !== $len) { + throw new AMQPConnectionClosedException('Read data failed, The reason is ' . $this->sock->errMsg); } + + return $data; } public function write($data) { - $buffer = $this->sock->send($data); + $len = $this->sock->sendAll($data, $this->readWriteTimeout); - if ($buffer === false) { - throw new AMQPConnectionClosedException('Error sending data'); + /* @phpstan-ignore-next-line */ + if ($data === false || strlen($data) !== $len) { + throw new AMQPConnectionClosedException('Send data failed, The reason is ' . $this->sock->errMsg); } } @@ -134,7 +122,7 @@ public function reenableHeartbeat() protected function makeClient() { - $sock = new Client(SWOOLE_SOCK_TCP); + $sock = new Socket(AF_INET, SOCK_STREAM, 0); if (! $sock->connect($this->host, $this->port, $this->connectionTimeout)) { throw new AMQPRuntimeException( sprintf('Error Connecting to server: %s ', $sock->errMsg), From 8669c57a6ea4f4debff84edcedb4e4bba8d830b1 Mon Sep 17 00:00:00 2001 From: Nick Date: Mon, 20 Dec 2021 15:11:22 +0800 Subject: [PATCH 9/9] Fixed bug that `AMQPConnection` does not adapt `php7.3`. --- src/AMQPConnection.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/AMQPConnection.php b/src/AMQPConnection.php index 6e4de53..cb47c98 100644 --- a/src/AMQPConnection.php +++ b/src/AMQPConnection.php @@ -79,7 +79,10 @@ class AMQPConnection extends AbstractConnection */ protected $exited = false; - protected Channel $chan; + /** + * @var Channel + */ + protected $chan; public function __construct( string $user,