Skip to content

Commit

Permalink
Merge branch '2.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
limingxinleo committed May 31, 2021
2 parents 8f1aa71 + 428e901 commit e259e67
Showing 1 changed file with 29 additions and 7 deletions.
36 changes: 29 additions & 7 deletions src/IO/SwooleIO.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\IO\AbstractIO;
use Psr\Log\LoggerInterface;
use Psr\Log\LogLevel;
use Swoole\Coroutine\Client;
use const SWOOLE_SOCK_TCP;

Expand Down Expand Up @@ -70,7 +71,7 @@ class SwooleIO extends AbstractIO
protected $brokenChannel;

/**
* @var LoggerInterface
* @var null|LoggerInterface
*/
protected $logger;

Expand All @@ -84,8 +85,16 @@ class SwooleIO extends AbstractIO
*/
private $sock;

/**
* @var string
*/
private $buffer = '';

/**
* @var bool
*/
private $exited = false;

/**
* @throws \InvalidArgumentException when readWriteTimeout argument does not 2x the heartbeat
*/
Expand Down Expand Up @@ -124,6 +133,7 @@ public function heartbeat()
}

if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield($heartbeat)) {
$this->exited = true;
$this->close();
break;
}
Expand All @@ -138,7 +148,7 @@ public function heartbeat()
$this->write_heartbeat();
}
} catch (\Throwable $exception) {
$this->logger && $this->logger->error((string) $exception);
$this->log((string) $exception);
}
}
});
Expand Down Expand Up @@ -182,8 +192,8 @@ public function read($len)

/**
* @param string $data
* @throws AMQPRuntimeException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
* @throws AMQPRuntimeException
* @return mixed|void
*/
public function write($data)
Expand All @@ -203,7 +213,7 @@ public function check_heartbeat()

public function close()
{
$this->logger && $this->logger->error('Connection closed, wait to restart in next time.');
$this->log('Connection closed, wait to restart in next time.');
$this->pushChannel->close();
$this->sock->close();
}
Expand Down Expand Up @@ -271,7 +281,7 @@ public function loop(): void
$readChannel->close();
}
} finally {
$this->logger && $this->logger->error('Recv loop broken, wait to restart in next time. The reason is ' . $reason);
$this->log('Recv loop broken, wait to restart in next time. The reason is ' . $reason);
$this->brokenChannel->push(true);
$chan->close();
$client->close();
Expand Down Expand Up @@ -300,11 +310,11 @@ public function loop(): void

$res = $client->send($data);
if ($res === false) {
$this->logger && $this->logger->error('Send data failed. The reason is ' . $client->errMsg);
$this->log('Send data failed. The reason is ' . $client->errMsg);
}
}
} finally {
$this->logger && $this->logger->error('Send loop broken, wait to restart in next time. The reason is ' . $reason);
$this->log('Send loop broken, wait to restart in next time. The reason is ' . $reason);
$this->brokenChannel->push(true);
$chan->close();
$client->close();
Expand Down Expand Up @@ -360,4 +370,16 @@ protected function makeChannel(): Channel
{
return new Channel(65535);
}

protected function log(string $message)
{
if ($this->logger) {
$level = LogLevel::ERROR;
if ($this->exited) {
$level = LogLevel::WARNING;
}

$this->logger->log($level, $message);
}
}
}

0 comments on commit e259e67

Please sign in to comment.