From 5ede9014e6ffd6d932b613e3f8b0045ab59f7b31 Mon Sep 17 00:00:00 2001 From: Mac L Date: Thu, 5 Dec 2024 00:14:39 +1100 Subject: [PATCH 1/2] First try --- beacon_node/http_api/src/lib.rs | 19 +++++++++++ beacon_node/store/src/errors.rs | 1 + beacon_node/store/src/hot_cold_store.rs | 44 +++++++++++++++++++++++++ 3 files changed, 64 insertions(+) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index fe05f55a01a..5a4c88554bb 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4395,6 +4395,24 @@ pub fn serve( }, ); + // POST lighthouse/database/import_blobs + let post_lighthouse_database_import_blobs = database_path + .and(warp::path("import_blobs")) + .and(warp::path::end()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |blobs, task_spawner: TaskSpawner, chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + match chain.store.import_historical_blobs(blobs) { + Ok(()) => Ok(()), + Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), + } + }) + }, + ); + // GET lighthouse/analysis/block_rewards let get_lighthouse_block_rewards = warp::path("lighthouse") .and(warp::path("analysis")) @@ -4752,6 +4770,7 @@ pub fn serve( .uor(post_validator_liveness_epoch) .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) + .uor(post_lighthouse_database_import_blobs) .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 6bb4edee6b2..130a90f26c7 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -59,6 +59,7 @@ pub enum Error { AddPayloadLogicError, InvalidKey, InvalidBytes, + InvalidBlobImport(String), InconsistentFork(InconsistentFork), Hdiff(hdiff::Error), CacheBuildError(EpochCacheError), diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 4942b148810..001cd5de3e9 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -43,6 +43,8 @@ use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidec use types::*; use zstd::{Decoder, Encoder}; +const HISTORICAL_BLOB_BATCH_SIZE: usize = 1000; + /// On-disk database that stores finalized states efficiently. /// /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores @@ -866,6 +868,48 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + /// Import historical blobs. + pub fn import_historical_blobs( + &self, + historical_blobs: Vec<(Hash256, BlobSidecarList)>, + ) -> Result<(), Error> { + if historical_blobs.is_empty() { + return Ok(()); + } + + let mut total_imported = 0; + + for chunk in historical_blobs.chunks(HISTORICAL_BLOB_BATCH_SIZE) { + let mut ops = Vec::with_capacity(chunk.len()); + + for (block_root, blobs) in chunk { + // Verify block exists. + if !self.block_exists(block_root)? { + warn!( + self.log, + "Skipping import of blobs; block root does not exist."; + "block_root" => ?block_root, + "num_blobs" => blobs.len(), + ); + continue; + } + + self.blobs_as_kv_store_ops(block_root, blobs.clone(), &mut ops); + total_imported += blobs.len(); + } + + self.blobs_db.do_atomically(ops)?; + } + + debug!( + self.log, + "Imported historical blobs."; + "total_imported" => total_imported, + ); + + Ok(()) + } + pub fn blobs_as_kv_store_ops( &self, key: &Hash256, From cda4d2050b656b83595072692aaf50dfd5590af2 Mon Sep 17 00:00:00 2001 From: Mac L Date: Fri, 6 Dec 2024 17:48:52 +1100 Subject: [PATCH 2/2] Additional checks and support SSZ --- beacon_node/http_api/src/lib.rs | 75 ++++++++++++++++++++++--- beacon_node/store/src/hot_cold_store.rs | 65 +++++++++++++++------ common/eth2/src/lighthouse.rs | 17 ++++++ common/eth2/src/types.rs | 5 ++ 4 files changed, 137 insertions(+), 25 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 5a4c88554bb..0ab0d22a7db 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -35,9 +35,10 @@ use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3}; use crate::version::fork_versioned_response; use beacon_chain::{ - attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome, - validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, - BeaconChainTypes, WhenSlotSkipped, + attestation_verification::VerifiedAttestation, blob_verification::verify_kzg_for_blob_list, + observed_operations::ObservationOutcome, validator_monitor::timestamp_now, + AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, + WhenSlotSkipped, }; use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend}; pub use block_id::BlockId; @@ -62,7 +63,7 @@ pub use publish_blocks::{ use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, warn, Logger}; use slot_clock::SlotClock; -use ssz::Encode; +use ssz::{Decode, Encode}; pub use state_id::StateId; use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -82,9 +83,9 @@ use tokio_stream::{ }; use types::{ fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, - AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, - ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, - SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, + AttesterSlashing, BeaconStateError, BlobSidecarList, CommitteeCache, ConfigAndPreset, Epoch, + EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, + RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, }; @@ -4399,13 +4400,68 @@ pub fn serve( let post_lighthouse_database_import_blobs = database_path .and(warp::path("import_blobs")) .and(warp::path::end()) + .and(warp::query::()) .and(warp_utils::json::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( - |blobs, task_spawner: TaskSpawner, chain: Arc>| { + |query: api_types::ImportBlobsQuery, + blob_lists: Vec>, + task_spawner: TaskSpawner, + chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { - match chain.store.import_historical_blobs(blobs) { + if query.verify { + for blob_list in &blob_lists { + match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { + Ok(()) => (), + Err(e) => { + return Err(warp_utils::reject::custom_server_error(format!( + "{e:?}" + ))) + } + } + } + } + + match chain.store.import_blobs_batch(blob_lists) { + Ok(()) => Ok(()), + Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), + } + }) + }, + ); + + // POST lighthouse/database/import_blobs_ssz + let post_lighthouse_database_import_blobs_ssz = database_path + .and(warp::path("import_blobs_ssz")) + .and(warp::path::end()) + .and(warp::query::()) + .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |query: api_types::ImportBlobsQuery, + body: Bytes, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let blob_lists = Vec::>::from_ssz_bytes(&body) + .map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?; + + if query.verify { + for blob_list in &blob_lists { + match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { + Ok(()) => (), + Err(e) => { + return Err(warp_utils::reject::custom_server_error(format!( + "{e:?}" + ))) + } + } + } + } + + match chain.store.import_blobs_batch(blob_lists) { Ok(()) => Ok(()), Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), } @@ -4771,6 +4827,7 @@ pub fn serve( .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) .uor(post_lighthouse_database_import_blobs) + .uor(post_lighthouse_database_import_blobs_ssz) .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 001cd5de3e9..fe8836f43db 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -43,8 +43,6 @@ use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidec use types::*; use zstd::{Decoder, Encoder}; -const HISTORICAL_BLOB_BATCH_SIZE: usize = 1000; - /// On-disk database that stores finalized states efficiently. /// /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores @@ -868,10 +866,15 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - /// Import historical blobs. - pub fn import_historical_blobs( + /// Import a batch of blobs. + /// Implements the following checks: + /// - Checks that `block_root` is consistent across each `BlobSidecarList`. + /// - Checks that `block_root` exists in the database. + /// - Checks if a `BlobSidecarList` is already stored for that `block_root`. + /// If it is, ensure it matches the `BlobSidecarList` we are attempting to store. + pub fn import_blobs_batch( &self, - historical_blobs: Vec<(Hash256, BlobSidecarList)>, + historical_blobs: Vec>, ) -> Result<(), Error> { if historical_blobs.is_empty() { return Ok(()); @@ -879,28 +882,58 @@ impl, Cold: ItemStore> HotColdDB let mut total_imported = 0; - for chunk in historical_blobs.chunks(HISTORICAL_BLOB_BATCH_SIZE) { - let mut ops = Vec::with_capacity(chunk.len()); + let mut ops = vec![]; - for (block_root, blobs) in chunk { - // Verify block exists. - if !self.block_exists(block_root)? { - warn!( + for blob_list in historical_blobs { + // Ensure all block_roots in the blob list are the same. + let block_root = { + let first_block_root = blob_list[0].block_root(); + if !blob_list + .iter() + .all(|blob| blob.block_root() == first_block_root) + { + return Err(Error::InvalidBlobImport( + "Inconsistent block roots".to_string(), + )); + } + first_block_root + }; + + // Verify block exists. + if !self.block_exists(&block_root)? { + warn!( + self.log, + "Aborting blob import; block root does not exist."; + "block_root" => ?block_root, + "num_blobs" => blob_list.len(), + ); + return Err(Error::InvalidBlobImport("Missing block root".to_string())); + } + + // Check if a blob_list is already stored for this block root. + if let Some(existing_blob_list) = self.get_blobs(&block_root)? { + if existing_blob_list == blob_list { + debug!( self.log, - "Skipping import of blobs; block root does not exist."; + "Skipping blob import as identical blob exists"; "block_root" => ?block_root, - "num_blobs" => blobs.len(), + "num_blobs" => blob_list.len(), ); continue; } - self.blobs_as_kv_store_ops(block_root, blobs.clone(), &mut ops); - total_imported += blobs.len(); + return Err(Error::InvalidBlobImport(format!( + "Conflicting blobs exist for block root {:?}", + block_root + ))); } - self.blobs_db.do_atomically(ops)?; + self.blobs_as_kv_store_ops(&block_root, blob_list.clone(), &mut ops); + total_imported += blob_list.len(); } + self.blobs_db.do_atomically(ops)?; + debug!( self.log, "Imported historical blobs."; diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 66dd5d779bd..a1bfe86d909 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -13,6 +13,7 @@ use crate::{ }, BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot, }; +use bytes::Bytes; use proto_array::core::ProtoArray; use serde::{Deserialize, Serialize}; use ssz::four_byte_option_impl; @@ -528,6 +529,22 @@ impl BeaconNodeHttpClient { self.post_with_response(path, &()).await } + /// `POST lighthouse/database/import_blobs_ssz` + pub async fn post_lighthouse_database_import_blobs_ssz( + &self, + blobs: Bytes, + ) -> Result { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("database") + .push("import_blobs_ssz"); + + self.post_with_response(path, &blobs).await + } + /* Analysis endpoints. */ diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index c187399ebd7..39847a95fa7 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -803,6 +803,11 @@ pub struct LightClientUpdateResponseChunk { pub payload: Vec, } +#[derive(Clone, Serialize, Deserialize)] +pub struct ImportBlobsQuery { + pub verify: bool, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] pub struct BeaconCommitteeSubscription { #[serde(with = "serde_utils::quoted_u64")]