Skip to content

Commit

Permalink
TaskSupervisor partitionning
Browse files Browse the repository at this point in the history
  • Loading branch information
bchamagne committed Nov 5, 2024
1 parent 507c4be commit ff91282
Show file tree
Hide file tree
Showing 43 changed files with 111 additions and 122 deletions.
2 changes: 0 additions & 2 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import Config
# Print only errors during test
config :logger, level: :error

config :archethic, Archethic.TaskSupervisor, enabled: true

config :archethic, :mut_dir, "data_test"

config :archethic, Archethic.BeaconChain.Subset, enabled: false
Expand Down
16 changes: 12 additions & 4 deletions lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule Archethic do
alias Archethic.SelfRepair
alias Archethic.SelfRepair.NetworkChain
alias Archethic.SelfRepair.NetworkView
alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput
Expand All @@ -43,6 +43,14 @@ defmodule Archethic do
:persistent_term.get(:archethic_up, nil) == :up
end

@doc """
Return the via tuple to use in the Task.Supervisor module.
"""
@spec task_supervisors() :: tuple()
def task_supervisors() do
{:via, PartitionSupervisor, {Archethic.TaskSupervisors, self()}}
end

@doc """
Search a transaction by its address
Check locally and fallback to a quorum read
Expand Down Expand Up @@ -126,7 +134,7 @@ defmodule Archethic do
}

Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
validation_nodes,
&P2P.send_message(&1, message),
ordered: false,
Expand Down Expand Up @@ -173,7 +181,7 @@ defmodule Archethic do
nodes
end

TaskSupervisor
Archethic.task_supervisors()
|> Task.Supervisor.start_child(fn ->
message = %NewTransaction{
transaction: tx,
Expand Down Expand Up @@ -265,7 +273,7 @@ defmodule Archethic do
defp get_welcome_node_public_key(_, key), do: key

defp notify_welcome_node(welcome_node_key, address, :already_locked) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
Task.Supervisor.start_child(task_supervisors(), fn ->
message = %ValidationError{error: MiningError.new(:transaction_in_mining), address: address}
P2P.send_message(welcome_node_key, message)
end)
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ defmodule Archethic.Application do
transport = Keyword.get(p2p_endpoint_conf, :transport, :tcp)

children = [
{Task.Supervisor, name: Archethic.TaskSupervisor},
{PartitionSupervisor, child_spec: Task.Supervisor, name: Archethic.TaskSupervisors},
Archethic.Telemetry,
{Registry, keys: :duplicate, name: Archethic.PubSubRegistry},
DBSupervisor,
Expand Down
6 changes: 2 additions & 4 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ defmodule Archethic.BeaconChain do
alias Archethic.P2P.Message.NotFound
alias Archethic.P2P.Message.TransactionSummaryList

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.DB
Expand Down Expand Up @@ -125,7 +123,7 @@ defmodule Archethic.BeaconChain do
@spec load_slot(Slot.t(), Crypto.key()) :: :ok | :error
def load_slot(slot = %Slot{subset: subset, slot_time: slot_time}, node_public_key) do
if slot_time == SlotTimer.previous_slot(DateTime.utc_now()) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
Task.Supervisor.start_child(Archethic.task_supervisors(), fn ->
case validate_slot(slot) do
:ok ->
Logger.debug("New beacon slot loaded - #{inspect(slot)}",
Expand Down Expand Up @@ -321,7 +319,7 @@ defmodule Archethic.BeaconChain do
# download the summaries
result =
Task.Supervisor.async_stream(
TaskSupervisor,
Archethic.task_supervisors(),
summaries_by_node,
fn {node, addresses} ->
fetch_beacon_summaries(node, addresses)
Expand Down
4 changes: 1 addition & 3 deletions lib/archethic/beacon_chain/network_coordinates.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do
alias Archethic.SelfRepair
alias Archethic.Utils

alias Archethic.TaskSupervisor

@doc """
Return the timeout to determine network patches
It is equivalent to 4m30s in production. 4.5s in dev.
Expand Down Expand Up @@ -226,7 +224,7 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do

defp stream_network_stats(summary_time, beacon_nodes, timeout) do
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
Archethic.task_supervisors(),
beacon_nodes,
fn node ->
P2P.send_message(node, %GetNetworkStats{summary_time: summary_time}, timeout)
Expand Down
4 changes: 1 addition & 3 deletions lib/archethic/beacon_chain/slot/validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ defmodule Archethic.BeaconChain.Slot.Validation do
alias Archethic.P2P
alias Archethic.P2P.Node

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain.TransactionSummary

require Logger
Expand All @@ -21,7 +19,7 @@ defmodule Archethic.BeaconChain.Slot.Validation do
@spec valid_transaction_attestations?(Slot.t()) :: boolean()
def valid_transaction_attestations?(%Slot{transaction_attestations: transaction_attestations}) do
Task.Supervisor.async_stream(
TaskSupervisor,
Archethic.task_supervisors(),
transaction_attestations,
&valid_transaction_attestation/1,
ordered: false,
Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Archethic.BeaconChain.Subset do
alias Archethic.P2P.Message.ReplicationAttestationMessage

alias Archethic.PubSub
alias Archethic.TaskSupervisor

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.Utils
Expand Down Expand Up @@ -387,7 +387,7 @@ defmodule Archethic.BeaconChain.Subset do
network_patches_timeout = NetworkCoordinates.timeout()

patch_task =
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
get_network_patches(time, subset, network_patches_timeout)
end)

Expand Down
7 changes: 4 additions & 3 deletions lib/archethic/beacon_chain/subset/p2p_sampling.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ defmodule Archethic.BeaconChain.Subset.P2PSampling do
alias Archethic.P2P.Message.Ping
alias Archethic.P2P.Node

alias Archethic.TaskSupervisor

@type p2p_view :: {available? :: boolean(), latency :: non_neg_integer()}

@doc """
Expand All @@ -29,7 +27,10 @@ defmodule Archethic.BeaconChain.Subset.P2PSampling do
def get_p2p_views(nodes, nodes_availability_times) when is_list(nodes) do
timeout = 1_000

Task.Supervisor.async_stream_nolink(TaskSupervisor, nodes, &do_sample_p2p_view(&1, timeout),
Task.Supervisor.async_stream_nolink(
Archethic.task_supervisors(),
nodes,
&do_sample_p2p_view(&1, timeout),
on_timeout: :kill_task
)
|> Enum.with_index()
Expand Down
4 changes: 1 addition & 3 deletions lib/archethic/beacon_chain/update.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ defmodule Archethic.BeaconChain.Update do
alias Archethic.P2P.Node
alias Archethic.P2P.Message.RegisterBeaconUpdates

alias Archethic.TaskSupervisor

def start_link(args \\ [], opts \\ [name: __MODULE__]) do
GenServer.start_link(__MODULE__, args, opts)
end
Expand Down Expand Up @@ -52,7 +50,7 @@ defmodule Archethic.BeaconChain.Update do
state
else
Task.Supervisor.async_stream(
TaskSupervisor,
Archethic.task_supervisors(),
nodes_to_subscribe,
fn node ->
{P2P.send_message(node, message), node.first_public_key}
Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/contracts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ defmodule Archethic.Contracts do
}

task =
Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
try do
# TODO: logs
logs = []
Expand Down Expand Up @@ -290,7 +290,7 @@ defmodule Archethic.Contracts do

queued_calls =
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
genesis_addresses,
fn genesis_address ->
genesis_address
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ defmodule Archethic.Contracts.Interpreter.Library.Common.HttpImpl do
alias Archethic.Tag
alias Archethic.Contracts.Interpreter.Library
alias Archethic.Contracts.Interpreter.Library.Common.Http
alias Archethic.TaskSupervisor

use Tag

Expand Down Expand Up @@ -129,7 +128,7 @@ defmodule Archethic.Contracts.Interpreter.Library.Common.HttpImpl do
"body" => request_body
}
) do
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
with :ok <- validate_request(url, method, headers, request_body),
headers <- Map.to_list(headers),
{:ok, uri} <- URI.new(url),
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/contracts/loader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ defmodule Archethic.Contracts.Loader do
defp resolve_genesis_address(recipients, authorized_nodes, protocol_version)
when protocol_version <= 7 do
Task.Supervisor.async_stream(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
recipients,
fn address ->
nodes = Election.chain_storage_nodes(address, authorized_nodes)
Expand Down
3 changes: 1 addition & 2 deletions lib/archethic/governance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule Archethic.Governance do
alias Archethic.TransactionChain.TransactionData
alias Archethic.TransactionChain.TransactionData.Recipient

alias Archethic.TaskSupervisor
alias Archethic.Utils

@proposal_tx_select_fields [
Expand Down Expand Up @@ -118,7 +117,7 @@ defmodule Archethic.Governance do
with true <- Utils.key_in_node_list?(storage_nodes, Crypto.first_node_public_key()),
{:ok, prop} <- get_code_proposal(prop_address),
true <- Code.enough_code_approval?(prop) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
Task.Supervisor.start_child(Archethic.task_supervisors(), fn ->
if Code.valid_integration?(prop) do
Code.deploy_proposal_testnet(prop)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/mining.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ defmodule Archethic.Mining do

aggregated_responses =
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
storage_nodes,
&P2P.send_message(&1, message),
max_concurrency: nb_storage_nodes,
Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/mining/distributed_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,7 @@ defmodule Archethic.Mining.DistributedWorkflow do

results =
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
storage_nodes,
&P2P.send_message(&1, message),
ordered: false,
Expand Down Expand Up @@ -1223,7 +1223,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
# Notify error to the welcome node
message = %ValidationError{error: error, address: tx_address}

Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
P2P.send_message(welcome_node, message)
:ok
end)
Expand All @@ -1235,7 +1235,7 @@ defmodule Archethic.Mining.DistributedWorkflow do
end

defp time_offset(ref_time, timeout) do
# If time offset is negative (timeout is already passed based on ref_time)
# If time offset is negative (timeout is already passed based on ref_time)
# this function returns 0. GenStateMachine works with timeout = 0 and directly create
# an event in the Process message box after all external message already in the box
ref_time
Expand Down
7 changes: 3 additions & 4 deletions lib/archethic/mining/pending_transaction_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ defmodule Archethic.Mining.PendingTransactionValidation do
alias Archethic.SharedSecrets
alias Archethic.SharedSecrets.NodeRenewal

alias Archethic.TaskSupervisor
alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.TransactionData
Expand Down Expand Up @@ -829,13 +828,13 @@ defmodule Archethic.Mining.PendingTransactionValidation do

# fetch in parallel the data we need
tasks = [
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
fetch_previous_tx_genesis_address(tx)
end),
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
TransactionChain.fetch_genesis_address(token_address, storage_nodes)
end),
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
TransactionChain.fetch_transaction(token_address, storage_nodes)
end)
]
Expand Down
3 changes: 1 addition & 2 deletions lib/archethic/mining/smart_contract_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ defmodule Archethic.Mining.SmartContractValidation do

alias Archethic.TransactionChain.TransactionData
alias Archethic.TransactionChain.TransactionData.Recipient
alias Archethic.TaskSupervisor

alias Crontab.CronExpression.Parser, as: CronParser
alias Crontab.DateChecker, as: CronDateChecker
Expand Down Expand Up @@ -54,7 +53,7 @@ defmodule Archethic.Mining.SmartContractValidation do
default_error =
Error.new(:invalid_recipients_execution, "Failed to validate call due to timeout")

TaskSupervisor
Archethic.task_supervisors()
|> Task.Supervisor.async_stream_nolink(
recipients,
&request_contract_validation(&1, transaction, validation_time),
Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/mining/standalone_workflow.ex
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do

results =
Task.Supervisor.async_stream_nolink(
Archethic.TaskSupervisor,
Archethic.task_supervisors(),
replication_nodes,
&P2P.send_message(&1, message),
ordered: false,
Expand Down Expand Up @@ -246,7 +246,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do
# Notify error to the welcome node
message = %ValidationError{address: tx_address, error: error}

Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
P2P.send_message(
Crypto.last_node_public_key(),
message
Expand Down Expand Up @@ -323,7 +323,7 @@ defmodule Archethic.Mining.StandaloneWorkflow do
# Notify error to the welcome node
message = %ValidationError{error: Error.new(:timeout), address: tx.address}

Task.Supervisor.async_nolink(Archethic.TaskSupervisor, fn ->
Task.Supervisor.async_nolink(Archethic.task_supervisors(), fn ->
P2P.send_message(welcome_node, message)
:ok
end)
Expand Down
10 changes: 4 additions & 6 deletions lib/archethic/mining/transaction_context.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ defmodule Archethic.Mining.TransactionContext do
alias Archethic.P2P.Message.Ping
alias Archethic.P2P.Node

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.UnspentOutput
Expand Down Expand Up @@ -89,7 +87,7 @@ defmodule Archethic.Mining.TransactionContext do
previous_storage_nodes = Election.chain_storage_nodes(previous_address, authorized_nodes)

Task.Supervisor.async(
TaskSupervisor,
Archethic.task_supervisors(),
fn ->
# Timeout of 4 sec because the coordinator node wait 5 sec to get the context
# from the cross validation nodes
Expand All @@ -115,17 +113,17 @@ defmodule Archethic.Mining.TransactionContext do
|> Election.chain_storage_nodes(authorized_nodes)
|> Election.get_synchronized_nodes_before(previous_summary_time)

Task.Supervisor.async(TaskSupervisor, fn ->
Task.Supervisor.async(Archethic.task_supervisors(), fn ->
genesis_address
|> TransactionChain.fetch_unspent_outputs(genesis_nodes)
|> Enum.to_list()
end)
end

defp request_nodes_view(node_public_keys) do
Task.Supervisor.async(TaskSupervisor, fn ->
Task.Supervisor.async(Archethic.task_supervisors(), fn ->
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
Archethic.task_supervisors(),
node_public_keys,
fn node_public_key ->
{node_public_key, P2P.send_message(node_public_key, %Ping{}, 1000)}
Expand Down
Loading

0 comments on commit ff91282

Please sign in to comment.