diff --git a/lib/archethic/db.ex b/lib/archethic/db.ex index d576888aa..26a5354bc 100644 --- a/lib/archethic/db.ex +++ b/lib/archethic/db.ex @@ -33,7 +33,8 @@ defmodule Archethic.DB do order: :asc | :desc ] ) :: Enumerable.t() - @callback write_transaction(Transaction.t(), storage_type()) :: :ok + @callback write_transaction(Transaction.t(), storage_type()) :: + :ok | {:error, :transaction_already_exists} @callback write_beacon_summary(Summary.t()) :: :ok @callback clear_beacon_summaries() :: :ok @callback write_beacon_summaries_aggregate(SummaryAggregate.t()) :: :ok diff --git a/lib/archethic/db/embedded_impl.ex b/lib/archethic/db/embedded_impl.ex index df73748eb..114ae37a5 100644 --- a/lib/archethic/db/embedded_impl.ex +++ b/lib/archethic/db/embedded_impl.ex @@ -44,37 +44,31 @@ defmodule Archethic.DB.EmbeddedImpl do @doc """ Write a single transaction and append it to its chain """ - @spec write_transaction(Transaction.t(), DB.storage_type()) :: :ok + @spec write_transaction(Transaction.t(), DB.storage_type()) :: + :ok | {:error, :transaction_already_exists} def write_transaction(tx, storage_type \\ :chain) def write_transaction(tx = %Transaction{}, :chain) do - if ChainIndex.transaction_exists?(tx.address, filepath()) do - {:error, :transaction_already_exists} - else - previous_address = Transaction.previous_address(tx) - - genesis_address = - case ChainIndex.get_tx_entry(previous_address, filepath()) do - {:ok, %{genesis_address: genesis_address}} -> - genesis_address - - {:error, :not_exists} -> - previous_address - end + previous_address = Transaction.previous_address(tx) + + genesis_address = + case ChainIndex.get_tx_entry(previous_address, filepath()) do + {:ok, %{genesis_address: genesis_address}} -> + genesis_address - ChainWriter.append_transaction(genesis_address, tx) + {:error, :not_exists} -> + previous_address + end + case ChainWriter.append_transaction(genesis_address, tx) do # Delete IO transaction if it exists as it is now stored as a chain - delete_io_transaction(tx.address) + :ok -> delete_io_transaction(tx.address) + error -> error end end def write_transaction(tx = %Transaction{}, :io) do - if ChainIndex.transaction_exists?(tx.address, :io, filepath()) do - {:error, :transaction_already_exists} - else - ChainWriter.write_io_transaction(tx, filepath()) - end + ChainWriter.write_io_transaction(tx, filepath()) end defp delete_io_transaction(address) do diff --git a/lib/archethic/db/embedded_impl/chain_writer.ex b/lib/archethic/db/embedded_impl/chain_writer.ex index b50d37bc4..de5888b62 100644 --- a/lib/archethic/db/embedded_impl/chain_writer.ex +++ b/lib/archethic/db/embedded_impl/chain_writer.ex @@ -25,7 +25,8 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do @doc """ Append a transaction to a file for the given genesis address """ - @spec append_transaction(binary(), Transaction.t()) :: :ok + @spec append_transaction(binary(), Transaction.t()) :: + :ok | {:error, :transaction_already_exists} def append_transaction(genesis_address, tx = %Transaction{}) do via_tuple = {:via, PartitionSupervisor, {ChainWriterSupervisor, genesis_address}} GenServer.call(via_tuple, {:append_tx, genesis_address, tx}, :infinity) @@ -34,23 +35,28 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do @doc """ write an io transaction in a file name by it's address """ - @spec write_io_transaction(Transaction.t(), String.t()) :: :ok + @spec write_io_transaction(Transaction.t(), String.t()) :: + :ok | {:error, :transaction_already_exists} def write_io_transaction(tx = %Transaction{address: address}, db_path) do - start = System.monotonic_time() + if ChainIndex.transaction_exists?(tx.address, :io, db_path) do + {:error, :transaction_already_exists} + else + start = System.monotonic_time() - filename = io_path(db_path, address) + filename = io_path(db_path, address) - data = Encoding.encode(tx) + data = Encoding.encode(tx) - File.write!( - filename, - data, - [:exclusive, :binary] - ) + File.write!( + filename, + data, + [:exclusive, :binary] + ) - :telemetry.execute([:archethic, :db], %{duration: System.monotonic_time() - start}, %{ - query: "write_io_transaction" - }) + :telemetry.execute([:archethic, :db], %{duration: System.monotonic_time() - start}, %{ + query: "write_io_transaction" + }) + end end @doc """ @@ -140,17 +146,12 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do _from, state = %{db_path: db_path} ) do - write_transaction(genesis_address, tx, db_path) - {:reply, :ok, state} - end - - def handle_call( - {:write_io_transaction, tx}, - _from, - state = %{db_path: db_path} - ) do - write_io_transaction(tx, db_path) - {:reply, :ok, state} + if ChainIndex.transaction_exists?(tx.address, db_path) do + {:reply, {:error, :transaction_already_exists}, state} + else + write_transaction(genesis_address, tx, db_path) + {:reply, :ok, state} + end end defp write_transaction(genesis_address, tx, db_path) do diff --git a/lib/archethic/replication.ex b/lib/archethic/replication.ex index 4acace50d..7bd5db4c7 100644 --- a/lib/archethic/replication.ex +++ b/lib/archethic/replication.ex @@ -155,39 +155,46 @@ defmodule Archethic.Replication do tx |> stream_previous_chain(genesis_address, download_nodes) |> Stream.each(fn tx = %Transaction{address: address, type: type} -> - TransactionChain.write_transaction(tx) - - # There is some case where a transaction is not replicated while it should - # because of some latency or network issue. So when we replicate a past chain - # we also ingest the transaction if we are storage node of it - - ingest? = - Transaction.network_type?(type) or - Election.chain_storage_node?(address, first_node_key, download_nodes) or - Election.chain_storage_node?(genesis_address, first_node_key, download_nodes) - - opts = Keyword.delete(ingest_opts, :resolved_addresses) - if ingest?, do: ingest_transaction(tx, genesis_address, opts) + if TransactionChain.write_transaction(tx) == :ok do + # There is some case where a transaction is not replicated while it should + # because of some latency or network issue. So when we replicate a past chain + # we also ingest the transaction if we are storage node of it + + ingest? = + Transaction.network_type?(type) or + Election.chain_storage_node?(address, first_node_key, download_nodes) or + Election.chain_storage_node?(genesis_address, first_node_key, download_nodes) + + opts = Keyword.delete(ingest_opts, :resolved_addresses) + if ingest?, do: ingest_transaction(tx, genesis_address, opts) + end end) |> Stream.run() - TransactionChain.write_transaction(tx) + case TransactionChain.write_transaction(tx) do + :ok -> + :ok = ingest_transaction(tx, genesis_address, ingest_opts) - :ok = ingest_transaction(tx, genesis_address, ingest_opts) + Logger.info("Replication finished", + transaction_address: Base.encode16(address), + transaction_type: type + ) - Logger.info("Replication finished", - transaction_address: Base.encode16(address), - transaction_type: type - ) + PubSub.notify_new_transaction(address, type, timestamp) - PubSub.notify_new_transaction(address, type, timestamp) + :telemetry.execute( + [:archethic, :replication, :full_write], + %{ + duration: System.monotonic_time() - start_time + } + ) - :telemetry.execute( - [:archethic, :replication, :full_write], - %{ - duration: System.monotonic_time() - start_time - } - ) + error -> + Logger.debug("Replication chain aborted (#{inspect(error)})", + transaction_address: Base.encode16(address), + transaction_type: type + ) + end :ok end @@ -285,15 +292,25 @@ defmodule Archethic.Replication do opts \\ [] ) when is_list(opts) do - :ok = TransactionChain.write_transaction(tx, :io) - ingest_transaction(tx, genesis_address, Keyword.put(opts, :io_transaction?, true)) + case TransactionChain.write_transaction(tx, :io) do + :ok -> + ingest_transaction(tx, genesis_address, Keyword.put(opts, :io_transaction?, true)) - Logger.info("Replication finished", - transaction_address: Base.encode16(address), - transaction_type: type - ) + Logger.info("Replication finished", + transaction_address: Base.encode16(address), + transaction_type: type + ) + + PubSub.notify_new_transaction(address, type, timestamp) + + error -> + Logger.debug("Replication IO aborted (#{inspect(error)})", + transaction_address: Base.encode16(address), + transaction_type: type + ) + end - PubSub.notify_new_transaction(address, type, timestamp) + # TODO: Should not ingest if write_transaction failed end defp fetch_context(tx = %Transaction{}) do diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index b8fdfcf31..b5fc3b905 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -1055,7 +1055,7 @@ defmodule Archethic.TransactionChain do Persist only one transaction """ @spec write_transaction(transaction :: Transaction.t(), storage_location :: DB.storage_type()) :: - :ok + :ok | {:error, :transaction_already_exist} def write_transaction( tx = %Transaction{ address: address, @@ -1064,12 +1064,14 @@ defmodule Archethic.TransactionChain do storage_type \\ :chain ) do DB.write_transaction(tx, storage_type) - KOLedger.remove_transaction(address) + |> tap(fn _ -> + KOLedger.remove_transaction(address) - Logger.info("Transaction stored", - transaction_address: Base.encode16(address), - transaction_type: type - ) + Logger.info("Transaction stored", + transaction_address: Base.encode16(address), + transaction_type: type + ) + end) end @doc """ diff --git a/test/archethic/db/embedded_impl_test.exs b/test/archethic/db/embedded_impl_test.exs index e46cf2eb7..ef23bef29 100644 --- a/test/archethic/db/embedded_impl_test.exs +++ b/test/archethic/db/embedded_impl_test.exs @@ -107,6 +107,20 @@ defmodule Archethic.DB.EmbeddedTest do assert File.exists?(filename_chain) assert !File.exists?(filename_io) end + + test "should return an error when transaction already exists in chain storage" do + tx1 = TransactionFactory.create_valid_transaction() + assert :ok == EmbeddedImpl.write_transaction(tx1) + + assert {:error, :transaction_already_exists} == EmbeddedImpl.write_transaction(tx1) + end + + test "should return an error when transaction already exists in io storage" do + tx1 = TransactionFactory.create_valid_transaction() + assert :ok == EmbeddedImpl.write_transaction(tx1, :io) + + assert {:error, :transaction_already_exists} == EmbeddedImpl.write_transaction(tx1, :io) + end end describe "transaction_exists?/2" do