Skip to content

Commit

Permalink
Merge branch '2.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
limingxinleo committed Dec 23, 2021
2 parents 0e6a5df + 8669c57 commit 0353cf2
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 38 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"php-amqplib/php-amqplib": "^3.0",
"psr/container": "^1.0|^2.0",
"psr/event-dispatcher": "^1.0",
"psr/log": "^1.0"
"psr/log": "^1.0|^2.0|^3.0"
},
"suggest": {
"hyperf/di": "Required to use annotations.",
Expand Down
51 changes: 48 additions & 3 deletions src/AMQPConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
*/
namespace Hyperf\Amqp;

use Hyperf\Amqp\Exception\LoopBrokenException;
use Hyperf\Amqp\Exception\SendChannelClosedException;
use Hyperf\Amqp\Exception\SendChannelTimeoutException;
use Hyperf\Engine\Channel;
use Hyperf\Utils\Coordinator\Constants;
use Hyperf\Utils\Coordinator\CoordinatorManager;
Expand Down Expand Up @@ -75,6 +78,11 @@ class AMQPConnection extends AbstractConnection
*/
protected $exited = false;

/**
* @var Channel
*/
protected $chan;

public function __construct(
string $user,
string $password,
Expand All @@ -90,6 +98,7 @@ public function __construct(
) {
$this->channelManager = new ChannelManager(16);
$this->channelManager->get(0, true);
$this->chan = $this->channelManager->make(65535);

parent::__construct($user, $password, $vhost, $insist, $login_method, $login_response, $locale, $io, $heartbeat, $connection_timeout, $channel_rpc_timeout);

Expand All @@ -101,7 +110,13 @@ public function write($data)
{
$this->loop();

parent::write($data);
$this->chan->push($data, 5);
if ($this->chan->isClosing()) {
throw new SendChannelClosedException('Writing data failed, because send channel closed.');
}
if ($this->chan->isTimeout()) {
throw new SendChannelTimeoutException('Writing data failed, because send channel timeout.');
}
}

/**
Expand Down Expand Up @@ -191,6 +206,8 @@ public function close($reply_code = 0, $reply_text = '', $method_sig = [0, 0])
try {
$res = parent::close($reply_code, $reply_text, $method_sig);
} finally {
$this->setIsConnected(false);
$this->chan->close();
$this->channelManager->flush();
}
return $res;
Expand Down Expand Up @@ -218,6 +235,34 @@ protected function loop(): void

$this->loop = true;

Coroutine::create(function () {
try {
while (true) {
$data = $this->chan->pop(-1);
if ($this->chan->isClosing()) {
throw new SendChannelClosedException('Write failed, because send channel closed.');
}

if ($data === false || $data === '') {
throw new LoopBrokenException('Push channel broken or write empty string for connection.');
}

parent::write($data);
}
} catch (\Throwable $exception) {
$level = $this->exited ? 'warning' : 'error';
$this->logger && $this->logger->log($level, 'Send loop broken. The reason is ' . (string) $exception);
} finally {
$this->loop = false;
if (! $this->exited) {
// When loop broken, AMQPConnection will not be able to communicate with AMQP server.
// So flush all recv channels to ensure closing AMQP connections quickly.
$this->channelManager->flush();
$this->close();
}
}
});

Coroutine::create(function () {
try {
while (true) {
Expand Down Expand Up @@ -274,13 +319,13 @@ protected function heartbeat(): void

try {
// PING
if ($this->isConnected()) {
if ($this->isConnected() && $this->chan->isEmpty()) {
$pkt = new AMQPWriter();
$pkt->write_octet(8);
$pkt->write_short(0);
$pkt->write_long(0);
$pkt->write_octet(0xCE);
$this->getIO()->write($pkt->getvalue());
$this->chan->push($pkt->getvalue(), 0.001);
}
} catch (\Throwable $exception) {
$this->logger && $this->logger->error((string) $exception);
Expand Down
3 changes: 2 additions & 1 deletion src/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public function make(array $config): AMQPConnection
$io = new SwooleIO(
$host,
$port,
$params->getConnectionTimeout()
$params->getConnectionTimeout(),
$params->getReadWriteTimeout(),
);

$connection = new AMQPConnection(
Expand Down
2 changes: 1 addition & 1 deletion src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ protected function getCallback(ConsumerMessageInterface $consumerMessage, AMQPMe
}
if ($result === Result::NACK) {
$this->logger->debug($deliveryTag . ' uacked.');
return $channel->basic_nack($deliveryTag);
return $channel->basic_nack($deliveryTag, false, $consumerMessage->isRequeue());
}
if ($consumerMessage->isRequeue() && $result === Result::REQUEUE) {
$this->logger->debug($deliveryTag . ' requeued.');
Expand Down
16 changes: 16 additions & 0 deletions src/Exception/LoopBrokenException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Exception;

class LoopBrokenException extends \RuntimeException
{
}
16 changes: 16 additions & 0 deletions src/Exception/SendChannelClosedException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Exception;

class SendChannelClosedException extends \RuntimeException
{
}
16 changes: 16 additions & 0 deletions src/Exception/SendChannelTimeoutException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Exception;

class SendChannelTimeoutException extends \RuntimeException
{
}
52 changes: 20 additions & 32 deletions src/IO/SwooleIO.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\IO\AbstractIO;
use Swoole\Coroutine\Client;
use const SWOOLE_SOCK_TCP;
use Swoole\Coroutine\Socket;

class SwooleIO extends AbstractIO
{
Expand All @@ -40,29 +39,31 @@ class SwooleIO extends AbstractIO
/**
* @var int
*/
protected $heartbeat;
protected $readWriteTimeout;

/**
* @var null|Client
* @var int
*/
private $sock;
protected $heartbeat;

/**
* @var string
* @var null|Socket
*/
private $buffer = '';
private $sock;

/**
* @throws \InvalidArgumentException when readWriteTimeout argument does not 2x the heartbeat
*/
public function __construct(
string $host,
int $port,
int $connectionTimeout
int $connectionTimeout,
int $readWriteTimeout = 3
) {
$this->host = $host;
$this->port = $port;
$this->connectionTimeout = $connectionTimeout;
$this->readWriteTimeout = $readWriteTimeout;
}

/**
Expand All @@ -77,34 +78,21 @@ public function connect()

public function read($len)
{
do {
if ($len <= strlen($this->buffer)) {
$data = substr($this->buffer, 0, $len);
$this->buffer = substr($this->buffer, $len);

return $data;
}

if (! $this->sock->isConnected()) {
throw new AMQPConnectionClosedException('Broken pipe or closed connection. ' . $this->sock->errMsg);
}

$buffer = $this->sock->recv(-1);

if ($buffer === '') {
throw new AMQPConnectionClosedException('Connection is closed. The reason is ' . $this->sock->errMsg);
}
$data = $this->sock->recvAll($len, $this->readWriteTimeout);
if ($data === false || strlen($data) !== $len) {
throw new AMQPConnectionClosedException('Read data failed, The reason is ' . $this->sock->errMsg);
}

$this->buffer .= $buffer;
} while (true);
return $data;
}

public function write($data)
{
$buffer = $this->sock->send($data);
$len = $this->sock->sendAll($data, $this->readWriteTimeout);

if ($buffer === false) {
throw new AMQPConnectionClosedException('Error sending data');
/* @phpstan-ignore-next-line */
if ($data === false || strlen($data) !== $len) {
throw new AMQPConnectionClosedException('Send data failed, The reason is ' . $this->sock->errMsg);
}
}

Expand All @@ -117,7 +105,7 @@ public function close()
$this->sock && $this->sock->close();
}

public function select($sec, $usec)
public function select($sec, $usec = 0)
{
return 1;
}
Expand All @@ -134,7 +122,7 @@ public function reenableHeartbeat()

protected function makeClient()
{
$sock = new Client(SWOOLE_SOCK_TCP);
$sock = new Socket(AF_INET, SOCK_STREAM, 0);
if (! $sock->connect($this->host, $this->port, $this->connectionTimeout)) {
throw new AMQPRuntimeException(
sprintf('Error Connecting to server: %s ', $sock->errMsg),
Expand Down
35 changes: 35 additions & 0 deletions src/Message/ConsumerDelayedMessageTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Message;

use Hyperf\Amqp\Builder\QueueBuilder;
use PhpAmqpLib\Wire\AMQPTable;

/**
* @method ConsumerMessage getQueue()
*/
trait ConsumerDelayedMessageTrait
{
/**
* Overwrite.
*/
public function getQueueBuilder(): QueueBuilder
{
return (new QueueBuilder())->setQueue((string) $this->getQueue())
->setArguments(new AMQPTable(['x-dead-letter-exchange' => $this->getDeadLetterExchange()]));
}

protected function getDeadLetterExchange(): string
{
return 'delayed';
}
}
43 changes: 43 additions & 0 deletions src/Message/ProducerDelayedMessageTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp\Message;

use Hyperf\Amqp\Builder\ExchangeBuilder;
use PhpAmqpLib\Wire\AMQPTable;

/**
* @method ProducerMessage getExchange()
* @method ProducerMessage getType()
* @property ProducerMessage $properties
*/
trait ProducerDelayedMessageTrait
{
/**
* Set the delay time.
* @return $this
*/
public function setDelayMs(int $millisecond, string $name = 'x-delay'): self
{
$this->properties['application_headers'] = new AMQPTable([$name => $millisecond]);
return $this;
}

/**
* Overwrite.
*/
public function getExchangeBuilder(): ExchangeBuilder
{
return (new ExchangeBuilder())->setExchange((string) $this->getExchange())
->setType('x-delayed-message')
->setArguments(new AMQPTable(['x-delayed-type' => $this->getType()]));
}
}
11 changes: 11 additions & 0 deletions tests/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use Hyperf\Utils\Reflection\ClassInvoker;
use HyperfTest\Amqp\Stub\AMQPConnectionStub;
use HyperfTest\Amqp\Stub\ContainerStub;
use HyperfTest\Amqp\Stub\Delay2Consumer;
use HyperfTest\Amqp\Stub\DelayConsumer;
use Mockery;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
Expand Down Expand Up @@ -56,4 +58,13 @@ public function testWaitChannel()
$chan->close();
$invoker->wait_channel(1);
}

public function testRewriteDelayMessage()
{
$consumer = new DelayConsumer();
$this->assertSame('x-delayed', (new ClassInvoker($consumer))->getDeadLetterExchange());

$consumer = new Delay2Consumer();
$this->assertSame('delayed', (new ClassInvoker($consumer))->getDeadLetterExchange());
}
}
Loading

0 comments on commit 0353cf2

Please sign in to comment.