Skip to content

Commit

Permalink
Merge pull request #21 from esl/declare_queues
Browse files Browse the repository at this point in the history
[add] declare queues on startup
  • Loading branch information
filipevarjao authored Dec 27, 2018
2 parents 015433c + 7a784b1 commit 705752f
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 1 deletion.
33 changes: 33 additions & 0 deletions apps/bugs_bunny/lib/bugs_bunny.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule BugsBunny do
require Logger

alias BugsBunny.Worker.RabbitConnection, as: Conn

@type f :: ({:ok, AMQP.Channel.t()} | {:error, :disconected | :out_of_channels} -> any())
Expand Down Expand Up @@ -70,4 +72,35 @@ defmodule BugsBunny do
fun.(error)
end
end

@spec create_queue_with_bind(
module(),
AMQP.Basic.queue(),
AMQP.Basic.exchange(),
type :: atom(),
keyword()
) :: :ok | AMQP.Basic.error() | {:error, any()}
def create_queue_with_bind(adapter, pool_id, queue, exchange, type \\ :direct, options \\ []) do
queue_options = Keyword.get(options, :queue_options, [])
exchange_options = Keyword.get(options, :exchange_options, [])
bind_options = Keyword.get(options, :bind_options, [])
conn_worker = get_connection_worker(pool_id)

do_with_conn(conn_worker, fn
{:ok, channel} ->
with {:ok, _} <- adapter.declare_queue(channel, queue, queue_options),
:ok <- Logger.info("queue: #{queue} successfully declared"),
:ok <- adapter.declare_exchange(channel, exchange, type, exchange_options),
:ok <- Logger.info("exchange #{exchange} successfully declared"),
:ok <- adapter.queue_bind(channel, queue, exchange, bind_options),
:ok <- Logger.info("#{queue} successfully bound to #{exchange}") do
:ok
else
{:error, _} = error -> error
end

{:error, _} = error ->
error
end)
end
end
6 changes: 6 additions & 0 deletions apps/bugs_bunny/lib/bugs_bunny/clients/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ defmodule BugsBunny.Clients.Adapter do
{:ok, String.t()} | AMQP.Basic.error()
@callback ack(AMQP.Channel.t(), String.t(), keyword()) :: :ok | AMQP.Basic.error()
@callback reject(AMQP.Channel.t(), String.t(), keyword()) :: :ok | AMQP.Basic.error()
@callback declare_queue(AMQP.Channel.t(), AMQP.Basic.queue(), keyword()) ::
{:ok, map()} | AMQP.Basic.error()
@callback queue_bind(AMQP.Channel.t(), AMQP.Basic.queue(), AMQP.Basic.exchange(), keyword()) ::
:ok | AMQP.Basic.error()
@callback declare_exchange(AMQP.Channel.t(), AMQP.Basic.exchange(), type :: atom, keyword()) ::
:ok | AMQP.Basic.error()
end
15 changes: 15 additions & 0 deletions apps/bugs_bunny/lib/bugs_bunny/clients/fake_rabbitmq.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,19 @@ defmodule BugsBunny.FakeRabbitMQ do
# Connection.close(conn)
:ok
end

@impl true
def declare_queue(_channel, _queue, _options \\ []) do
{:ok, %{}}
end

@impl true
def declare_exchange(_channel, _exchange, _type \\ :direct, _options \\ []) do
:ok
end

@impl true
def queue_bind(_channel, _queue, _exchange, _options \\ []) do
:ok
end
end
23 changes: 23 additions & 0 deletions apps/bugs_bunny/lib/bugs_bunny/clients/rabbitmq.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,27 @@ defmodule BugsBunny.RabbitMQ do
def close_connection(conn) do
Connection.close(conn)
end

@impl true
def declare_queue(channel, queue \\ "", options \\ []) do
Queue.declare(channel, queue, options)
end

@impl true
def declare_exchange(channel, exchange, type \\ :direct, options \\ [])

def declare_exchange(_channel, "", _type, _options), do: :ok

def declare_exchange(channel, exchange, type, options) do
Exchange.declare(channel, exchange, type, options)
end

@impl true
def queue_bind(channel, queue, exchange, options \\ [])

def queue_bind(_channel, _queue, "", _options), do: :ok

def queue_bind(channel, queue, exchange, options) do
Queue.bind(channel, queue, exchange, options)
end
end
38 changes: 38 additions & 0 deletions apps/bugs_bunny/test/integration/api_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,42 @@ defmodule BugsBunny.Integration.ApiTest do

assert logs =~ "[Rabbit] channel lost, attempting to reconnect reason: :normal"
end

test "creates queue with exchange and bindings", %{pool_id: pool_id} do
assert :ok =
BugsBunny.create_queue_with_bind(
RabbitMQ,
pool_id,
"test_queue",
"test_exchange",
:direct,
queue_options: [auto_delete: true],
exchange_options: [auto_delete: true]
)

BugsBunny.with_channel(pool_id, fn {:ok, channel} ->
assert :ok = AMQP.Basic.publish(channel, "test_exchange", "", "Hello, World!")
assert {:ok, "Hello, World!", _meta} = AMQP.Basic.get(channel, "test_queue")
assert {:ok, _} = AMQP.Queue.delete(channel, "test_queue")
end)
end

test "should not fail when binding and declaring default exchange", %{pool_id: pool_id} do
assert :ok =
BugsBunny.create_queue_with_bind(
RabbitMQ,
pool_id,
"test2_queue",
"",
:direct,
queue_options: [auto_delete: true],
exchange_options: [auto_delete: true]
)

BugsBunny.with_channel(pool_id, fn {:ok, channel} ->
assert :ok = AMQP.Basic.publish(channel, "", "test2_queue", "Hello, World!")
assert {:ok, "Hello, World!", _meta} = AMQP.Basic.get(channel, "test2_queue")
assert {:ok, _} = AMQP.Queue.delete(channel, "test2_queue")
end)
end
end
23 changes: 23 additions & 0 deletions apps/repo_poller/lib/repo_poller/setup_queue_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule RepoPoller.SetupQueueWorker do
use GenServer, restart: :temporary

alias RepoPoller.Config

def start_link(_) do
GenServer.start_link(__MODULE__, :ok, [])
end

def init(_) do
pool_id = Config.get_connection_pool_id()
client = Config.get_rabbitmq_client()
queue = Config.get_rabbitmq_queue()
exchange = Config.get_rabbitmq_exchange()

BugsBunny.create_queue_with_bind(client, pool_id, queue, exchange, :direct,
queue_options: [durable: true],
exchange_options: [durable: true]
)

{:ok, :ignore}
end
end
8 changes: 7 additions & 1 deletion apps/repo_poller/lib/repo_poller/setup_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule RepoPoller.SetupSupervisor do
use Supervisor

alias RepoPoller.{PollerSupervisor, SetupWorker}
alias RepoPoller.{PollerSupervisor, SetupWorker, SetupQueueWorker, Config}

def start_link(args \\ []) do
name = Keyword.get(args, :name, __MODULE__)
Expand All @@ -14,6 +14,12 @@ defmodule RepoPoller.SetupSupervisor do
{SetupWorker, []}
]

children =
case Config.get_rabbitmq_config() do
[] -> children
_ -> [{SetupQueueWorker, []} | children]
end

opts = [strategy: :rest_for_one]
Supervisor.init(children, opts)
end
Expand Down
55 changes: 55 additions & 0 deletions apps/repo_poller/test/integration/setup_queue_worker_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
defmodule RepoPoller.SetupQueueWorkerTest do
use ExUnit.Case, async: true

alias RepoPoller.SetupQueueWorker

@moduletag :integration
@queue "test.queue"

setup do
caller = self()

rabbitmq_config = [
channels: 1,
port: String.to_integer(System.get_env("POLLER_RMQ_PORT") || "5672"),
queue: @queue,
exchange: "my_exchange",
caller: caller
]

rabbitmq_conn_pool = [
:rabbitmq_conn_pool,
pool_id: :setup_queue_pool,
name: {:local, :setup_queue_pool},
worker_module: BugsBunny.Worker.RabbitConnection,
size: 1,
max_overflow: 0
]

Application.put_env(:repo_poller, :rabbitmq_config, rabbitmq_config)
Application.put_env(:repo_poller, :rabbitmq_conn_pool, rabbitmq_conn_pool)
Application.put_env(:repo_poller, :database, Domain.Service.MockDatabase)

start_supervised!(%{
id: BugsBunny.PoolSupervisorTest,
start:
{BugsBunny.PoolSupervisor, :start_link,
[
[rabbitmq_config: rabbitmq_config, rabbitmq_conn_pool: rabbitmq_conn_pool],
BugsBunny.PoolSupervisorTest
]},
type: :supervisor
})

{:ok, pool_id: :setup_queue_pool}
end

test "declare queue on startup", %{pool_id: pool_id} do
_worker_pid = start_supervised!(SetupQueueWorker)
BugsBunny.with_channel(pool_id, fn {:ok, channel} ->
assert :ok = AMQP.Basic.publish(channel, "my_exchange", "", "Hello, World!")
assert {:ok, "Hello, World!", _meta} = AMQP.Basic.get(channel, @queue)
assert {:ok, _} = AMQP.Queue.delete(channel, @queue)
end)
end
end

0 comments on commit 705752f

Please sign in to comment.