From ac9f890f514b3bc05e6b33719bab9bc4bfcc99c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Mon, 5 Aug 2024 20:14:25 +0200 Subject: [PATCH 01/16] WIP: using store for states and checkpoint states. Store on all fork choice handlers. --- .../beacon/pending_blocks.ex | 22 ++-- .../fork_choice/fork_choice.ex | 83 ++++++--------- .../fork_choice/handlers.ex | 48 +++------ .../fork_choice/head.ex | 6 +- .../store/checkpoint_states.ex | 2 + .../store/store_db.ex | 2 +- lib/types/store.ex | 100 +++++++++++++++++- 7 files changed, 161 insertions(+), 102 deletions(-) diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index 7bb37ff83..9a3ae5bf0 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -37,7 +37,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do If blobs are missing, they will be requested. """ @spec add_block(SignedBeaconBlock.t()) :: :ok - def add_block(signed_block) do + def add_block(store, signed_block) do block_info = BlockInfo.from_block(signed_block) loaded_block = Blocks.get_block_info(block_info.root) @@ -47,14 +47,18 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do if Enum.empty?(missing_blobs) do Blocks.new_block_info(block_info) - process_block_and_check_children(block_info) + process_block_and_check_children(store, block_info) else BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, @download_retries) block_info |> BlockInfo.change_status(:download_blobs) |> Blocks.new_block_info() + + store end + else + store end end @@ -63,12 +67,12 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do module after receiving a new block, but there are some other cases like at node startup, as there may be pending blocks from prior executions. """ - def process_blocks() do + def process_blocks(store) do case Blocks.get_blocks_with_status(:pending) do {:ok, blocks} -> blocks |> Enum.sort_by(fn %BlockInfo{} = block_info -> block_info.signed_block.message.slot end) - |> Enum.each(&process_block/1) + |> Enum.each(&process_block(store, &1)) {:error, reason} -> Logger.error( @@ -85,13 +89,13 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do # is called to check if there's any children that can now be processed. This function # is only to be called when a new block is saved as pending, not when processing blocks # in batch, to avoid unneeded recursion. - defp process_block_and_check_children(block_info) do - if process_block(block_info) in [:transitioned, :invalid] do - process_blocks() + defp process_block_and_check_children(store, block_info) do + if process_block(store, block_info) in [:transitioned, :invalid] do + process_blocks(store) end end - defp process_block(block_info) do + defp process_block(store, block_info) do if block_info.status != :pending do Logger.error("Called process block for a block that's not ready: #{block_info}") end @@ -123,7 +127,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do :invalid %BlockInfo{status: :transitioned} -> - case ForkChoice.on_block(block_info) do + case ForkChoice.on_block(store, block_info) do :ok -> Blocks.change_status(block_info, :transitioned) :transitioned diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 32cd09264..ecb7e9681 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -15,7 +15,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Store.BlockDb alias LambdaEthereumConsensus.Store.Blocks - alias LambdaEthereumConsensus.Store.CheckpointStates alias LambdaEthereumConsensus.Store.StateDb alias LambdaEthereumConsensus.Store.StoreDb alias LambdaEthereumConsensus.Validator.ValidatorManager @@ -39,12 +38,11 @@ defmodule LambdaEthereumConsensus.ForkChoice do Metrics.block_status(head_root, head_slot, :transitioned) - persist_store(store) + StoreDb.persist_store(store) end - @spec on_block(BlockInfo.t()) :: :ok | {:error, String.t()} - def on_block(%BlockInfo{} = block_info) do - store = fetch_store!() + @spec on_block(Store.t(), BlockInfo.t()) :: :ok | {:error, String.t()} + def on_block(store, %BlockInfo{} = block_info) do slot = block_info.signed_block.message.slot block_root = block_info.root @@ -66,11 +64,9 @@ defmodule LambdaEthereumConsensus.ForkChoice do {recompute_head(new_store), %{}} end) - %Store{finalized_checkpoint: new_finalized_checkpoint} = new_store - - prune_old_states(last_finalized_checkpoint.epoch, new_finalized_checkpoint.epoch) - - persist_store(new_store) + new_store + |> prune_old_states(last_finalized_checkpoint.epoch) + |> tap(&Store.persist/1) {:error, reason} -> Logger.error("[Fork choice] Failed to add block: #{reason}", slot: slot, root: block_root) @@ -78,44 +74,41 @@ defmodule LambdaEthereumConsensus.ForkChoice do end end - @spec on_attestation(Types.Attestation.t()) :: :ok - def on_attestation(%Attestation{} = attestation) do - state = fetch_store!() + @spec on_attestation(Store.t(), Types.Attestation.t()) :: Store.t() + def on_attestation(store, %Attestation{} = attestation) do id = attestation.signature |> Base.encode16() |> String.slice(0, 8) Logger.debug("[Fork choice] Adding attestation #{id} to the store") - state = - case Handlers.on_attestation(state, attestation, false) do - {:ok, new_state} -> new_state - _ -> state + store = + case Handlers.on_attestation(store, attestation, false) do + {:ok, new_store} -> new_store + _ -> store end - persist_store(state) + tap(store, &StoreDb.persist_store/1) end - @spec on_attester_slashing(Types.AttesterSlashing.t()) :: :ok - def on_attester_slashing(attester_slashing) do + @spec on_attester_slashing(Store.t(), Types.AttesterSlashing.t()) :: Store.t() + def on_attester_slashing(store, attester_slashing) do Logger.info("[Fork choice] Adding attester slashing to the store") - state = fetch_store!() - case Handlers.on_attester_slashing(state, attester_slashing) do - {:ok, new_state} -> - persist_store(new_state) + case Handlers.on_attester_slashing(store, attester_slashing) do + {:ok, new_store} -> + tap(new_store, &StoreDb.persist_store/1) _ -> Logger.error("[Fork choice] Failed to add attester slashing to the store") + store end end - @spec on_tick(Types.uint64()) :: :ok - def on_tick(time) do - store = fetch_store!() + @spec on_tick(Store.t(), Types.uint64()) :: Store.t() + def on_tick(store, time) do %Store{finalized_checkpoint: last_finalized_checkpoint} = store - new_store = Handlers.on_tick(store, time) - %Store{finalized_checkpoint: new_finalized_checkpoint} = new_store - prune_old_states(last_finalized_checkpoint.epoch, new_finalized_checkpoint.epoch) - persist_store(new_store) + Handlers.on_tick(store, time) + |> prune_old_states(last_finalized_checkpoint.epoch) + |> tap(&StoreDb.persist_store/1) end @spec get_current_chain_slot() :: Types.slot() @@ -176,7 +169,9 @@ defmodule LambdaEthereumConsensus.ForkChoice do ### Private Functions ########################## - defp prune_old_states(last_finalized_epoch, new_finalized_epoch) do + defp prune_old_states(store, last_finalized_epoch) do + new_finalized_epoch = store.finalized_checkpoint.epoch + if last_finalized_epoch < new_finalized_epoch do new_finalized_slot = new_finalized_epoch * ChainSpec.get("SLOTS_PER_EPOCH") @@ -196,6 +191,8 @@ defmodule LambdaEthereumConsensus.ForkChoice do fn -> BlobDb.prune_old_blobs(new_finalized_slot) end ) end + + Store.prune(store) end def apply_handler(iter, state, handler) do @@ -233,24 +230,11 @@ defmodule LambdaEthereumConsensus.ForkChoice do apply_handler( attestations, store, - &Handlers.on_attestation(&1, &2, true, prefetch_states(attestations)) + &Handlers.on_attestation(&1, &2, true) ) end) end - defp prefetch_states(attestations) do - attestations - |> Enum.map(& &1.data.target) - |> Enum.uniq() - |> Enum.flat_map(fn ch -> - case CheckpointStates.get_checkpoint_state(ch) do - {:ok, state} -> [{ch, state}] - _other -> [] - end - end) - |> Map.new() - end - @spec recompute_head(Store.t()) :: :ok def recompute_head(store) do {:ok, head_root} = Head.get_head(store) @@ -276,11 +260,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do :ok end - defp persist_store(store) do - StoreDb.persist_store(store) - Logger.debug("[Fork choice] Store persisted") - end - defp fetch_store!() do {:ok, store} = StoreDb.fetch_store() store @@ -308,6 +287,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do finalized_checkpoint: finalized } - persist_store(new_store) + StoreDb.persist_store(new_store) end end diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 1bd4d1709..3980ed2d2 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -12,8 +12,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do alias LambdaEthereumConsensus.StateTransition.Predicates alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Store.Blocks - alias LambdaEthereumConsensus.Store.BlockStates - alias LambdaEthereumConsensus.Store.CheckpointStates + alias Types.Attestation alias Types.AttestationData alias Types.AttesterSlashing @@ -60,7 +59,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do %{epoch: finalized_epoch, root: finalized_root} = store.finalized_checkpoint finalized_slot = Misc.compute_start_slot_at_epoch(finalized_epoch) - base_state = BlockStates.get_state_info(block.parent_root) + base_state = Store.get_state(store, block.parent_root) cond do # Parent block must be known @@ -115,26 +114,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do end end - def on_attestation(%Store{} = store, %Attestation{} = attestation, is_from_block) do - with {:ok, target_state} <- CheckpointStates.get_checkpoint_state(attestation.data.target) do - on_attestation_with_state(store, attestation, is_from_block, target_state) - end - end - - def on_attestation(%Store{} = store, %Attestation{} = attestation, is_from_block, states) do - case Map.fetch(states, attestation.data.target) do - {:ok, target_state} -> - on_attestation_with_state(store, attestation, is_from_block, target_state) - - :error -> - if is_from_block do - {:ok, store} - else - {:error, "Checkpoint state not found for attestation."} - end - end - end - @doc """ Run ``on_attestation`` upon receiving a new ``attestation`` from either within a block or directly on the wire. @@ -143,20 +122,24 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do """ @spec on_attestation(Store.t(), Attestation.t(), boolean()) :: {:ok, Store.t()} | {:error, String.t()} - def on_attestation_with_state( + def on_attestation( %Store{} = store, %Attestation{} = attestation, - is_from_block, - target_state + is_from_block ) do with :ok <- check_attestation_valid(store, attestation, is_from_block), # Get state at the `target` to fully validate attestation + {new_store, %StateInfo{beacon_state: target_state}} <- + Store.get_checkpoint_state(store, attestation.data.target), {:ok, indexed_attestation} <- Accessors.get_indexed_attestation(target_state, attestation), :ok <- check_valid_indexed_attestation(target_state, indexed_attestation) do # Update latest messages for attesting indices - update_latest_messages(store, indexed_attestation.attesting_indices, attestation) + update_latest_messages(new_store, indexed_attestation.attesting_indices, attestation) else + {%Store{} = _store, nil} -> + {:error, "Target state not found for the checkpoint while validating attestation"} + {:unknown_block, _} -> # TODO: this is just a patch, we should fetch blocks preemptively if is_from_block do @@ -189,7 +172,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do attestation_2: %IndexedAttestation{} = attestation_2 } ) do - state = BlockStates.get_state_info!(store.justified_checkpoint.root).beacon_state + state = Store.get_state!(store, store.justified_checkpoint.root).beacon_state cond do not Predicates.slashable_attestation_data?(attestation_1.data, attestation_2.data) -> @@ -244,15 +227,14 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do is_before_attesting_interval = time_into_slot < div(seconds_per_slot, intervals_per_slot) # Add new block and state to the store - BlockStates.store_state_info(new_state_info) - - is_first_block = store.proposer_boost_root == <<0::256>> + new_store = Store.store_state(store, new_state_info.block_root, new_state_info) + is_first_block = new_store.proposer_boost_root == <<0::256>> # TODO: store block timeliness data? - is_timely = Store.get_current_slot(store) == block.slot and is_before_attesting_interval + is_timely = Store.get_current_slot(new_store) == block.slot and is_before_attesting_interval state = new_state_info.beacon_state - store + new_store |> Store.store_block_info(block_info) |> if_then_update( is_timely and is_first_block, diff --git a/lib/lambda_ethereum_consensus/fork_choice/head.ex b/lib/lambda_ethereum_consensus/fork_choice/head.ex index a4bc087b3..9fd63cf2d 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/head.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/head.ex @@ -5,8 +5,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Head do alias LambdaEthereumConsensus.StateTransition.Accessors alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Store.Blocks - alias LambdaEthereumConsensus.Store.BlockStates - alias LambdaEthereumConsensus.Store.CheckpointStates alias Types.Store @spec get_head(Store.t()) :: {:ok, Types.root()} | {:error, any} @@ -15,7 +13,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Head do blocks = get_filtered_block_tree(store) # Execute the LMD-GHOST fork choice head = store.justified_checkpoint.root - {:ok, justified_state} = CheckpointStates.get_checkpoint_state(store.justified_checkpoint) + {:ok, justified_state} = Store.get_checkpoint_state(store, store.justified_checkpoint) # PERF: return just the parent root and the block root in `get_filtered_block_tree` Stream.cycle([nil]) @@ -158,7 +156,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Head do store.unrealized_justifications[block_root] else # The block is not from a prior epoch, therefore the voting source is not pulled up - head_state = BlockStates.get_state_info!(block_root).beacon_state + head_state = Store.get_state!(store, block_root).beacon_state head_state.current_justified_checkpoint end end diff --git a/lib/lambda_ethereum_consensus/store/checkpoint_states.ex b/lib/lambda_ethereum_consensus/store/checkpoint_states.ex index eca79aeae..c7e4f4971 100644 --- a/lib/lambda_ethereum_consensus/store/checkpoint_states.ex +++ b/lib/lambda_ethereum_consensus/store/checkpoint_states.ex @@ -51,6 +51,8 @@ defmodule LambdaEthereumConsensus.Store.CheckpointStates do @doc """ Calculate the state for a checkpoint without interacting with the db. + + DEPRECATED. """ @spec compute_target_checkpoint_state(Types.epoch(), Types.root()) :: {:ok, BeaconState.t()} | {:error, String.t()} diff --git a/lib/lambda_ethereum_consensus/store/store_db.ex b/lib/lambda_ethereum_consensus/store/store_db.ex index f439b9452..f5c9dd06b 100644 --- a/lib/lambda_ethereum_consensus/store/store_db.ex +++ b/lib/lambda_ethereum_consensus/store/store_db.ex @@ -17,7 +17,7 @@ defmodule LambdaEthereumConsensus.Store.StoreDb do @spec persist_store(Types.Store.t()) :: :ok def persist_store(%Types.Store{} = store) do :telemetry.span([:db, :latency], %{}, fn -> - {put(@store_prefix, store), %{module: "fork_choice", action: "persist"}} + {put(@store_prefix, Store.remove_cache(store)), %{module: "fork_choice", action: "persist"}} end) end diff --git a/lib/types/store.ex b/lib/types/store.ex index 59f729b41..83baa3dd4 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -5,11 +5,13 @@ defmodule Types.Store do alias LambdaEthereumConsensus.ForkChoice.Head alias LambdaEthereumConsensus.ForkChoice.Simple.Tree + alias LambdaEthereumConsensus.StateTransition alias LambdaEthereumConsensus.StateTransition.Accessors alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Store.BlockStates alias LambdaEthereumConsensus.Store.CheckpointStates + alias LambdaEthereumConsensus.Store.StoreDb alias Types.BeaconBlock alias Types.BeaconState alias Types.BlockInfo @@ -31,7 +33,13 @@ defmodule Types.Store do :head_root, :head_slot, # Stores block data on the current fork tree (~last two epochs) - :tree_cache + :tree_cache, + + ### Everything under this can be thought as cache for the db. + # States indexed by block root. + :states, + # States indexed by checkpoint. Sometimes necessary because of empty slots. + :checkpoint_states ] @type t :: %__MODULE__{ @@ -48,7 +56,9 @@ defmodule Types.Store do unrealized_justifications: %{Types.root() => Checkpoint.t()}, head_root: Types.root() | nil, head_slot: Types.slot() | nil, - tree_cache: Tree.t() + tree_cache: Tree.t(), + states: %{Types.root() => StateInfo.t()}, + checkpoint_states: %{Types.Checkpoint.t() => StateInfo.t()} } @spec get_forkchoice_store(BeaconState.t(), SignedBeaconBlock.t()) :: @@ -88,7 +98,9 @@ defmodule Types.Store do unrealized_justifications: %{anchor_block_root => anchor_checkpoint}, head_root: nil, head_slot: nil, - tree_cache: Tree.new(anchor_block_root) + tree_cache: Tree.new(anchor_block_root), + states: %{}, + checkpoint_states: %{} } |> store_block_info(block_info) |> update_head_info() @@ -149,6 +161,71 @@ defmodule Types.Store do safe_block.body.execution_payload.block_hash end + @doc """ + Removes everything prior to the last finalized slot, specifically checkpoint states + and states by root. + """ + def prune(%Store{} = store) do + new_finalized_slot = + store.finalized_checkpoint.epoch * ChainSpec.get("SLOTS_PER_EPOCH") + + store + |> prune_checkpoint_states(slot) + |> prune_states(slot) + end + + @doc """ + Gets a StatInfo given a block root. Defaults to the DB if not present in the store. + """ + def get_state(store, root) when is_binary(root) do + with nil <- Map.get(store.states, root) do + BlockStates.get_state_info(root) + end + end + + def get_state!(store, root) do + %StateInfo{} = get_state(store, root) + end + + def store_state(store, block_root, state) do + # TODO: Add a task to save the block async. + put_in(store, [:states, block_root], state) + end + + @doc """ + Gets a State given a checkpoint. If there is no state for that checkpoint in the store + it will try to compute it. + + Computing the state means: + 1. Getting the state for the checkpoint's root. + 2. If the state is the one requested, it is returned. + 3. If not, that means that there are empty slots, so slots are processed. + + Returns a {store, state} or {store, nil}. The store may be updated if the state is calculated. + """ + def get_checkpoint_state(store, %Checkpoint{} = checkpoint) do + case Map.get(store.checkpoint_states, checkpoint) do + nil -> compute_checkpoint_state(store, checkpoint) + state -> {store, state} + end + end + + def remove_cache(%Store{} = store) do + store |> Map.delete(:states) |> Map.delete(:checkpoint_states) + end + + defp prune_checkpoint_states(store, slot) do + update_in(store.checkpoint_states, fn checkpoint_states -> + Map.filter(checkpoint_states, fn {_checkpoint, state} -> state.slot < slot end) + end) + end + + defp prune_states(store, slot) do + update_in(store.states, fn states -> + Map.filter(states, fn {_root, state} -> state.slot < slot end) + end) + end + @spec get_safe_beacon_block_root(t()) :: Types.root() defp get_safe_beacon_block_root(%__MODULE__{} = store) do store.finalized_checkpoint.root @@ -170,4 +247,21 @@ defmodule Types.Store do %{slot: head_slot} = Blocks.get_block!(head_root) %{store | head_root: head_root, head_slot: head_slot} end + + defp compute_checkpoint_state(store, checkpoint) do + target_slot = Misc.compute_start_slot_at_epoch(checkpoint.epoch) + + case get_state(store, checkpoint.root) do + nil -> + {store, nil} + + %StateInfo{beacon_state: state} -> + if state.slot < target_slot do + new_state = StateTransition.process_slots(state, target_slot) + {put_in(store, [:checkpoint_states, checkpoint], new_state), new_state} + else + {store, state} + end + end + end end From 42860fd62f5a3f29351dd4f30a5899ee98238a21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Tue, 6 Aug 2024 19:23:38 +0200 Subject: [PATCH 02/16] add store everywhere --- .../beacon/beacon_node.ex | 6 +- .../beacon/pending_blocks.ex | 72 ++++++++++--------- .../beacon/sync_blocks.ex | 6 +- .../fork_choice/fork_choice.ex | 50 +++++-------- .../fork_choice/handlers.ex | 3 +- .../fork_choice/head.ex | 5 +- .../p2p/blob_downloader.ex | 36 +++++----- .../p2p/block_downloader.ex | 56 ++++++++++----- .../p2p/gossip/attestation.ex | 5 ++ .../p2p/gossip/beacon_block.ex | 8 +-- .../p2p/gossip/blob_sidecar.ex | 5 ++ .../p2p/gossip/handler.ex | 4 +- .../p2p/gossip/operations_collector.ex | 9 +-- lib/lambda_ethereum_consensus/p2p/requests.ex | 8 ++- .../store/store_db.ex | 1 + lib/libp2p_port.ex | 45 +++++++----- lib/types/store.ex | 15 ++-- test/unit/libp2p_port_test.exs | 2 +- 18 files changed, 190 insertions(+), 146 deletions(-) diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex index f939f597b..4726677d9 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex @@ -26,13 +26,15 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do time = :os.system_time(:second) - ForkChoice.init_store(store, time) + store = ForkChoice.init_store(store, time) init_execution_chain(deposit_tree_snapshot, store.head_root) validators = Validator.Setup.init(store.head_slot, store.head_root) - libp2p_args = [genesis_time: store.genesis_time, validators: validators] ++ get_libp2p_args() + libp2p_args = + [genesis_time: store.genesis_time, validators: validators, store: store] ++ + get_libp2p_args() children = [ diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index 9a3ae5bf0..c99927749 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -15,6 +15,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do alias LambdaEthereumConsensus.Store.Blocks alias Types.BlockInfo alias Types.SignedBeaconBlock + alias Types.Store @type block_status :: :pending | :invalid | :download | :download_blobs | :unknown @type block_info :: @@ -36,7 +37,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do If blobs are missing, they will be requested. """ - @spec add_block(SignedBeaconBlock.t()) :: :ok + @spec add_block(Store.t(), SignedBeaconBlock.t()) :: Store.t() def add_block(store, signed_block) do block_info = BlockInfo.from_block(signed_block) loaded_block = Blocks.get_block_info(block_info.root) @@ -49,7 +50,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do Blocks.new_block_info(block_info) process_block_and_check_children(store, block_info) else - BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, @download_retries) + BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/2, @download_retries) block_info |> BlockInfo.change_status(:download_blobs) @@ -72,12 +73,17 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do {:ok, blocks} -> blocks |> Enum.sort_by(fn %BlockInfo{} = block_info -> block_info.signed_block.message.slot end) - |> Enum.each(&process_block(store, &1)) + |> Enum.reduce(store, fn block_info, store -> + {store, _state} = process_block(store, block_info) + store + end) {:error, reason} -> Logger.error( "[Pending Blocks] Failed to get pending blocks to process. Reason: #{reason}" ) + + store end end @@ -90,8 +96,9 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do # is only to be called when a new block is saved as pending, not when processing blocks # in batch, to avoid unneeded recursion. defp process_block_and_check_children(store, block_info) do - if process_block(store, block_info) in [:transitioned, :invalid] do - process_blocks(store) + case process_block(store, block_info) do + {store, result} when result in [:transitioned, :invalid] -> process_blocks(store) + {store, _other} -> store end end @@ -109,9 +116,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do BlockDownloader.request_blocks_by_root( [parent_root], - fn result -> - process_downloaded_block(result) - end, + &process_downloaded_block/2, @download_retries ) @@ -120,65 +125,66 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do block_info.root ) - :download_pending + {store, :download_pending} %BlockInfo{status: :invalid} -> Blocks.change_status(block_info, :invalid) - :invalid + {store, :invalid} %BlockInfo{status: :transitioned} -> case ForkChoice.on_block(store, block_info) do - :ok -> + {:ok, store} -> Blocks.change_status(block_info, :transitioned) - :transitioned + {store, :transitioned} - {:error, reason} -> + {:error, reason, store} -> Logger.error("[PendingBlocks] Saving block as invalid #{reason}", slot: block_info.signed_block.message.slot, root: block_info.root ) Blocks.change_status(block_info, :invalid) - :invalid + {store, :invalid} end _other -> - :ok + {store, :ok} end end - defp process_downloaded_block({:ok, [block]}) do - add_block(block) + defp process_downloaded_block(store, {:ok, [block]}) do + # TODO: add store. + add_block(store, block) end - defp process_downloaded_block({:error, reason}) do - Logger.error("Error downloading block: #{inspect(reason)}") - + defp process_downloaded_block(store, {:error, reason}) do # We might want to declare a block invalid here. + Logger.error("Error downloading block: #{inspect(reason)}") + store end - defp process_blobs({:ok, blobs}), do: add_blobs(blobs) - - defp process_blobs({:error, reason}) do - Logger.error("Error downloading blobs: #{inspect(reason)}") + defp process_blobs(store, {:ok, blobs}), do: add_blobs(store, blobs) + defp process_blobs(store, {:error, reason}) do # We might want to declare a block invalid here. + Logger.error("Error downloading blobs: #{inspect(reason)}") + store end # To be used when a series of blobs are downloaded. Stores each blob. # If there are blocks that can be processed, does so immediately. - defp add_blobs(blobs) do + defp add_blobs(store, blobs) do blobs |> Enum.map(&BlobDb.store_blob/1) |> Enum.uniq() - |> Enum.each(fn root -> - with %BlockInfo{} = block_info <- Blocks.get_block_info(root) do - # TODO: add a new missing blobs call if some blobs are still missing for a block. - if Enum.empty?(missing_blobs(block_info)) do - block_info - |> Blocks.change_status(:pending) - |> process_block_and_check_children() - end + |> Enum.reduce(store, fn root, store -> + with %BlockInfo{} = block_info <- Blocks.get_block_info(root), + [] <- missing_blobs(block_info) do + block_info + |> Blocks.change_status(:pending) + |> then(&process_block_and_check_children(store, &1)) + else + _ -> store end end) end diff --git a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex index c4f22af1e..47dbf824e 100644 --- a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex @@ -51,7 +51,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do BlockDownloader.request_blocks_by_range( first_slot, count, - &on_chunk_downloaded/1, + &on_chunk_downloaded/2, @retries ) @@ -61,11 +61,11 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do end end - defp on_chunk_downloaded({:ok, range, blocks}) do + defp on_chunk_downloaded(_store, {:ok, range, blocks}) do Libp2pPort.notify_blocks_downloaded(range, blocks) end - defp on_chunk_downloaded({:error, range, reason}) do + defp on_chunk_downloaded(_store, {:error, range, reason}) do Libp2pPort.notify_block_download_failed(range, reason) end end diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 07c25add7..92deee780 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -18,14 +18,13 @@ defmodule LambdaEthereumConsensus.ForkChoice do alias LambdaEthereumConsensus.Store.StoreDb alias Types.Attestation alias Types.BlockInfo - alias Types.Checkpoint alias Types.Store ########################## ### Public API ########################## - @spec init_store(Store.t(), Types.uint64()) :: :ok | :error + @spec init_store(Store.t(), Types.uint64()) :: Store.t() def init_store(%Store{head_slot: head_slot, head_root: head_root} = store, time) do Logger.info("[Fork choice] Initialized store.", slot: head_slot) @@ -36,10 +35,10 @@ defmodule LambdaEthereumConsensus.ForkChoice do Metrics.block_status(head_root, head_slot, :transitioned) - StoreDb.persist_store(store) + tap(store, &StoreDb.persist_store/1) end - @spec on_block(Store.t(), BlockInfo.t()) :: :ok | {:error, String.t()} + @spec on_block(Store.t(), BlockInfo.t()) :: {:ok, Store.t()} | {:error, String.t(), Store.t()} def on_block(store, %BlockInfo{} = block_info) do slot = block_info.signed_block.message.slot block_root = block_info.root @@ -56,19 +55,19 @@ defmodule LambdaEthereumConsensus.ForkChoice do case result do {:ok, new_store} -> :telemetry.execute([:sync, :on_block], %{slot: slot}) + # TODO: move this to the tap after persisting. Logger.info("[Fork choice] Added new block", slot: slot, root: block_root) :telemetry.span([:fork_choice, :recompute_head], %{}, fn -> {recompute_head(new_store), %{}} end) - - new_store |> prune_old_states(last_finalized_checkpoint.epoch) - |> tap(&Store.persist/1) + |> tap(&StoreDb.persist_store/1) + |> then(&{:ok, &1}) {:error, reason} -> Logger.error("[Fork choice] Failed to add block: #{reason}", slot: slot, root: block_root) - {:error, reason} + {:error, reason, store} end end @@ -233,8 +232,10 @@ defmodule LambdaEthereumConsensus.ForkChoice do end) end - @spec recompute_head(Store.t()) :: :ok - def recompute_head(store) do + # Recomputes the head in the store and sends the new head to others (libP2P, + # operations collector db, execution chain db). + @spec recompute_head(Store.t()) :: Store.t() + defp recompute_head(store) do {:ok, head_root} = Head.get_head(store) head_block = Blocks.get_block!(head_root) @@ -246,16 +247,13 @@ defmodule LambdaEthereumConsensus.ForkChoice do Libp2pPort.notify_new_head(slot, head_root) ExecutionChain.notify_new_block(slot, body.eth1_data, body.execution_payload) - update_fork_choice_data( - head_root, - slot, - store.justified_checkpoint, - store.finalized_checkpoint - ) - Logger.debug("[Fork choice] Updated fork choice cache", slot: slot) - :ok + %{ + store + | head_root: head_root, + head_slot: slot + } end defp fetch_store!() do @@ -271,20 +269,4 @@ defmodule LambdaEthereumConsensus.ForkChoice do |> ChainSpec.get_fork_version_for_epoch() |> Misc.compute_fork_digest(genesis_validators_root) end - - @spec update_fork_choice_data(Types.root(), Types.slot(), Checkpoint.t(), Checkpoint.t()) :: - :ok - defp update_fork_choice_data(head_root, head_slot, justified, finalized) do - store = fetch_store!() - - new_store = %{ - store - | head_root: head_root, - head_slot: head_slot, - justified_checkpoint: justified, - finalized_checkpoint: finalized - } - - StoreDb.persist_store(new_store) - end end diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 3980ed2d2..7cb07c6b3 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -129,8 +129,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do ) do with :ok <- check_attestation_valid(store, attestation, is_from_block), # Get state at the `target` to fully validate attestation - {new_store, %StateInfo{beacon_state: target_state}} <- - Store.get_checkpoint_state(store, attestation.data.target), + {new_store, target_state} <- Store.get_checkpoint_state(store, attestation.data.target), {:ok, indexed_attestation} <- Accessors.get_indexed_attestation(target_state, attestation), :ok <- check_valid_indexed_attestation(target_state, indexed_attestation) do diff --git a/lib/lambda_ethereum_consensus/fork_choice/head.ex b/lib/lambda_ethereum_consensus/fork_choice/head.ex index 9fd63cf2d..9114bc0e8 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/head.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/head.ex @@ -5,6 +5,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Head do alias LambdaEthereumConsensus.StateTransition.Accessors alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Store.Blocks + alias Types.BeaconState alias Types.Store @spec get_head(Store.t()) :: {:ok, Types.root()} | {:error, any} @@ -13,7 +14,9 @@ defmodule LambdaEthereumConsensus.ForkChoice.Head do blocks = get_filtered_block_tree(store) # Execute the LMD-GHOST fork choice head = store.justified_checkpoint.root - {:ok, justified_state} = Store.get_checkpoint_state(store, store.justified_checkpoint) + + {_store, %BeaconState{} = justified_state} = + Store.get_checkpoint_state(store, store.justified_checkpoint) # PERF: return just the parent root and the block root in `get_filtered_block_tree` Stream.cycle([nil]) diff --git a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex index 558855a86..62f2a61a0 100644 --- a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex @@ -8,13 +8,15 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do alias LambdaEthereumConsensus.Metrics alias LambdaEthereumConsensus.P2P alias LambdaEthereumConsensus.P2P.ReqResp + alias LambdaEthereumConsensus.Store alias Types.BlobSidecar + alias Types.Store @blobs_by_range_protocol_id "/eth2/beacon_chain/req/blob_sidecars_by_range/1/ssz_snappy" @blobs_by_root_protocol_id "/eth2/beacon_chain/req/blob_sidecars_by_root/1/ssz_snappy" - @type on_blobs :: ({:ok, [BlobSidecar.t()]} | {:error, any()} -> :ok) - @type on_blob :: ({:ok, BlobSidecar.t()} | {:error, any()} -> :ok) + @type on_blobs :: (Store.t(), {:ok, [BlobSidecar.t()]} | {:error, any()} -> :ok) + @type on_blob :: (Store.t(), {:ok, BlobSidecar.t()} | {:error, any()} -> :ok) # Requests to peers might fail for various reasons, # for example they might not support the protocol or might not reply @@ -38,22 +40,23 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do %Types.BeaconBlocksByRangeRequest{start_slot: slot, count: count} |> ReqResp.encode_request() - Libp2pPort.send_async_request(peer_id, @blobs_by_range_protocol_id, request, fn response -> + Libp2pPort.send_async_request(peer_id, @blobs_by_range_protocol_id, request, fn store, + response -> Metrics.handler_span( "response_handler", "blob_sidecars_by_range", fn -> - handle_blobs_by_range_response(response, peer_id, count, slot, retries, on_blobs) + handle_blobs_by_range_response(store, response, peer_id, count, slot, retries, on_blobs) end ) end) end - defp handle_blobs_by_range_response(response, peer_id, count, slot, retries, on_blobs) do + defp handle_blobs_by_range_response(store, response, peer_id, count, slot, retries, on_blobs) do with {:ok, response_message} <- response, {:ok, blobs} <- ReqResp.decode_response(response_message, BlobSidecar), :ok <- verify_batch(blobs, slot, count) do - on_blobs.({:ok, blobs}) + on_blobs.(store, {:ok, blobs}) else {:error, reason} -> P2P.Peerbook.penalize_peer(peer_id) @@ -62,7 +65,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do Logger.debug("Retrying request for #{count} blobs", slot: slot) request_blobs_by_range(slot, count, on_blobs, retries - 1) else - on_blobs.({:error, reason}) + on_blobs.(store, {:error, reason}) end end end @@ -71,10 +74,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do def request_blob_by_root(identifier, on_blob, retries \\ @default_retries) do request_blobs_by_root( [identifier], - fn - {:ok, [blob]} -> on_blob.({:ok, blob}) - other -> on_blob.(other) - end, + fn store, response -> on_blob.(store, flatten_response(response)) end, retries ) end @@ -91,19 +91,20 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do request = ReqResp.encode_request({identifiers, TypeAliases.blob_sidecars_by_root_request()}) - Libp2pPort.send_async_request(peer_id, @blobs_by_root_protocol_id, request, fn response -> + Libp2pPort.send_async_request(peer_id, @blobs_by_root_protocol_id, request, fn store, + response -> Metrics.handler_span( "response_handler", "blob_sidecars_by_root", - fn -> handle_blobs_by_root(response, peer_id, identifiers, retries, on_blobs) end + fn -> handle_blobs_by_root(store, response, peer_id, identifiers, retries, on_blobs) end ) end) end - def handle_blobs_by_root(response, peer_id, identifiers, retries, on_blobs) do + def handle_blobs_by_root(store, response, peer_id, identifiers, retries, on_blobs) do with {:ok, response_message} <- response, {:ok, blobs} <- ReqResp.decode_response(response_message, BlobSidecar) do - on_blobs.({:ok, blobs}) + on_blobs.(store, {:ok, blobs}) else {:error, reason} -> P2P.Peerbook.penalize_peer(peer_id) @@ -112,7 +113,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do Logger.debug("Retrying request for blobs.") request_blobs_by_root(identifiers, on_blobs, retries - 1) else - on_blobs.({:error, reason}) + on_blobs.(store, {:error, reason}) end end end @@ -139,4 +140,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do {:error, "blob outside requested slot range"} end end + + defp flatten_response({:ok, [blob]}), do: {:ok, blob} + defp flatten_response(other), do: other end diff --git a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex index 06e97fa9b..f66377fd1 100644 --- a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex @@ -9,6 +9,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do alias LambdaEthereumConsensus.P2P alias LambdaEthereumConsensus.P2P.ReqResp alias Types.SignedBeaconBlock + alias Types.Store @blocks_by_range_protocol_id "/eth2/beacon_chain/req/beacon_blocks_by_range/2/ssz_snappy" @blocks_by_root_protocol_id "/eth2/beacon_chain/req/beacon_blocks_by_root/2/ssz_snappy" @@ -21,7 +22,8 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do @type range :: {Types.slot(), Types.slot()} @type download_result :: {:ok, [SignedBeaconBlock.t()]} | {:error, any()} @type on_blocks :: - ({:ok, range(), [SignedBeaconBlock.t()]} | {:error, range(), any()} -> term()) + (Store.t(), {:ok, range(), [SignedBeaconBlock.t()]} | {:error, range(), any()} -> + term()) @doc """ Requests a series of blocks in batch, and synchronously (the caller will block waiting for the @@ -41,7 +43,13 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do def request_blocks_by_range_sync(slot, count, retries) do pid = self() - request_blocks_by_range(slot, count, fn result -> send(pid, result) end, retries) + + request_blocks_by_range( + slot, + count, + fn store, result -> tap(store, send(pid, result)) end, + retries + ) receive do result -> result @@ -65,24 +73,33 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do %Types.BeaconBlocksByRangeRequest{start_slot: slot, count: count} |> ReqResp.encode_request() - Libp2pPort.send_async_request(peer_id, @blocks_by_range_protocol_id, request, fn response -> + Libp2pPort.send_async_request(peer_id, @blocks_by_range_protocol_id, request, fn store, + response -> Metrics.handler_span( "response_handler", "blocks_by_range", fn -> - handle_blocks_by_range_response(response, slot, count, retries, peer_id, on_blocks) + handle_blocks_by_range_response( + store, + response, + slot, + count, + retries, + peer_id, + on_blocks + ) end ) end) end - defp handle_blocks_by_range_response(response, slot, count, retries, peer_id, on_blocks) do + defp handle_blocks_by_range_response(store, response, slot, count, retries, peer_id, on_blocks) do with {:ok, response_message} <- response, {:ok, blocks} <- ReqResp.decode_response(response_message, SignedBeaconBlock), :ok <- verify_batch(blocks, slot, count) do tags = %{result: "success", type: "by_slot", reason: "success"} :telemetry.execute([:network, :request], %{blocks: count}, tags) - on_blocks.({:ok, {slot, slot + count - 1}, blocks}) + on_blocks.(store, {:ok, {slot, slot + count - 1}, blocks}) else {:error, reason} -> tags = %{type: "by_slot", reason: parse_reason(reason)} @@ -95,7 +112,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do else :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "error")) # TODO: Add block range that failed in the reason - on_blocks.({:error, {slot, slot + count - 1}, reason}) + on_blocks.(store, {:error, {slot, slot + count - 1}, reason}) {:error, reason} end end @@ -103,23 +120,20 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do @spec request_block_by_root( Types.root(), - ({:ok, SignedBeaconBlock.t()} | {:error, binary()} -> :ok), + (Store.t(), {:ok, SignedBeaconBlock.t()} | {:error, binary()} -> :ok), integer() ) :: :ok def request_block_by_root(root, on_block, retries \\ @default_retries) do request_blocks_by_root( [root], - fn - {:ok, [block]} -> on_block.({:ok, block}) - other -> on_block.(other) - end, + fn store, response -> on_block.(store, flatten_response(response)) end, retries ) end @spec request_blocks_by_root( [Types.root()], - ({:ok, [SignedBeaconBlock.t()]} | {:error, binary()} -> :ok), + (Store.t(), {:ok, [SignedBeaconBlock.t()]} | {:error, binary()} -> :ok), integer() ) :: :ok def request_blocks_by_root(roots, on_blocks, retries \\ @default_retries) @@ -133,21 +147,24 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do request = ReqResp.encode_request({roots, TypeAliases.beacon_blocks_by_root_request()}) - Libp2pPort.send_async_request(peer_id, @blocks_by_root_protocol_id, request, fn response -> + Libp2pPort.send_async_request(peer_id, @blocks_by_root_protocol_id, request, fn store, + response -> Metrics.handler_span( "response_handler", "blocks_by_root", - fn -> handle_blocks_by_root_response(response, roots, on_blocks, peer_id, retries) end + fn -> + handle_blocks_by_root_response(store, response, roots, on_blocks, peer_id, retries) + end ) end) end - defp handle_blocks_by_root_response(response, roots, on_blocks, peer_id, retries) do + defp handle_blocks_by_root_response(store, response, roots, on_blocks, peer_id, retries) do with {:ok, response_message} <- response, {:ok, blocks} <- ReqResp.decode_response(response_message, SignedBeaconBlock) do tags = %{result: "success", type: "by_root", reason: "success"} :telemetry.execute([:network, :request], %{blocks: length(roots)}, tags) - on_blocks.({:ok, blocks}) + on_blocks.(store, {:ok, blocks}) else {:error, reason} -> tags = %{type: "by_root", reason: parse_reason(reason)} @@ -160,7 +177,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do request_blocks_by_root(roots, on_blocks, retries - 1) else :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "error")) - on_blocks.({:error, reason}) + on_blocks.(store, {:error, reason}) end end end @@ -194,4 +211,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do {:error, "block outside requested slot range"} end end + + defp flatten_response({:ok, [block]}), do: {:ok, block} + defp flatten_response(other), do: other end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex b/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex index 11ea44353..5e6943a60 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex @@ -23,6 +23,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do end @impl true + def handle_gossip_message(store, topic, msg_id, message) do + handle_gossip_message(topic, msg_id, message) + store + end + def handle_gossip_message(topic, msg_id, message) do subnet_id = extract_subnet_id(topic) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex index b09efccf7..a1accbab1 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex @@ -16,7 +16,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do ########################## @impl true - def handle_gossip_message(_topic, msg_id, message) do + def handle_gossip_message(store, _topic, msg_id, message) do slot = ForkChoice.get_current_chain_slot() with {:ok, uncompressed} <- :snappyer.decompress(message), @@ -24,18 +24,18 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do :ok <- validate(signed_block, slot) do Logger.info("[Gossip] Block received, block.slot: #{signed_block.message.slot}.") Libp2pPort.validate_message(msg_id, :accept) - PendingBlocks.add_block(signed_block) + PendingBlocks.add_block(store, signed_block) else {:ignore, reason} -> Logger.warning("[Gossip] Block ignored, reason: #{inspect(reason)}.") Libp2pPort.validate_message(msg_id, :ignore) + store {:error, reason} -> Logger.warning("[Gossip] Block rejected, reason: #{inspect(reason)}.") Libp2pPort.validate_message(msg_id, :reject) + store end - - :ok end @spec subscribe_to_topic() :: :ok | :error diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex b/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex index 439169bd6..ac8568f39 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex @@ -12,6 +12,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do @behaviour Handler @impl true + def handle_gossip_message(store, topic, msg_id, message) do + handle_gossip_message(topic, msg_id, message) + store + end + def handle_gossip_message(_topic, msg_id, message) do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.BlobSidecar{index: blob_index} = blob} <- diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex index 2dd3e31a4..50c4bcbac 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex @@ -2,6 +2,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do @moduledoc """ Gossip handler behaviour """ + alias Types.Store - @callback handle_gossip_message(binary(), binary(), iodata()) :: :ok | {:error, any} + @callback handle_gossip_message(Store.t(), binary(), binary(), iodata()) :: + {:ok, Store.t()} | {:error, any} end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index 46d0578e8..bbda6d620 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -143,6 +143,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end @impl true + def handle_gossip_message(store, topic, msg_id, message) do + handle_gossip_message(topic, msg_id, message) + store + end + def handle_gossip_message( <<_::binary-size(15)>> <> "beacon_aggregate_and_proof" <> _, _msg_id, @@ -168,7 +173,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end end - @impl true def handle_gossip_message( <<_::binary-size(15)>> <> "voluntary_exit" <> _, _msg_id, @@ -181,7 +185,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end end - @impl true def handle_gossip_message( <<_::binary-size(15)>> <> "proposer_slashing" <> _, _msg_id, @@ -194,7 +197,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end end - @impl true def handle_gossip_message( <<_::binary-size(15)>> <> "attester_slashing" <> _, _msg_id, @@ -207,7 +209,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end end - @impl true def handle_gossip_message( <<_::binary-size(15)>> <> "bls_to_execution_change" <> _, _msg_id, diff --git a/lib/lambda_ethereum_consensus/p2p/requests.ex b/lib/lambda_ethereum_consensus/p2p/requests.ex index 4e3138eb9..31af48021 100644 --- a/lib/lambda_ethereum_consensus/p2p/requests.ex +++ b/lib/lambda_ethereum_consensus/p2p/requests.ex @@ -3,6 +3,8 @@ defmodule LambdaEthereumConsensus.P2p.Requests do Uses uuids to identify requests and their handlers. Saves the handler in the struct until a response is available and then handles appropriately. """ + alias Types.Store + @type id :: binary @type handler :: (term() -> term()) @type requests :: %{id => handler} @@ -34,11 +36,11 @@ defmodule LambdaEthereumConsensus.P2p.Requests do - status is :ok if it was handled or :unhandled if the id didn't correspond to a saved handler. - requests is the modified requests object with the handler removed. """ - @spec handle_response(requests(), term(), id()) :: {:ok | :unhandled, requests()} - def handle_response(requests, response, handler_id) do + @spec handle_response(requests(), Store.t(), term(), id()) :: {:ok | :unhandled, requests()} + def handle_response(requests, store, response, handler_id) do case Map.fetch(requests, handler_id) do {:ok, handler} -> - handler.(response) + handler.(store, response) {:ok, Map.delete(requests, handler_id)} :error -> diff --git a/lib/lambda_ethereum_consensus/store/store_db.ex b/lib/lambda_ethereum_consensus/store/store_db.ex index f5c9dd06b..16886cac5 100644 --- a/lib/lambda_ethereum_consensus/store/store_db.ex +++ b/lib/lambda_ethereum_consensus/store/store_db.ex @@ -3,6 +3,7 @@ defmodule LambdaEthereumConsensus.Store.StoreDb do Beacon node store storage. """ alias LambdaEthereumConsensus.Store.Db + alias Types.Store @store_prefix "store" @snapshot_prefix "snapshot" diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index 47f86eaa8..2a4a51e39 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -378,10 +378,12 @@ defmodule LambdaEthereumConsensus.Libp2pPort do @impl GenServer def init(args) do + IO.puts("empieza") {genesis_time, args} = Keyword.pop!(args, :genesis_time) {validators, args} = Keyword.pop(args, :validators, %{}) {join_init_topics, args} = Keyword.pop(args, :join_init_topics, false) {enable_request_handlers, args} = Keyword.pop(args, :enable_request_handlers, false) + {store, args} = Keyword.pop!(args, :store) port = Port.open({:spawn, @port_name}, [:binary, {:packet, 4}, :exit_status]) @@ -404,6 +406,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do schedule_next_tick() + IO.puts("termina") + {:ok, %{ genesis_time: genesis_time, @@ -412,6 +416,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do port: port, subscribers: %{}, requests: Requests.new(), + store: store, syncing: true }, {:continue, :check_pending_blocks}} end @@ -421,8 +426,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do # call is a noop. @impl GenServer def handle_continue(:check_pending_blocks, state) do - PendingBlocks.process_blocks() - {:noreply, state} + {:noreply, update_in(state.store, &PendingBlocks.process_blocks/1)} end @impl GenServer @@ -467,10 +471,15 @@ defmodule LambdaEthereumConsensus.Libp2pPort do "[Optimistic Sync] Range #{first_slot} - #{last_slot} downloaded successfully, with #{n_blocks} blocks and #{missing} missing." ) - Enum.each(blocks, &PendingBlocks.add_block/1) + new_store = + Enum.reduce(blocks, state.store, fn block, store -> + PendingBlocks.add_block(store, block) + end) new_state = - Map.update!(state, :blocks_remaining, fn n -> n - n_blocks - missing end) + state + |> Map.put(:store, new_store) + |> Map.update!(:blocks_remaining, fn n -> n - n_blocks - missing end) |> subscribe_if_no_blocks() {:noreply, new_state} @@ -540,17 +549,19 @@ defmodule LambdaEthereumConsensus.Libp2pPort do direction: "->elixir" }) - case Map.fetch(subscribers, gs.topic) do - {:ok, module} -> - Metrics.handler_span("gossip_handler", gs.topic, fn -> - module.handle_gossip_message(gs.topic, gs.msg_id, gs.message) - end) + new_store = + case Map.fetch(subscribers, gs.topic) do + {:ok, module} -> + Metrics.handler_span("gossip_handler", gs.topic, fn -> + module.handle_gossip_message(state.store, gs.topic, gs.msg_id, gs.message) + end) - :error -> - Logger.error("[Gossip] Received gossip from unknown topic: #{gs.topic}.") - end + :error -> + Logger.error("[Gossip] Received gossip from unknown topic: #{gs.topic}.") + state.store + end - state + Map.put(state, :store, new_store) end defp handle_notification( @@ -587,7 +598,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do state end - defp handle_notification(%Response{} = response, %{requests: requests} = state) do + defp handle_notification(%Response{} = response, %{requests: requests, store: store} = state) do :telemetry.execute([:port, :message], %{}, %{ function: "response", direction: "->elixir" @@ -596,7 +607,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do success = if response.success, do: :ok, else: :error {result, new_requests} = - Requests.handle_response(requests, {success, response.message}, response.id) + Requests.handle_response(requests, store, {success, response.message}, response.id) if result == :unhandled do Logger.error("Unhandled response with id: #{response.id}. Message: #{response.message}") @@ -739,7 +750,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do # TODO: we probably want to remove this (ForkChoice.on_tick) from here, but we keep it # here to have this serialized with respect to the other fork choice store modifications. - ForkChoice.on_tick(time) + new_store = ForkChoice.on_tick(state.store, time) new_slot_data = compute_slot(genesis_time, time) @@ -755,7 +766,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do maybe_log_new_slot(slot_data, new_slot_data) - updated_state + updated_state |> Map.put(:store, new_store) end defp schedule_next_tick() do diff --git a/lib/types/store.ex b/lib/types/store.ex index 83baa3dd4..fbca387c3 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -11,7 +11,6 @@ defmodule Types.Store do alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Store.BlockStates alias LambdaEthereumConsensus.Store.CheckpointStates - alias LambdaEthereumConsensus.Store.StoreDb alias Types.BeaconBlock alias Types.BeaconState alias Types.BlockInfo @@ -58,7 +57,7 @@ defmodule Types.Store do head_slot: Types.slot() | nil, tree_cache: Tree.t(), states: %{Types.root() => StateInfo.t()}, - checkpoint_states: %{Types.Checkpoint.t() => StateInfo.t()} + checkpoint_states: %{Types.Checkpoint.t() => BeaconState.t()} } @spec get_forkchoice_store(BeaconState.t(), SignedBeaconBlock.t()) :: @@ -103,6 +102,7 @@ defmodule Types.Store do checkpoint_states: %{} } |> store_block_info(block_info) + |> store_state(block_info.root, state_info) |> update_head_info() |> then(&{:ok, &1}) else @@ -165,13 +165,13 @@ defmodule Types.Store do Removes everything prior to the last finalized slot, specifically checkpoint states and states by root. """ - def prune(%Store{} = store) do + def prune(%__MODULE__{} = store) do new_finalized_slot = store.finalized_checkpoint.epoch * ChainSpec.get("SLOTS_PER_EPOCH") store - |> prune_checkpoint_states(slot) - |> prune_states(slot) + |> prune_checkpoint_states(new_finalized_slot) + |> prune_states(new_finalized_slot) end @doc """ @@ -189,9 +189,10 @@ defmodule Types.Store do def store_state(store, block_root, state) do # TODO: Add a task to save the block async. - put_in(store, [:states, block_root], state) + update_in(store.states, fn states -> Map.put(states, block_root, state) end) end + @spec get_checkpoint_state(t(), Types.Checkpoint.t()) :: {t(), BeaconState.t()} @doc """ Gets a State given a checkpoint. If there is no state for that checkpoint in the store it will try to compute it. @@ -210,7 +211,7 @@ defmodule Types.Store do end end - def remove_cache(%Store{} = store) do + def remove_cache(%__MODULE__{} = store) do store |> Map.delete(:states) |> Map.delete(:checkpoint_states) end diff --git a/test/unit/libp2p_port_test.exs b/test/unit/libp2p_port_test.exs index 82fb5bc52..4303ed035 100644 --- a/test/unit/libp2p_port_test.exs +++ b/test/unit/libp2p_port_test.exs @@ -102,7 +102,7 @@ defmodule Unit.Libp2pPortTest do end @behaviour Handler - def handle_gossip_message(topic, msg_id, message) do + def handle_gossip_message(_store, topic, msg_id, message) do # Decode the PID from the message and send a notification. send(:erlang.binary_to_term(message), {:gossipsub, {topic, msg_id, message}}) end From 994a8f5ef732002603cce473c8ff32975e4f0df3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Thu, 8 Aug 2024 12:18:36 +0200 Subject: [PATCH 03/16] fix type and fix spec tests --- lib/types/store.ex | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/types/store.ex b/lib/types/store.ex index fbca387c3..2e46ed739 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -258,8 +258,12 @@ defmodule Types.Store do %StateInfo{beacon_state: state} -> if state.slot < target_slot do - new_state = StateTransition.process_slots(state, target_slot) - {put_in(store, [:checkpoint_states, checkpoint], new_state), new_state} + # The only way this can fail is if target slot > state.slot, which is false by + # construction. + {:ok, new_state} = StateTransition.process_slots(state, target_slot) + + {update_in(store.checkpoint_states, fn s -> Map.put(s, checkpoint, new_state) end), + new_state} else {store, state} end From cfb3983cb8b96bafaab635e2ef5da605c0df0d1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Thu, 8 Aug 2024 13:22:49 +0200 Subject: [PATCH 04/16] fix sync requests --- lib/lambda_ethereum_consensus/beacon/beacon_node.ex | 6 +++--- lib/libp2p_port.ex | 5 +---- test/unit/libp2p_port_test.exs | 4 +++- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex index 4726677d9..0706672a2 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex @@ -18,7 +18,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do @impl true def init(_) do store = StoreSetup.setup!() - deposit_tree_snapshot = StoreSetup.get_deposit_snapshot!() LambdaEthereumConsensus.P2P.Metadata.init() @@ -28,10 +27,11 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do store = ForkChoice.init_store(store, time) - init_execution_chain(deposit_tree_snapshot, store.head_root) - validators = Validator.Setup.init(store.head_slot, store.head_root) + StoreSetup.get_deposit_snapshot!() + |> init_execution_chain(store.head_root) + libp2p_args = [genesis_time: store.genesis_time, validators: validators, store: store] ++ get_libp2p_args() diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index 2a4a51e39..24875e801 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -182,7 +182,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do GenServer.cast( pid, {:send_request, peer_id, protocol_id, message, - fn response -> send(from, {:response, response}) end} + fn _store, response -> send(from, {:response, response}) end} ) receive_response() @@ -378,7 +378,6 @@ defmodule LambdaEthereumConsensus.Libp2pPort do @impl GenServer def init(args) do - IO.puts("empieza") {genesis_time, args} = Keyword.pop!(args, :genesis_time) {validators, args} = Keyword.pop(args, :validators, %{}) {join_init_topics, args} = Keyword.pop(args, :join_init_topics, false) @@ -406,8 +405,6 @@ defmodule LambdaEthereumConsensus.Libp2pPort do schedule_next_tick() - IO.puts("termina") - {:ok, %{ genesis_time: genesis_time, diff --git a/test/unit/libp2p_port_test.exs b/test/unit/libp2p_port_test.exs index 4303ed035..e039f1c5e 100644 --- a/test/unit/libp2p_port_test.exs +++ b/test/unit/libp2p_port_test.exs @@ -7,6 +7,7 @@ defmodule Unit.Libp2pPortTest do alias LambdaEthereumConsensus.P2P.Gossip.Handler alias LambdaEthereumConsensus.P2P.Metadata alias LambdaEthereumConsensus.P2P.ReqResp + alias Types.Store doctest Libp2pPort @@ -17,7 +18,8 @@ defmodule Unit.Libp2pPortTest do end defp start_port(name \\ Libp2pPort, init_args \\ []) do - start_link_supervised!({Libp2pPort, [opts: [name: name], genesis_time: 42] ++ init_args}, + start_link_supervised!( + {Libp2pPort, [opts: [name: name], store: %Store{}, genesis_time: 42] ++ init_args}, id: name ) end From e182e8d1c3ce18ed5eee0bacb8174755f855193e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Thu, 8 Aug 2024 15:18:46 +0200 Subject: [PATCH 05/16] fix problem in libp2p port --- lib/libp2p_port.ex | 1 - lib/types/store.ex | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index 24875e801..f1e7348c7 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -746,7 +746,6 @@ defmodule LambdaEthereumConsensus.Libp2pPort do defp on_tick(time, %{genesis_time: genesis_time, slot_data: slot_data} = state) do # TODO: we probably want to remove this (ForkChoice.on_tick) from here, but we keep it # here to have this serialized with respect to the other fork choice store modifications. - new_store = ForkChoice.on_tick(state.store, time) new_slot_data = compute_slot(genesis_time, time) diff --git a/lib/types/store.ex b/lib/types/store.ex index 91061de64..a0cf0d49f 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -212,18 +212,18 @@ defmodule Types.Store do end def remove_cache(%__MODULE__{} = store) do - store |> Map.delete(:states) |> Map.delete(:checkpoint_states) + store |> Map.put(:states, %{}) |> Map.put(:checkpoint_states, %{}) end defp prune_checkpoint_states(store, slot) do update_in(store.checkpoint_states, fn checkpoint_states -> - Map.filter(checkpoint_states, fn {_checkpoint, state} -> state.slot < slot end) + Map.reject(checkpoint_states, fn {_checkpoint, state} -> state.slot < slot end) end) end defp prune_states(store, slot) do update_in(store.states, fn states -> - Map.filter(states, fn {_root, state} -> state.slot < slot end) + Map.reject(states, fn {_root, %StateInfo{beacon_state: state}} -> state.slot < slot end) end) end From 0bd2b598d5fd6d7f2899b1fcadea3be948647f5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 9 Aug 2024 10:57:15 +0200 Subject: [PATCH 06/16] fix store modification in response handling --- lib/lambda_ethereum_consensus/beacon/pending_blocks.ex | 3 +-- lib/lambda_ethereum_consensus/beacon/sync_blocks.ex | 6 ++++-- .../fork_choice/fork_choice.ex | 3 ++- lib/lambda_ethereum_consensus/fork_choice/handlers.ex | 3 ++- lib/lambda_ethereum_consensus/logger/console_logger.ex | 10 +++++++--- lib/lambda_ethereum_consensus/p2p/blob_downloader.ex | 2 ++ lib/lambda_ethereum_consensus/p2p/block_downloader.ex | 4 +++- lib/lambda_ethereum_consensus/p2p/requests.ex | 9 +++++---- lib/libp2p_port.ex | 5 +++-- 9 files changed, 29 insertions(+), 16 deletions(-) diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index c99927749..2ccdd3fca 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -7,10 +7,9 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do require Logger alias LambdaEthereumConsensus.ForkChoice - alias LambdaEthereumConsensus.P2P.BlockDownloader - alias LambdaEthereumConsensus.Metrics alias LambdaEthereumConsensus.P2P.BlobDownloader + alias LambdaEthereumConsensus.P2P.BlockDownloader alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Store.Blocks alias Types.BlockInfo diff --git a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex index 47dbf824e..be1100f95 100644 --- a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex @@ -61,11 +61,13 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do end end - defp on_chunk_downloaded(_store, {:ok, range, blocks}) do + defp on_chunk_downloaded(store, {:ok, range, blocks}) do Libp2pPort.notify_blocks_downloaded(range, blocks) + store end - defp on_chunk_downloaded(_store, {:error, range, reason}) do + defp on_chunk_downloaded(store, {:error, range, reason}) do Libp2pPort.notify_block_download_failed(range, reason) + store end end diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 5b8a236f8..2b5fde893 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -64,7 +64,6 @@ defmodule LambdaEthereumConsensus.ForkChoice do end) |> prune_old_states(last_finalized_checkpoint.epoch) |> tap(&StoreDb.persist_store/1) - |> then(&{:ok, &1}) {:error, reason} -> Logger.error("[Fork choice] Failed to add block: #{reason}", slot: slot, root: block_root) @@ -171,6 +170,8 @@ defmodule LambdaEthereumConsensus.ForkChoice do new_finalized_epoch = store.finalized_checkpoint.epoch if last_finalized_epoch < new_finalized_epoch do + Logger.info("Pruning states before slot #{new_finalized_epoch}") + new_finalized_slot = new_finalized_epoch * ChainSpec.get("SLOTS_PER_EPOCH") diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 7cb07c6b3..1462b2b7e 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -64,7 +64,8 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do cond do # Parent block must be known base_state |> is_nil() -> - {:error, "parent state not found in store"} + {:error, + "parent state (block root = #{Base.encode16(block.parent_root)}) not found in store"} # Blocks cannot be in the future. If they are, their # consideration must be delayed until they are in the past. diff --git a/lib/lambda_ethereum_consensus/logger/console_logger.ex b/lib/lambda_ethereum_consensus/logger/console_logger.ex index 4e39032ad..7e74fd93b 100644 --- a/lib/lambda_ethereum_consensus/logger/console_logger.ex +++ b/lib/lambda_ethereum_consensus/logger/console_logger.ex @@ -9,9 +9,13 @@ defmodule ConsoleLogger do def format(level, message, timestamp, metadata) do formatted_metadata = format_metadata(metadata) - [format_level(level)] ++ - [Logger.Formatter.format(@pattern, level, message, timestamp, [])] ++ - [formatted_metadata] ++ ["\n"] + [ + format_level(level), + Logger.Formatter.format(@pattern, level, message, timestamp, []), + formatted_metadata, + "\n" + ] + |> IO.iodata_to_binary() rescue err -> inspect(err) diff --git a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex index 62f2a61a0..f94d5ddbd 100644 --- a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex @@ -64,6 +64,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do if retries > 0 do Logger.debug("Retrying request for #{count} blobs", slot: slot) request_blobs_by_range(slot, count, on_blobs, retries - 1) + store else on_blobs.(store, {:error, reason}) end @@ -112,6 +113,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do if retries > 0 do Logger.debug("Retrying request for blobs.") request_blobs_by_root(identifiers, on_blobs, retries - 1) + store else on_blobs.(store, {:error, reason}) end diff --git a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex index f66377fd1..7ad187311 100644 --- a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex @@ -109,11 +109,12 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "retry")) Logger.debug("Retrying request for #{count} blocks", slot: slot) request_blocks_by_range(slot, count, on_blocks, retries - 1) + store else :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "error")) # TODO: Add block range that failed in the reason on_blocks.(store, {:error, {slot, slot + count - 1}, reason}) - {:error, reason} + store end end end @@ -175,6 +176,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do pretty_roots = Enum.map_join(roots, ", ", &Base.encode16/1) Logger.debug("Retrying request for blocks with roots #{pretty_roots}") request_blocks_by_root(roots, on_blocks, retries - 1) + store else :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "error")) on_blocks.(store, {:error, reason}) diff --git a/lib/lambda_ethereum_consensus/p2p/requests.ex b/lib/lambda_ethereum_consensus/p2p/requests.ex index 31af48021..dfffb590e 100644 --- a/lib/lambda_ethereum_consensus/p2p/requests.ex +++ b/lib/lambda_ethereum_consensus/p2p/requests.ex @@ -36,15 +36,16 @@ defmodule LambdaEthereumConsensus.P2p.Requests do - status is :ok if it was handled or :unhandled if the id didn't correspond to a saved handler. - requests is the modified requests object with the handler removed. """ - @spec handle_response(requests(), Store.t(), term(), id()) :: {:ok | :unhandled, requests()} + @spec handle_response(requests(), Store.t(), term(), id()) :: + {:ok | :unhandled, requests(), Store.t()} def handle_response(requests, store, response, handler_id) do case Map.fetch(requests, handler_id) do {:ok, handler} -> - handler.(store, response) - {:ok, Map.delete(requests, handler_id)} + %Store{} = new_store = handler.(store, response) + {:ok, Map.delete(requests, handler_id), new_store} :error -> - {:unhandled, requests} + {:unhandled, requests, store} end end end diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index f1e7348c7..53a05e23f 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -14,6 +14,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do alias LambdaEthereumConsensus.Beacon.PendingBlocks alias LambdaEthereumConsensus.Beacon.SyncBlocks alias LambdaEthereumConsensus.ForkChoice + alias LambdaEthereumConsensus.ForkChoice.Handlers alias LambdaEthereumConsensus.Metrics alias LambdaEthereumConsensus.P2P.Gossip.BeaconBlock alias LambdaEthereumConsensus.P2P.Gossip.BlobSideCar @@ -603,14 +604,14 @@ defmodule LambdaEthereumConsensus.Libp2pPort do success = if response.success, do: :ok, else: :error - {result, new_requests} = + {result, new_requests, new_store} = Requests.handle_response(requests, store, {success, response.message}, response.id) if result == :unhandled do Logger.error("Unhandled response with id: #{response.id}. Message: #{response.message}") end - state |> Map.put(:requests, new_requests) + state |> Map.put(:requests, new_requests) |> Map.put(:store, new_store) end defp handle_notification(%Result{from: "", result: result}, state) do From 5e0d6867b973687b805b368c2e569269064c045d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 9 Aug 2024 15:16:14 +0200 Subject: [PATCH 07/16] fix response values --- .../beacon/pending_blocks.ex | 8 +-- .../beacon/sync_blocks.ex | 4 +- .../fork_choice/fork_choice.ex | 1 + .../p2p/blob_downloader.ex | 4 +- .../p2p/block_downloader.ex | 5 +- lib/lambda_ethereum_consensus/p2p/requests.ex | 51 ------------------- lib/libp2p_port.ex | 50 +++++++++++++----- 7 files changed, 47 insertions(+), 76 deletions(-) delete mode 100644 lib/lambda_ethereum_consensus/p2p/requests.ex diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index 2ccdd3fca..673234b0a 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -153,21 +153,21 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do defp process_downloaded_block(store, {:ok, [block]}) do # TODO: add store. - add_block(store, block) + {:ok, add_block(store, block)} end defp process_downloaded_block(store, {:error, reason}) do # We might want to declare a block invalid here. Logger.error("Error downloading block: #{inspect(reason)}") - store + {:ok, store} end - defp process_blobs(store, {:ok, blobs}), do: add_blobs(store, blobs) + defp process_blobs(store, {:ok, blobs}), do: {:ok, add_blobs(store, blobs)} defp process_blobs(store, {:error, reason}) do # We might want to declare a block invalid here. Logger.error("Error downloading blobs: #{inspect(reason)}") - store + {:ok, store} end # To be used when a series of blobs are downloaded. Stores each blob. diff --git a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex index be1100f95..169993dcb 100644 --- a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex @@ -63,11 +63,11 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do defp on_chunk_downloaded(store, {:ok, range, blocks}) do Libp2pPort.notify_blocks_downloaded(range, blocks) - store + {:ok, store} end defp on_chunk_downloaded(store, {:error, range, reason}) do Libp2pPort.notify_block_download_failed(range, reason) - store + {:ok, store} end end diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 2b5fde893..288bc3161 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -64,6 +64,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do end) |> prune_old_states(last_finalized_checkpoint.epoch) |> tap(&StoreDb.persist_store/1) + |> then(&{:ok, &1}) {:error, reason} -> Logger.error("[Fork choice] Failed to add block: #{reason}", slot: slot, root: block_root) diff --git a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex index f94d5ddbd..4493a4fc0 100644 --- a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex @@ -64,7 +64,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do if retries > 0 do Logger.debug("Retrying request for #{count} blobs", slot: slot) request_blobs_by_range(slot, count, on_blobs, retries - 1) - store + {:ok, store} else on_blobs.(store, {:error, reason}) end @@ -113,7 +113,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do if retries > 0 do Logger.debug("Retrying request for blobs.") request_blobs_by_root(identifiers, on_blobs, retries - 1) - store + {:ok, store} else on_blobs.(store, {:error, reason}) end diff --git a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex index 7ad187311..6522fa84a 100644 --- a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex @@ -109,12 +109,11 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "retry")) Logger.debug("Retrying request for #{count} blocks", slot: slot) request_blocks_by_range(slot, count, on_blocks, retries - 1) - store + {:ok, store} else :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "error")) # TODO: Add block range that failed in the reason on_blocks.(store, {:error, {slot, slot + count - 1}, reason}) - store end end end @@ -176,7 +175,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do pretty_roots = Enum.map_join(roots, ", ", &Base.encode16/1) Logger.debug("Retrying request for blocks with roots #{pretty_roots}") request_blocks_by_root(roots, on_blocks, retries - 1) - store + {:ok, store} else :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "error")) on_blocks.(store, {:error, reason}) diff --git a/lib/lambda_ethereum_consensus/p2p/requests.ex b/lib/lambda_ethereum_consensus/p2p/requests.ex deleted file mode 100644 index dfffb590e..000000000 --- a/lib/lambda_ethereum_consensus/p2p/requests.ex +++ /dev/null @@ -1,51 +0,0 @@ -defmodule LambdaEthereumConsensus.P2p.Requests do - @moduledoc """ - Uses uuids to identify requests and their handlers. Saves the handler in the struct until a - response is available and then handles appropriately. - """ - alias Types.Store - - @type id :: binary - @type handler :: (term() -> term()) - @type requests :: %{id => handler} - - @doc """ - Creates a requests object that will hold response handlers. - """ - @spec new() :: requests() - def new(), do: %{} - - @doc """ - Adds a handler for a request. - - Returns a tuple {requests, request_id}, where: - - Requests is the modified requests object with the added handler. - - The id for the handler for that request. This will be used later when calling handle_response/3. - """ - @spec add_response_handler(requests(), handler()) :: {requests(), id()} - def add_response_handler(requests, handler) do - id = UUID.uuid4() - {Map.put(requests, id, handler), id} - end - - @doc """ - Handles a request using handler_id. The handler will be popped from the - requests object. - - Returns a {status, requests} tuple where: - - status is :ok if it was handled or :unhandled if the id didn't correspond to a saved handler. - - requests is the modified requests object with the handler removed. - """ - @spec handle_response(requests(), Store.t(), term(), id()) :: - {:ok | :unhandled, requests(), Store.t()} - def handle_response(requests, store, response, handler_id) do - case Map.fetch(requests, handler_id) do - {:ok, handler} -> - %Store{} = new_store = handler.(store, response) - {:ok, Map.delete(requests, handler_id), new_store} - - :error -> - {:unhandled, requests, store} - end - end -end diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index 53a05e23f..0cf1a19d1 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -14,14 +14,12 @@ defmodule LambdaEthereumConsensus.Libp2pPort do alias LambdaEthereumConsensus.Beacon.PendingBlocks alias LambdaEthereumConsensus.Beacon.SyncBlocks alias LambdaEthereumConsensus.ForkChoice - alias LambdaEthereumConsensus.ForkChoice.Handlers alias LambdaEthereumConsensus.Metrics alias LambdaEthereumConsensus.P2P.Gossip.BeaconBlock alias LambdaEthereumConsensus.P2P.Gossip.BlobSideCar alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector alias LambdaEthereumConsensus.P2P.IncomingRequestsHandler alias LambdaEthereumConsensus.P2P.Peerbook - alias LambdaEthereumConsensus.P2p.Requests alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Utils.BitVector alias LambdaEthereumConsensus.Validator @@ -47,6 +45,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do alias Libp2pProto.Tracer alias Libp2pProto.ValidateMessage alias Types.EnrForkId + alias Types.Store require Logger @@ -413,7 +412,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do slot_data: nil, port: port, subscribers: %{}, - requests: Requests.new(), + requests: %{}, store: store, syncing: true }, {:continue, :check_pending_blocks}} @@ -445,7 +444,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do port: port } = state ) do - {new_requests, handler_id} = Requests.add_response_handler(requests, handler) + {new_requests, handler_id} = add_response_handler(requests, handler) send_request = %SendRequest{ id: peer_id, @@ -602,16 +601,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do direction: "->elixir" }) - success = if response.success, do: :ok, else: :error - - {result, new_requests, new_store} = - Requests.handle_response(requests, store, {success, response.message}, response.id) - - if result == :unhandled do - Logger.error("Unhandled response with id: #{response.id}. Message: #{response.message}") - end - - state |> Map.put(:requests, new_requests) |> Map.put(:store, new_store) + {new_requests, new_store} = handle_response(requests, store, response) + state |> Map.merge(%{requests: new_requests, store: new_store}) end defp handle_notification(%Result{from: "", result: result}, state) do @@ -790,6 +781,37 @@ defmodule LambdaEthereumConsensus.Libp2pPort do {slot, slot_third} end + defp add_response_handler(requests, handler) do + id = UUID.uuid4() + {Map.put(requests, id, handler), id} + end + + # Handles a request using handler_id. The handler will be popped from the + # requests map. + # + # Returns a {status, requests} tuple where: + # - status is :ok if it was handled or :unhandled if the id didn't correspond to a saved handler. + # - requests is the modified requests object with the handler removed. + defp handle_response(requests, store, response) do + case Map.pop(requests, response.id) do + {nil, new_requests} -> + Logger.error("Unhandled response with id: #{response.id}. Message: #{response.message}") + {new_requests, store} + + {handler, new_requests} -> + success = if response.success, do: :ok, else: :error + + case handler.(store, {success, response.message}) do + {:ok, %Store{} = new_store} -> + {new_requests, new_store} + + {:error, reason} -> + Logger.warning("Handling response failed with reason: #{reason}") + {new_requests, store} + end + end + end + defp maybe_log_new_slot({slot, _third}, {slot, _another_third}), do: :ok defp maybe_log_new_slot({_prev_slot, _thrid}, {slot, :first_third}) do From 2bbea20b4fe4e5a0af4da29a2b0bcb6860c599bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 9 Aug 2024 17:10:56 +0200 Subject: [PATCH 08/16] fix sync requests too --- lib/libp2p_port.ex | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index 0cf1a19d1..6351513b6 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -182,7 +182,10 @@ defmodule LambdaEthereumConsensus.Libp2pPort do GenServer.cast( pid, {:send_request, peer_id, protocol_id, message, - fn _store, response -> send(from, {:response, response}) end} + fn store, response -> + send(from, {:response, response}) + {:ok, store} + end} ) receive_response() From 0196a21b0f0df86f252c4e5343f8919bc64a84b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 9 Aug 2024 17:24:41 +0200 Subject: [PATCH 09/16] remove request tests. They were moved to libp2p --- test/unit/p2p/requests_test.exs | 37 --------------------------------- 1 file changed, 37 deletions(-) delete mode 100644 test/unit/p2p/requests_test.exs diff --git a/test/unit/p2p/requests_test.exs b/test/unit/p2p/requests_test.exs deleted file mode 100644 index 9e7274add..000000000 --- a/test/unit/p2p/requests_test.exs +++ /dev/null @@ -1,37 +0,0 @@ -defmodule Unit.P2p.RequestsTest do - use ExUnit.Case - - alias LambdaEthereumConsensus.P2p.Requests - - test "An empty requests object shouldn't handle a request" do - requests = Requests.new() - - assert {:unhandled, requests} == - Requests.handle_response(requests, "some response", "fake id") - end - - test "A requests object should handler a request only once" do - requests = Requests.new() - pid = self() - - {requests_2, handler_id} = - Requests.add_response_handler( - requests, - fn response -> send(pid, response) end - ) - - {:ok, requests_3} = Requests.handle_response(requests_2, "some response", handler_id) - - response = - receive do - response -> response - end - - assert response == "some response" - - assert requests_3 == requests - - assert {:unhandled, requests_3} == - Requests.handle_response(requests, "some response", handler_id) - end -end From adb698c8fb1379a41da4dc682b9973c2c9c4d77f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 9 Aug 2024 17:46:24 +0200 Subject: [PATCH 10/16] fix unit tests --- test/unit/beacon_api/beacon_api_v1_test.exs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/unit/beacon_api/beacon_api_v1_test.exs b/test/unit/beacon_api/beacon_api_v1_test.exs index 91a4ec522..e3b5d9a40 100644 --- a/test/unit/beacon_api/beacon_api_v1_test.exs +++ b/test/unit/beacon_api/beacon_api_v1_test.exs @@ -10,6 +10,7 @@ defmodule Unit.BeaconApiTest.V1 do alias LambdaEthereumConsensus.Store.Db alias LambdaEthereumConsensus.Store.StoreDb alias Types.BlockInfo + alias Types.Store @moduletag :beacon_api_case @moduletag :tmp_dir @@ -159,7 +160,7 @@ defmodule Unit.BeaconApiTest.V1 do alias LambdaEthereumConsensus.P2P.Metadata patch(ForkChoice, :get_fork_version, fn -> ChainSpec.get("DENEB_FORK_VERSION") end) - start_link_supervised!({Libp2pPort, genesis_time: 42}) + start_link_supervised!({Libp2pPort, genesis_time: 42, store: %Store{}}) Metadata.init() identity = Libp2pPort.get_node_identity() metadata = Metadata.get_metadata() From 2e97df3a97da5361c5a71cbe5b57a19f03a3f149 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 9 Aug 2024 18:16:10 +0200 Subject: [PATCH 11/16] add async state store in db --- lib/lambda_ethereum_consensus/fork_choice/handlers.ex | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 1462b2b7e..769804178 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -12,7 +12,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do alias LambdaEthereumConsensus.StateTransition.Predicates alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Store.Blocks - + alias LambdaEthereumConsensus.Store.StateDb alias Types.Attestation alias Types.AttestationData alias Types.AttesterSlashing @@ -228,6 +228,12 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do # Add new block and state to the store new_store = Store.store_state(store, new_state_info.block_root, new_state_info) + + Task.Supervisor.start_child( + PruneStatesSupervisor, + fn -> StateDb.store_state_info(new_state_info) end + ) + is_first_block = new_store.proposer_boost_root == <<0::256>> # TODO: store block timeliness data? is_timely = Store.get_current_slot(new_store) == block.slot and is_before_attesting_interval From b8792d84916911cee73a90478d81e28662749649 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Mon, 12 Aug 2024 19:00:16 +0200 Subject: [PATCH 12/16] add per-runner setup capabilities. Fix fork choice spec tests --- test/spec/runner_behaviour.ex | 8 ++++++++ test/spec/runners/fork_choice.ex | 7 +++++++ test/spec/tasks/generate_spec_tests.ex | 1 + 3 files changed, 16 insertions(+) diff --git a/test/spec/runner_behaviour.ex b/test/spec/runner_behaviour.ex index f959ca929..654fa3d53 100644 --- a/test/spec/runner_behaviour.ex +++ b/test/spec/runner_behaviour.ex @@ -11,6 +11,9 @@ defmodule TestRunner do @behaviour TestRunner def skip?(_testcase), do: false defoverridable skip?: 1 + + def setup(), do: :ok + defoverridable setup: 0 end end @@ -24,4 +27,9 @@ defmodule TestRunner do if the test case passes. To ignore test cases use `skip?/1`. """ @callback run_test_case(testcase :: SpecTestCase.t()) :: any + + @doc """ + Called in an ExUnit.setup manner, before each test case, to perform setup specific to the runner. + """ + @callback setup() :: :ok end diff --git a/test/spec/runners/fork_choice.ex b/test/spec/runners/fork_choice.ex index 2d6983629..f29ed9f27 100644 --- a/test/spec/runners/fork_choice.ex +++ b/test/spec/runners/fork_choice.ex @@ -16,6 +16,13 @@ defmodule ForkChoiceTestRunner do alias Types.SignedBeaconBlock alias Types.Store + @impl TestRunner + def setup() do + # Start this supervisor, necessary for post state tasks. + start_link_supervised!({Task.Supervisor, name: PruneStatesSupervisor}) + :ok + end + @impl TestRunner def skip?(%SpecTestCase{fork: "capella"}), do: false def skip?(%SpecTestCase{fork: "deneb"}), do: false diff --git a/test/spec/tasks/generate_spec_tests.ex b/test/spec/tasks/generate_spec_tests.ex index f20d5ffce..5224be3c5 100644 --- a/test/spec/tasks/generate_spec_tests.ex +++ b/test/spec/tasks/generate_spec_tests.ex @@ -80,6 +80,7 @@ defmodule Mix.Tasks.GenerateSpecTests do start_link_supervised!(LambdaEthereumConsensus.Store.Blocks) start_link_supervised!(LambdaEthereumConsensus.Store.BlockStates) + #{runner_module}.setup() :ok end """ From f57dbade0a1ddadbcb4d11b6ffe92242ecf25b12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Mon, 12 Aug 2024 19:06:42 +0200 Subject: [PATCH 13/16] same but sync --- test/spec/runners/sync.ex | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/spec/runners/sync.ex b/test/spec/runners/sync.ex index d656dee52..e1df1e80d 100644 --- a/test/spec/runners/sync.ex +++ b/test/spec/runners/sync.ex @@ -12,6 +12,13 @@ defmodule SyncTestRunner do # "from_syncing_to_invalid" ] + @impl TestRunner + def setup() do + # Start this supervisor, necessary for post state tasks. + start_link_supervised!({Task.Supervisor, name: PruneStatesSupervisor}) + :ok + end + @impl TestRunner def skip?(%SpecTestCase{fork: "capella"} = testcase) do Enum.member?(@disabled_cases, testcase.case) From aeb4210cfe74945bea5ff34a622cbc5cbfb09d4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Mon, 12 Aug 2024 19:37:26 +0200 Subject: [PATCH 14/16] Add recompute head log and remove some todos --- lib/lambda_ethereum_consensus/beacon/pending_blocks.ex | 1 - lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex | 8 +++++--- lib/types/store.ex | 1 - 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index 673234b0a..984055fb9 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -152,7 +152,6 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do end defp process_downloaded_block(store, {:ok, [block]}) do - # TODO: add store. {:ok, add_block(store, block)} end diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 288bc3161..3bb4015df 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -55,15 +55,17 @@ defmodule LambdaEthereumConsensus.ForkChoice do case result do {:ok, new_store} -> + Logger.info("[Fork choice] Block processed. Recomputing head.") :telemetry.execute([:sync, :on_block], %{slot: slot}) - # TODO: move this to the tap after persisting. - Logger.info("[Fork choice] Added new block", slot: slot, root: block_root) :telemetry.span([:fork_choice, :recompute_head], %{}, fn -> {recompute_head(new_store), %{}} end) |> prune_old_states(last_finalized_checkpoint.epoch) - |> tap(&StoreDb.persist_store/1) + |> tap(fn store -> + StoreDb.persist_store(store) + Logger.info("[Fork choice] Added new block", slot: slot, root: block_root) + end) |> then(&{:ok, &1}) {:error, reason} -> diff --git a/lib/types/store.ex b/lib/types/store.ex index a0cf0d49f..152331039 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -188,7 +188,6 @@ defmodule Types.Store do end def store_state(store, block_root, state) do - # TODO: Add a task to save the block async. update_in(store.states, fn states -> Map.put(states, block_root, state) end) end From b6a5d94199fd4d25fb78c21bb9e1d955db8b557e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 16 Aug 2024 14:39:56 +0200 Subject: [PATCH 15/16] address comments --- .../beacon/beacon_node.ex | 3 ++- .../beacon/pending_blocks.ex | 2 ++ .../fork_choice/handlers.ex | 2 +- .../p2p/gossip/blob_sidecar.ex | 14 +++++--------- .../p2p/gossip/handler.ex | 2 +- .../store/checkpoint_states.ex | 1 + lib/types/store.ex | 2 +- test/spec/runners/fork_choice.ex | 2 +- test/spec/runners/sync.ex | 2 +- 9 files changed, 15 insertions(+), 15 deletions(-) diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex index 0706672a2..491ba2b07 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex @@ -41,7 +41,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do {LambdaEthereumConsensus.Libp2pPort, libp2p_args}, {Task.Supervisor, name: PruneStatesSupervisor}, {Task.Supervisor, name: PruneBlocksSupervisor}, - {Task.Supervisor, name: PruneBlobsSupervisor} + {Task.Supervisor, name: PruneBlobsSupervisor}, + {Task.Supervisor, name: StoreStatesSupervisor} ] Supervisor.init(children, strategy: :one_for_all) diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index 984055fb9..6611dd5ec 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -169,6 +169,8 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do {:ok, store} end + def add_blob(store, blob), do: add_blobs(store, [blob]) + # To be used when a series of blobs are downloaded. Stores each blob. # If there are blocks that can be processed, does so immediately. defp add_blobs(store, blobs) do diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 769804178..275f8c7df 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -230,7 +230,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do new_store = Store.store_state(store, new_state_info.block_root, new_state_info) Task.Supervisor.start_child( - PruneStatesSupervisor, + StoreStatesSupervisor, fn -> StateDb.store_state_info(new_state_info) end ) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex b/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex index ac8568f39..eed5da8c8 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex @@ -2,32 +2,28 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do @moduledoc """ This module handles blob sidecar gossipsub topics. """ + alias LambdaEthereumConsensus.Beacon.PendingBlocks alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.Libp2pPort alias LambdaEthereumConsensus.P2P.Gossip.Handler - alias LambdaEthereumConsensus.Store.BlobDb require Logger @behaviour Handler - @impl true - def handle_gossip_message(store, topic, msg_id, message) do - handle_gossip_message(topic, msg_id, message) - store - end - - def handle_gossip_message(_topic, msg_id, message) do + @impl Handler + def handle_gossip_message(store, _topic, msg_id, message) do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.BlobSidecar{index: blob_index} = blob} <- Ssz.from_ssz(uncompressed, Types.BlobSidecar) do Logger.debug("[Gossip] Blob sidecar received, with index #{blob_index}") - BlobDb.store_blob(blob) Libp2pPort.validate_message(msg_id, :accept) + PendingBlocks.add_blob(store, blob) else {:error, reason} -> Logger.warning("[Gossip] Blob rejected, reason: #{inspect(reason)}") Libp2pPort.validate_message(msg_id, :reject) + store end end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex index 50c4bcbac..9ecadb89e 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex @@ -5,5 +5,5 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do alias Types.Store @callback handle_gossip_message(Store.t(), binary(), binary(), iodata()) :: - {:ok, Store.t()} | {:error, any} + Store.t() | {:error, any} end diff --git a/lib/lambda_ethereum_consensus/store/checkpoint_states.ex b/lib/lambda_ethereum_consensus/store/checkpoint_states.ex index c7e4f4971..4dab375e5 100644 --- a/lib/lambda_ethereum_consensus/store/checkpoint_states.ex +++ b/lib/lambda_ethereum_consensus/store/checkpoint_states.ex @@ -54,6 +54,7 @@ defmodule LambdaEthereumConsensus.Store.CheckpointStates do DEPRECATED. """ + @deprecated "Use Store.get_checkpoint_state/2 instead" @spec compute_target_checkpoint_state(Types.epoch(), Types.root()) :: {:ok, BeaconState.t()} | {:error, String.t()} def compute_target_checkpoint_state(target_epoch, target_root) do diff --git a/lib/types/store.ex b/lib/types/store.ex index 152331039..736684034 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -257,7 +257,7 @@ defmodule Types.Store do %StateInfo{beacon_state: state} -> if state.slot < target_slot do - # The only way this can fail is if target slot > state.slot, which is false by + # The only way this can fail is if state.slot < target_slot, which is false by # construction. {:ok, new_state} = StateTransition.process_slots(state, target_slot) diff --git a/test/spec/runners/fork_choice.ex b/test/spec/runners/fork_choice.ex index f29ed9f27..9a60e38c4 100644 --- a/test/spec/runners/fork_choice.ex +++ b/test/spec/runners/fork_choice.ex @@ -19,7 +19,7 @@ defmodule ForkChoiceTestRunner do @impl TestRunner def setup() do # Start this supervisor, necessary for post state tasks. - start_link_supervised!({Task.Supervisor, name: PruneStatesSupervisor}) + start_link_supervised!({Task.Supervisor, name: StoreStatesSupervisor}) :ok end diff --git a/test/spec/runners/sync.ex b/test/spec/runners/sync.ex index e1df1e80d..9e25ed781 100644 --- a/test/spec/runners/sync.ex +++ b/test/spec/runners/sync.ex @@ -15,7 +15,7 @@ defmodule SyncTestRunner do @impl TestRunner def setup() do # Start this supervisor, necessary for post state tasks. - start_link_supervised!({Task.Supervisor, name: PruneStatesSupervisor}) + start_link_supervised!({Task.Supervisor, name: StoreStatesSupervisor}) :ok end From 10cfc23943ea4c4f7137567fe8e67f9106a70e1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 16 Aug 2024 14:49:56 +0200 Subject: [PATCH 16/16] remove deprecated --- lib/lambda_ethereum_consensus/store/checkpoint_states.ex | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/lambda_ethereum_consensus/store/checkpoint_states.ex b/lib/lambda_ethereum_consensus/store/checkpoint_states.ex index 4dab375e5..2ae9a7e4d 100644 --- a/lib/lambda_ethereum_consensus/store/checkpoint_states.ex +++ b/lib/lambda_ethereum_consensus/store/checkpoint_states.ex @@ -52,9 +52,8 @@ defmodule LambdaEthereumConsensus.Store.CheckpointStates do @doc """ Calculate the state for a checkpoint without interacting with the db. - DEPRECATED. + TODO (#1278): use Store.get_target_checkpoint_state instead. """ - @deprecated "Use Store.get_checkpoint_state/2 instead" @spec compute_target_checkpoint_state(Types.epoch(), Types.root()) :: {:ok, BeaconState.t()} | {:error, String.t()} def compute_target_checkpoint_state(target_epoch, target_root) do