Skip to content

Commit

Permalink
For OTP<=25.1 use polling instead of monitor (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
am-kantox authored Jan 20, 2025
1 parent 72fac6d commit 34b4f61
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 82 deletions.
150 changes: 113 additions & 37 deletions lib/finitomata/distributed/group_monitor.ex
Original file line number Diff line number Diff line change
@@ -1,49 +1,125 @@
defmodule Finitomata.Distributed.GroupMonitor do
@moduledoc false
use GenServer
major = System.otp_release()

alias Finitomata.Distributed.Supervisor, as: Sup

def start_link(id) do
GenServer.start_link(__MODULE__, id, name: sup_name(id))
otp_version =
try do
{:ok, contents} = File.read(Path.join([:code.root_dir(), "releases", major, "OTP_VERSION"]))
String.split(contents, "\n", trim: true)
else
[full] -> full
_ -> major
catch
:error, _ -> major
end
|> String.split(".")
|> case do
[major] -> [major, 0, 0]
[major, minor] -> [major, minor, 0]
[major, minor, patch | _] -> [major, minor, patch]
end
|> Enum.join(".")

def count(id), do: GenServer.call(sup_name(id), :count)
if Version.compare(otp_version, "25.1.0") == :lt do
defmodule Finitomata.Distributed.GroupMonitor do
@moduledoc false
use GenServer

defp sup_name(id), do: Module.concat(id, "GroupMonitor")
alias Finitomata.Distributed.Supervisor, as: Sup

@impl GenServer
def init(id) do
{_reference, _pids} =
id
|> Sup.group()
|> :pg.monitor()
@update_interval Application.compile_env(:finitomata, :pg_update_interval, 500)

{:ok, 0}
end
def start_link(id) do
GenServer.start_link(__MODULE__, id, name: sup_name(id))
end

@impl GenServer
def handle_info({ref, :join, group, pids}, counter) do
{:noreply,
Enum.reduce(pids, counter, fn pid, counter ->
id = Sup.ungroup(group)
name = GenServer.call(pid, :name)
Sup.put(id, name, %{node: :erlang.node(pid), pid: pid, ref: ref})
counter + 1
end)}
end
def count(id), do: GenServer.call(sup_name(id), :count)
def members(id), do: id |> Sup.group() |> :pg.get_members()

defp sup_name(id), do: Module.concat(id, "GroupMonitor")

@impl GenServer
def init(id) do
Process.send_after(self(), {:update, id}, @update_interval)

{:ok, {0, MapSet.new([])}}
end

@impl GenServer
def handle_info({:update, id}, {count, members}) do
updated_members = id |> Sup.group() |> :pg.get_members() |> MapSet.new()

joined = MapSet.difference(updated_members, members)
handle_join(id, joined)

left = MapSet.difference(members, updated_members)
handle_leave(id, left)

Process.send_after(self(), {:update, id}, @update_interval)
{:noreply, {count, updated_members}}
end

@impl GenServer
def handle_info({_ref, :leave, group, pids}, counter) do
deleted =
group
|> Sup.ungroup()
|> Sup.delete_by_pids(pids)
|> map_size()
defp handle_join(id, pids) do
for pid <- pids do
name = GenServer.call(pid, :name)
Sup.put(id, name, %{node: :erlang.node(pid), pid: pid, ref: make_ref()})
end
end

{:noreply, counter - deleted}
defp handle_leave(id, pids) do
Sup.delete_by_pids(id, pids)
end

@impl GenServer
def handle_call(:count, _from, {count, members}), do: {:reply, count, {count, members}}
end
else
defmodule Finitomata.Distributed.GroupMonitor do
@moduledoc false
use GenServer

alias Finitomata.Distributed.Supervisor, as: Sup

def start_link(id) do
GenServer.start_link(__MODULE__, id, name: sup_name(id))
end

def count(id), do: GenServer.call(sup_name(id), :count)
def members(id), do: id |> Sup.group() |> :pg.get_members()

defp sup_name(id), do: Module.concat(id, "GroupMonitor")

@impl GenServer
def init(id) do
{_reference, _pids} =
id
|> Sup.group()
|> :pg.monitor()

@impl GenServer
def handle_call(:count, _from, state), do: {:reply, state, state}
{:ok, 0}
end

@impl GenServer
def handle_info({ref, :join, group, pids}, count) do
{:noreply,
Enum.reduce(pids, count, fn pid, counter ->
id = Sup.ungroup(group)
name = GenServer.call(pid, :name)
Sup.put(id, name, %{node: :erlang.node(pid), pid: pid, ref: ref})
counter + 1
end)}
end

@impl GenServer
def handle_info({_ref, :leave, group, pids}, count) do
deleted =
group
|> Sup.ungroup()
|> Sup.delete_by_pids(pids)
|> map_size()

{:noreply, count - deleted}
end

@impl GenServer
def handle_call(:count, _from, count), do: {:reply, count, count}
end
end
10 changes: 9 additions & 1 deletion lib/finitomata/distributed/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,22 @@ defmodule Finitomata.Distributed.Supervisor do
)
end

def delete_by_pids(id, pids) do
def delete_by_pids(id, pids) when is_list(pids) do
{match, mismatch} =
Agent.get(agent(id), &split_with(&1, fn {_, %{pid: pid}} -> pid in pids end))

Agent.update(agent(id), fn _ -> mismatch end)
match
end

def delete_by_pids(id, %MapSet{} = pids) do
{match, mismatch} =
Agent.get(agent(id), &split_with(&1, fn {_, %{pid: pid}} -> MapSet.member?(pids, pid) end))

Agent.update(agent(id), fn _ -> mismatch end)
match
end

def put(id, name, %{pid: pid} = data) when is_pid(pid) do
Agent.update(agent(id), &Map.put(&1, name, data))
end
Expand Down
29 changes: 27 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,9 +1,35 @@
defmodule Finitomata.MixProject do
use Mix.Project

major = System.otp_release()

otp_version =
try do
{:ok, contents} = File.read(Path.join([:code.root_dir(), "releases", major, "OTP_VERSION"]))
String.split(contents, "\n", trim: true)
else
[full] -> full
_ -> major
catch
:error, _ -> major
end
|> String.split(".")
|> case do
[major] -> [major, 0, 0]
[major, minor] -> [major, minor, 0]
[major, minor, patch | _] -> [major, minor, patch]
end
|> Enum.join(".")

@modern_libs if Version.compare(otp_version, "25.1.0") == :lt,
do: [],
else: [{:enfiladex, "~> 0.1", only: [:dev, :test, :finitomata]}]

@app :finitomata
@version "0.29.10"

def lib?(lib), do: lib in Enum.map(@modern_libs, &elem(&1, 0))

def project do
[
app: @app,
Expand Down Expand Up @@ -52,15 +78,14 @@ defmodule Finitomata.MixProject do
{:telemetry_poller, "~> 1.0", optional: true},
{:telemetria, "~> 0.21", optional: true},
# dev / test
{:enfiladex, "~> 0.1", only: [:dev, :test, :finitomata]},
{:nimble_ownership, "~> 1.0", only: [:dev, :test, :ci, :finitomata]},
{:mox, "~> 1.2", only: [:dev, :test, :ci, :finitomata]},
{:stream_data, "~> 1.0"},
{:observer_cli, "~> 1.5", only: [:dev]},
{:credo, "~> 1.0", only: [:dev, :ci]},
{:dialyxir, "~> 1.0", only: [:dev, :ci], runtime: false},
{:ex_doc, "~> 0.11", only: [:dev]}
]
] ++ @modern_libs
end

defp aliases do
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"earmark_parser": {:hex, :earmark_parser, "1.4.42", "f23d856f41919f17cd06a493923a722d87a2d684f143a1e663c04a2b93100682", [:mix], [], "hexpm", "6915b6ca369b5f7346636a2f41c6a6d78b5af419d61a611079189233358b8b8b"},
"enfiladex": {:hex, :enfiladex, "0.3.1", "096b6b6f1eaaea36a1b0f3c75057db53db3b900e12d8620c66ccae2987e5c2db", [:mix], [], "hexpm", "75fb006d1d196dd1b95306fd64c11c72755201e97008e9051753fe0470af5180"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"estructura": {:hex, :estructura, "1.6.1", "724bb572194482095d250795ffe593e5f4e326b6f5dc0c7e5864695f0a9c4182", [:mix], [{:doctest_formatter, "~> 0.2", [hex: :doctest_formatter, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}], "hexpm", "b3424637719071bd9f6b7be583bce09f140795b0352e7c6b3b847c50b2784dd9"},
"estructura": {:hex, :estructura, "1.7.0", "831e767cd8957862f0a641081cef4b7a76c8b3adcf74a280286af1dedc692f38", [:mix], [{:doctest_formatter, "~> 0.2", [hex: :doctest_formatter, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}], "hexpm", "85f8bf458dbadc6f6e790d2c7d3ad6dc19e89c17fef840402df7b7252d906dc4"},
"ex_doc": {:hex, :ex_doc, "0.36.1", "4197d034f93e0b89ec79fac56e226107824adcce8d2dd0a26f5ed3a95efc36b1", [:mix], [{:earmark_parser, "~> 1.4.42", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "d7d26a7cf965dacadcd48f9fa7b5953d7d0cfa3b44fa7a65514427da44eafd89"},
"file_system": {:hex, :file_system, "1.1.0", "08d232062284546c6c34426997dd7ef6ec9f8bbd090eb91780283c9016840e8f", [:mix], [], "hexpm", "bfcf81244f416871f2a2e15c1b515287faa5db9c6bcf290222206d120b3d43f6"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
Expand Down
84 changes: 43 additions & 41 deletions test/infinitomata_test.exs
Original file line number Diff line number Diff line change
@@ -1,52 +1,54 @@
defmodule Infinitomata.Test do
use ExUnit.Case, async: true

@moduletag :distributed

setup do
{_peers, _nodes} = Enfiladex.start_peers(3)
# Enfiladex.block_call_everywhere(Infinitomata, :start_link, [InfiniTest])
Infinitomata.start_link(InfiniTest)
# on_exit(fn -> Enfiladex.stop_peers(peers) end)
Enfiladex.call_everywhere(Infinitomata, :start_link, [InfiniTest])
Process.sleep(1000)
:ok
end

test "kinda stress-test instances (distributed)" do
for i <- 1..100 do
assert match?(
{:ok, pid} when is_pid(pid),
Infinitomata.start_fsm(InfiniTest, "FSM_ST_#{i}", Finitomata.Test.Log, %{
instance: i
})
)

assert :ok = Infinitomata.transition(InfiniTest, "FSM_ST_#{i}", :accept)
assert :ok = Infinitomata.transition(InfiniTest, "FSM_ST_#{i}", :__end__)
if Finitomata.MixProject.lib?(:enfiladex) do
defmodule Infinitomata.Test do
use ExUnit.Case, async: true

@moduletag :distributed

setup do
{_peers, _nodes} = Enfiladex.start_peers(3)
# Enfiladex.block_call_everywhere(Infinitomata, :start_link, [InfiniTest])
Infinitomata.start_link(InfiniTest)
# on_exit(fn -> Enfiladex.stop_peers(peers) end)
Enfiladex.call_everywhere(Infinitomata, :start_link, [InfiniTest])
Process.sleep(1000)
:ok
end
end

test "many instances (distributed)" do
for i <- 1..10 do
Infinitomata.start_fsm(InfiniTest, "FSM_#{i}", Finitomata.Test.Log, %{instance: i})
test "kinda stress-test instances (distributed)" do
for i <- 1..100 do
assert match?(
{:ok, pid} when is_pid(pid),
Infinitomata.start_fsm(InfiniTest, "FSM_ST_#{i}", Finitomata.Test.Log, %{
instance: i
})
)

assert :ok = Infinitomata.transition(InfiniTest, "FSM_ST_#{i}", :accept)
assert :ok = Infinitomata.transition(InfiniTest, "FSM_ST_#{i}", :__end__)
end
end

assert Infinitomata.count(InfiniTest) == 10
test "many instances (distributed)" do
for i <- 1..10 do
Infinitomata.start_fsm(InfiniTest, "FSM_#{i}", Finitomata.Test.Log, %{instance: i})
end

for i <- 1..10 do
Infinitomata.transition(InfiniTest, "FSM_#{i}", :accept)
end
assert Infinitomata.count(InfiniTest) == 10

assert %{"FSM_1" => %{}} = Infinitomata.all(InfiniTest)
for i <- 1..10 do
Infinitomata.transition(InfiniTest, "FSM_#{i}", :accept)
end

for i <- 1..10 do
Infinitomata.transition(InfiniTest, "FSM_#{i}", :__end__)
end
assert %{"FSM_1" => %{}} = Infinitomata.all(InfiniTest)

for i <- 1..10 do
Infinitomata.transition(InfiniTest, "FSM_#{i}", :__end__)
end

Process.sleep(1_000)
Process.sleep(1_000)

assert Infinitomata.count(InfiniTest) == 0
assert Infinitomata.all(InfiniTest) == %{}
assert Infinitomata.count(InfiniTest) == 0
assert Infinitomata.all(InfiniTest) == %{}
end
end
end

0 comments on commit 34b4f61

Please sign in to comment.