diff --git a/src/AMQPConnection.php b/src/AMQPConnection.php index c45795e..7325650 100644 --- a/src/AMQPConnection.php +++ b/src/AMQPConnection.php @@ -12,7 +12,6 @@ namespace Hyperf\Amqp; use Hyperf\Engine\Channel; -use Hyperf\Utils\Channel\ChannelManager; use Hyperf\Utils\Coordinator\Constants; use Hyperf\Utils\Coordinator\CoordinatorManager; use Hyperf\Utils\Coroutine; diff --git a/src/ChannelManager.php b/src/ChannelManager.php new file mode 100644 index 0000000..b96cce4 --- /dev/null +++ b/src/ChannelManager.php @@ -0,0 +1,72 @@ +size = $size; + } + + public function get(int $id, bool $initialize = false): ?Channel + { + if (isset($this->channels[$id])) { + return $this->channels[$id]; + } + + if ($initialize) { + return $this->channels[$id] = $this->make($this->size); + } + + return null; + } + + public function make(int $limit): Channel + { + return new Channel($limit); + } + + public function close(int $id): void + { + if ($channel = $this->channels[$id] ?? null) { + $channel->close(); + } + + unset($this->channels[$id]); + } + + public function getChannels(): array + { + return $this->channels; + } + + public function flush(): void + { + $channels = $this->getChannels(); + foreach ($channels as $id => $channel) { + $this->close($id); + } + } +}