diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index fe05f55a01a..0ab0d22a7db 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -35,9 +35,10 @@ use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3}; use crate::version::fork_versioned_response; use beacon_chain::{ - attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome, - validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError, - BeaconChainTypes, WhenSlotSkipped, + attestation_verification::VerifiedAttestation, blob_verification::verify_kzg_for_blob_list, + observed_operations::ObservationOutcome, validator_monitor::timestamp_now, + AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes, + WhenSlotSkipped, }; use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend}; pub use block_id::BlockId; @@ -62,7 +63,7 @@ pub use publish_blocks::{ use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, warn, Logger}; use slot_clock::SlotClock; -use ssz::Encode; +use ssz::{Decode, Encode}; pub use state_id::StateId; use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -82,9 +83,9 @@ use tokio_stream::{ }; use types::{ fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, - AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, - ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch, - SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, + AttesterSlashing, BeaconStateError, BlobSidecarList, CommitteeCache, ConfigAndPreset, Epoch, + EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, + RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, }; @@ -4395,6 +4396,79 @@ pub fn serve( }, ); + // POST lighthouse/database/import_blobs + let post_lighthouse_database_import_blobs = database_path + .and(warp::path("import_blobs")) + .and(warp::path::end()) + .and(warp::query::()) + .and(warp_utils::json::json()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |query: api_types::ImportBlobsQuery, + blob_lists: Vec>, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + if query.verify { + for blob_list in &blob_lists { + match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { + Ok(()) => (), + Err(e) => { + return Err(warp_utils::reject::custom_server_error(format!( + "{e:?}" + ))) + } + } + } + } + + match chain.store.import_blobs_batch(blob_lists) { + Ok(()) => Ok(()), + Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), + } + }) + }, + ); + + // POST lighthouse/database/import_blobs_ssz + let post_lighthouse_database_import_blobs_ssz = database_path + .and(warp::path("import_blobs_ssz")) + .and(warp::path::end()) + .and(warp::query::()) + .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |query: api_types::ImportBlobsQuery, + body: Bytes, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.blocking_json_task(Priority::P1, move || { + let blob_lists = Vec::>::from_ssz_bytes(&body) + .map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?; + + if query.verify { + for blob_list in &blob_lists { + match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) { + Ok(()) => (), + Err(e) => { + return Err(warp_utils::reject::custom_server_error(format!( + "{e:?}" + ))) + } + } + } + } + + match chain.store.import_blobs_batch(blob_lists) { + Ok(()) => Ok(()), + Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))), + } + }) + }, + ); + // GET lighthouse/analysis/block_rewards let get_lighthouse_block_rewards = warp::path("lighthouse") .and(warp::path("analysis")) @@ -4752,6 +4826,8 @@ pub fn serve( .uor(post_validator_liveness_epoch) .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) + .uor(post_lighthouse_database_import_blobs) + .uor(post_lighthouse_database_import_blobs_ssz) .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 6bb4edee6b2..130a90f26c7 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -59,6 +59,7 @@ pub enum Error { AddPayloadLogicError, InvalidKey, InvalidBytes, + InvalidBlobImport(String), InconsistentFork(InconsistentFork), Hdiff(hdiff::Error), CacheBuildError(EpochCacheError), diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 4942b148810..fe8836f43db 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -866,6 +866,83 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + /// Import a batch of blobs. + /// Implements the following checks: + /// - Checks that `block_root` is consistent across each `BlobSidecarList`. + /// - Checks that `block_root` exists in the database. + /// - Checks if a `BlobSidecarList` is already stored for that `block_root`. + /// If it is, ensure it matches the `BlobSidecarList` we are attempting to store. + pub fn import_blobs_batch( + &self, + historical_blobs: Vec>, + ) -> Result<(), Error> { + if historical_blobs.is_empty() { + return Ok(()); + } + + let mut total_imported = 0; + + let mut ops = vec![]; + + for blob_list in historical_blobs { + // Ensure all block_roots in the blob list are the same. + let block_root = { + let first_block_root = blob_list[0].block_root(); + if !blob_list + .iter() + .all(|blob| blob.block_root() == first_block_root) + { + return Err(Error::InvalidBlobImport( + "Inconsistent block roots".to_string(), + )); + } + first_block_root + }; + + // Verify block exists. + if !self.block_exists(&block_root)? { + warn!( + self.log, + "Aborting blob import; block root does not exist."; + "block_root" => ?block_root, + "num_blobs" => blob_list.len(), + ); + return Err(Error::InvalidBlobImport("Missing block root".to_string())); + } + + // Check if a blob_list is already stored for this block root. + if let Some(existing_blob_list) = self.get_blobs(&block_root)? { + if existing_blob_list == blob_list { + debug!( + self.log, + "Skipping blob import as identical blob exists"; + "block_root" => ?block_root, + "num_blobs" => blob_list.len(), + ); + continue; + } + + return Err(Error::InvalidBlobImport(format!( + "Conflicting blobs exist for block root {:?}", + block_root + ))); + } + + self.blobs_as_kv_store_ops(&block_root, blob_list.clone(), &mut ops); + total_imported += blob_list.len(); + } + + self.blobs_db.do_atomically(ops)?; + + debug!( + self.log, + "Imported historical blobs."; + "total_imported" => total_imported, + ); + + Ok(()) + } + pub fn blobs_as_kv_store_ops( &self, key: &Hash256, diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 66dd5d779bd..a1bfe86d909 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -13,6 +13,7 @@ use crate::{ }, BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot, }; +use bytes::Bytes; use proto_array::core::ProtoArray; use serde::{Deserialize, Serialize}; use ssz::four_byte_option_impl; @@ -528,6 +529,22 @@ impl BeaconNodeHttpClient { self.post_with_response(path, &()).await } + /// `POST lighthouse/database/import_blobs_ssz` + pub async fn post_lighthouse_database_import_blobs_ssz( + &self, + blobs: Bytes, + ) -> Result { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("lighthouse") + .push("database") + .push("import_blobs_ssz"); + + self.post_with_response(path, &blobs).await + } + /* Analysis endpoints. */ diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index c187399ebd7..39847a95fa7 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -803,6 +803,11 @@ pub struct LightClientUpdateResponseChunk { pub payload: Vec, } +#[derive(Clone, Serialize, Deserialize)] +pub struct ImportBlobsQuery { + pub verify: bool, +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] pub struct BeaconCommitteeSubscription { #[serde(with = "serde_utils::quoted_u64")]