Skip to content

Commit

Permalink
Tx replication sometimes fails.
Browse files Browse the repository at this point in the history
Fixes #1650
  • Loading branch information
Chralu authored and Neylix committed Feb 3, 2025
1 parent 840d5b6 commit 21303b3
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 85 deletions.
3 changes: 2 additions & 1 deletion lib/archethic/db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ defmodule Archethic.DB do
order: :asc | :desc
]
) :: Enumerable.t()
@callback write_transaction(Transaction.t(), storage_type()) :: :ok
@callback write_transaction(Transaction.t(), storage_type()) ::
:ok | {:error, :transaction_already_exists}
@callback write_beacon_summary(Summary.t()) :: :ok
@callback clear_beacon_summaries() :: :ok
@callback write_beacon_summaries_aggregate(SummaryAggregate.t()) :: :ok
Expand Down
36 changes: 15 additions & 21 deletions lib/archethic/db/embedded_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,37 +44,31 @@ defmodule Archethic.DB.EmbeddedImpl do
@doc """
Write a single transaction and append it to its chain
"""
@spec write_transaction(Transaction.t(), DB.storage_type()) :: :ok
@spec write_transaction(Transaction.t(), DB.storage_type()) ::
:ok | {:error, :transaction_already_exists}
def write_transaction(tx, storage_type \\ :chain)

def write_transaction(tx = %Transaction{}, :chain) do
if ChainIndex.transaction_exists?(tx.address, filepath()) do
{:error, :transaction_already_exists}
else
previous_address = Transaction.previous_address(tx)

genesis_address =
case ChainIndex.get_tx_entry(previous_address, filepath()) do
{:ok, %{genesis_address: genesis_address}} ->
genesis_address

{:error, :not_exists} ->
previous_address
end
previous_address = Transaction.previous_address(tx)

genesis_address =
case ChainIndex.get_tx_entry(previous_address, filepath()) do
{:ok, %{genesis_address: genesis_address}} ->
genesis_address

ChainWriter.append_transaction(genesis_address, tx)
{:error, :not_exists} ->
previous_address
end

case ChainWriter.append_transaction(genesis_address, tx) do
# Delete IO transaction if it exists as it is now stored as a chain
delete_io_transaction(tx.address)
:ok -> delete_io_transaction(tx.address)
error -> error
end
end

def write_transaction(tx = %Transaction{}, :io) do
if ChainIndex.transaction_exists?(tx.address, :io, filepath()) do
{:error, :transaction_already_exists}
else
ChainWriter.write_io_transaction(tx, filepath())
end
ChainWriter.write_io_transaction(tx, filepath())
end

defp delete_io_transaction(address) do
Expand Down
49 changes: 25 additions & 24 deletions lib/archethic/db/embedded_impl/chain_writer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do
@doc """
Append a transaction to a file for the given genesis address
"""
@spec append_transaction(binary(), Transaction.t()) :: :ok
@spec append_transaction(binary(), Transaction.t()) ::
:ok | {:error, :transaction_already_exists}
def append_transaction(genesis_address, tx = %Transaction{}) do
via_tuple = {:via, PartitionSupervisor, {ChainWriterSupervisor, genesis_address}}
GenServer.call(via_tuple, {:append_tx, genesis_address, tx}, :infinity)
Expand All @@ -34,23 +35,28 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do
@doc """
write an io transaction in a file name by it's address
"""
@spec write_io_transaction(Transaction.t(), String.t()) :: :ok
@spec write_io_transaction(Transaction.t(), String.t()) ::
:ok | {:error, :transaction_already_exists}
def write_io_transaction(tx = %Transaction{address: address}, db_path) do
start = System.monotonic_time()
if ChainIndex.transaction_exists?(tx.address, :io, db_path) do
{:error, :transaction_already_exists}
else
start = System.monotonic_time()

filename = io_path(db_path, address)
filename = io_path(db_path, address)

data = Encoding.encode(tx)
data = Encoding.encode(tx)

File.write!(
filename,
data,
[:exclusive, :binary]
)
File.write!(
filename,
data,
[:exclusive, :binary]
)

:telemetry.execute([:archethic, :db], %{duration: System.monotonic_time() - start}, %{
query: "write_io_transaction"
})
:telemetry.execute([:archethic, :db], %{duration: System.monotonic_time() - start}, %{
query: "write_io_transaction"
})
end
end

@doc """
Expand Down Expand Up @@ -140,17 +146,12 @@ defmodule Archethic.DB.EmbeddedImpl.ChainWriter do
_from,
state = %{db_path: db_path}
) do
write_transaction(genesis_address, tx, db_path)
{:reply, :ok, state}
end

def handle_call(
{:write_io_transaction, tx},
_from,
state = %{db_path: db_path}
) do
write_io_transaction(tx, db_path)
{:reply, :ok, state}
if ChainIndex.transaction_exists?(tx.address, db_path) do
{:reply, {:error, :transaction_already_exists}, state}
else
write_transaction(genesis_address, tx, db_path)
{:reply, :ok, state}
end
end

defp write_transaction(genesis_address, tx, db_path) do
Expand Down
83 changes: 50 additions & 33 deletions lib/archethic/replication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,39 +155,46 @@ defmodule Archethic.Replication do
tx
|> stream_previous_chain(genesis_address, download_nodes)
|> Stream.each(fn tx = %Transaction{address: address, type: type} ->
TransactionChain.write_transaction(tx)

# There is some case where a transaction is not replicated while it should
# because of some latency or network issue. So when we replicate a past chain
# we also ingest the transaction if we are storage node of it

ingest? =
Transaction.network_type?(type) or
Election.chain_storage_node?(address, first_node_key, download_nodes) or
Election.chain_storage_node?(genesis_address, first_node_key, download_nodes)

opts = Keyword.delete(ingest_opts, :resolved_addresses)
if ingest?, do: ingest_transaction(tx, genesis_address, opts)
if TransactionChain.write_transaction(tx) == :ok do
# There is some case where a transaction is not replicated while it should
# because of some latency or network issue. So when we replicate a past chain
# we also ingest the transaction if we are storage node of it

ingest? =
Transaction.network_type?(type) or
Election.chain_storage_node?(address, first_node_key, download_nodes) or
Election.chain_storage_node?(genesis_address, first_node_key, download_nodes)

opts = Keyword.delete(ingest_opts, :resolved_addresses)
if ingest?, do: ingest_transaction(tx, genesis_address, opts)

Check warning on line 169 in lib/archethic/replication.ex

View workflow job for this annotation

GitHub Actions / Build and test

Function body is nested too deep (max depth is 2, was 3).
end
end)
|> Stream.run()

TransactionChain.write_transaction(tx)
case TransactionChain.write_transaction(tx) do
:ok ->
:ok = ingest_transaction(tx, genesis_address, ingest_opts)

:ok = ingest_transaction(tx, genesis_address, ingest_opts)
Logger.info("Replication finished",
transaction_address: Base.encode16(address),
transaction_type: type
)

Logger.info("Replication finished",
transaction_address: Base.encode16(address),
transaction_type: type
)
PubSub.notify_new_transaction(address, type, timestamp)

PubSub.notify_new_transaction(address, type, timestamp)
:telemetry.execute(
[:archethic, :replication, :full_write],
%{
duration: System.monotonic_time() - start_time
}
)

:telemetry.execute(
[:archethic, :replication, :full_write],
%{
duration: System.monotonic_time() - start_time
}
)
error ->
Logger.debug("Replication chain aborted (#{inspect(error)})",
transaction_address: Base.encode16(address),
transaction_type: type
)
end

:ok
end
Expand Down Expand Up @@ -285,15 +292,25 @@ defmodule Archethic.Replication do
opts \\ []
)
when is_list(opts) do
:ok = TransactionChain.write_transaction(tx, :io)
ingest_transaction(tx, genesis_address, Keyword.put(opts, :io_transaction?, true))
case TransactionChain.write_transaction(tx, :io) do
:ok ->
ingest_transaction(tx, genesis_address, Keyword.put(opts, :io_transaction?, true))

Logger.info("Replication finished",
transaction_address: Base.encode16(address),
transaction_type: type
)
Logger.info("Replication finished",
transaction_address: Base.encode16(address),
transaction_type: type
)

PubSub.notify_new_transaction(address, type, timestamp)

error ->
Logger.debug("Replication IO aborted (#{inspect(error)})",
transaction_address: Base.encode16(address),
transaction_type: type
)
end

PubSub.notify_new_transaction(address, type, timestamp)
# TODO: Should not ingest if write_transaction failed
end

defp fetch_context(tx = %Transaction{}) do
Expand Down
14 changes: 8 additions & 6 deletions lib/archethic/transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ defmodule Archethic.TransactionChain do
Persist only one transaction
"""
@spec write_transaction(transaction :: Transaction.t(), storage_location :: DB.storage_type()) ::
:ok
:ok | {:error, :transaction_already_exist}
def write_transaction(
tx = %Transaction{
address: address,
Expand All @@ -1064,12 +1064,14 @@ defmodule Archethic.TransactionChain do
storage_type \\ :chain
) do
DB.write_transaction(tx, storage_type)
KOLedger.remove_transaction(address)
|> tap(fn _ ->
KOLedger.remove_transaction(address)

Logger.info("Transaction stored",
transaction_address: Base.encode16(address),
transaction_type: type
)
Logger.info("Transaction stored",
transaction_address: Base.encode16(address),
transaction_type: type
)
end)
end

@doc """
Expand Down
14 changes: 14 additions & 0 deletions test/archethic/db/embedded_impl_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ defmodule Archethic.DB.EmbeddedTest do
assert File.exists?(filename_chain)
assert !File.exists?(filename_io)
end

test "should return an error when transaction already exists in chain storage" do
tx1 = TransactionFactory.create_valid_transaction()
assert :ok == EmbeddedImpl.write_transaction(tx1)

assert {:error, :transaction_already_exists} == EmbeddedImpl.write_transaction(tx1)
end

test "should return an error when transaction already exists in io storage" do
tx1 = TransactionFactory.create_valid_transaction()
assert :ok == EmbeddedImpl.write_transaction(tx1, :io)

assert {:error, :transaction_already_exists} == EmbeddedImpl.write_transaction(tx1, :io)
end
end

describe "transaction_exists?/2" do
Expand Down

0 comments on commit 21303b3

Please sign in to comment.