Skip to content

Commit

Permalink
perf: use the Store struct to save states and checkpoint states in me…
Browse files Browse the repository at this point in the history
…mory (#1253)
  • Loading branch information
Arkenan authored Aug 16, 2024
1 parent f0f8111 commit 84ee168
Show file tree
Hide file tree
Showing 26 changed files with 440 additions and 340 deletions.
15 changes: 9 additions & 6 deletions lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,31 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
@impl true
def init(_) do
store = StoreSetup.setup!()
deposit_tree_snapshot = StoreSetup.get_deposit_snapshot!()

LambdaEthereumConsensus.P2P.Metadata.init()

Cache.initialize_cache()

time = :os.system_time(:second)

ForkChoice.init_store(store, time)

init_execution_chain(deposit_tree_snapshot, store.head_root)
store = ForkChoice.init_store(store, time)

validators = Validator.Setup.init(store.head_slot, store.head_root)

libp2p_args = [genesis_time: store.genesis_time, validators: validators] ++ get_libp2p_args()
StoreSetup.get_deposit_snapshot!()
|> init_execution_chain(store.head_root)

libp2p_args =
[genesis_time: store.genesis_time, validators: validators, store: store] ++
get_libp2p_args()

children =
[
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
{Task.Supervisor, name: PruneStatesSupervisor},
{Task.Supervisor, name: PruneBlocksSupervisor},
{Task.Supervisor, name: PruneBlobsSupervisor}
{Task.Supervisor, name: PruneBlobsSupervisor},
{Task.Supervisor, name: StoreStatesSupervisor}
]

Supervisor.init(children, strategy: :one_for_all)
Expand Down
92 changes: 51 additions & 41 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
require Logger

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.P2P.BlockDownloader

alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.P2P.BlobDownloader
alias LambdaEthereumConsensus.P2P.BlockDownloader
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blocks
alias Types.BlockInfo
alias Types.SignedBeaconBlock
alias Types.Store

@type block_status :: :pending | :invalid | :download | :download_blobs | :unknown
@type block_info ::
Expand All @@ -36,8 +36,8 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
If blobs are missing, they will be requested.
"""
@spec add_block(SignedBeaconBlock.t()) :: :ok
def add_block(signed_block) do
@spec add_block(Store.t(), SignedBeaconBlock.t()) :: Store.t()
def add_block(store, signed_block) do
block_info = BlockInfo.from_block(signed_block)
loaded_block = Blocks.get_block_info(block_info.root)

Expand All @@ -47,14 +47,18 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do

if Enum.empty?(missing_blobs) do
Blocks.new_block_info(block_info)
process_block_and_check_children(block_info)
process_block_and_check_children(store, block_info)
else
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, @download_retries)
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/2, @download_retries)

block_info
|> BlockInfo.change_status(:download_blobs)
|> Blocks.new_block_info()

store
end
else
store
end
end

Expand All @@ -63,17 +67,22 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
module after receiving a new block, but there are some other cases like at node startup, as there
may be pending blocks from prior executions.
"""
def process_blocks() do
def process_blocks(store) do
case Blocks.get_blocks_with_status(:pending) do
{:ok, blocks} ->
blocks
|> Enum.sort_by(fn %BlockInfo{} = block_info -> block_info.signed_block.message.slot end)
|> Enum.each(&process_block/1)
|> Enum.reduce(store, fn block_info, store ->
{store, _state} = process_block(store, block_info)
store
end)

{:error, reason} ->
Logger.error(
"[Pending Blocks] Failed to get pending blocks to process. Reason: #{reason}"
)

store
end
end

Expand All @@ -85,13 +94,14 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
# is called to check if there's any children that can now be processed. This function
# is only to be called when a new block is saved as pending, not when processing blocks
# in batch, to avoid unneeded recursion.
defp process_block_and_check_children(block_info) do
if process_block(block_info) in [:transitioned, :invalid] do
process_blocks()
defp process_block_and_check_children(store, block_info) do
case process_block(store, block_info) do
{store, result} when result in [:transitioned, :invalid] -> process_blocks(store)
{store, _other} -> store
end
end

defp process_block(block_info) do
defp process_block(store, block_info) do
if block_info.status != :pending do
Logger.error("Called process block for a block that's not ready: #{block_info}")
end
Expand All @@ -105,9 +115,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do

BlockDownloader.request_blocks_by_root(
[parent_root],
fn result ->
process_downloaded_block(result)
end,
&process_downloaded_block/2,
@download_retries
)

Expand All @@ -116,65 +124,67 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
block_info.root
)

:download_pending
{store, :download_pending}

%BlockInfo{status: :invalid} ->
Blocks.change_status(block_info, :invalid)
:invalid
{store, :invalid}

%BlockInfo{status: :transitioned} ->
case ForkChoice.on_block(block_info) do
:ok ->
case ForkChoice.on_block(store, block_info) do
{:ok, store} ->
Blocks.change_status(block_info, :transitioned)
:transitioned
{store, :transitioned}

{:error, reason} ->
{:error, reason, store} ->
Logger.error("[PendingBlocks] Saving block as invalid #{reason}",
slot: block_info.signed_block.message.slot,
root: block_info.root
)

Blocks.change_status(block_info, :invalid)
:invalid
{store, :invalid}
end

_other ->
:ok
{store, :ok}
end
end

defp process_downloaded_block({:ok, [block]}) do
add_block(block)
defp process_downloaded_block(store, {:ok, [block]}) do
{:ok, add_block(store, block)}
end

defp process_downloaded_block({:error, reason}) do
Logger.error("Error downloading block: #{inspect(reason)}")

defp process_downloaded_block(store, {:error, reason}) do
# We might want to declare a block invalid here.
Logger.error("Error downloading block: #{inspect(reason)}")
{:ok, store}
end

defp process_blobs({:ok, blobs}), do: add_blobs(blobs)

defp process_blobs({:error, reason}) do
Logger.error("Error downloading blobs: #{inspect(reason)}")
defp process_blobs(store, {:ok, blobs}), do: {:ok, add_blobs(store, blobs)}

defp process_blobs(store, {:error, reason}) do
# We might want to declare a block invalid here.
Logger.error("Error downloading blobs: #{inspect(reason)}")
{:ok, store}
end

def add_blob(store, blob), do: add_blobs(store, [blob])

# To be used when a series of blobs are downloaded. Stores each blob.
# If there are blocks that can be processed, does so immediately.
defp add_blobs(blobs) do
defp add_blobs(store, blobs) do
blobs
|> Enum.map(&BlobDb.store_blob/1)
|> Enum.uniq()
|> Enum.each(fn root ->
with %BlockInfo{} = block_info <- Blocks.get_block_info(root) do
# TODO: add a new missing blobs call if some blobs are still missing for a block.
if Enum.empty?(missing_blobs(block_info)) do
block_info
|> Blocks.change_status(:pending)
|> process_block_and_check_children()
end
|> Enum.reduce(store, fn root, store ->
with %BlockInfo{} = block_info <- Blocks.get_block_info(root),
[] <- missing_blobs(block_info) do
block_info
|> Blocks.change_status(:pending)
|> then(&process_block_and_check_children(store, &1))
else
_ -> store
end
end)
end
Expand Down
8 changes: 5 additions & 3 deletions lib/lambda_ethereum_consensus/beacon/sync_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
BlockDownloader.request_blocks_by_range(
first_slot,
count,
&on_chunk_downloaded/1,
&on_chunk_downloaded/2,
@retries
)

Expand All @@ -61,11 +61,13 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
end
end

defp on_chunk_downloaded({:ok, range, blocks}) do
defp on_chunk_downloaded(store, {:ok, range, blocks}) do
Libp2pPort.notify_blocks_downloaded(range, blocks)
{:ok, store}
end

defp on_chunk_downloaded({:error, range, reason}) do
defp on_chunk_downloaded(store, {:error, range, reason}) do
Libp2pPort.notify_block_download_failed(range, reason)
{:ok, store}
end
end
Loading

0 comments on commit 84ee168

Please sign in to comment.