From 84ee168eade1aa0c039f1f2696ed9e5d59a4442c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Fri, 16 Aug 2024 15:58:43 +0200 Subject: [PATCH] perf: use the Store struct to save states and checkpoint states in memory (#1253) --- .../beacon/beacon_node.ex | 15 +- .../beacon/pending_blocks.ex | 92 ++++++------ .../beacon/sync_blocks.ex | 8 +- .../fork_choice/fork_choice.ex | 139 ++++++++---------- .../fork_choice/handlers.ex | 54 +++---- .../fork_choice/head.ex | 9 +- .../logger/console_logger.ex | 10 +- .../p2p/blob_downloader.ex | 38 +++-- .../p2p/block_downloader.ex | 59 +++++--- .../p2p/gossip/attestation.ex | 5 + .../p2p/gossip/beacon_block.ex | 8 +- .../p2p/gossip/blob_sidecar.ex | 9 +- .../p2p/gossip/handler.ex | 4 +- .../p2p/gossip/operations_collector.ex | 9 +- lib/lambda_ethereum_consensus/p2p/requests.ex | 48 ------ .../store/checkpoint_states.ex | 2 + .../store/store_db.ex | 3 +- lib/libp2p_port.ex | 95 ++++++++---- lib/types/store.ex | 104 ++++++++++++- test/spec/runner_behaviour.ex | 8 + test/spec/runners/fork_choice.ex | 7 + test/spec/runners/sync.ex | 7 + test/spec/tasks/generate_spec_tests.ex | 1 + test/unit/beacon_api/beacon_api_v1_test.exs | 3 +- test/unit/libp2p_port_test.exs | 6 +- test/unit/p2p/requests_test.exs | 37 ----- 26 files changed, 440 insertions(+), 340 deletions(-) delete mode 100644 lib/lambda_ethereum_consensus/p2p/requests.ex delete mode 100644 test/unit/p2p/requests_test.exs diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex index f939f597b..491ba2b07 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex @@ -18,7 +18,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do @impl true def init(_) do store = StoreSetup.setup!() - deposit_tree_snapshot = StoreSetup.get_deposit_snapshot!() LambdaEthereumConsensus.P2P.Metadata.init() @@ -26,20 +25,24 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do time = :os.system_time(:second) - ForkChoice.init_store(store, time) - - init_execution_chain(deposit_tree_snapshot, store.head_root) + store = ForkChoice.init_store(store, time) validators = Validator.Setup.init(store.head_slot, store.head_root) - libp2p_args = [genesis_time: store.genesis_time, validators: validators] ++ get_libp2p_args() + StoreSetup.get_deposit_snapshot!() + |> init_execution_chain(store.head_root) + + libp2p_args = + [genesis_time: store.genesis_time, validators: validators, store: store] ++ + get_libp2p_args() children = [ {LambdaEthereumConsensus.Libp2pPort, libp2p_args}, {Task.Supervisor, name: PruneStatesSupervisor}, {Task.Supervisor, name: PruneBlocksSupervisor}, - {Task.Supervisor, name: PruneBlobsSupervisor} + {Task.Supervisor, name: PruneBlobsSupervisor}, + {Task.Supervisor, name: StoreStatesSupervisor} ] Supervisor.init(children, strategy: :one_for_all) diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index 7bb37ff83..6611dd5ec 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -7,14 +7,14 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do require Logger alias LambdaEthereumConsensus.ForkChoice - alias LambdaEthereumConsensus.P2P.BlockDownloader - alias LambdaEthereumConsensus.Metrics alias LambdaEthereumConsensus.P2P.BlobDownloader + alias LambdaEthereumConsensus.P2P.BlockDownloader alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Store.Blocks alias Types.BlockInfo alias Types.SignedBeaconBlock + alias Types.Store @type block_status :: :pending | :invalid | :download | :download_blobs | :unknown @type block_info :: @@ -36,8 +36,8 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do If blobs are missing, they will be requested. """ - @spec add_block(SignedBeaconBlock.t()) :: :ok - def add_block(signed_block) do + @spec add_block(Store.t(), SignedBeaconBlock.t()) :: Store.t() + def add_block(store, signed_block) do block_info = BlockInfo.from_block(signed_block) loaded_block = Blocks.get_block_info(block_info.root) @@ -47,14 +47,18 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do if Enum.empty?(missing_blobs) do Blocks.new_block_info(block_info) - process_block_and_check_children(block_info) + process_block_and_check_children(store, block_info) else - BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, @download_retries) + BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/2, @download_retries) block_info |> BlockInfo.change_status(:download_blobs) |> Blocks.new_block_info() + + store end + else + store end end @@ -63,17 +67,22 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do module after receiving a new block, but there are some other cases like at node startup, as there may be pending blocks from prior executions. """ - def process_blocks() do + def process_blocks(store) do case Blocks.get_blocks_with_status(:pending) do {:ok, blocks} -> blocks |> Enum.sort_by(fn %BlockInfo{} = block_info -> block_info.signed_block.message.slot end) - |> Enum.each(&process_block/1) + |> Enum.reduce(store, fn block_info, store -> + {store, _state} = process_block(store, block_info) + store + end) {:error, reason} -> Logger.error( "[Pending Blocks] Failed to get pending blocks to process. Reason: #{reason}" ) + + store end end @@ -85,13 +94,14 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do # is called to check if there's any children that can now be processed. This function # is only to be called when a new block is saved as pending, not when processing blocks # in batch, to avoid unneeded recursion. - defp process_block_and_check_children(block_info) do - if process_block(block_info) in [:transitioned, :invalid] do - process_blocks() + defp process_block_and_check_children(store, block_info) do + case process_block(store, block_info) do + {store, result} when result in [:transitioned, :invalid] -> process_blocks(store) + {store, _other} -> store end end - defp process_block(block_info) do + defp process_block(store, block_info) do if block_info.status != :pending do Logger.error("Called process block for a block that's not ready: #{block_info}") end @@ -105,9 +115,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do BlockDownloader.request_blocks_by_root( [parent_root], - fn result -> - process_downloaded_block(result) - end, + &process_downloaded_block/2, @download_retries ) @@ -116,65 +124,67 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do block_info.root ) - :download_pending + {store, :download_pending} %BlockInfo{status: :invalid} -> Blocks.change_status(block_info, :invalid) - :invalid + {store, :invalid} %BlockInfo{status: :transitioned} -> - case ForkChoice.on_block(block_info) do - :ok -> + case ForkChoice.on_block(store, block_info) do + {:ok, store} -> Blocks.change_status(block_info, :transitioned) - :transitioned + {store, :transitioned} - {:error, reason} -> + {:error, reason, store} -> Logger.error("[PendingBlocks] Saving block as invalid #{reason}", slot: block_info.signed_block.message.slot, root: block_info.root ) Blocks.change_status(block_info, :invalid) - :invalid + {store, :invalid} end _other -> - :ok + {store, :ok} end end - defp process_downloaded_block({:ok, [block]}) do - add_block(block) + defp process_downloaded_block(store, {:ok, [block]}) do + {:ok, add_block(store, block)} end - defp process_downloaded_block({:error, reason}) do - Logger.error("Error downloading block: #{inspect(reason)}") - + defp process_downloaded_block(store, {:error, reason}) do # We might want to declare a block invalid here. + Logger.error("Error downloading block: #{inspect(reason)}") + {:ok, store} end - defp process_blobs({:ok, blobs}), do: add_blobs(blobs) - - defp process_blobs({:error, reason}) do - Logger.error("Error downloading blobs: #{inspect(reason)}") + defp process_blobs(store, {:ok, blobs}), do: {:ok, add_blobs(store, blobs)} + defp process_blobs(store, {:error, reason}) do # We might want to declare a block invalid here. + Logger.error("Error downloading blobs: #{inspect(reason)}") + {:ok, store} end + def add_blob(store, blob), do: add_blobs(store, [blob]) + # To be used when a series of blobs are downloaded. Stores each blob. # If there are blocks that can be processed, does so immediately. - defp add_blobs(blobs) do + defp add_blobs(store, blobs) do blobs |> Enum.map(&BlobDb.store_blob/1) |> Enum.uniq() - |> Enum.each(fn root -> - with %BlockInfo{} = block_info <- Blocks.get_block_info(root) do - # TODO: add a new missing blobs call if some blobs are still missing for a block. - if Enum.empty?(missing_blobs(block_info)) do - block_info - |> Blocks.change_status(:pending) - |> process_block_and_check_children() - end + |> Enum.reduce(store, fn root, store -> + with %BlockInfo{} = block_info <- Blocks.get_block_info(root), + [] <- missing_blobs(block_info) do + block_info + |> Blocks.change_status(:pending) + |> then(&process_block_and_check_children(store, &1)) + else + _ -> store end end) end diff --git a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex index c4f22af1e..169993dcb 100644 --- a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex @@ -51,7 +51,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do BlockDownloader.request_blocks_by_range( first_slot, count, - &on_chunk_downloaded/1, + &on_chunk_downloaded/2, @retries ) @@ -61,11 +61,13 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do end end - defp on_chunk_downloaded({:ok, range, blocks}) do + defp on_chunk_downloaded(store, {:ok, range, blocks}) do Libp2pPort.notify_blocks_downloaded(range, blocks) + {:ok, store} end - defp on_chunk_downloaded({:error, range, reason}) do + defp on_chunk_downloaded(store, {:error, range, reason}) do Libp2pPort.notify_block_download_failed(range, reason) + {:ok, store} end end diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 7f2b19123..3bb4015df 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -15,19 +15,17 @@ defmodule LambdaEthereumConsensus.ForkChoice do alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Store.BlockDb alias LambdaEthereumConsensus.Store.Blocks - alias LambdaEthereumConsensus.Store.CheckpointStates alias LambdaEthereumConsensus.Store.StateDb alias LambdaEthereumConsensus.Store.StoreDb alias Types.Attestation alias Types.BlockInfo - alias Types.Checkpoint alias Types.Store ########################## ### Public API ########################## - @spec init_store(Store.t(), Types.uint64()) :: :ok | :error + @spec init_store(Store.t(), Types.uint64()) :: Store.t() def init_store(%Store{head_slot: head_slot, head_root: head_root} = store, time) do Logger.info("[Fork choice] Initialized store.", slot: head_slot) @@ -38,12 +36,11 @@ defmodule LambdaEthereumConsensus.ForkChoice do Metrics.block_status(head_root, head_slot, :transitioned) - persist_store(store) + tap(store, &StoreDb.persist_store/1) end - @spec on_block(BlockInfo.t()) :: :ok | {:error, String.t()} - def on_block(%BlockInfo{} = block_info) do - store = fetch_store!() + @spec on_block(Store.t(), BlockInfo.t()) :: {:ok, Store.t()} | {:error, String.t(), Store.t()} + def on_block(store, %BlockInfo{} = block_info) do slot = block_info.signed_block.message.slot block_root = block_info.root @@ -58,63 +55,60 @@ defmodule LambdaEthereumConsensus.ForkChoice do case result do {:ok, new_store} -> + Logger.info("[Fork choice] Block processed. Recomputing head.") :telemetry.execute([:sync, :on_block], %{slot: slot}) - Logger.info("[Fork choice] Added new block", slot: slot, root: block_root) :telemetry.span([:fork_choice, :recompute_head], %{}, fn -> {recompute_head(new_store), %{}} end) - - %Store{finalized_checkpoint: new_finalized_checkpoint} = new_store - - prune_old_states(last_finalized_checkpoint.epoch, new_finalized_checkpoint.epoch) - - persist_store(new_store) + |> prune_old_states(last_finalized_checkpoint.epoch) + |> tap(fn store -> + StoreDb.persist_store(store) + Logger.info("[Fork choice] Added new block", slot: slot, root: block_root) + end) + |> then(&{:ok, &1}) {:error, reason} -> Logger.error("[Fork choice] Failed to add block: #{reason}", slot: slot, root: block_root) - {:error, reason} + {:error, reason, store} end end - @spec on_attestation(Types.Attestation.t()) :: :ok - def on_attestation(%Attestation{} = attestation) do - state = fetch_store!() + @spec on_attestation(Store.t(), Types.Attestation.t()) :: Store.t() + def on_attestation(store, %Attestation{} = attestation) do id = attestation.signature |> Base.encode16() |> String.slice(0, 8) Logger.debug("[Fork choice] Adding attestation #{id} to the store") - state = - case Handlers.on_attestation(state, attestation, false) do - {:ok, new_state} -> new_state - _ -> state + store = + case Handlers.on_attestation(store, attestation, false) do + {:ok, new_store} -> new_store + _ -> store end - persist_store(state) + tap(store, &StoreDb.persist_store/1) end - @spec on_attester_slashing(Types.AttesterSlashing.t()) :: :ok - def on_attester_slashing(attester_slashing) do + @spec on_attester_slashing(Store.t(), Types.AttesterSlashing.t()) :: Store.t() + def on_attester_slashing(store, attester_slashing) do Logger.info("[Fork choice] Adding attester slashing to the store") - state = fetch_store!() - case Handlers.on_attester_slashing(state, attester_slashing) do - {:ok, new_state} -> - persist_store(new_state) + case Handlers.on_attester_slashing(store, attester_slashing) do + {:ok, new_store} -> + tap(new_store, &StoreDb.persist_store/1) _ -> Logger.error("[Fork choice] Failed to add attester slashing to the store") + store end end - @spec on_tick(Types.uint64()) :: :ok - def on_tick(time) do - store = fetch_store!() + @spec on_tick(Store.t(), Types.uint64()) :: Store.t() + def on_tick(store, time) do %Store{finalized_checkpoint: last_finalized_checkpoint} = store - new_store = Handlers.on_tick(store, time) - %Store{finalized_checkpoint: new_finalized_checkpoint} = new_store - prune_old_states(last_finalized_checkpoint.epoch, new_finalized_checkpoint.epoch) - persist_store(new_store) + Handlers.on_tick(store, time) + |> prune_old_states(last_finalized_checkpoint.epoch) + |> tap(&StoreDb.persist_store/1) end @spec get_current_chain_slot() :: Types.slot() @@ -175,8 +169,12 @@ defmodule LambdaEthereumConsensus.ForkChoice do ### Private Functions ########################## - defp prune_old_states(last_finalized_epoch, new_finalized_epoch) do + defp prune_old_states(store, last_finalized_epoch) do + new_finalized_epoch = store.finalized_checkpoint.epoch + if last_finalized_epoch < new_finalized_epoch do + Logger.info("Pruning states before slot #{new_finalized_epoch}") + new_finalized_slot = new_finalized_epoch * ChainSpec.get("SLOTS_PER_EPOCH") @@ -195,6 +193,8 @@ defmodule LambdaEthereumConsensus.ForkChoice do fn -> BlobDb.prune_old_blobs(new_finalized_slot) end ) end + + Store.prune(store) end def apply_handler(iter, state, handler) do @@ -216,26 +216,29 @@ defmodule LambdaEthereumConsensus.ForkChoice do attestations |> Enum.map(& &1.data.target) |> Enum.uniq() - |> Enum.flat_map(&fetch_checkpoint_state/1) - |> Map.new() + |> Enum.flat_map(fn ch -> fetch_checkpoint_state(store, ch) end) end) # Prefetch committees for all relevant epochs. Metrics.span_operation(:prefetch_committees, nil, nil, fn -> - Enum.each(states, fn {ch, state} -> Accessors.maybe_prefetch_committees(state, ch.epoch) end) + for {checkpoint, state} <- states do + Accessors.maybe_prefetch_committees(state, checkpoint.epoch) + end end) - with {:ok, new_store} <- apply_on_block(store, block_info), - {:ok, new_store} <- process_attestations(new_store, attestations, states), + new_store = update_in(store.checkpoint_states, fn cs -> Map.merge(cs, Map.new(states)) end) + + with {:ok, new_store} <- apply_on_block(new_store, block_info), + {:ok, new_store} <- process_attestations(new_store, attestations), {:ok, new_store} <- process_attester_slashings(new_store, attester_slashings) do {:ok, new_store} end end - def fetch_checkpoint_state(checkpoint) do - case CheckpointStates.get_checkpoint_state(checkpoint) do - {:ok, state} -> [{checkpoint, state}] - _other -> [] + def fetch_checkpoint_state(store, checkpoint) do + case Store.get_checkpoint_state(store, checkpoint) do + {_store, nil} -> [] + {_store, state} -> [{checkpoint, state}] end end @@ -249,18 +252,20 @@ defmodule LambdaEthereumConsensus.ForkChoice do end) end - defp process_attestations(store, attestations, states) do + defp process_attestations(store, attestations) do Metrics.span_operation(:attestations, nil, nil, fn -> apply_handler( attestations, store, - &Handlers.on_attestation(&1, &2, true, states) + &Handlers.on_attestation(&1, &2, true) ) end) end - @spec recompute_head(Store.t()) :: :ok - def recompute_head(store) do + # Recomputes the head in the store and sends the new head to others (libP2P, + # operations collector db, execution chain db). + @spec recompute_head(Store.t()) :: Store.t() + defp recompute_head(store) do {:ok, head_root} = Head.get_head(store) head_block = Blocks.get_block!(head_root) @@ -272,21 +277,13 @@ defmodule LambdaEthereumConsensus.ForkChoice do Libp2pPort.notify_new_head(slot, head_root) ExecutionChain.notify_new_block(slot, body.eth1_data, body.execution_payload) - update_fork_choice_data( - head_root, - slot, - store.justified_checkpoint, - store.finalized_checkpoint - ) - Logger.debug("[Fork choice] Updated fork choice cache", slot: slot) - :ok - end - - defp persist_store(store) do - StoreDb.persist_store(store) - Logger.debug("[Fork choice] Store persisted") + %{ + store + | head_root: head_root, + head_slot: slot + } end defp fetch_store!() do @@ -302,20 +299,4 @@ defmodule LambdaEthereumConsensus.ForkChoice do |> ChainSpec.get_fork_version_for_epoch() |> Misc.compute_fork_digest(genesis_validators_root) end - - @spec update_fork_choice_data(Types.root(), Types.slot(), Checkpoint.t(), Checkpoint.t()) :: - :ok - defp update_fork_choice_data(head_root, head_slot, justified, finalized) do - store = fetch_store!() - - new_store = %{ - store - | head_root: head_root, - head_slot: head_slot, - justified_checkpoint: justified, - finalized_checkpoint: finalized - } - - persist_store(new_store) - end end diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 1bd4d1709..275f8c7df 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -12,8 +12,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do alias LambdaEthereumConsensus.StateTransition.Predicates alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Store.Blocks - alias LambdaEthereumConsensus.Store.BlockStates - alias LambdaEthereumConsensus.Store.CheckpointStates + alias LambdaEthereumConsensus.Store.StateDb alias Types.Attestation alias Types.AttestationData alias Types.AttesterSlashing @@ -60,12 +59,13 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do %{epoch: finalized_epoch, root: finalized_root} = store.finalized_checkpoint finalized_slot = Misc.compute_start_slot_at_epoch(finalized_epoch) - base_state = BlockStates.get_state_info(block.parent_root) + base_state = Store.get_state(store, block.parent_root) cond do # Parent block must be known base_state |> is_nil() -> - {:error, "parent state not found in store"} + {:error, + "parent state (block root = #{Base.encode16(block.parent_root)}) not found in store"} # Blocks cannot be in the future. If they are, their # consideration must be delayed until they are in the past. @@ -115,26 +115,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do end end - def on_attestation(%Store{} = store, %Attestation{} = attestation, is_from_block) do - with {:ok, target_state} <- CheckpointStates.get_checkpoint_state(attestation.data.target) do - on_attestation_with_state(store, attestation, is_from_block, target_state) - end - end - - def on_attestation(%Store{} = store, %Attestation{} = attestation, is_from_block, states) do - case Map.fetch(states, attestation.data.target) do - {:ok, target_state} -> - on_attestation_with_state(store, attestation, is_from_block, target_state) - - :error -> - if is_from_block do - {:ok, store} - else - {:error, "Checkpoint state not found for attestation."} - end - end - end - @doc """ Run ``on_attestation`` upon receiving a new ``attestation`` from either within a block or directly on the wire. @@ -143,20 +123,23 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do """ @spec on_attestation(Store.t(), Attestation.t(), boolean()) :: {:ok, Store.t()} | {:error, String.t()} - def on_attestation_with_state( + def on_attestation( %Store{} = store, %Attestation{} = attestation, - is_from_block, - target_state + is_from_block ) do with :ok <- check_attestation_valid(store, attestation, is_from_block), # Get state at the `target` to fully validate attestation + {new_store, target_state} <- Store.get_checkpoint_state(store, attestation.data.target), {:ok, indexed_attestation} <- Accessors.get_indexed_attestation(target_state, attestation), :ok <- check_valid_indexed_attestation(target_state, indexed_attestation) do # Update latest messages for attesting indices - update_latest_messages(store, indexed_attestation.attesting_indices, attestation) + update_latest_messages(new_store, indexed_attestation.attesting_indices, attestation) else + {%Store{} = _store, nil} -> + {:error, "Target state not found for the checkpoint while validating attestation"} + {:unknown_block, _} -> # TODO: this is just a patch, we should fetch blocks preemptively if is_from_block do @@ -189,7 +172,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do attestation_2: %IndexedAttestation{} = attestation_2 } ) do - state = BlockStates.get_state_info!(store.justified_checkpoint.root).beacon_state + state = Store.get_state!(store, store.justified_checkpoint.root).beacon_state cond do not Predicates.slashable_attestation_data?(attestation_1.data, attestation_2.data) -> @@ -244,15 +227,20 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do is_before_attesting_interval = time_into_slot < div(seconds_per_slot, intervals_per_slot) # Add new block and state to the store - BlockStates.store_state_info(new_state_info) + new_store = Store.store_state(store, new_state_info.block_root, new_state_info) + + Task.Supervisor.start_child( + StoreStatesSupervisor, + fn -> StateDb.store_state_info(new_state_info) end + ) - is_first_block = store.proposer_boost_root == <<0::256>> + is_first_block = new_store.proposer_boost_root == <<0::256>> # TODO: store block timeliness data? - is_timely = Store.get_current_slot(store) == block.slot and is_before_attesting_interval + is_timely = Store.get_current_slot(new_store) == block.slot and is_before_attesting_interval state = new_state_info.beacon_state - store + new_store |> Store.store_block_info(block_info) |> if_then_update( is_timely and is_first_block, diff --git a/lib/lambda_ethereum_consensus/fork_choice/head.ex b/lib/lambda_ethereum_consensus/fork_choice/head.ex index a4bc087b3..9114bc0e8 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/head.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/head.ex @@ -5,8 +5,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Head do alias LambdaEthereumConsensus.StateTransition.Accessors alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Store.Blocks - alias LambdaEthereumConsensus.Store.BlockStates - alias LambdaEthereumConsensus.Store.CheckpointStates + alias Types.BeaconState alias Types.Store @spec get_head(Store.t()) :: {:ok, Types.root()} | {:error, any} @@ -15,7 +14,9 @@ defmodule LambdaEthereumConsensus.ForkChoice.Head do blocks = get_filtered_block_tree(store) # Execute the LMD-GHOST fork choice head = store.justified_checkpoint.root - {:ok, justified_state} = CheckpointStates.get_checkpoint_state(store.justified_checkpoint) + + {_store, %BeaconState{} = justified_state} = + Store.get_checkpoint_state(store, store.justified_checkpoint) # PERF: return just the parent root and the block root in `get_filtered_block_tree` Stream.cycle([nil]) @@ -158,7 +159,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Head do store.unrealized_justifications[block_root] else # The block is not from a prior epoch, therefore the voting source is not pulled up - head_state = BlockStates.get_state_info!(block_root).beacon_state + head_state = Store.get_state!(store, block_root).beacon_state head_state.current_justified_checkpoint end end diff --git a/lib/lambda_ethereum_consensus/logger/console_logger.ex b/lib/lambda_ethereum_consensus/logger/console_logger.ex index 4e39032ad..7e74fd93b 100644 --- a/lib/lambda_ethereum_consensus/logger/console_logger.ex +++ b/lib/lambda_ethereum_consensus/logger/console_logger.ex @@ -9,9 +9,13 @@ defmodule ConsoleLogger do def format(level, message, timestamp, metadata) do formatted_metadata = format_metadata(metadata) - [format_level(level)] ++ - [Logger.Formatter.format(@pattern, level, message, timestamp, [])] ++ - [formatted_metadata] ++ ["\n"] + [ + format_level(level), + Logger.Formatter.format(@pattern, level, message, timestamp, []), + formatted_metadata, + "\n" + ] + |> IO.iodata_to_binary() rescue err -> inspect(err) diff --git a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex index 558855a86..4493a4fc0 100644 --- a/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/blob_downloader.ex @@ -8,13 +8,15 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do alias LambdaEthereumConsensus.Metrics alias LambdaEthereumConsensus.P2P alias LambdaEthereumConsensus.P2P.ReqResp + alias LambdaEthereumConsensus.Store alias Types.BlobSidecar + alias Types.Store @blobs_by_range_protocol_id "/eth2/beacon_chain/req/blob_sidecars_by_range/1/ssz_snappy" @blobs_by_root_protocol_id "/eth2/beacon_chain/req/blob_sidecars_by_root/1/ssz_snappy" - @type on_blobs :: ({:ok, [BlobSidecar.t()]} | {:error, any()} -> :ok) - @type on_blob :: ({:ok, BlobSidecar.t()} | {:error, any()} -> :ok) + @type on_blobs :: (Store.t(), {:ok, [BlobSidecar.t()]} | {:error, any()} -> :ok) + @type on_blob :: (Store.t(), {:ok, BlobSidecar.t()} | {:error, any()} -> :ok) # Requests to peers might fail for various reasons, # for example they might not support the protocol or might not reply @@ -38,22 +40,23 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do %Types.BeaconBlocksByRangeRequest{start_slot: slot, count: count} |> ReqResp.encode_request() - Libp2pPort.send_async_request(peer_id, @blobs_by_range_protocol_id, request, fn response -> + Libp2pPort.send_async_request(peer_id, @blobs_by_range_protocol_id, request, fn store, + response -> Metrics.handler_span( "response_handler", "blob_sidecars_by_range", fn -> - handle_blobs_by_range_response(response, peer_id, count, slot, retries, on_blobs) + handle_blobs_by_range_response(store, response, peer_id, count, slot, retries, on_blobs) end ) end) end - defp handle_blobs_by_range_response(response, peer_id, count, slot, retries, on_blobs) do + defp handle_blobs_by_range_response(store, response, peer_id, count, slot, retries, on_blobs) do with {:ok, response_message} <- response, {:ok, blobs} <- ReqResp.decode_response(response_message, BlobSidecar), :ok <- verify_batch(blobs, slot, count) do - on_blobs.({:ok, blobs}) + on_blobs.(store, {:ok, blobs}) else {:error, reason} -> P2P.Peerbook.penalize_peer(peer_id) @@ -61,8 +64,9 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do if retries > 0 do Logger.debug("Retrying request for #{count} blobs", slot: slot) request_blobs_by_range(slot, count, on_blobs, retries - 1) + {:ok, store} else - on_blobs.({:error, reason}) + on_blobs.(store, {:error, reason}) end end end @@ -71,10 +75,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do def request_blob_by_root(identifier, on_blob, retries \\ @default_retries) do request_blobs_by_root( [identifier], - fn - {:ok, [blob]} -> on_blob.({:ok, blob}) - other -> on_blob.(other) - end, + fn store, response -> on_blob.(store, flatten_response(response)) end, retries ) end @@ -91,19 +92,20 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do request = ReqResp.encode_request({identifiers, TypeAliases.blob_sidecars_by_root_request()}) - Libp2pPort.send_async_request(peer_id, @blobs_by_root_protocol_id, request, fn response -> + Libp2pPort.send_async_request(peer_id, @blobs_by_root_protocol_id, request, fn store, + response -> Metrics.handler_span( "response_handler", "blob_sidecars_by_root", - fn -> handle_blobs_by_root(response, peer_id, identifiers, retries, on_blobs) end + fn -> handle_blobs_by_root(store, response, peer_id, identifiers, retries, on_blobs) end ) end) end - def handle_blobs_by_root(response, peer_id, identifiers, retries, on_blobs) do + def handle_blobs_by_root(store, response, peer_id, identifiers, retries, on_blobs) do with {:ok, response_message} <- response, {:ok, blobs} <- ReqResp.decode_response(response_message, BlobSidecar) do - on_blobs.({:ok, blobs}) + on_blobs.(store, {:ok, blobs}) else {:error, reason} -> P2P.Peerbook.penalize_peer(peer_id) @@ -111,8 +113,9 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do if retries > 0 do Logger.debug("Retrying request for blobs.") request_blobs_by_root(identifiers, on_blobs, retries - 1) + {:ok, store} else - on_blobs.({:error, reason}) + on_blobs.(store, {:error, reason}) end end end @@ -139,4 +142,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do {:error, "blob outside requested slot range"} end end + + defp flatten_response({:ok, [blob]}), do: {:ok, blob} + defp flatten_response(other), do: other end diff --git a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex index 06e97fa9b..6522fa84a 100644 --- a/lib/lambda_ethereum_consensus/p2p/block_downloader.ex +++ b/lib/lambda_ethereum_consensus/p2p/block_downloader.ex @@ -9,6 +9,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do alias LambdaEthereumConsensus.P2P alias LambdaEthereumConsensus.P2P.ReqResp alias Types.SignedBeaconBlock + alias Types.Store @blocks_by_range_protocol_id "/eth2/beacon_chain/req/beacon_blocks_by_range/2/ssz_snappy" @blocks_by_root_protocol_id "/eth2/beacon_chain/req/beacon_blocks_by_root/2/ssz_snappy" @@ -21,7 +22,8 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do @type range :: {Types.slot(), Types.slot()} @type download_result :: {:ok, [SignedBeaconBlock.t()]} | {:error, any()} @type on_blocks :: - ({:ok, range(), [SignedBeaconBlock.t()]} | {:error, range(), any()} -> term()) + (Store.t(), {:ok, range(), [SignedBeaconBlock.t()]} | {:error, range(), any()} -> + term()) @doc """ Requests a series of blocks in batch, and synchronously (the caller will block waiting for the @@ -41,7 +43,13 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do def request_blocks_by_range_sync(slot, count, retries) do pid = self() - request_blocks_by_range(slot, count, fn result -> send(pid, result) end, retries) + + request_blocks_by_range( + slot, + count, + fn store, result -> tap(store, send(pid, result)) end, + retries + ) receive do result -> result @@ -65,24 +73,33 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do %Types.BeaconBlocksByRangeRequest{start_slot: slot, count: count} |> ReqResp.encode_request() - Libp2pPort.send_async_request(peer_id, @blocks_by_range_protocol_id, request, fn response -> + Libp2pPort.send_async_request(peer_id, @blocks_by_range_protocol_id, request, fn store, + response -> Metrics.handler_span( "response_handler", "blocks_by_range", fn -> - handle_blocks_by_range_response(response, slot, count, retries, peer_id, on_blocks) + handle_blocks_by_range_response( + store, + response, + slot, + count, + retries, + peer_id, + on_blocks + ) end ) end) end - defp handle_blocks_by_range_response(response, slot, count, retries, peer_id, on_blocks) do + defp handle_blocks_by_range_response(store, response, slot, count, retries, peer_id, on_blocks) do with {:ok, response_message} <- response, {:ok, blocks} <- ReqResp.decode_response(response_message, SignedBeaconBlock), :ok <- verify_batch(blocks, slot, count) do tags = %{result: "success", type: "by_slot", reason: "success"} :telemetry.execute([:network, :request], %{blocks: count}, tags) - on_blocks.({:ok, {slot, slot + count - 1}, blocks}) + on_blocks.(store, {:ok, {slot, slot + count - 1}, blocks}) else {:error, reason} -> tags = %{type: "by_slot", reason: parse_reason(reason)} @@ -92,34 +109,31 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "retry")) Logger.debug("Retrying request for #{count} blocks", slot: slot) request_blocks_by_range(slot, count, on_blocks, retries - 1) + {:ok, store} else :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "error")) # TODO: Add block range that failed in the reason - on_blocks.({:error, {slot, slot + count - 1}, reason}) - {:error, reason} + on_blocks.(store, {:error, {slot, slot + count - 1}, reason}) end end end @spec request_block_by_root( Types.root(), - ({:ok, SignedBeaconBlock.t()} | {:error, binary()} -> :ok), + (Store.t(), {:ok, SignedBeaconBlock.t()} | {:error, binary()} -> :ok), integer() ) :: :ok def request_block_by_root(root, on_block, retries \\ @default_retries) do request_blocks_by_root( [root], - fn - {:ok, [block]} -> on_block.({:ok, block}) - other -> on_block.(other) - end, + fn store, response -> on_block.(store, flatten_response(response)) end, retries ) end @spec request_blocks_by_root( [Types.root()], - ({:ok, [SignedBeaconBlock.t()]} | {:error, binary()} -> :ok), + (Store.t(), {:ok, [SignedBeaconBlock.t()]} | {:error, binary()} -> :ok), integer() ) :: :ok def request_blocks_by_root(roots, on_blocks, retries \\ @default_retries) @@ -133,21 +147,24 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do request = ReqResp.encode_request({roots, TypeAliases.beacon_blocks_by_root_request()}) - Libp2pPort.send_async_request(peer_id, @blocks_by_root_protocol_id, request, fn response -> + Libp2pPort.send_async_request(peer_id, @blocks_by_root_protocol_id, request, fn store, + response -> Metrics.handler_span( "response_handler", "blocks_by_root", - fn -> handle_blocks_by_root_response(response, roots, on_blocks, peer_id, retries) end + fn -> + handle_blocks_by_root_response(store, response, roots, on_blocks, peer_id, retries) + end ) end) end - defp handle_blocks_by_root_response(response, roots, on_blocks, peer_id, retries) do + defp handle_blocks_by_root_response(store, response, roots, on_blocks, peer_id, retries) do with {:ok, response_message} <- response, {:ok, blocks} <- ReqResp.decode_response(response_message, SignedBeaconBlock) do tags = %{result: "success", type: "by_root", reason: "success"} :telemetry.execute([:network, :request], %{blocks: length(roots)}, tags) - on_blocks.({:ok, blocks}) + on_blocks.(store, {:ok, blocks}) else {:error, reason} -> tags = %{type: "by_root", reason: parse_reason(reason)} @@ -158,9 +175,10 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do pretty_roots = Enum.map_join(roots, ", ", &Base.encode16/1) Logger.debug("Retrying request for blocks with roots #{pretty_roots}") request_blocks_by_root(roots, on_blocks, retries - 1) + {:ok, store} else :telemetry.execute([:network, :request], %{blocks: 0}, Map.put(tags, :result, "error")) - on_blocks.({:error, reason}) + on_blocks.(store, {:error, reason}) end end end @@ -194,4 +212,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do {:error, "block outside requested slot range"} end end + + defp flatten_response({:ok, [block]}), do: {:ok, block} + defp flatten_response(other), do: other end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex b/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex index 11ea44353..5e6943a60 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex @@ -23,6 +23,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do end @impl true + def handle_gossip_message(store, topic, msg_id, message) do + handle_gossip_message(topic, msg_id, message) + store + end + def handle_gossip_message(topic, msg_id, message) do subnet_id = extract_subnet_id(topic) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex index b09efccf7..a1accbab1 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex @@ -16,7 +16,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do ########################## @impl true - def handle_gossip_message(_topic, msg_id, message) do + def handle_gossip_message(store, _topic, msg_id, message) do slot = ForkChoice.get_current_chain_slot() with {:ok, uncompressed} <- :snappyer.decompress(message), @@ -24,18 +24,18 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do :ok <- validate(signed_block, slot) do Logger.info("[Gossip] Block received, block.slot: #{signed_block.message.slot}.") Libp2pPort.validate_message(msg_id, :accept) - PendingBlocks.add_block(signed_block) + PendingBlocks.add_block(store, signed_block) else {:ignore, reason} -> Logger.warning("[Gossip] Block ignored, reason: #{inspect(reason)}.") Libp2pPort.validate_message(msg_id, :ignore) + store {:error, reason} -> Logger.warning("[Gossip] Block rejected, reason: #{inspect(reason)}.") Libp2pPort.validate_message(msg_id, :reject) + store end - - :ok end @spec subscribe_to_topic() :: :ok | :error diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex b/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex index 439169bd6..eed5da8c8 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/blob_sidecar.ex @@ -2,27 +2,28 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BlobSideCar do @moduledoc """ This module handles blob sidecar gossipsub topics. """ + alias LambdaEthereumConsensus.Beacon.PendingBlocks alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.Libp2pPort alias LambdaEthereumConsensus.P2P.Gossip.Handler - alias LambdaEthereumConsensus.Store.BlobDb require Logger @behaviour Handler - @impl true - def handle_gossip_message(_topic, msg_id, message) do + @impl Handler + def handle_gossip_message(store, _topic, msg_id, message) do with {:ok, uncompressed} <- :snappyer.decompress(message), {:ok, %Types.BlobSidecar{index: blob_index} = blob} <- Ssz.from_ssz(uncompressed, Types.BlobSidecar) do Logger.debug("[Gossip] Blob sidecar received, with index #{blob_index}") - BlobDb.store_blob(blob) Libp2pPort.validate_message(msg_id, :accept) + PendingBlocks.add_blob(store, blob) else {:error, reason} -> Logger.warning("[Gossip] Blob rejected, reason: #{inspect(reason)}") Libp2pPort.validate_message(msg_id, :reject) + store end end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex index 2dd3e31a4..9ecadb89e 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/handler.ex @@ -2,6 +2,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Handler do @moduledoc """ Gossip handler behaviour """ + alias Types.Store - @callback handle_gossip_message(binary(), binary(), iodata()) :: :ok | {:error, any} + @callback handle_gossip_message(Store.t(), binary(), binary(), iodata()) :: + Store.t() | {:error, any} end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index 46d0578e8..bbda6d620 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -143,6 +143,11 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end @impl true + def handle_gossip_message(store, topic, msg_id, message) do + handle_gossip_message(topic, msg_id, message) + store + end + def handle_gossip_message( <<_::binary-size(15)>> <> "beacon_aggregate_and_proof" <> _, _msg_id, @@ -168,7 +173,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end end - @impl true def handle_gossip_message( <<_::binary-size(15)>> <> "voluntary_exit" <> _, _msg_id, @@ -181,7 +185,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end end - @impl true def handle_gossip_message( <<_::binary-size(15)>> <> "proposer_slashing" <> _, _msg_id, @@ -194,7 +197,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end end - @impl true def handle_gossip_message( <<_::binary-size(15)>> <> "attester_slashing" <> _, _msg_id, @@ -207,7 +209,6 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do end end - @impl true def handle_gossip_message( <<_::binary-size(15)>> <> "bls_to_execution_change" <> _, _msg_id, diff --git a/lib/lambda_ethereum_consensus/p2p/requests.ex b/lib/lambda_ethereum_consensus/p2p/requests.ex deleted file mode 100644 index 4e3138eb9..000000000 --- a/lib/lambda_ethereum_consensus/p2p/requests.ex +++ /dev/null @@ -1,48 +0,0 @@ -defmodule LambdaEthereumConsensus.P2p.Requests do - @moduledoc """ - Uses uuids to identify requests and their handlers. Saves the handler in the struct until a - response is available and then handles appropriately. - """ - @type id :: binary - @type handler :: (term() -> term()) - @type requests :: %{id => handler} - - @doc """ - Creates a requests object that will hold response handlers. - """ - @spec new() :: requests() - def new(), do: %{} - - @doc """ - Adds a handler for a request. - - Returns a tuple {requests, request_id}, where: - - Requests is the modified requests object with the added handler. - - The id for the handler for that request. This will be used later when calling handle_response/3. - """ - @spec add_response_handler(requests(), handler()) :: {requests(), id()} - def add_response_handler(requests, handler) do - id = UUID.uuid4() - {Map.put(requests, id, handler), id} - end - - @doc """ - Handles a request using handler_id. The handler will be popped from the - requests object. - - Returns a {status, requests} tuple where: - - status is :ok if it was handled or :unhandled if the id didn't correspond to a saved handler. - - requests is the modified requests object with the handler removed. - """ - @spec handle_response(requests(), term(), id()) :: {:ok | :unhandled, requests()} - def handle_response(requests, response, handler_id) do - case Map.fetch(requests, handler_id) do - {:ok, handler} -> - handler.(response) - {:ok, Map.delete(requests, handler_id)} - - :error -> - {:unhandled, requests} - end - end -end diff --git a/lib/lambda_ethereum_consensus/store/checkpoint_states.ex b/lib/lambda_ethereum_consensus/store/checkpoint_states.ex index eca79aeae..2ae9a7e4d 100644 --- a/lib/lambda_ethereum_consensus/store/checkpoint_states.ex +++ b/lib/lambda_ethereum_consensus/store/checkpoint_states.ex @@ -51,6 +51,8 @@ defmodule LambdaEthereumConsensus.Store.CheckpointStates do @doc """ Calculate the state for a checkpoint without interacting with the db. + + TODO (#1278): use Store.get_target_checkpoint_state instead. """ @spec compute_target_checkpoint_state(Types.epoch(), Types.root()) :: {:ok, BeaconState.t()} | {:error, String.t()} diff --git a/lib/lambda_ethereum_consensus/store/store_db.ex b/lib/lambda_ethereum_consensus/store/store_db.ex index f439b9452..16886cac5 100644 --- a/lib/lambda_ethereum_consensus/store/store_db.ex +++ b/lib/lambda_ethereum_consensus/store/store_db.ex @@ -3,6 +3,7 @@ defmodule LambdaEthereumConsensus.Store.StoreDb do Beacon node store storage. """ alias LambdaEthereumConsensus.Store.Db + alias Types.Store @store_prefix "store" @snapshot_prefix "snapshot" @@ -17,7 +18,7 @@ defmodule LambdaEthereumConsensus.Store.StoreDb do @spec persist_store(Types.Store.t()) :: :ok def persist_store(%Types.Store{} = store) do :telemetry.span([:db, :latency], %{}, fn -> - {put(@store_prefix, store), %{module: "fork_choice", action: "persist"}} + {put(@store_prefix, Store.remove_cache(store)), %{module: "fork_choice", action: "persist"}} end) end diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index afc7d12dd..48fb52651 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -20,7 +20,6 @@ defmodule LambdaEthereumConsensus.Libp2pPort do alias LambdaEthereumConsensus.P2P.Gossip.OperationsCollector alias LambdaEthereumConsensus.P2P.IncomingRequestsHandler alias LambdaEthereumConsensus.P2P.Peerbook - alias LambdaEthereumConsensus.P2p.Requests alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Utils.BitVector alias LambdaEthereumConsensus.Validator @@ -46,6 +45,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do alias Libp2pProto.Tracer alias Libp2pProto.ValidateMessage alias Types.EnrForkId + alias Types.Store require Logger @@ -182,7 +182,10 @@ defmodule LambdaEthereumConsensus.Libp2pPort do GenServer.cast( pid, {:send_request, peer_id, protocol_id, message, - fn response -> send(from, {:response, response}) end} + fn store, response -> + send(from, {:response, response}) + {:ok, store} + end} ) receive_response() @@ -391,6 +394,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do {validators, args} = Keyword.pop(args, :validators, %{}) {join_init_topics, args} = Keyword.pop(args, :join_init_topics, false) {enable_request_handlers, args} = Keyword.pop(args, :enable_request_handlers, false) + {store, args} = Keyword.pop!(args, :store) port = Port.open({:spawn, @port_name}, [:binary, {:packet, 4}, :exit_status]) @@ -420,7 +424,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do slot_data: nil, port: port, subscribers: %{}, - requests: Requests.new(), + requests: %{}, + store: store, syncing: true }, {:continue, :check_pending_blocks}} end @@ -430,8 +435,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do # call is a noop. @impl GenServer def handle_continue(:check_pending_blocks, state) do - PendingBlocks.process_blocks() - {:noreply, state} + {:noreply, update_in(state.store, &PendingBlocks.process_blocks/1)} end @impl GenServer @@ -452,7 +456,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do port: port } = state ) do - {new_requests, handler_id} = Requests.add_response_handler(requests, handler) + {new_requests, handler_id} = add_response_handler(requests, handler) send_request = %SendRequest{ id: peer_id, @@ -476,10 +480,15 @@ defmodule LambdaEthereumConsensus.Libp2pPort do "[Optimistic Sync] Range #{first_slot} - #{last_slot} downloaded successfully, with #{n_blocks} blocks and #{missing} missing." ) - Enum.each(blocks, &PendingBlocks.add_block/1) + new_store = + Enum.reduce(blocks, state.store, fn block, store -> + PendingBlocks.add_block(store, block) + end) new_state = - Map.update!(state, :blocks_remaining, fn n -> n - n_blocks - missing end) + state + |> Map.put(:store, new_store) + |> Map.update!(:blocks_remaining, fn n -> n - n_blocks - missing end) |> subscribe_if_no_blocks() {:noreply, new_state} @@ -592,17 +601,19 @@ defmodule LambdaEthereumConsensus.Libp2pPort do direction: "->elixir" }) - case Map.fetch(subscribers, gs.topic) do - {:ok, module} -> - Metrics.handler_span("gossip_handler", gs.topic, fn -> - module.handle_gossip_message(gs.topic, gs.msg_id, gs.message) - end) + new_store = + case Map.fetch(subscribers, gs.topic) do + {:ok, module} -> + Metrics.handler_span("gossip_handler", gs.topic, fn -> + module.handle_gossip_message(state.store, gs.topic, gs.msg_id, gs.message) + end) - :error -> - Logger.error("[Gossip] Received gossip from unknown topic: #{gs.topic}.") - end + :error -> + Logger.error("[Gossip] Received gossip from unknown topic: #{gs.topic}.") + state.store + end - state + Map.put(state, :store, new_store) end defp handle_notification( @@ -639,22 +650,14 @@ defmodule LambdaEthereumConsensus.Libp2pPort do state end - defp handle_notification(%Response{} = response, %{requests: requests} = state) do + defp handle_notification(%Response{} = response, %{requests: requests, store: store} = state) do :telemetry.execute([:port, :message], %{}, %{ function: "response", direction: "->elixir" }) - success = if response.success, do: :ok, else: :error - - {result, new_requests} = - Requests.handle_response(requests, {success, response.message}, response.id) - - if result == :unhandled do - Logger.error("Unhandled response with id: #{response.id}. Message: #{response.message}") - end - - state |> Map.put(:requests, new_requests) + {new_requests, new_store} = handle_response(requests, store, response) + state |> Map.merge(%{requests: new_requests, store: new_store}) end defp handle_notification(%Result{from: "", result: result}, state) do @@ -790,8 +793,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do defp on_tick(time, %{genesis_time: genesis_time, slot_data: slot_data} = state) do # TODO: we probably want to remove this (ForkChoice.on_tick) from here, but we keep it # here to have this serialized with respect to the other fork choice store modifications. - - ForkChoice.on_tick(time) + new_store = ForkChoice.on_tick(state.store, time) new_slot_data = compute_slot(genesis_time, time) @@ -807,7 +809,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do maybe_log_new_slot(slot_data, new_slot_data) - updated_state + updated_state |> Map.put(:store, new_store) end defp schedule_next_tick() do @@ -834,6 +836,37 @@ defmodule LambdaEthereumConsensus.Libp2pPort do {slot, slot_third} end + defp add_response_handler(requests, handler) do + id = UUID.uuid4() + {Map.put(requests, id, handler), id} + end + + # Handles a request using handler_id. The handler will be popped from the + # requests map. + # + # Returns a {status, requests} tuple where: + # - status is :ok if it was handled or :unhandled if the id didn't correspond to a saved handler. + # - requests is the modified requests object with the handler removed. + defp handle_response(requests, store, response) do + case Map.pop(requests, response.id) do + {nil, new_requests} -> + Logger.error("Unhandled response with id: #{response.id}. Message: #{response.message}") + {new_requests, store} + + {handler, new_requests} -> + success = if response.success, do: :ok, else: :error + + case handler.(store, {success, response.message}) do + {:ok, %Store{} = new_store} -> + {new_requests, new_store} + + {:error, reason} -> + Logger.warning("Handling response failed with reason: #{reason}") + {new_requests, store} + end + end + end + defp maybe_log_new_slot({slot, _third}, {slot, _another_third}), do: :ok defp maybe_log_new_slot({_prev_slot, _thrid}, {slot, :first_third}) do diff --git a/lib/types/store.ex b/lib/types/store.ex index 59f729b41..736684034 100644 --- a/lib/types/store.ex +++ b/lib/types/store.ex @@ -5,6 +5,7 @@ defmodule Types.Store do alias LambdaEthereumConsensus.ForkChoice.Head alias LambdaEthereumConsensus.ForkChoice.Simple.Tree + alias LambdaEthereumConsensus.StateTransition alias LambdaEthereumConsensus.StateTransition.Accessors alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Store.Blocks @@ -31,7 +32,13 @@ defmodule Types.Store do :head_root, :head_slot, # Stores block data on the current fork tree (~last two epochs) - :tree_cache + :tree_cache, + + ### Everything under this can be thought as cache for the db. + # States indexed by block root. + :states, + # States indexed by checkpoint. Sometimes necessary because of empty slots. + :checkpoint_states ] @type t :: %__MODULE__{ @@ -48,7 +55,9 @@ defmodule Types.Store do unrealized_justifications: %{Types.root() => Checkpoint.t()}, head_root: Types.root() | nil, head_slot: Types.slot() | nil, - tree_cache: Tree.t() + tree_cache: Tree.t(), + states: %{Types.root() => StateInfo.t()}, + checkpoint_states: %{Types.Checkpoint.t() => BeaconState.t()} } @spec get_forkchoice_store(BeaconState.t(), SignedBeaconBlock.t()) :: @@ -88,9 +97,12 @@ defmodule Types.Store do unrealized_justifications: %{anchor_block_root => anchor_checkpoint}, head_root: nil, head_slot: nil, - tree_cache: Tree.new(anchor_block_root) + tree_cache: Tree.new(anchor_block_root), + states: %{}, + checkpoint_states: %{} } |> store_block_info(block_info) + |> store_state(block_info.root, state_info) |> update_head_info() |> then(&{:ok, &1}) else @@ -149,6 +161,71 @@ defmodule Types.Store do safe_block.body.execution_payload.block_hash end + @doc """ + Removes everything prior to the last finalized slot, specifically checkpoint states + and states by root. + """ + def prune(%__MODULE__{} = store) do + new_finalized_slot = + store.finalized_checkpoint.epoch * ChainSpec.get("SLOTS_PER_EPOCH") + + store + |> prune_checkpoint_states(new_finalized_slot) + |> prune_states(new_finalized_slot) + end + + @doc """ + Gets a StatInfo given a block root. Defaults to the DB if not present in the store. + """ + def get_state(store, root) when is_binary(root) do + with nil <- Map.get(store.states, root) do + BlockStates.get_state_info(root) + end + end + + def get_state!(store, root) do + %StateInfo{} = get_state(store, root) + end + + def store_state(store, block_root, state) do + update_in(store.states, fn states -> Map.put(states, block_root, state) end) + end + + @spec get_checkpoint_state(t(), Types.Checkpoint.t()) :: {t(), BeaconState.t() | nil} + @doc """ + Gets a State given a checkpoint. If there is no state for that checkpoint in the store + it will try to compute it. + + Computing the state means: + 1. Getting the state for the checkpoint's root. + 2. If the state is the one requested, it is returned. + 3. If not, that means that there are empty slots, so slots are processed. + + Returns a {store, state} or {store, nil}. The store may be updated if the state is calculated. + """ + def get_checkpoint_state(store, %Checkpoint{} = checkpoint) do + case Map.get(store.checkpoint_states, checkpoint) do + nil -> compute_checkpoint_state(store, checkpoint) + state -> {store, state} + end + end + + def remove_cache(%__MODULE__{} = store) do + store |> Map.put(:states, %{}) |> Map.put(:checkpoint_states, %{}) + end + + defp prune_checkpoint_states(store, slot) do + update_in(store.checkpoint_states, fn checkpoint_states -> + Map.reject(checkpoint_states, fn {_checkpoint, state} -> state.slot < slot end) + end) + end + + defp prune_states(store, slot) do + update_in(store.states, fn states -> + Map.reject(states, fn {_root, %StateInfo{beacon_state: state}} -> state.slot < slot end) + end) + end + @spec get_safe_beacon_block_root(t()) :: Types.root() defp get_safe_beacon_block_root(%__MODULE__{} = store) do store.finalized_checkpoint.root @@ -170,4 +247,25 @@ defmodule Types.Store do %{slot: head_slot} = Blocks.get_block!(head_root) %{store | head_root: head_root, head_slot: head_slot} end + + defp compute_checkpoint_state(store, checkpoint) do + target_slot = Misc.compute_start_slot_at_epoch(checkpoint.epoch) + + case get_state(store, checkpoint.root) do + nil -> + {store, nil} + + %StateInfo{beacon_state: state} -> + if state.slot < target_slot do + # The only way this can fail is if state.slot < target_slot, which is false by + # construction. + {:ok, new_state} = StateTransition.process_slots(state, target_slot) + + {update_in(store.checkpoint_states, fn s -> Map.put(s, checkpoint, new_state) end), + new_state} + else + {store, state} + end + end + end end diff --git a/test/spec/runner_behaviour.ex b/test/spec/runner_behaviour.ex index f959ca929..654fa3d53 100644 --- a/test/spec/runner_behaviour.ex +++ b/test/spec/runner_behaviour.ex @@ -11,6 +11,9 @@ defmodule TestRunner do @behaviour TestRunner def skip?(_testcase), do: false defoverridable skip?: 1 + + def setup(), do: :ok + defoverridable setup: 0 end end @@ -24,4 +27,9 @@ defmodule TestRunner do if the test case passes. To ignore test cases use `skip?/1`. """ @callback run_test_case(testcase :: SpecTestCase.t()) :: any + + @doc """ + Called in an ExUnit.setup manner, before each test case, to perform setup specific to the runner. + """ + @callback setup() :: :ok end diff --git a/test/spec/runners/fork_choice.ex b/test/spec/runners/fork_choice.ex index 2d6983629..9a60e38c4 100644 --- a/test/spec/runners/fork_choice.ex +++ b/test/spec/runners/fork_choice.ex @@ -16,6 +16,13 @@ defmodule ForkChoiceTestRunner do alias Types.SignedBeaconBlock alias Types.Store + @impl TestRunner + def setup() do + # Start this supervisor, necessary for post state tasks. + start_link_supervised!({Task.Supervisor, name: StoreStatesSupervisor}) + :ok + end + @impl TestRunner def skip?(%SpecTestCase{fork: "capella"}), do: false def skip?(%SpecTestCase{fork: "deneb"}), do: false diff --git a/test/spec/runners/sync.ex b/test/spec/runners/sync.ex index d656dee52..9e25ed781 100644 --- a/test/spec/runners/sync.ex +++ b/test/spec/runners/sync.ex @@ -12,6 +12,13 @@ defmodule SyncTestRunner do # "from_syncing_to_invalid" ] + @impl TestRunner + def setup() do + # Start this supervisor, necessary for post state tasks. + start_link_supervised!({Task.Supervisor, name: StoreStatesSupervisor}) + :ok + end + @impl TestRunner def skip?(%SpecTestCase{fork: "capella"} = testcase) do Enum.member?(@disabled_cases, testcase.case) diff --git a/test/spec/tasks/generate_spec_tests.ex b/test/spec/tasks/generate_spec_tests.ex index f20d5ffce..5224be3c5 100644 --- a/test/spec/tasks/generate_spec_tests.ex +++ b/test/spec/tasks/generate_spec_tests.ex @@ -80,6 +80,7 @@ defmodule Mix.Tasks.GenerateSpecTests do start_link_supervised!(LambdaEthereumConsensus.Store.Blocks) start_link_supervised!(LambdaEthereumConsensus.Store.BlockStates) + #{runner_module}.setup() :ok end """ diff --git a/test/unit/beacon_api/beacon_api_v1_test.exs b/test/unit/beacon_api/beacon_api_v1_test.exs index 91a4ec522..e3b5d9a40 100644 --- a/test/unit/beacon_api/beacon_api_v1_test.exs +++ b/test/unit/beacon_api/beacon_api_v1_test.exs @@ -10,6 +10,7 @@ defmodule Unit.BeaconApiTest.V1 do alias LambdaEthereumConsensus.Store.Db alias LambdaEthereumConsensus.Store.StoreDb alias Types.BlockInfo + alias Types.Store @moduletag :beacon_api_case @moduletag :tmp_dir @@ -159,7 +160,7 @@ defmodule Unit.BeaconApiTest.V1 do alias LambdaEthereumConsensus.P2P.Metadata patch(ForkChoice, :get_fork_version, fn -> ChainSpec.get("DENEB_FORK_VERSION") end) - start_link_supervised!({Libp2pPort, genesis_time: 42}) + start_link_supervised!({Libp2pPort, genesis_time: 42, store: %Store{}}) Metadata.init() identity = Libp2pPort.get_node_identity() metadata = Metadata.get_metadata() diff --git a/test/unit/libp2p_port_test.exs b/test/unit/libp2p_port_test.exs index 82fb5bc52..e039f1c5e 100644 --- a/test/unit/libp2p_port_test.exs +++ b/test/unit/libp2p_port_test.exs @@ -7,6 +7,7 @@ defmodule Unit.Libp2pPortTest do alias LambdaEthereumConsensus.P2P.Gossip.Handler alias LambdaEthereumConsensus.P2P.Metadata alias LambdaEthereumConsensus.P2P.ReqResp + alias Types.Store doctest Libp2pPort @@ -17,7 +18,8 @@ defmodule Unit.Libp2pPortTest do end defp start_port(name \\ Libp2pPort, init_args \\ []) do - start_link_supervised!({Libp2pPort, [opts: [name: name], genesis_time: 42] ++ init_args}, + start_link_supervised!( + {Libp2pPort, [opts: [name: name], store: %Store{}, genesis_time: 42] ++ init_args}, id: name ) end @@ -102,7 +104,7 @@ defmodule Unit.Libp2pPortTest do end @behaviour Handler - def handle_gossip_message(topic, msg_id, message) do + def handle_gossip_message(_store, topic, msg_id, message) do # Decode the PID from the message and send a notification. send(:erlang.binary_to_term(message), {:gossipsub, {topic, msg_id, message}}) end diff --git a/test/unit/p2p/requests_test.exs b/test/unit/p2p/requests_test.exs deleted file mode 100644 index 9e7274add..000000000 --- a/test/unit/p2p/requests_test.exs +++ /dev/null @@ -1,37 +0,0 @@ -defmodule Unit.P2p.RequestsTest do - use ExUnit.Case - - alias LambdaEthereumConsensus.P2p.Requests - - test "An empty requests object shouldn't handle a request" do - requests = Requests.new() - - assert {:unhandled, requests} == - Requests.handle_response(requests, "some response", "fake id") - end - - test "A requests object should handler a request only once" do - requests = Requests.new() - pid = self() - - {requests_2, handler_id} = - Requests.add_response_handler( - requests, - fn response -> send(pid, response) end - ) - - {:ok, requests_3} = Requests.handle_response(requests_2, "some response", handler_id) - - response = - receive do - response -> response - end - - assert response == "some response" - - assert requests_3 == requests - - assert {:unhandled, requests_3} == - Requests.handle_response(requests, "some response", handler_id) - end -end