diff --git a/apps/bugs_bunny/lib/bugs_bunny.ex b/apps/bugs_bunny/lib/bugs_bunny.ex index fdcdda2..224b5cd 100644 --- a/apps/bugs_bunny/lib/bugs_bunny.ex +++ b/apps/bugs_bunny/lib/bugs_bunny.ex @@ -1,4 +1,6 @@ defmodule BugsBunny do + require Logger + alias BugsBunny.Worker.RabbitConnection, as: Conn @type f :: ({:ok, AMQP.Channel.t()} | {:error, :disconected | :out_of_channels} -> any()) @@ -70,4 +72,35 @@ defmodule BugsBunny do fun.(error) end end + + @spec create_queue_with_bind( + module(), + AMQP.Basic.queue(), + AMQP.Basic.exchange(), + type :: atom(), + keyword() + ) :: :ok | AMQP.Basic.error() | {:error, any()} + def create_queue_with_bind(adapter, pool_id, queue, exchange, type \\ :direct, options \\ []) do + queue_options = Keyword.get(options, :queue_options, []) + exchange_options = Keyword.get(options, :exchange_options, []) + bind_options = Keyword.get(options, :bind_options, []) + conn_worker = get_connection_worker(pool_id) + + do_with_conn(conn_worker, fn + {:ok, channel} -> + with {:ok, _} <- adapter.declare_queue(channel, queue, queue_options), + :ok <- Logger.info("queue: #{queue} successfully declared"), + :ok <- adapter.declare_exchange(channel, exchange, type, exchange_options), + :ok <- Logger.info("exchange #{exchange} successfully declared"), + :ok <- adapter.queue_bind(channel, queue, exchange, bind_options), + :ok <- Logger.info("#{queue} successfully bound to #{exchange}") do + :ok + else + {:error, _} = error -> error + end + + {:error, _} = error -> + error + end) + end end diff --git a/apps/bugs_bunny/lib/bugs_bunny/clients/adapter.ex b/apps/bugs_bunny/lib/bugs_bunny/clients/adapter.ex index 214714f..4a2e29f 100644 --- a/apps/bugs_bunny/lib/bugs_bunny/clients/adapter.ex +++ b/apps/bugs_bunny/lib/bugs_bunny/clients/adapter.ex @@ -8,4 +8,10 @@ defmodule BugsBunny.Clients.Adapter do {:ok, String.t()} | AMQP.Basic.error() @callback ack(AMQP.Channel.t(), String.t(), keyword()) :: :ok | AMQP.Basic.error() @callback reject(AMQP.Channel.t(), String.t(), keyword()) :: :ok | AMQP.Basic.error() + @callback declare_queue(AMQP.Channel.t(), AMQP.Basic.queue(), keyword()) :: + {:ok, map()} | AMQP.Basic.error() + @callback queue_bind(AMQP.Channel.t(), AMQP.Basic.queue(), AMQP.Basic.exchange(), keyword()) :: + :ok | AMQP.Basic.error() + @callback declare_exchange(AMQP.Channel.t(), AMQP.Basic.exchange(), type :: atom, keyword()) :: + :ok | AMQP.Basic.error() end diff --git a/apps/bugs_bunny/lib/bugs_bunny/clients/fake_rabbitmq.ex b/apps/bugs_bunny/lib/bugs_bunny/clients/fake_rabbitmq.ex index 8bf7e4e..c3292bc 100644 --- a/apps/bugs_bunny/lib/bugs_bunny/clients/fake_rabbitmq.ex +++ b/apps/bugs_bunny/lib/bugs_bunny/clients/fake_rabbitmq.ex @@ -49,4 +49,19 @@ defmodule BugsBunny.FakeRabbitMQ do # Connection.close(conn) :ok end + + @impl true + def declare_queue(_channel, _queue, _options \\ []) do + {:ok, %{}} + end + + @impl true + def declare_exchange(_channel, _exchange, _type \\ :direct, _options \\ []) do + :ok + end + + @impl true + def queue_bind(_channel, _queue, _exchange, _options \\ []) do + :ok + end end diff --git a/apps/bugs_bunny/lib/bugs_bunny/clients/rabbitmq.ex b/apps/bugs_bunny/lib/bugs_bunny/clients/rabbitmq.ex index 7b7d930..1678faf 100644 --- a/apps/bugs_bunny/lib/bugs_bunny/clients/rabbitmq.ex +++ b/apps/bugs_bunny/lib/bugs_bunny/clients/rabbitmq.ex @@ -36,4 +36,27 @@ defmodule BugsBunny.RabbitMQ do def close_connection(conn) do Connection.close(conn) end + + @impl true + def declare_queue(channel, queue \\ "", options \\ []) do + Queue.declare(channel, queue, options) + end + + @impl true + def declare_exchange(channel, exchange, type \\ :direct, options \\ []) + + def declare_exchange(_channel, "", _type, _options), do: :ok + + def declare_exchange(channel, exchange, type, options) do + Exchange.declare(channel, exchange, type, options) + end + + @impl true + def queue_bind(channel, queue, exchange, options \\ []) + + def queue_bind(_channel, _queue, "", _options), do: :ok + + def queue_bind(channel, queue, exchange, options) do + Queue.bind(channel, queue, exchange, options) + end end diff --git a/apps/bugs_bunny/test/integration/api_test.exs b/apps/bugs_bunny/test/integration/api_test.exs index 415bc68..2929fd6 100644 --- a/apps/bugs_bunny/test/integration/api_test.exs +++ b/apps/bugs_bunny/test/integration/api_test.exs @@ -135,4 +135,42 @@ defmodule BugsBunny.Integration.ApiTest do assert logs =~ "[Rabbit] channel lost, attempting to reconnect reason: :normal" end + + test "creates queue with exchange and bindings", %{pool_id: pool_id} do + assert :ok = + BugsBunny.create_queue_with_bind( + RabbitMQ, + pool_id, + "test_queue", + "test_exchange", + :direct, + queue_options: [auto_delete: true], + exchange_options: [auto_delete: true] + ) + + BugsBunny.with_channel(pool_id, fn {:ok, channel} -> + assert :ok = AMQP.Basic.publish(channel, "test_exchange", "", "Hello, World!") + assert {:ok, "Hello, World!", _meta} = AMQP.Basic.get(channel, "test_queue") + assert {:ok, _} = AMQP.Queue.delete(channel, "test_queue") + end) + end + + test "should not fail when binding and declaring default exchange", %{pool_id: pool_id} do + assert :ok = + BugsBunny.create_queue_with_bind( + RabbitMQ, + pool_id, + "test2_queue", + "", + :direct, + queue_options: [auto_delete: true], + exchange_options: [auto_delete: true] + ) + + BugsBunny.with_channel(pool_id, fn {:ok, channel} -> + assert :ok = AMQP.Basic.publish(channel, "", "test2_queue", "Hello, World!") + assert {:ok, "Hello, World!", _meta} = AMQP.Basic.get(channel, "test2_queue") + assert {:ok, _} = AMQP.Queue.delete(channel, "test2_queue") + end) + end end diff --git a/apps/repo_poller/lib/repo_poller/setup_queue_worker.ex b/apps/repo_poller/lib/repo_poller/setup_queue_worker.ex new file mode 100644 index 0000000..dcfde97 --- /dev/null +++ b/apps/repo_poller/lib/repo_poller/setup_queue_worker.ex @@ -0,0 +1,23 @@ +defmodule RepoPoller.SetupQueueWorker do + use GenServer, restart: :temporary + + alias RepoPoller.Config + + def start_link(_) do + GenServer.start_link(__MODULE__, :ok, []) + end + + def init(_) do + pool_id = Config.get_connection_pool_id() + client = Config.get_rabbitmq_client() + queue = Config.get_rabbitmq_queue() + exchange = Config.get_rabbitmq_exchange() + + BugsBunny.create_queue_with_bind(client, pool_id, queue, exchange, :direct, + queue_options: [durable: true], + exchange_options: [durable: true] + ) + + {:ok, :ignore} + end +end diff --git a/apps/repo_poller/lib/repo_poller/setup_supervisor.ex b/apps/repo_poller/lib/repo_poller/setup_supervisor.ex index 7c5619b..a5318de 100644 --- a/apps/repo_poller/lib/repo_poller/setup_supervisor.ex +++ b/apps/repo_poller/lib/repo_poller/setup_supervisor.ex @@ -1,7 +1,7 @@ defmodule RepoPoller.SetupSupervisor do use Supervisor - alias RepoPoller.{PollerSupervisor, SetupWorker} + alias RepoPoller.{PollerSupervisor, SetupWorker, SetupQueueWorker, Config} def start_link(args \\ []) do name = Keyword.get(args, :name, __MODULE__) @@ -14,6 +14,12 @@ defmodule RepoPoller.SetupSupervisor do {SetupWorker, []} ] + children = + case Config.get_rabbitmq_config() do + [] -> children + _ -> [{SetupQueueWorker, []} | children] + end + opts = [strategy: :rest_for_one] Supervisor.init(children, opts) end diff --git a/apps/repo_poller/test/integration/setup_queue_worker_test.exs b/apps/repo_poller/test/integration/setup_queue_worker_test.exs new file mode 100644 index 0000000..3b1ae60 --- /dev/null +++ b/apps/repo_poller/test/integration/setup_queue_worker_test.exs @@ -0,0 +1,55 @@ +defmodule RepoPoller.SetupQueueWorkerTest do + use ExUnit.Case, async: true + + alias RepoPoller.SetupQueueWorker + + @moduletag :integration + @queue "test.queue" + + setup do + caller = self() + + rabbitmq_config = [ + channels: 1, + port: String.to_integer(System.get_env("POLLER_RMQ_PORT") || "5672"), + queue: @queue, + exchange: "my_exchange", + caller: caller + ] + + rabbitmq_conn_pool = [ + :rabbitmq_conn_pool, + pool_id: :setup_queue_pool, + name: {:local, :setup_queue_pool}, + worker_module: BugsBunny.Worker.RabbitConnection, + size: 1, + max_overflow: 0 + ] + + Application.put_env(:repo_poller, :rabbitmq_config, rabbitmq_config) + Application.put_env(:repo_poller, :rabbitmq_conn_pool, rabbitmq_conn_pool) + Application.put_env(:repo_poller, :database, Domain.Service.MockDatabase) + + start_supervised!(%{ + id: BugsBunny.PoolSupervisorTest, + start: + {BugsBunny.PoolSupervisor, :start_link, + [ + [rabbitmq_config: rabbitmq_config, rabbitmq_conn_pool: rabbitmq_conn_pool], + BugsBunny.PoolSupervisorTest + ]}, + type: :supervisor + }) + + {:ok, pool_id: :setup_queue_pool} + end + + test "declare queue on startup", %{pool_id: pool_id} do + _worker_pid = start_supervised!(SetupQueueWorker) + BugsBunny.with_channel(pool_id, fn {:ok, channel} -> + assert :ok = AMQP.Basic.publish(channel, "my_exchange", "", "Hello, World!") + assert {:ok, "Hello, World!", _meta} = AMQP.Basic.get(channel, @queue) + assert {:ok, _} = AMQP.Queue.delete(channel, @queue) + end) + end +end