From e92cef4360e9c3e9837564d963356bc52e29c4cb Mon Sep 17 00:00:00 2001 From: Chralu Date: Thu, 6 Feb 2025 12:27:33 +0100 Subject: [PATCH] feat(P2PView): :sparkles: Store P2PView historic data. --- lib/archethic/p2p/mem_table.ex | 1 + lib/archethic/p2p/p2p_view.ex | 296 +++++++++++++++++++++++++++ test/archethic/p2p/p2p_view_test.exs | 230 +++++++++++++++++++++ 3 files changed, 527 insertions(+) create mode 100644 lib/archethic/p2p/p2p_view.ex create mode 100644 test/archethic/p2p/p2p_view_test.exs diff --git a/lib/archethic/p2p/mem_table.ex b/lib/archethic/p2p/mem_table.ex index 4b61706b7..cea91d3ec 100644 --- a/lib/archethic/p2p/mem_table.ex +++ b/lib/archethic/p2p/mem_table.ex @@ -229,6 +229,7 @@ defmodule Archethic.P2P.MemTable do @doc """ List the P2P nodes """ + # TODO add date en parametre. retourner tout les noeuds ou enrollment_date < date @spec list_nodes() :: list(Node.t()) def list_nodes do :ets.foldl( diff --git a/lib/archethic/p2p/p2p_view.ex b/lib/archethic/p2p/p2p_view.ex new file mode 100644 index 000000000..1771f50cf --- /dev/null +++ b/lib/archethic/p2p/p2p_view.ex @@ -0,0 +1,296 @@ +defmodule Archethic.P2P.P2PView do + defstruct [ + :geo_patch, + :available?, + :avg_availability + ] + + @type t :: %{ + geo_patch: binary(), + available?: boolean(), + avg_availability: float() + } + + @archethic_db_p2pview :archethic_db_p2pview + + require Logger + use GenServer + + @spec start_link(Keyword.t()) :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def init(_) do + setup_ets_table() + {:ok, %{}} + end + + defp setup_ets_table, do: :ets.new(@archethic_db_p2pview, [:ordered_set, :named_table]) + + @spec get_summary(timestamp :: DateTime.t()) :: list(t()) + def get_summary(timestamp) do + DateTime.to_unix(timestamp) + |> read_nodes() + |> deserialize() + end + + @spec update_node( + changes :: Keyword.t(), + start_timestamp :: DateTime.t(), + index_at_timestamp :: (DateTime.t() -> integer()) + ) :: :ok + def update_node(changes, start_timestamp, index_at_timestamp) do + unix_start_timestamp = DateTime.to_unix(start_timestamp) + + changes = changes |> Enum.map(fn {key, value} -> {key, {value, true}} end) + + GenServer.call( + __MODULE__, + {:update_node, changes, unix_start_timestamp, index_at_timestamp} + ) + end + + @spec add_node( + node :: t(), + start_timestamp :: DateTime.t(), + index_at_timestamp :: (DateTime.t() -> integer()) + ) :: :ok + def add_node(node, start_timestamp, index_at_timestamp) do + unix_start_timestamp = DateTime.to_unix(start_timestamp) + node_bin = serialize_node(node, true) + GenServer.call(__MODULE__, {:add_node, node_bin, unix_start_timestamp, index_at_timestamp}) + end + + def handle_call({:update_node, changes, unix_start_timestamp, index_at_timestamp}, _from, state) do + do_update_node(changes, unix_start_timestamp, index_at_timestamp) + {:reply, :ok, state} + end + + def handle_call({:add_node, node_bin, unix_start_timestamp, index_at_timestamp}, _from, state) do + do_add_node(node_bin, unix_start_timestamp, index_at_timestamp) + {:reply, :ok, state} + end + + defp do_update_node(_, :"$end_of_table", _), do: :ok + defp do_update_node([], _, _), do: :ok + + defp do_update_node(changes, unix_timestamp, index_at_timestamp) do + node_index = index_at_timestamp.(DateTime.from_unix!(unix_timestamp)) + bin_p2p_view = read_nodes(unix_timestamp) + + {prefix, bin_node, suffix} = get_bin_node(bin_p2p_view, node_index) + + changes_to_apply = + changes + |> Enum.filter(&should_apply_change?(&1, bin_node)) + + updated_node = + bin_node + |> apply_changes_to_node(changes_to_apply) + + (prefix <> updated_node <> suffix) + |> write_nodes(unix_timestamp) + + changes_to_apply = + changes_to_apply |> Enum.map(fn {key, {value, _}} -> {key, {value, false}} end) + + do_update_node( + changes_to_apply, + :ets.next(@archethic_db_p2pview, unix_timestamp), + index_at_timestamp + ) + end + + defp should_apply_change?({key, {_, changed?}}, bin_node) do + {_, previously_changed?} = get_bin_node_property(bin_node, key) + changed? == true || previously_changed? != 1 + end + + defp do_add_node(_, :"$end_of_table", _), do: :ok + + defp do_add_node(node_bin, unix_timestamp, index_at_timestamp) do + node_index = index_at_timestamp.(DateTime.from_unix!(unix_timestamp)) + + read_nodes(unix_timestamp) + |> insert_bin_node(node_index, node_bin) + |> write_nodes(unix_timestamp) + + do_add_node( + node_bin, + :ets.next(@archethic_db_p2pview, unix_timestamp), + index_at_timestamp + ) + end + + # TODO decliner avec enregistrement sur fichier + defp read_nodes(unix_timestamp) do + case :ets.prev(@archethic_db_p2pview, unix_timestamp + 1) do + :"$end_of_table" -> + <<>> + + ^unix_timestamp = index -> + :ets.lookup_element(@archethic_db_p2pview, index, 2) + + index -> + :ets.lookup_element(@archethic_db_p2pview, index, 2) + |> reset_bin_change_bits() + end + end + + # TODO decliner avec enregistrement sur fichier + defp write_nodes(nodes, unix_timestamp) do + :ets.insert( + @archethic_db_p2pview, + {unix_timestamp, nodes} + ) + + :ok + end + + @bin_node_byte_size 8 + + defp serialize(p2p_view, are_new_nodes?, acc \\ <<>>) + + defp serialize([], _, acc), do: acc + + defp serialize([node | rest], are_new_nodes?, acc) do + node_bin = serialize_node(node, are_new_nodes?) + + serialize( + rest, + are_new_nodes?, + acc <> node_bin + ) + end + + defp serialize_node( + %__MODULE__{ + geo_patch: geo_patch, + available?: available?, + avg_availability: avg_availability + }, + is_new_node? + ) do + [{:geo_patch, geo_patch}, {:available?, available?}, {:avg_availability, avg_availability}] + |> Enum.reduce( + <<>>, + &(&2 <> serialize_boolean(is_new_node?) <> serialize_node_property(&1)) + ) + end + + defp serialize_node_property({:geo_patch, value}), do: <> + defp serialize_node_property({:available?, value}), do: serialize_boolean(value) + defp serialize_node_property({:avg_availability, value}), do: <> + + defp serialize_boolean(true), do: <<1::8>> + defp serialize_boolean(false), do: <<0::8>> + + defp deserialize(rest, acc \\ []) + + defp deserialize(<<>>, acc) do + acc |> Enum.reverse() + end + + defp deserialize( + <>, + acc + ) do + node = deserialize_node(node_bin) + + deserialize(rest, [node | acc]) + end + + defp deserialize_node( + <<_::8, geo_patch::binary-size(3), _::8, available?, _::8, avg_availability::8>> + ) do + %__MODULE__{ + geo_patch: geo_patch, + available?: available? == 1, + avg_availability: avg_availability / 100 + } + end + + # Helper functions for bin_node manipulation in binary form + + defp reset_bin_change_bits(bin_p2p_view, acc \\ <<>>) + + defp reset_bin_change_bits(<<>>, acc), do: acc + + defp reset_bin_change_bits( + bin_p2p_view, + acc + ) do + <<_::8, geo_patch::binary-size(3), _::8, available?::8, _::8, avg_availability::8, + rest::binary>> = bin_p2p_view + + acc = + <> + + reset_bin_change_bits(rest, acc) + end + + defp get_bin_node( + bin_p2p_view, + index + ) do + prefix_size = @bin_node_byte_size * index + + <> = bin_p2p_view + + {prefix, bin_node, suffix} + end + + defp get_bin_node_property( + <>, + :geo_patch + ) do + {<>, geo_patch_changed?} + end + + defp get_bin_node_property( + <<_::32, available_changed?::8, available?::8, _::binary>>, + :available + ) do + {<>, available_changed?} + end + + defp get_bin_node_property( + <<_::48, avg_availability_changed?::8, avg_availability::8>>, + :avg_availability + ) do + {<>, avg_availability_changed?} + end + + defp insert_bin_node( + bin_p2p_view, + index, + bin_node + ) do + prefix_size = @bin_node_byte_size * index + + <> = bin_p2p_view + + prefix <> bin_node <> suffix + end + + defp apply_changes_to_node( + bin_node, + changes + ) do + [:geo_patch, :available, :avg_availability] + |> Enum.reduce(<<>>, fn key, acc -> + acc <> + case changes[key] do + nil -> + {value, changed?} = get_bin_node_property(bin_node, key) + <> + + {value, changed?} -> + serialize_boolean(changed?) <> serialize_node_property({key, value}) + end + end) + end +end diff --git a/test/archethic/p2p/p2p_view_test.exs b/test/archethic/p2p/p2p_view_test.exs new file mode 100644 index 000000000..5e19081b8 --- /dev/null +++ b/test/archethic/p2p/p2p_view_test.exs @@ -0,0 +1,230 @@ +defmodule Archethic.P2P.P2PViewTest do + use ArchethicCase + + alias Archethic.P2P.P2PView + + # import ArchethicCase + + # Doit stocker sur disque les années précédentes + + setup do + {:ok, _pid} = P2PView.start_link() + :ok + end + + @spec init_p2pview(id :: non_neg_integer()) :: P2PView.t() + defp init_p2pview(id) do + %P2PView{ + geo_patch: "AAA", + available?: true, + avg_availability: id / 10 + } + end + + @doc """ + Gets P2PView nodes from unix timestamp. + """ + def get_summary(timestamp) do + P2PView.get_summary(DateTime.from_unix!(timestamp)) + end + + describe "get_summary/1" do + test "should return the requested timestamp data when available" do + node1 = init_p2pview(1) + node2 = init_p2pview(2) + + date0 = DateTime.truncate(DateTime.utc_now(), :second) + date3 = DateTime.add(date0, 3) + + # Given + # 0 : [node1] + # 3 : [node1, node2] + P2PView.add_node(node1, date0, fn _ -> 0 end) + P2PView.add_node(node2, date3, fn _ -> 1 end) + + assert [node1, node2] == P2PView.get_summary(date3) + end + + test "should return the previous timestamp state when no data at requested timestamp" do + node1 = init_p2pview(1) + node2 = init_p2pview(2) + + date0 = DateTime.truncate(DateTime.utc_now(), :second) + date2 = DateTime.add(date0, 2) + date3 = DateTime.add(date0, 3) + + # Given + # 0 : [node1] + # 2 : [node1, node2] + P2PView.add_node(node1, date0, fn _ -> 0 end) + P2PView.add_node(node2, date2, fn _ -> 1 end) + + assert [node1, node2] == P2PView.get_summary(date3) + end + end + + describe "add_node/3" do + test "should update first record and the following ones" do + node1 = init_p2pview(1) + node2 = init_p2pview(2) + node3 = init_p2pview(3) + + date0 = DateTime.truncate(DateTime.utc_now(), :second) + date3 = DateTime.add(date0, 3) + + # Given + # 0 : [node1] + # 3 : [node1, node2] + P2PView.add_node(node1, date0, fn _ -> 0 end) + P2PView.add_node(node2, date3, fn _ -> 1 end) + + # When adding a new node from timestamp 1 + assert :ok == + P2PView.add_node( + node3, + date0, + fn + ^date0 -> 1 + _ -> 2 + end + ) + + assert [node1, node3] == P2PView.get_summary(date0) + assert [node1, node2, node3] == P2PView.get_summary(date3) + end + + test "should create first record and update the following ones" do + node1 = init_p2pview(1) + node2 = init_p2pview(2) + node3 = init_p2pview(3) + + date0 = DateTime.truncate(DateTime.utc_now(), :second) + date1 = DateTime.add(date0, 1) + date3 = DateTime.add(date0, 3) + + # Given + # 0 : [node1] + # 3 : [node1, node2] + P2PView.add_node(node1, date0, fn _ -> 0 end) + P2PView.add_node(node2, date3, fn _ -> 1 end) + + # When adding a new node from timestamp 1 + assert :ok == + P2PView.add_node( + node3, + date1, + fn + ^date1 -> 0 + _ -> 1 + end + ) + + assert [node1] == P2PView.get_summary(date0) + assert [node3, node1] == P2PView.get_summary(date1) + assert [node1, node3, node2] == P2PView.get_summary(date3) + end + end + + describe "update_node/2" do + test "should update first record and the following ones" do + node1 = init_p2pview(1) + node2 = init_p2pview(2) + node3 = init_p2pview(3) + + date0 = DateTime.truncate(DateTime.utc_now(), :second) + date1 = DateTime.add(date0, 1) + date3 = DateTime.add(date0, 3) + + # Given + # 0 : [node1] + # 1 : [node1, node2] + # 3 : [node1, node2, node3] + P2PView.add_node(node1, date0, fn _ -> 0 end) + P2PView.add_node(node2, date1, fn _ -> 1 end) + P2PView.add_node(node3, date3, fn _ -> 2 end) + + # When updating node 2 from timestamp 3 + assert :ok == + P2PView.update_node( + [avg_availability: 0.5], + date1, + fn _ -> 1 end + ) + + updated_node_2 = %{node2 | avg_availability: 0.5} + + assert [node1] == P2PView.get_summary(date0) + assert [node1, updated_node_2] == P2PView.get_summary(date1) + assert [node1, updated_node_2, node3] == P2PView.get_summary(date3) + end + end + + test "should create first record and update the following ones" do + node1 = init_p2pview(1) + node2 = init_p2pview(2) + + date0 = DateTime.truncate(DateTime.utc_now(), :second) + date1 = DateTime.add(date0, 1) + date2 = DateTime.add(date0, 2) + date3 = DateTime.add(date0, 3) + + # Given + # 0 : [node1] + # 1 : [node1, node2] + # 3 : [node1, node2] + P2PView.add_node(node1, date0, fn _ -> 0 end) + P2PView.add_node(node2, date1, fn _ -> 1 end) + + # When updating node 2 from timestamp 2 + assert :ok == + P2PView.update_node( + [avg_availability: 0.5], + date2, + fn _ -> 1 end + ) + + updated_node_2 = %{node2 | avg_availability: 0.5} + + assert [node1] == P2PView.get_summary(date0) + assert [node1, node2] == P2PView.get_summary(date1) + assert [node1, updated_node_2] == P2PView.get_summary(date2) + assert [node1, updated_node_2] == P2PView.get_summary(date3) + end + + test "should update nodes properties until another mutation is met" do + node1 = init_p2pview(1) + + date0 = DateTime.truncate(DateTime.utc_now(), :second) + date1 = DateTime.add(date0, 1) + date2 = DateTime.add(date0, 2) + + # Given + # 0 : [node1] + P2PView.add_node(node1, date0, fn _ -> 0 end) + + # Given a mutation on timestamp 1 + P2PView.update_node( + [geo_patch: "CCC"], + date1, + fn _ -> 0 end + ) + + P2PView.update_node( + [avg_availability: 0.5], + date2, + fn _ -> 0 end + ) + + # When updating node 1 from timestamp 0 + assert :ok == + P2PView.update_node( + [geo_patch: "BBB", avg_availability: 0.3], + date0, + fn _ -> 0 end + ) + + assert [%{node1 | avg_availability: 0.3, geo_patch: "BBB"}] == P2PView.get_summary(date0) + assert [%{node1 | avg_availability: 0.3, geo_patch: "CCC"}] == P2PView.get_summary(date1) + assert [%{node1 | avg_availability: 0.5, geo_patch: "CCC"}] == P2PView.get_summary(date2) + end +end