Skip to content

Commit

Permalink
Merge branch '2.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
limingxinleo committed Jun 2, 2021
2 parents bdf5396 + 16ba85e commit 5df5a46
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 282 deletions.
1 change: 1 addition & 0 deletions publish/amqp.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'heartbeat' => 3,
'channel_rpc_timeout' => 0.0,
'close_on_destruct' => false,
'max_idle_channels' => 10,
],
],
];
145 changes: 128 additions & 17 deletions src/AMQPConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
*/
namespace Hyperf\Amqp;

use Hyperf\Amqp\IO\SwooleIO;
use Hyperf\Engine\Channel;
use Hyperf\Utils\Channel\ChannelManager;
use Hyperf\Utils\Coordinator\Constants;
use Hyperf\Utils\Coordinator\CoordinatorManager;
use Hyperf\Utils\Coroutine;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\IO\AbstractIO;
use Psr\Log\LoggerInterface;

Expand All @@ -26,11 +30,6 @@ class AMQPConnection extends AbstractConnection

public const CONFIRM_CHANNEL_POOL_LENGTH = 10000;

/**
* @var bool
*/
public $isBroken = false;

/**
* @var Channel
*/
Expand All @@ -52,9 +51,25 @@ class AMQPConnection extends AbstractConnection
protected $lastChannelId = 0;

/**
* @param null $login_response @deprecated
* @param AbstractIO $io
* @var null|Params
*/
protected $params;

/**
* @var bool
*/
protected $loop = false;

/**
* @var bool
*/
protected $enableHeartbeat = false;

/**
* @var ChannelManager
*/
protected $channelManager;

public function __construct(
string $user,
string $password,
Expand All @@ -68,15 +83,20 @@ public function __construct(
int $connection_timeout = 0,
float $channel_rpc_timeout = 0.0
) {
$this->channelManager = new ChannelManager(16);
$this->channelManager->get(0, true);

parent::__construct($user, $password, $vhost, $insist, $login_method, $login_response, $locale, $io, $heartbeat, $connection_timeout, $channel_rpc_timeout);

$this->pool = new Channel(static::CHANNEL_POOL_LENGTH);
$this->confirmPool = new Channel(static::CONFIRM_CHANNEL_POOL_LENGTH);
Coroutine::create(function () {
if ($this->io instanceof SwooleIO) {
$this->isBroken = $this->io->isBroken();
}
});
}

public function write($data)
{
$this->loop();

parent::write($data);
}

/**
Expand All @@ -85,9 +105,16 @@ public function __construct(
public function setLogger(?LoggerInterface $logger)
{
$this->logger = $logger;
if ($this->io instanceof SwooleIO) {
$this->io->setLogger($logger);
}

return $this;
}

/**
* @return static
*/
public function setParams(Params $params)
{
$this->params = $params;
return $this;
}

Expand All @@ -110,11 +137,19 @@ public function getChannel(): AMQPChannel
return $this->channel($id);
}

public function channel($channel_id = null)
{
$this->channelManager->close($channel_id);
$this->channelManager->get($channel_id, true);

return parent::channel($channel_id); // TODO: Change the autogenerated stub
}

public function getConfirmChannel(): AMQPChannel
{
$id = 0;
$confirm = false;
if (! $this->pool->isEmpty()) {
if (! $this->confirmPool->isEmpty()) {
$id = (int) $this->confirmPool->pop(0.001);
}

Expand All @@ -131,6 +166,14 @@ public function getConfirmChannel(): AMQPChannel

public function releaseChannel(AMQPChannel $channel, bool $confirm = false): void
{
if ($this->params) {
$length = $confirm ? $this->confirmPool->getLength() : $this->pool->getLength();
if ($length > $this->params->getMaxIdleChannels()) {
$channel->close();
return;
}
}

if ($confirm) {
$this->confirmPool->push($channel->getChannelId());
} else {
Expand All @@ -149,4 +192,72 @@ protected function makeChannelId(): int

throw new AMQPRuntimeException('No free channel ids');
}

protected function loop(): void
{
$this->heartbeat();

if ($this->loop) {
return;
}

$this->loop = true;

Coroutine::create(function () {
try {
while (true) {
[$frame_type, $channel, $payload] = $this->wait_frame(0);
$this->channelManager->get($channel)->push([$frame_type, $payload], 0.001);
}
} catch (\Throwable $exception) {
$this->logger && $this->logger->error('Recv loop broken. The reason is ' . (string) $exception);
} finally {
$this->loop = false;
$this->close();
}
});
}

protected function wait_channel($channel_id, $timeout = 0)
{
$chan = $this->channelManager->get($channel_id);

$data = $chan->pop($timeout);
if ($data === false) {
if ($chan->isTimeout()) {
throw new AMQPTimeoutException('Timeout waiting on channel');
}
}

return $data;
}

protected function heartbeat(): void
{
if (! $this->enableHeartbeat && $this->getHeartbeat() > 0) {
$this->enableHeartbeat = true;

Coroutine::create(function () {
while (true) {
if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield($this->getHeartbeat())) {
break;
}

try {
// PING
if ($this->isConnected()) {
$pkt = new AMQPWriter();
$pkt->write_octet(8);
$pkt->write_short(0);
$pkt->write_long(0);
$pkt->write_octet(0xCE);
$this->getIO()->write($pkt->getvalue());
}
} catch (\Throwable $exception) {
$this->logger && $this->logger->error((string) $exception);
}
}
});
}
}
}
33 changes: 19 additions & 14 deletions src/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ public function refresh(string $pool)
$count = $config['pool']['connections'] ?? 1;

if (Locker::lock(static::class)) {
for ($i = 0; $i < $count; ++$i) {
$connection = $this->make($config);
$this->connections[$pool][] = $connection;
try {
for ($i = 0; $i < $count; ++$i) {
$connection = $this->make($config);
$this->connections[$pool][] = $connection;
}
} finally {
Locker::unlock(static::class);
}
Locker::unlock(static::class);
}
}

Expand All @@ -61,13 +64,16 @@ public function getConnection(string $pool): AMQPConnection
if (! empty($this->connections[$pool])) {
$index = array_rand($this->connections[$pool]);
$connection = $this->connections[$pool][$index];
if ($connection->isBroken) {
if (! $connection->isConnected()) {
if (Locker::lock(static::class . 'getConnection')) {
unset($this->connections[$pool][$index]);
$connection->close();
$connection = $this->make($this->getConfig($pool));
$this->connections[$pool][] = $connection;
Locker::unlock(static::class . 'getConnection');
try {
unset($this->connections[$pool][$index]);
$connection->close();
$connection = $this->make($this->getConfig($pool));
$this->connections[$pool][] = $connection;
} finally {
Locker::unlock(static::class . 'getConnection');
}
} else {
return $this->getConnection($pool);
}
Expand All @@ -92,9 +98,7 @@ public function make(array $config): AMQPConnection
$io = new SwooleIO(
$host,
$port,
$params->getConnectionTimeout(),
$params->getReadWriteTimeout(),
$params->getHeartbeat()
$params->getConnectionTimeout()
);

$connection = new AMQPConnection(
Expand All @@ -111,7 +115,8 @@ public function make(array $config): AMQPConnection
$params->getChannelRpcTimeout()
);

return $connection->setLogger($this->container->get(StdoutLoggerInterface::class));
return $connection->setParams($params)
->setLogger($this->container->get(StdoutLoggerInterface::class));
}

protected function getConfig(string $pool): array
Expand Down
Loading

0 comments on commit 5df5a46

Please sign in to comment.