From ee3a0cbbb74f6c5c65e4589346d6ea912890b3cd Mon Sep 17 00:00:00 2001 From: sescobb27 Date: Fri, 28 Dec 2018 13:39:02 -0500 Subject: [PATCH] [WIP] distributing repo_poller worker jobs across multiple nodes --- .../lib/repo_poller/application.ex | 5 +- apps/repo_poller/lib/repo_poller/config.ex | 13 ++++ .../lib/repo_poller/custer_connector.ex | 62 +++++++++++++++++++ apps/repo_poller/lib/repo_poller/poller.ex | 8 ++- .../lib/repo_poller/poller_supervisor.ex | 35 +++++------ .../lib/repo_poller/setup_queue_worker.ex | 2 +- .../lib/repo_poller/setup_supervisor.ex | 3 +- apps/repo_poller/mix.exs | 3 +- mix.lock | 3 + 9 files changed, 109 insertions(+), 25 deletions(-) create mode 100644 apps/repo_poller/lib/repo_poller/custer_connector.ex diff --git a/apps/repo_poller/lib/repo_poller/application.ex b/apps/repo_poller/lib/repo_poller/application.ex index a508105..725ba02 100644 --- a/apps/repo_poller/lib/repo_poller/application.ex +++ b/apps/repo_poller/lib/repo_poller/application.ex @@ -5,7 +5,7 @@ defmodule RepoPoller.Application do use Application - alias RepoPoller.{SetupSupervisor, Config} + alias RepoPoller.{SetupSupervisor, Config, ClusterConnector} def start(_type, _args) do # List all child processes to be supervised @@ -15,6 +15,9 @@ defmodule RepoPoller.Application do children = [ {BugsBunny.PoolSupervisor, [rabbitmq_config: rabbitmq_config, rabbitmq_conn_pool: rabbitmq_conn_pool]}, + {Horde.Registry, [name: RepoPoller.DistributedRegistry, keys: :unique]}, + {Horde.Supervisor, [name: RepoPoller.DistributedSupervisor, strategy: :one_for_one]}, + {ClusterConnector, []}, {SetupSupervisor, []} ] diff --git a/apps/repo_poller/lib/repo_poller/config.ex b/apps/repo_poller/lib/repo_poller/config.ex index b1f777a..f1eb8e0 100644 --- a/apps/repo_poller/lib/repo_poller/config.ex +++ b/apps/repo_poller/lib/repo_poller/config.ex @@ -52,4 +52,17 @@ defmodule RepoPoller.Config do def get_database_reconnection_interval() do Application.get_env(:repo_poller, :database_reconnect, 5000) end + + def get_nodes() do + case Application.get_env(:repo_poller, :poller_nodes) do + nil -> + "POLLER_NODES" + |> System.get_env() + |> String.split(",") + |> Enum.map(&String.to_atom/1) + + nodes -> + nodes + end + end end diff --git a/apps/repo_poller/lib/repo_poller/custer_connector.ex b/apps/repo_poller/lib/repo_poller/custer_connector.ex new file mode 100644 index 0000000..531b294 --- /dev/null +++ b/apps/repo_poller/lib/repo_poller/custer_connector.ex @@ -0,0 +1,62 @@ +defmodule RepoPoller.ClusterConnector do + use GenServer + + require Logger + + alias RepoPoller.Config + + def start_link(args) do + GenServer.start_link(__MODULE__, args, []) + end + + def init(_) do + nodes = nodes() + join_cluster(nodes) + schedule_self_heal() + + {:ok, nil} + end + + def handle_info(:self_heal, nil) do + cluster_nodes = nodes() + + nodes = + Node.list() + |> Enum.filter(fn node -> + Enum.member?(cluster_nodes, node) + end) + + (cluster_nodes -- nodes) + |> join_cluster() + + schedule_self_heal() + {:noreply, nil} + end + + defp schedule_self_heal() do + Process.send_after(self(), :self_heal, 5000) + end + + defp nodes() do + Config.get_nodes() |> List.delete(Node.self()) + end + + defp join_cluster(nodes) when is_list(nodes) do + Enum.each(nodes, fn node -> + Logger.info("joining node #{inspect(node)}") + join_cluster(node) + end) + end + + defp join_cluster(node) do + Horde.Cluster.join_hordes( + RepoPoller.DistributedSupervisor, + {RepoPoller.DistributedSupervisor, node} + ) + + Horde.Cluster.join_hordes( + RepoPoller.DistributedRegistry, + {RepoPoller.DistributedRegistry, node} + ) + end +end diff --git a/apps/repo_poller/lib/repo_poller/poller.ex b/apps/repo_poller/lib/repo_poller/poller.ex index ff01200..15aa5f5 100644 --- a/apps/repo_poller/lib/repo_poller/poller.ex +++ b/apps/repo_poller/lib/repo_poller/poller.ex @@ -28,11 +28,11 @@ defmodule RepoPoller.Poller do ############## def start_link({%{name: repo_name}, _adapter, _pool_id} = args) do - GenServer.start_link(__MODULE__, args, name: String.to_atom(repo_name)) + GenServer.start_link(__MODULE__, args, name: via_tuple(repo_name)) end def start_link({_caller, %{name: repo_name}, _adapter, _pool_id} = args) do - GenServer.start_link(__MODULE__, args, name: String.to_atom(repo_name)) + GenServer.start_link(__MODULE__, args, name: via_tuple(repo_name)) end @doc false @@ -223,4 +223,8 @@ defmodule RepoPoller.Poller do exchange = Config.get_rabbitmq_exchange() Config.get_rabbitmq_client().publish(channel, exchange, queue, payload, config) end + + defp via_tuple(name) do + {:via, Horde.Registry, {RepoPoller.DistributedRegistry, String.to_atom(name)}} + end end diff --git a/apps/repo_poller/lib/repo_poller/poller_supervisor.ex b/apps/repo_poller/lib/repo_poller/poller_supervisor.ex index 4c435eb..32c9725 100644 --- a/apps/repo_poller/lib/repo_poller/poller_supervisor.ex +++ b/apps/repo_poller/lib/repo_poller/poller_supervisor.ex @@ -1,28 +1,22 @@ defmodule RepoPoller.PollerSupervisor do - use DynamicSupervisor - alias RepoPoller.Poller alias Domain.Repos.Repo alias RepoPoller.Config - def start_link(args \\ []) do - name = Keyword.get(args, :name, __MODULE__) - DynamicSupervisor.start_link(__MODULE__, [], name: name) - end - - def init(_) do - DynamicSupervisor.init(strategy: :one_for_one) - end - def start_child(%Repo{} = repo) do pool_id = Config.get_connection_pool_id() adapter = setup_adapter(repo.adapter) - DynamicSupervisor.start_child(__MODULE__, %{ + # Horde doesn't support :transient children yet: https://github.com/derekkraan/horde/issues/36 + # Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{ + # id: "poller_#{repo.name}", + # start: {Poller, :start_link, [{repo, adapter, pool_id}]}, + # restart: :transient + # }) + Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{ id: "poller_#{repo.name}", - start: {Poller, :start_link, [{repo, adapter, pool_id}]}, - restart: :transient + start: {Poller, :start_link, [{repo, adapter, pool_id}]} }) end @@ -37,10 +31,15 @@ defmodule RepoPoller.PollerSupervisor do adapter = setup_adapter(adapter) - DynamicSupervisor.start_child(__MODULE__, %{ + # Horde doesn't support :transient children yet: https://github.com/derekkraan/horde/issues/36 + # Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{ + # id: "poller_#{repo.name}", + # start: {Poller, :start_link, [{repo, adapter, pool_id}]}, + # restart: :transient + # }) + Horde.Supervisor.start_child(RepoPoller.DistributedSupervisor, %{ id: "poller_#{repo.name}", - start: {Poller, :start_link, [{repo, adapter, pool_id}]}, - restart: :transient + start: {Poller, :start_link, [{repo, adapter, pool_id}]} }) end @@ -56,7 +55,7 @@ defmodule RepoPoller.PollerSupervisor do {:error, "Couldn't find repository process."} pid when is_pid(pid) -> - DynamicSupervisor.terminate_child(__MODULE__, pid) + Horde.Supervisor.terminate_child(__MODULE__, pid) end rescue _ -> {:error, "Couldn't find repository process."} diff --git a/apps/repo_poller/lib/repo_poller/setup_queue_worker.ex b/apps/repo_poller/lib/repo_poller/setup_queue_worker.ex index dcfde97..ac31940 100644 --- a/apps/repo_poller/lib/repo_poller/setup_queue_worker.ex +++ b/apps/repo_poller/lib/repo_poller/setup_queue_worker.ex @@ -18,6 +18,6 @@ defmodule RepoPoller.SetupQueueWorker do exchange_options: [durable: true] ) - {:ok, :ignore} + :ignore end end diff --git a/apps/repo_poller/lib/repo_poller/setup_supervisor.ex b/apps/repo_poller/lib/repo_poller/setup_supervisor.ex index a5318de..1d4d461 100644 --- a/apps/repo_poller/lib/repo_poller/setup_supervisor.ex +++ b/apps/repo_poller/lib/repo_poller/setup_supervisor.ex @@ -1,7 +1,7 @@ defmodule RepoPoller.SetupSupervisor do use Supervisor - alias RepoPoller.{PollerSupervisor, SetupWorker, SetupQueueWorker, Config} + alias RepoPoller.{SetupWorker, SetupQueueWorker, Config} def start_link(args \\ []) do name = Keyword.get(args, :name, __MODULE__) @@ -10,7 +10,6 @@ defmodule RepoPoller.SetupSupervisor do def init(_) do children = [ - {PollerSupervisor, []}, {SetupWorker, []} ] diff --git a/apps/repo_poller/mix.exs b/apps/repo_poller/mix.exs index 015c1a0..7455a25 100644 --- a/apps/repo_poller/mix.exs +++ b/apps/repo_poller/mix.exs @@ -33,7 +33,8 @@ defmodule RepoPoller.MixProject do {:domain, in_umbrella: true}, {:poison, "~> 4.0"}, {:httpoison, "~> 1.3.0", override: true}, - {:mox, "~> 0.4", only: :test} + {:mox, "~> 0.4", only: :test}, + {:horde, git: "https://github.com/derekkraan/horde", branch: "master"} # {:dep_from_hexpm, "~> 0.3.0"}, # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}, # {:sibling_app_in_umbrella, in_umbrella: true}, diff --git a/mix.lock b/mix.lock index 1e15d46..d885869 100644 --- a/mix.lock +++ b/mix.lock @@ -5,6 +5,7 @@ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm"}, "certifi": {:hex, :certifi, "2.3.1", "d0f424232390bf47d82da8478022301c561cf6445b5b5fb6a84d49a9e76d2639", [:rebar3], [{:parse_trans, "3.2.0", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm"}, "credo": {:hex, :credo, "0.10.0", "66234a95effaf9067edb19fc5d0cd5c6b461ad841baac42467afed96c78e5e9e", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, + "delta_crdt": {:hex, :delta_crdt, "0.3.1", "a50bff0460da2d9b0559a68c0116bf8f0c3847de272bdce4a391aae2063902d3", [:mix], [], "hexpm"}, "dialyxir": {:hex, :dialyxir, "1.0.0-rc.3", "774306f84973fc3f1e2e8743eeaa5f5d29b117f3916e5de74c075c02f1b8ef55", [:mix], [], "hexpm"}, "distillery": {:hex, :distillery, "2.0.5", "f387ea3fdec9f3e4216e8f8157760ba41bda1ce26c8986662ebb164a60a4658e", [:mix], [{:artificery, "~> 0.2", [hex: :artificery, repo: "hexpm", optional: false]}], "hexpm"}, "ex_docker_build": {:hex, :ex_docker_build, "0.6.0", "696b61c0ed4947f9104fc304a57aa2293a31d1e82f9692c82115579951df800f", [:mix], [{:hackney, "~> 1.13.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:httpoison, "~> 1.3.0", [hex: :httpoison, repo: "hexpm", optional: false]}, {:poison, "~> 4.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, @@ -12,6 +13,7 @@ "exjsx": {:hex, :exjsx, "4.0.0", "60548841e0212df401e38e63c0078ec57b33e7ea49b032c796ccad8cde794b5c", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, repo: "hexpm", optional: false]}], "hexpm"}, "goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm"}, "hackney": {:hex, :hackney, "1.13.0", "24edc8cd2b28e1c652593833862435c80661834f6c9344e84b6a2255e7aeef03", [:rebar3], [{:certifi, "2.3.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.2", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, + "horde": {:git, "https://github.com/derekkraan/horde", "78a2d65548c6aa479a9625e608280b0d36facc65", [branch: "master"]}, "httpoison": {:hex, :httpoison, "1.3.1", "7ac607311f5f706b44e8b3fab736d0737f2f62a31910ccd9afe7227b43edb7f0", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"}, "idna": {:hex, :idna, "5.1.2", "e21cb58a09f0228a9e0b95eaa1217f1bcfc31a1aaa6e1fdf2f53a33f7dbd9494", [:rebar3], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, "jason": {:hex, :jason, "1.1.1", "d3ccb840dfb06f2f90a6d335b536dd074db748b3e7f5b11ab61d239506585eb2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, @@ -34,4 +36,5 @@ "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm"}, "tentacat": {:hex, :tentacat, "1.1.0", "d389f6b5b36e45c052d29848fc3a984ba0159fae31519a8e35d24653337412de", [:mix], [{:exjsx, "~> 4.0", [hex: :exjsx, repo: "hexpm", optional: false]}, {:httpoison, "~> 0.8", [hex: :httpoison, repo: "hexpm", optional: false]}], "hexpm"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [:rebar3], [], "hexpm"}, + "xxhash": {:hex, :xxhash, "0.2.1", "ab0893a8124f3c11116c57e500485dc5f67817d1d4c44f0fff41f3fd3c590607", [:mix], [], "hexpm"}, }