Skip to content

Commit

Permalink
Use Swoole\Coroutine\Socket in Hyperf\Amqp\IO\SwooleIO. (#4360)
Browse files Browse the repository at this point in the history
* No longer uses `Swoole\Coroutine\Client`, but uses `Swoole\Coroutine\Socket`, which is more stable and has better performance in `Hyperf\Amqp\IO\SwooleIO`.
  • Loading branch information
limingxinleo authored Dec 15, 2021
1 parent 015e129 commit 3d6749e
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 32 deletions.
3 changes: 2 additions & 1 deletion src/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public function make(array $config): AMQPConnection
$io = new SwooleIO(
$host,
$port,
$params->getConnectionTimeout()
$params->getConnectionTimeout(),
$params->getReadWriteTimeout(),
);

$connection = new AMQPConnection(
Expand Down
50 changes: 19 additions & 31 deletions src/IO/SwooleIO.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\IO\AbstractIO;
use Swoole\Coroutine\Client;
use const SWOOLE_SOCK_TCP;
use Swoole\Coroutine\Socket;

class SwooleIO extends AbstractIO
{
Expand All @@ -40,29 +39,31 @@ class SwooleIO extends AbstractIO
/**
* @var int
*/
protected $heartbeat;
protected $readWriteTimeout;

/**
* @var null|Client
* @var int
*/
private $sock;
protected $heartbeat;

/**
* @var string
* @var null|Socket
*/
private $buffer = '';
private $sock;

/**
* @throws \InvalidArgumentException when readWriteTimeout argument does not 2x the heartbeat
*/
public function __construct(
string $host,
int $port,
int $connectionTimeout
int $connectionTimeout,
int $readWriteTimeout = 3
) {
$this->host = $host;
$this->port = $port;
$this->connectionTimeout = $connectionTimeout;
$this->readWriteTimeout = $readWriteTimeout;
}

/**
Expand All @@ -77,34 +78,21 @@ public function connect()

public function read($len)
{
while (true) {
if ($len <= strlen($this->buffer)) {
$data = substr($this->buffer, 0, $len);
$this->buffer = substr($this->buffer, $len);

return $data;
}

if (! $this->sock->isConnected()) {
throw new AMQPConnectionClosedException('Broken pipe or closed connection. ' . $this->sock->errMsg);
}

$buffer = $this->sock->recv(-1);

if ($buffer === '') {
throw new AMQPConnectionClosedException('Connection is closed. The reason is ' . $this->sock->errMsg);
}

$this->buffer .= $buffer;
$data = $this->sock->recvAll($len, $this->readWriteTimeout);
if ($data === false || strlen($data) !== $len) {
throw new AMQPConnectionClosedException('Read data failed, The reason is ' . $this->sock->errMsg);
}

return $data;
}

public function write($data)
{
$buffer = $this->sock->send($data);
$len = $this->sock->sendAll($data, $this->readWriteTimeout);

if ($buffer === false) {
throw new AMQPConnectionClosedException('Error sending data');
/* @phpstan-ignore-next-line */
if ($data === false || strlen($data) !== $len) {
throw new AMQPConnectionClosedException('Send data failed, The reason is ' . $this->sock->errMsg);
}
}

Expand Down Expand Up @@ -134,7 +122,7 @@ public function reenableHeartbeat()

protected function makeClient()
{
$sock = new Client(SWOOLE_SOCK_TCP);
$sock = new Socket(AF_INET, SOCK_STREAM, 0);
if (! $sock->connect($this->host, $this->port, $this->connectionTimeout)) {
throw new AMQPRuntimeException(
sprintf('Error Connecting to server: %s ', $sock->errMsg),
Expand Down

0 comments on commit 3d6749e

Please sign in to comment.