Skip to content

Commit

Permalink
[WIP] distributing repo_poller worker jobs across multiple nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
sescobb27 committed Dec 28, 2018
1 parent 705752f commit ee3a0cb
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 25 deletions.
5 changes: 4 additions & 1 deletion apps/repo_poller/lib/repo_poller/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule RepoPoller.Application do

use Application

alias RepoPoller.{SetupSupervisor, Config}
alias RepoPoller.{SetupSupervisor, Config, ClusterConnector}

def start(_type, _args) do
# List all child processes to be supervised
Expand All @@ -15,6 +15,9 @@ defmodule RepoPoller.Application do
children = [
{BugsBunny.PoolSupervisor,
[rabbitmq_config: rabbitmq_config, rabbitmq_conn_pool: rabbitmq_conn_pool]},
{Horde.Registry, [name: RepoPoller.DistributedRegistry, keys: :unique]},
{Horde.Supervisor, [name: RepoPoller.DistributedSupervisor, strategy: :one_for_one]},
{ClusterConnector, []},
{SetupSupervisor, []}
]

Expand Down
13 changes: 13 additions & 0 deletions apps/repo_poller/lib/repo_poller/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,17 @@ defmodule RepoPoller.Config do
def get_database_reconnection_interval() do
Application.get_env(:repo_poller, :database_reconnect, 5000)
end

def get_nodes() do
case Application.get_env(:repo_poller, :poller_nodes) do
nil ->
"POLLER_NODES"
|> System.get_env()
|> String.split(",")
|> Enum.map(&String.to_atom/1)

nodes ->
nodes
end
end
end
62 changes: 62 additions & 0 deletions apps/repo_poller/lib/repo_poller/custer_connector.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
defmodule RepoPoller.ClusterConnector do
use GenServer

require Logger

alias RepoPoller.Config

def start_link(args) do
GenServer.start_link(__MODULE__, args, [])
end

def init(_) do
nodes = nodes()
join_cluster(nodes)
schedule_self_heal()

{:ok, nil}
end

def handle_info(:self_heal, nil) do
cluster_nodes = nodes()

nodes =
Node.list()
|> Enum.filter(fn node ->
Enum.member?(cluster_nodes, node)
end)

(cluster_nodes -- nodes)
|> join_cluster()

schedule_self_heal()
{:noreply, nil}
end

defp schedule_self_heal() do
Process.send_after(self(), :self_heal, 5000)
end

defp nodes() do
Config.get_nodes() |> List.delete(Node.self())
end

defp join_cluster(nodes) when is_list(nodes) do
Enum.each(nodes, fn node ->
Logger.info("joining node #{inspect(node)}")
join_cluster(node)
end)
end

defp join_cluster(node) do
Horde.Cluster.join_hordes(
RepoPoller.DistributedSupervisor,
{RepoPoller.DistributedSupervisor, node}
)

Horde.Cluster.join_hordes(
RepoPoller.DistributedRegistry,
{RepoPoller.DistributedRegistry, node}
)
end
end
8 changes: 6 additions & 2 deletions apps/repo_poller/lib/repo_poller/poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ defmodule RepoPoller.Poller do
##############

def start_link({%{name: repo_name}, _adapter, _pool_id} = args) do
GenServer.start_link(__MODULE__, args, name: String.to_atom(repo_name))
GenServer.start_link(__MODULE__, args, name: via_tuple(repo_name))
end

def start_link({_caller, %{name: repo_name}, _adapter, _pool_id} = args) do
GenServer.start_link(__MODULE__, args, name: String.to_atom(repo_name))
GenServer.start_link(__MODULE__, args, name: via_tuple(repo_name))
end

@doc false
Expand Down Expand Up @@ -223,4 +223,8 @@ defmodule RepoPoller.Poller do
exchange = Config.get_rabbitmq_exchange()
Config.get_rabbitmq_client().publish(channel, exchange, queue, payload, config)
end

defp via_tuple(name) do
{:via, Horde.Registry, {RepoPoller.DistributedRegistry, String.to_atom(name)}}
end
end
35 changes: 17 additions & 18 deletions apps/repo_poller/lib/repo_poller/poller_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
defmodule RepoPoller.PollerSupervisor do
use DynamicSupervisor

alias RepoPoller.Poller
alias Domain.Repos.Repo
alias RepoPoller.Config

def start_link(args \\ []) do
name = Keyword.get(args, :name, __MODULE__)
DynamicSupervisor.start_link(__MODULE__, [], name: name)
end

def init(_) do
DynamicSupervisor.init(strategy: :one_for_one)
end

def start_child(%Repo{} = repo) do
pool_id = Config.get_connection_pool_id()

adapter = setup_adapter(repo.adapter)

DynamicSupervisor.start_child(__MODULE__, %{
# Horde doesn't support :transient children yet: https://github.com/derekkraan/horde/issues/36
# Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{
# id: "poller_#{repo.name}",
# start: {Poller, :start_link, [{repo, adapter, pool_id}]},
# restart: :transient
# })
Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{
id: "poller_#{repo.name}",
start: {Poller, :start_link, [{repo, adapter, pool_id}]},
restart: :transient
start: {Poller, :start_link, [{repo, adapter, pool_id}]}
})
end

Expand All @@ -37,10 +31,15 @@ defmodule RepoPoller.PollerSupervisor do

adapter = setup_adapter(adapter)

DynamicSupervisor.start_child(__MODULE__, %{
# Horde doesn't support :transient children yet: https://github.com/derekkraan/horde/issues/36
# Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{
# id: "poller_#{repo.name}",
# start: {Poller, :start_link, [{repo, adapter, pool_id}]},
# restart: :transient
# })
Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{
id: "poller_#{repo.name}",
start: {Poller, :start_link, [{repo, adapter, pool_id}]},
restart: :transient
start: {Poller, :start_link, [{repo, adapter, pool_id}]}
})
end

Expand All @@ -56,7 +55,7 @@ defmodule RepoPoller.PollerSupervisor do
{:error, "Couldn't find repository process."}

pid when is_pid(pid) ->
DynamicSupervisor.terminate_child(__MODULE__, pid)
Horde.Supervisor.terminate_child(__MODULE__, pid)
end
rescue
_ -> {:error, "Couldn't find repository process."}
Expand Down
2 changes: 1 addition & 1 deletion apps/repo_poller/lib/repo_poller/setup_queue_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ defmodule RepoPoller.SetupQueueWorker do
exchange_options: [durable: true]
)

{:ok, :ignore}
:ignore
end
end
3 changes: 1 addition & 2 deletions apps/repo_poller/lib/repo_poller/setup_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule RepoPoller.SetupSupervisor do
use Supervisor

alias RepoPoller.{PollerSupervisor, SetupWorker, SetupQueueWorker, Config}
alias RepoPoller.{SetupWorker, SetupQueueWorker, Config}

def start_link(args \\ []) do
name = Keyword.get(args, :name, __MODULE__)
Expand All @@ -10,7 +10,6 @@ defmodule RepoPoller.SetupSupervisor do

def init(_) do
children = [
{PollerSupervisor, []},
{SetupWorker, []}
]

Expand Down
3 changes: 2 additions & 1 deletion apps/repo_poller/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ defmodule RepoPoller.MixProject do
{:domain, in_umbrella: true},
{:poison, "~> 4.0"},
{:httpoison, "~> 1.3.0", override: true},
{:mox, "~> 0.4", only: :test}
{:mox, "~> 0.4", only: :test},
{:horde, git: "https://github.com/derekkraan/horde", branch: "master"}
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
# {:sibling_app_in_umbrella, in_umbrella: true},
Expand Down
3 changes: 3 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm"},
"certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"},
"credo": {:hex, :credo, "0.10.0", "66234a95effaf9067edb19fc5d0cd5c6b461ad841baac42467afed96c78e5e9e", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"delta_crdt": {:hex, :delta_crdt, "0.3.1", "a50bff0460da2d9b0559a68c0116bf8f0c3847de272bdce4a391aae2063902d3", [:mix], [], "hexpm"},
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.3", "774306f84973fc3f1e2e8743eeaa5f5d29b117f3916e5de74c075c02f1b8ef55", [:mix], [], "hexpm"},
"distillery": {:hex, :distillery, "2.0.5", "f387ea3fdec9f3e4216e8f8157760ba41bda1ce26c8986662ebb164a60a4658e", [:mix], [{:artificery, "~> 0.2", [hex: :artificery, repo: "hexpm", optional: false]}], "hexpm"},
"ex_docker_build": {:hex, :ex_docker_build, "0.6.0", "696b61c0ed4947f9104fc304a57aa2293a31d1e82f9692c82115579951df800f", [:mix], [{:hackney, "~> 1.13.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:httpoison, "~> 1.3.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:poison, "~> 4.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"},
"excoveralls": {:hex, :excoveralls, "0.9.2", "299ea4903be7cb2959af0f919d258af116736ca8d507f86c12ef2184698e21a0", [:mix], [{:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"},
"goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm"},
"hackney": {:hex, :hackney, "1.13.0", "24edc8cd2b28e1c652593833862435c80661834f6c9344e84b6a2255e7aeef03", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"horde": {:git, "https://github.com/derekkraan/horde", "78a2d65548c6aa479a9625e608280b0d36facc65", [branch: "master"]},
"httpoison": {:hex, :httpoison, "1.3.1", "7ac607311f5f706b44e8b3fab736d0737f2f62a31910ccd9afe7227b43edb7f0", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "5.1.2", "e21cb58a09f0228a9e0b95eaa1217f1bcfc31a1aaa6e1fdf2f53a33f7dbd9494", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"jason": {:hex, :jason, "1.1.1", "d3ccb840dfb06f2f90a6d335b536dd074db748b3e7f5b11ab61d239506585eb2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
Expand All @@ -34,4 +36,5 @@
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"},
"tentacat": {:hex, :tentacat, "1.1.0", "d389f6b5b36e45c052d29848fc3a984ba0159fae31519a8e35d24653337412de", [:mix], [{:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:httpoison, "~> 0.8", [hex: :httpoison, repo: "hexpm", optional: false]}], "hexpm"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"},
"xxhash": {:hex, :xxhash, "0.2.1", "ab0893a8124f3c11116c57e500485dc5f67817d1d4c44f0fff41f3fd3c590607", [:mix], [], "hexpm"},
}

0 comments on commit ee3a0cb

Please sign in to comment.