Skip to content

Commit

Permalink
Added ChannelManager
Browse files Browse the repository at this point in the history
  • Loading branch information
limingxinleo committed Jun 2, 2021
1 parent 5df5a46 commit 564a8ee
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 1 deletion.
1 change: 0 additions & 1 deletion src/AMQPConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
namespace Hyperf\Amqp;

use Hyperf\Engine\Channel;
use Hyperf\Utils\Channel\ChannelManager;
use Hyperf\Utils\Coordinator\Constants;
use Hyperf\Utils\Coordinator\CoordinatorManager;
use Hyperf\Utils\Coroutine;
Expand Down
72 changes: 72 additions & 0 deletions src/ChannelManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://hyperf.wiki
* @contact [email protected]
* @license https://github.com/hyperf/hyperf/blob/master/LICENSE
*/
namespace Hyperf\Amqp;

use Hyperf\Engine\Channel;

class ChannelManager
{
/**
* @var Channel[]
*/
protected $channels = [];

/**
* @var int
*/
protected $size = 1;

public function __construct(int $size = 1)
{
$this->size = $size;
}

public function get(int $id, bool $initialize = false): ?Channel
{
if (isset($this->channels[$id])) {
return $this->channels[$id];
}

if ($initialize) {
return $this->channels[$id] = $this->make($this->size);
}

return null;
}

public function make(int $limit): Channel
{
return new Channel($limit);
}

public function close(int $id): void
{
if ($channel = $this->channels[$id] ?? null) {
$channel->close();
}

unset($this->channels[$id]);
}

public function getChannels(): array
{
return $this->channels;
}

public function flush(): void
{
$channels = $this->getChannels();
foreach ($channels as $id => $channel) {
$this->close($id);
}
}
}

0 comments on commit 564a8ee

Please sign in to comment.