diff --git a/lib/archethic/self_repair/sync.ex b/lib/archethic/self_repair/sync.ex index 0c19f4e33..3cb8bec68 100644 --- a/lib/archethic/self_repair/sync.ex +++ b/lib/archethic/self_repair/sync.ex @@ -480,7 +480,7 @@ defmodule Archethic.SelfRepair.Sync do previous_summary_time ) - consolidated_attestation = consolidate_recipients(attestation, tx) + consolidated_attestation = consolidate_recipients(attestation, tx, download_nodes) {consolidated_attestation, tx, inputs} end, max_concurrency: System.schedulers_online() * 2, @@ -507,29 +507,15 @@ defmodule Archethic.SelfRepair.Sync do movements_addresses: movements_addresses } }, - %Transaction{validation_stamp: %ValidationStamp{recipients: recipients = [_ | _]}} + %Transaction{validation_stamp: %ValidationStamp{recipients: recipients = [_ | _]}}, + download_nodes ) do - authorized_nodes = P2P.authorized_and_available_nodes() + resolved_addresses = + Enum.chunk_every(movements_addresses, 2) |> Enum.map(&List.to_tuple/1) |> Map.new() consolidated_movements_addresses = recipients - |> Task.async_stream( - fn recipient -> - genesis_nodes = Election.chain_storage_nodes(recipient, authorized_nodes) - - case TransactionChain.fetch_genesis_address(recipient, genesis_nodes, - acceptance_resolver: :accept_different_genesis - ) do - {:ok, genesis_address} -> - [recipient, genesis_address] - - {:error, reason} -> - raise SelfRepair.Error, - function: "consolidate_recipients", - message: "Failed to fetch genesis address with error #{inspect(reason)}", - address: recipient - end - end, + |> Task.async_stream(&do_consolidate_recipient(&1, resolved_addresses, download_nodes), max_concurrency: length(recipients) ) |> Stream.flat_map(fn {:ok, addresses} -> addresses end) @@ -543,7 +529,33 @@ defmodule Archethic.SelfRepair.Sync do %ReplicationAttestation{attestation | transaction_summary: adjusted_summary} end - defp consolidate_recipients(attestation, _tx), do: attestation + defp consolidate_recipients(attestation, _tx, _), do: attestation + + defp do_consolidate_recipient(recipient, resolved_addresses, download_nodes) do + case Map.fetch(resolved_addresses, recipient) do + {:ok, genesis} -> + [recipient, genesis] + + _ -> + genesis_nodes = Election.chain_storage_nodes(recipient, download_nodes) + + case TransactionChain.fetch_genesis_address(recipient, genesis_nodes, + acceptance_resolver: :accept_different_genesis + ) do + {:ok, genesis_address} -> + [recipient, genesis_address] + + {:error, :acceptance_failed} -> + [recipient, recipient] + + {:error, reason} -> + raise SelfRepair.Error, + function: "consolidate_recipients", + message: "Failed to fetch genesis address with error #{inspect(reason)}", + address: recipient + end + end + end defp sync_node(end_of_node_synchronizations) do end_of_node_synchronizations diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 704661ec7..b8fdfcf31 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -821,7 +821,7 @@ defmodule Archethic.TransactionChain do It queries the the network for genesis address. """ @spec fetch_genesis_address(address :: binary(), nodes :: list(Node.t()), opts :: Keyword.t()) :: - {:ok, binary()} | {:error, :network_issue} + {:ok, binary()} | {:error, :network_issue} | {:error, :acceptance_failed} def fetch_genesis_address(address, nodes, opts \\ []) when is_binary(address) do case find_genesis_address(address) do {:error, :not_found} -> @@ -851,11 +851,8 @@ defmodule Archethic.TransactionChain do timeout: timeout, acceptance_resolver: acceptance_resolver ) do - {:ok, %GenesisAddress{address: genesis_address}} -> - {:ok, genesis_address} - - _ -> - {:error, :network_issue} + {:ok, %GenesisAddress{address: genesis_address}} -> {:ok, genesis_address} + error -> error end res -> diff --git a/test/archethic/bootstrap/network_init_test.exs b/test/archethic/bootstrap/network_init_test.exs index 6a9b8f3b8..041203da5 100644 --- a/test/archethic/bootstrap/network_init_test.exs +++ b/test/archethic/bootstrap/network_init_test.exs @@ -332,8 +332,8 @@ defmodule Archethic.Bootstrap.NetworkInitTest do _, %GetTransactionChainLength{}, _ -> %TransactionChainLength{length: 1} - _, %GetGenesisAddress{}, _ -> - {:ok, %NotFound{}} + _, %GetGenesisAddress{address: address}, _ -> + {:ok, %GenesisAddress{address: address, timestamp: DateTime.utc_now()}} end) P2P.add_and_connect_node(%Node{ diff --git a/test/archethic/bootstrap_test.exs b/test/archethic/bootstrap_test.exs index 05e005cef..054f8449b 100644 --- a/test/archethic/bootstrap_test.exs +++ b/test/archethic/bootstrap_test.exs @@ -18,6 +18,7 @@ defmodule Archethic.BootstrapTest do EncryptedStorageNonce, GetBootstrappingNodes, GetGenesisAddress, + GenesisAddress, GetLastTransactionAddress, GetStorageNonce, GetTransaction, @@ -126,14 +127,11 @@ defmodule Archethic.BootstrapTest do _, %GetTransactionChainLength{}, _ -> %TransactionChainLength{length: 1} - _, %GetGenesisAddress{}, _ -> - {:ok, %NotFound{}} + _, %GetGenesisAddress{address: address}, _ -> + {:ok, %GenesisAddress{address: address, timestamp: DateTime.utc_now()}} _, %GetCurrentReplicationAttestations{}, _ -> - {:ok, - %CurrentReplicationAttestations{ - replication_attestations: [] - }} + {:ok, %CurrentReplicationAttestations{replication_attestations: []}} end) {:ok, daily_nonce_agent} = Agent.start_link(fn -> %{} end)