Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: use the Store struct to save states and checkpoint states in memory #1253

Merged
merged 19 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,31 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
@impl true
def init(_) do
store = StoreSetup.setup!()
deposit_tree_snapshot = StoreSetup.get_deposit_snapshot!()

LambdaEthereumConsensus.P2P.Metadata.init()

Cache.initialize_cache()

time = :os.system_time(:second)

ForkChoice.init_store(store, time)

init_execution_chain(deposit_tree_snapshot, store.head_root)
store = ForkChoice.init_store(store, time)

validators = Validator.Setup.init(store.head_slot, store.head_root)

libp2p_args = [genesis_time: store.genesis_time, validators: validators] ++ get_libp2p_args()
StoreSetup.get_deposit_snapshot!()
|> init_execution_chain(store.head_root)

libp2p_args =
[genesis_time: store.genesis_time, validators: validators, store: store] ++
get_libp2p_args()

children =
[
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
{Task.Supervisor, name: PruneStatesSupervisor},
{Task.Supervisor, name: PruneBlocksSupervisor},
{Task.Supervisor, name: PruneBlobsSupervisor}
{Task.Supervisor, name: PruneBlobsSupervisor},
{Task.Supervisor, name: StoreStatesSupervisor}
]

Supervisor.init(children, strategy: :one_for_all)
Expand Down
92 changes: 51 additions & 41 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
require Logger

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.P2P.BlockDownloader

alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.P2P.BlobDownloader
alias LambdaEthereumConsensus.P2P.BlockDownloader
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blocks
alias Types.BlockInfo
alias Types.SignedBeaconBlock
alias Types.Store

@type block_status :: :pending | :invalid | :download | :download_blobs | :unknown
@type block_info ::
Expand All @@ -36,8 +36,8 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
If blobs are missing, they will be requested.
"""
@spec add_block(SignedBeaconBlock.t()) :: :ok
def add_block(signed_block) do
@spec add_block(Store.t(), SignedBeaconBlock.t()) :: Store.t()
def add_block(store, signed_block) do
block_info = BlockInfo.from_block(signed_block)
loaded_block = Blocks.get_block_info(block_info.root)

Expand All @@ -47,14 +47,18 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do

if Enum.empty?(missing_blobs) do
Blocks.new_block_info(block_info)
process_block_and_check_children(block_info)
process_block_and_check_children(store, block_info)
else
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, @download_retries)
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/2, @download_retries)

block_info
|> BlockInfo.change_status(:download_blobs)
|> Blocks.new_block_info()

store
end
else
store
end
end

Expand All @@ -63,17 +67,22 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
module after receiving a new block, but there are some other cases like at node startup, as there
may be pending blocks from prior executions.
"""
def process_blocks() do
def process_blocks(store) do
case Blocks.get_blocks_with_status(:pending) do
{:ok, blocks} ->
blocks
|> Enum.sort_by(fn %BlockInfo{} = block_info -> block_info.signed_block.message.slot end)
|> Enum.each(&process_block/1)
|> Enum.reduce(store, fn block_info, store ->
{store, _state} = process_block(store, block_info)
store
end)

{:error, reason} ->
Logger.error(
"[Pending Blocks] Failed to get pending blocks to process. Reason: #{reason}"
)

store
end
end

Expand All @@ -85,13 +94,14 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
# is called to check if there's any children that can now be processed. This function
# is only to be called when a new block is saved as pending, not when processing blocks
# in batch, to avoid unneeded recursion.
defp process_block_and_check_children(block_info) do
if process_block(block_info) in [:transitioned, :invalid] do
process_blocks()
defp process_block_and_check_children(store, block_info) do
case process_block(store, block_info) do
{store, result} when result in [:transitioned, :invalid] -> process_blocks(store)
{store, _other} -> store
end
end

defp process_block(block_info) do
defp process_block(store, block_info) do
if block_info.status != :pending do
Logger.error("Called process block for a block that's not ready: #{block_info}")
end
Expand All @@ -105,9 +115,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do

BlockDownloader.request_blocks_by_root(
[parent_root],
fn result ->
process_downloaded_block(result)
end,
&process_downloaded_block/2,
@download_retries
)

Expand All @@ -116,65 +124,67 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
block_info.root
)

:download_pending
{store, :download_pending}

%BlockInfo{status: :invalid} ->
Blocks.change_status(block_info, :invalid)
:invalid
{store, :invalid}

%BlockInfo{status: :transitioned} ->
case ForkChoice.on_block(block_info) do
:ok ->
case ForkChoice.on_block(store, block_info) do
{:ok, store} ->
Blocks.change_status(block_info, :transitioned)
:transitioned
{store, :transitioned}

{:error, reason} ->
{:error, reason, store} ->
Logger.error("[PendingBlocks] Saving block as invalid #{reason}",
slot: block_info.signed_block.message.slot,
root: block_info.root
)

Blocks.change_status(block_info, :invalid)
:invalid
{store, :invalid}
end

_other ->
:ok
{store, :ok}
end
end

defp process_downloaded_block({:ok, [block]}) do
add_block(block)
defp process_downloaded_block(store, {:ok, [block]}) do
{:ok, add_block(store, block)}
end

defp process_downloaded_block({:error, reason}) do
Logger.error("Error downloading block: #{inspect(reason)}")

defp process_downloaded_block(store, {:error, reason}) do
# We might want to declare a block invalid here.
Logger.error("Error downloading block: #{inspect(reason)}")
{:ok, store}
end

defp process_blobs({:ok, blobs}), do: add_blobs(blobs)

defp process_blobs({:error, reason}) do
Logger.error("Error downloading blobs: #{inspect(reason)}")
defp process_blobs(store, {:ok, blobs}), do: {:ok, add_blobs(store, blobs)}

defp process_blobs(store, {:error, reason}) do
# We might want to declare a block invalid here.
Logger.error("Error downloading blobs: #{inspect(reason)}")
{:ok, store}
end

def add_blob(store, blob), do: add_blobs(store, [blob])

# To be used when a series of blobs are downloaded. Stores each blob.
# If there are blocks that can be processed, does so immediately.
defp add_blobs(blobs) do
defp add_blobs(store, blobs) do
blobs
|> Enum.map(&BlobDb.store_blob/1)
|> Enum.uniq()
|> Enum.each(fn root ->
with %BlockInfo{} = block_info <- Blocks.get_block_info(root) do
# TODO: add a new missing blobs call if some blobs are still missing for a block.
if Enum.empty?(missing_blobs(block_info)) do
block_info
|> Blocks.change_status(:pending)
|> process_block_and_check_children()
end
|> Enum.reduce(store, fn root, store ->
with %BlockInfo{} = block_info <- Blocks.get_block_info(root),
[] <- missing_blobs(block_info) do
block_info
|> Blocks.change_status(:pending)
|> then(&process_block_and_check_children(store, &1))
else
_ -> store
end
end)
end
Expand Down
8 changes: 5 additions & 3 deletions lib/lambda_ethereum_consensus/beacon/sync_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
BlockDownloader.request_blocks_by_range(
first_slot,
count,
&on_chunk_downloaded/1,
&on_chunk_downloaded/2,
@retries
)

Expand All @@ -61,11 +61,13 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
end
end

defp on_chunk_downloaded({:ok, range, blocks}) do
defp on_chunk_downloaded(store, {:ok, range, blocks}) do
Libp2pPort.notify_blocks_downloaded(range, blocks)
{:ok, store}
end

defp on_chunk_downloaded({:error, range, reason}) do
defp on_chunk_downloaded(store, {:error, range, reason}) do
Libp2pPort.notify_block_download_failed(range, reason)
{:ok, store}
end
end
Loading
Loading