diff --git a/composer.json b/composer.json index bf5d131..96ddda7 100644 --- a/composer.json +++ b/composer.json @@ -24,7 +24,7 @@ "php-amqplib/php-amqplib": "^3.0", "psr/container": "^1.0|^2.0", "psr/event-dispatcher": "^1.0", - "psr/log": "^1.0" + "psr/log": "^1.0|^2.0|^3.0" }, "suggest": { "hyperf/di": "Required to use annotations.", diff --git a/src/AMQPConnection.php b/src/AMQPConnection.php index 2358fb8..cec45b7 100644 --- a/src/AMQPConnection.php +++ b/src/AMQPConnection.php @@ -11,6 +11,9 @@ */ namespace Hyperf\Amqp; +use Hyperf\Amqp\Exception\LoopBrokenException; +use Hyperf\Amqp\Exception\SendChannelClosedException; +use Hyperf\Amqp\Exception\SendChannelTimeoutException; use Hyperf\Engine\Channel; use Hyperf\Utils\Coordinator\Constants; use Hyperf\Utils\Coordinator\CoordinatorManager; @@ -75,6 +78,11 @@ class AMQPConnection extends AbstractConnection */ protected $exited = false; + /** + * @var Channel + */ + protected $chan; + public function __construct( string $user, string $password, @@ -90,6 +98,7 @@ public function __construct( ) { $this->channelManager = new ChannelManager(16); $this->channelManager->get(0, true); + $this->chan = $this->channelManager->make(65535); parent::__construct($user, $password, $vhost, $insist, $login_method, $login_response, $locale, $io, $heartbeat, $connection_timeout, $channel_rpc_timeout); @@ -101,7 +110,13 @@ public function write($data) { $this->loop(); - parent::write($data); + $this->chan->push($data, 5); + if ($this->chan->isClosing()) { + throw new SendChannelClosedException('Writing data failed, because send channel closed.'); + } + if ($this->chan->isTimeout()) { + throw new SendChannelTimeoutException('Writing data failed, because send channel timeout.'); + } } /** @@ -191,6 +206,8 @@ public function close($reply_code = 0, $reply_text = '', $method_sig = [0, 0]) try { $res = parent::close($reply_code, $reply_text, $method_sig); } finally { + $this->setIsConnected(false); + $this->chan->close(); $this->channelManager->flush(); } return $res; @@ -218,6 +235,34 @@ protected function loop(): void $this->loop = true; + Coroutine::create(function () { + try { + while (true) { + $data = $this->chan->pop(-1); + if ($this->chan->isClosing()) { + throw new SendChannelClosedException('Write failed, because send channel closed.'); + } + + if ($data === false || $data === '') { + throw new LoopBrokenException('Push channel broken or write empty string for connection.'); + } + + parent::write($data); + } + } catch (\Throwable $exception) { + $level = $this->exited ? 'warning' : 'error'; + $this->logger && $this->logger->log($level, 'Send loop broken. The reason is ' . (string) $exception); + } finally { + $this->loop = false; + if (! $this->exited) { + // When loop broken, AMQPConnection will not be able to communicate with AMQP server. + // So flush all recv channels to ensure closing AMQP connections quickly. + $this->channelManager->flush(); + $this->close(); + } + } + }); + Coroutine::create(function () { try { while (true) { @@ -274,13 +319,13 @@ protected function heartbeat(): void try { // PING - if ($this->isConnected()) { + if ($this->isConnected() && $this->chan->isEmpty()) { $pkt = new AMQPWriter(); $pkt->write_octet(8); $pkt->write_short(0); $pkt->write_long(0); $pkt->write_octet(0xCE); - $this->getIO()->write($pkt->getvalue()); + $this->chan->push($pkt->getvalue(), 0.001); } } catch (\Throwable $exception) { $this->logger && $this->logger->error((string) $exception); diff --git a/src/ConnectionFactory.php b/src/ConnectionFactory.php index c09c52b..8f72223 100644 --- a/src/ConnectionFactory.php +++ b/src/ConnectionFactory.php @@ -98,7 +98,8 @@ public function make(array $config): AMQPConnection $io = new SwooleIO( $host, $port, - $params->getConnectionTimeout() + $params->getConnectionTimeout(), + $params->getReadWriteTimeout(), ); $connection = new AMQPConnection( diff --git a/src/Consumer.php b/src/Consumer.php index 7684555..fc9c85b 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -204,7 +204,7 @@ protected function getCallback(ConsumerMessageInterface $consumerMessage, AMQPMe } if ($result === Result::NACK) { $this->logger->debug($deliveryTag . ' uacked.'); - return $channel->basic_nack($deliveryTag); + return $channel->basic_nack($deliveryTag, false, $consumerMessage->isRequeue()); } if ($consumerMessage->isRequeue() && $result === Result::REQUEUE) { $this->logger->debug($deliveryTag . ' requeued.'); diff --git a/src/Exception/LoopBrokenException.php b/src/Exception/LoopBrokenException.php new file mode 100644 index 0000000..454ef2b --- /dev/null +++ b/src/Exception/LoopBrokenException.php @@ -0,0 +1,16 @@ +host = $host; $this->port = $port; $this->connectionTimeout = $connectionTimeout; + $this->readWriteTimeout = $readWriteTimeout; } /** @@ -77,34 +78,21 @@ public function connect() public function read($len) { - do { - if ($len <= strlen($this->buffer)) { - $data = substr($this->buffer, 0, $len); - $this->buffer = substr($this->buffer, $len); - - return $data; - } - - if (! $this->sock->isConnected()) { - throw new AMQPConnectionClosedException('Broken pipe or closed connection. ' . $this->sock->errMsg); - } - - $buffer = $this->sock->recv(-1); - - if ($buffer === '') { - throw new AMQPConnectionClosedException('Connection is closed. The reason is ' . $this->sock->errMsg); - } + $data = $this->sock->recvAll($len, $this->readWriteTimeout); + if ($data === false || strlen($data) !== $len) { + throw new AMQPConnectionClosedException('Read data failed, The reason is ' . $this->sock->errMsg); + } - $this->buffer .= $buffer; - } while (true); + return $data; } public function write($data) { - $buffer = $this->sock->send($data); + $len = $this->sock->sendAll($data, $this->readWriteTimeout); - if ($buffer === false) { - throw new AMQPConnectionClosedException('Error sending data'); + /* @phpstan-ignore-next-line */ + if ($data === false || strlen($data) !== $len) { + throw new AMQPConnectionClosedException('Send data failed, The reason is ' . $this->sock->errMsg); } } @@ -117,7 +105,7 @@ public function close() $this->sock && $this->sock->close(); } - public function select($sec, $usec) + public function select($sec, $usec = 0) { return 1; } @@ -134,7 +122,7 @@ public function reenableHeartbeat() protected function makeClient() { - $sock = new Client(SWOOLE_SOCK_TCP); + $sock = new Socket(AF_INET, SOCK_STREAM, 0); if (! $sock->connect($this->host, $this->port, $this->connectionTimeout)) { throw new AMQPRuntimeException( sprintf('Error Connecting to server: %s ', $sock->errMsg), diff --git a/src/Message/ConsumerDelayedMessageTrait.php b/src/Message/ConsumerDelayedMessageTrait.php new file mode 100644 index 0000000..9827321 --- /dev/null +++ b/src/Message/ConsumerDelayedMessageTrait.php @@ -0,0 +1,35 @@ +setQueue((string) $this->getQueue()) + ->setArguments(new AMQPTable(['x-dead-letter-exchange' => $this->getDeadLetterExchange()])); + } + + protected function getDeadLetterExchange(): string + { + return 'delayed'; + } +} diff --git a/src/Message/ProducerDelayedMessageTrait.php b/src/Message/ProducerDelayedMessageTrait.php new file mode 100644 index 0000000..10cf8b6 --- /dev/null +++ b/src/Message/ProducerDelayedMessageTrait.php @@ -0,0 +1,43 @@ +properties['application_headers'] = new AMQPTable([$name => $millisecond]); + return $this; + } + + /** + * Overwrite. + */ + public function getExchangeBuilder(): ExchangeBuilder + { + return (new ExchangeBuilder())->setExchange((string) $this->getExchange()) + ->setType('x-delayed-message') + ->setArguments(new AMQPTable(['x-delayed-type' => $this->getType()])); + } +} diff --git a/tests/ConsumerTest.php b/tests/ConsumerTest.php index d6af64a..2eac0c6 100644 --- a/tests/ConsumerTest.php +++ b/tests/ConsumerTest.php @@ -18,6 +18,8 @@ use Hyperf\Utils\Reflection\ClassInvoker; use HyperfTest\Amqp\Stub\AMQPConnectionStub; use HyperfTest\Amqp\Stub\ContainerStub; +use HyperfTest\Amqp\Stub\Delay2Consumer; +use HyperfTest\Amqp\Stub\DelayConsumer; use Mockery; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; @@ -56,4 +58,13 @@ public function testWaitChannel() $chan->close(); $invoker->wait_channel(1); } + + public function testRewriteDelayMessage() + { + $consumer = new DelayConsumer(); + $this->assertSame('x-delayed', (new ClassInvoker($consumer))->getDeadLetterExchange()); + + $consumer = new Delay2Consumer(); + $this->assertSame('delayed', (new ClassInvoker($consumer))->getDeadLetterExchange()); + } } diff --git a/tests/Stub/Delay2Consumer.php b/tests/Stub/Delay2Consumer.php new file mode 100644 index 0000000..186ea21 --- /dev/null +++ b/tests/Stub/Delay2Consumer.php @@ -0,0 +1,20 @@ +