Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] distributing repo_poller worker jobs across multiple nodes #22

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/repo_poller/lib/repo_poller/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule RepoPoller.Application do

use Application

alias RepoPoller.{SetupSupervisor, Config}
alias RepoPoller.{SetupSupervisor, Config, ClusterConnector}

def start(_type, _args) do
# List all child processes to be supervised
Expand All @@ -15,6 +15,9 @@ defmodule RepoPoller.Application do
children = [
{BugsBunny.PoolSupervisor,
[rabbitmq_config: rabbitmq_config, rabbitmq_conn_pool: rabbitmq_conn_pool]},
{Horde.Registry, [name: RepoPoller.DistributedRegistry, keys: :unique]},
{Horde.Supervisor, [name: RepoPoller.DistributedSupervisor, strategy: :one_for_one]},
{ClusterConnector, []},
{SetupSupervisor, []}
]

Expand Down
13 changes: 13 additions & 0 deletions apps/repo_poller/lib/repo_poller/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,17 @@ defmodule RepoPoller.Config do
def get_database_reconnection_interval() do
Application.get_env(:repo_poller, :database_reconnect, 5000)
end

def get_nodes() do
case Application.get_env(:repo_poller, :poller_nodes) do
nil ->
"POLLER_NODES"
|> System.get_env()
|> String.split(",")
|> Enum.map(&String.to_atom/1)

nodes ->
nodes
end
end
end
62 changes: 62 additions & 0 deletions apps/repo_poller/lib/repo_poller/custer_connector.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
defmodule RepoPoller.ClusterConnector do
use GenServer

require Logger

alias RepoPoller.Config

def start_link(args) do
GenServer.start_link(__MODULE__, args, [])
end

def init(_) do
nodes = nodes()
join_cluster(nodes)
schedule_self_heal()

{:ok, nil}
end

def handle_info(:self_heal, nil) do
cluster_nodes = nodes()

nodes =
Node.list()
|> Enum.filter(fn node ->
Enum.member?(cluster_nodes, node)
end)

(cluster_nodes -- nodes)
|> join_cluster()

schedule_self_heal()
{:noreply, nil}
end

defp schedule_self_heal() do
Process.send_after(self(), :self_heal, 5000)
end

defp nodes() do
Config.get_nodes() |> List.delete(Node.self())
end

defp join_cluster(nodes) when is_list(nodes) do
Enum.each(nodes, fn node ->
Logger.info("joining node #{inspect(node)}")
join_cluster(node)
end)
end

defp join_cluster(node) do
Horde.Cluster.join_hordes(
RepoPoller.DistributedSupervisor,
{RepoPoller.DistributedSupervisor, node}
)

Horde.Cluster.join_hordes(
RepoPoller.DistributedRegistry,
{RepoPoller.DistributedRegistry, node}
)
end
end
8 changes: 6 additions & 2 deletions apps/repo_poller/lib/repo_poller/poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ defmodule RepoPoller.Poller do
##############

def start_link({%{name: repo_name}, _adapter, _pool_id} = args) do
GenServer.start_link(__MODULE__, args, name: String.to_atom(repo_name))
GenServer.start_link(__MODULE__, args, name: via_tuple(repo_name))
end

def start_link({_caller, %{name: repo_name}, _adapter, _pool_id} = args) do
GenServer.start_link(__MODULE__, args, name: String.to_atom(repo_name))
GenServer.start_link(__MODULE__, args, name: via_tuple(repo_name))
end

@doc false
Expand Down Expand Up @@ -223,4 +223,8 @@ defmodule RepoPoller.Poller do
exchange = Config.get_rabbitmq_exchange()
Config.get_rabbitmq_client().publish(channel, exchange, queue, payload, config)
end

defp via_tuple(name) do
{:via, Horde.Registry, {RepoPoller.DistributedRegistry, String.to_atom(name)}}
end
end
35 changes: 17 additions & 18 deletions apps/repo_poller/lib/repo_poller/poller_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,28 +1,22 @@
defmodule RepoPoller.PollerSupervisor do
use DynamicSupervisor

alias RepoPoller.Poller
alias Domain.Repos.Repo
alias RepoPoller.Config

def start_link(args \\ []) do
name = Keyword.get(args, :name, __MODULE__)
DynamicSupervisor.start_link(__MODULE__, [], name: name)
end

def init(_) do
DynamicSupervisor.init(strategy: :one_for_one)
end

def start_child(%Repo{} = repo) do
pool_id = Config.get_connection_pool_id()

adapter = setup_adapter(repo.adapter)

DynamicSupervisor.start_child(__MODULE__, %{
# Horde doesn't support :transient children yet: https://github.com/derekkraan/horde/issues/36
# Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{
# id: "poller_#{repo.name}",
# start: {Poller, :start_link, [{repo, adapter, pool_id}]},
# restart: :transient
# })
Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{
id: "poller_#{repo.name}",
start: {Poller, :start_link, [{repo, adapter, pool_id}]},
restart: :transient
start: {Poller, :start_link, [{repo, adapter, pool_id}]}
})
end

Expand All @@ -37,10 +31,15 @@ defmodule RepoPoller.PollerSupervisor do

adapter = setup_adapter(adapter)

DynamicSupervisor.start_child(__MODULE__, %{
# Horde doesn't support :transient children yet: https://github.com/derekkraan/horde/issues/36
# Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{
# id: "poller_#{repo.name}",
# start: {Poller, :start_link, [{repo, adapter, pool_id}]},
# restart: :transient
# })
Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{
id: "poller_#{repo.name}",
start: {Poller, :start_link, [{repo, adapter, pool_id}]},
restart: :transient
start: {Poller, :start_link, [{repo, adapter, pool_id}]}
})
end

Expand All @@ -56,7 +55,7 @@ defmodule RepoPoller.PollerSupervisor do
{:error, "Couldn't find repository process."}

pid when is_pid(pid) ->
DynamicSupervisor.terminate_child(__MODULE__, pid)
Horde.Supervisor.terminate_child(__MODULE__, pid)
end
rescue
_ -> {:error, "Couldn't find repository process."}
Expand Down
2 changes: 1 addition & 1 deletion apps/repo_poller/lib/repo_poller/setup_queue_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ defmodule RepoPoller.SetupQueueWorker do
exchange_options: [durable: true]
)

{:ok, :ignore}
:ignore
end
end
3 changes: 1 addition & 2 deletions apps/repo_poller/lib/repo_poller/setup_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule RepoPoller.SetupSupervisor do
use Supervisor

alias RepoPoller.{PollerSupervisor, SetupWorker, SetupQueueWorker, Config}
alias RepoPoller.{SetupWorker, SetupQueueWorker, Config}

def start_link(args \\ []) do
name = Keyword.get(args, :name, __MODULE__)
Expand All @@ -10,7 +10,6 @@ defmodule RepoPoller.SetupSupervisor do

def init(_) do
children = [
{PollerSupervisor, []},
{SetupWorker, []}
]

Expand Down
3 changes: 2 additions & 1 deletion apps/repo_poller/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ defmodule RepoPoller.MixProject do
{:domain, in_umbrella: true},
{:poison, "~> 4.0"},
{:httpoison, "~> 1.3.0", override: true},
{:mox, "~> 0.4", only: :test}
{:mox, "~> 0.4", only: :test},
{:horde, git: "https://github.com/derekkraan/horde", branch: "master"}
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
# {:sibling_app_in_umbrella, in_umbrella: true},
Expand Down
3 changes: 3 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm"},
"certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"},
"credo": {:hex, :credo, "0.10.0", "66234a95effaf9067edb19fc5d0cd5c6b461ad841baac42467afed96c78e5e9e", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"delta_crdt": {:hex, :delta_crdt, "0.3.1", "a50bff0460da2d9b0559a68c0116bf8f0c3847de272bdce4a391aae2063902d3", [:mix], [], "hexpm"},
"dialyxir": {:hex, :dialyxir, "1.0.0-rc.3", "774306f84973fc3f1e2e8743eeaa5f5d29b117f3916e5de74c075c02f1b8ef55", [:mix], [], "hexpm"},
"distillery": {:hex, :distillery, "2.0.5", "f387ea3fdec9f3e4216e8f8157760ba41bda1ce26c8986662ebb164a60a4658e", [:mix], [{:artificery, "~> 0.2", [hex: :artificery, repo: "hexpm", optional: false]}], "hexpm"},
"ex_docker_build": {:hex, :ex_docker_build, "0.6.0", "696b61c0ed4947f9104fc304a57aa2293a31d1e82f9692c82115579951df800f", [:mix], [{:hackney, "~> 1.13.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:httpoison, "~> 1.3.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:poison, "~> 4.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"},
"excoveralls": {:hex, :excoveralls, "0.9.2", "299ea4903be7cb2959af0f919d258af116736ca8d507f86c12ef2184698e21a0", [:mix], [{:hackney, ">= 0.12.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"},
"goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm"},
"hackney": {:hex, :hackney, "1.13.0", "24edc8cd2b28e1c652593833862435c80661834f6c9344e84b6a2255e7aeef03", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"horde": {:git, "https://github.com/derekkraan/horde", "78a2d65548c6aa479a9625e608280b0d36facc65", [branch: "master"]},
"httpoison": {:hex, :httpoison, "1.3.1", "7ac607311f5f706b44e8b3fab736d0737f2f62a31910ccd9afe7227b43edb7f0", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "5.1.2", "e21cb58a09f0228a9e0b95eaa1217f1bcfc31a1aaa6e1fdf2f53a33f7dbd9494", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"jason": {:hex, :jason, "1.1.1", "d3ccb840dfb06f2f90a6d335b536dd074db748b3e7f5b11ab61d239506585eb2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
Expand All @@ -34,4 +36,5 @@
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"},
"tentacat": {:hex, :tentacat, "1.1.0", "d389f6b5b36e45c052d29848fc3a984ba0159fae31519a8e35d24653337412de", [:mix], [{:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:httpoison, "~> 0.8", [hex: :httpoison, repo: "hexpm", optional: false]}], "hexpm"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"},
"xxhash": {:hex, :xxhash, "0.2.1", "ab0893a8124f3c11116c57e500485dc5f67817d1d4c44f0fff41f3fd3c590607", [:mix], [], "hexpm"},
}