Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow importing of historical blobs via HTTP API #6656

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 83 additions & 7 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::fork_versioned_response;
use beacon_chain::{
attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome,
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
BeaconChainTypes, WhenSlotSkipped,
attestation_verification::VerifiedAttestation, blob_verification::verify_kzg_for_blob_list,
observed_operations::ObservationOutcome, validator_monitor::timestamp_now,
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
WhenSlotSkipped,
};
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
pub use block_id::BlockId;
Expand All @@ -62,7 +63,7 @@ pub use publish_blocks::{
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use ssz::{Decode, Encode};
pub use state_id::StateId;
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
Expand All @@ -82,9 +83,9 @@ use tokio_stream::{
};
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName,
ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
AttesterSlashing, BeaconStateError, BlobSidecarList, CommitteeCache, ConfigAndPreset, Epoch,
EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
};
Expand Down Expand Up @@ -4395,6 +4396,79 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// POST lighthouse/database/import_blobs
let post_lighthouse_database_import_blobs = database_path
.and(warp::path("import_blobs"))
.and(warp::path::end())
.and(warp::query::<api_types::ImportBlobsQuery>())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|query: api_types::ImportBlobsQuery,
blob_lists: Vec<BlobSidecarList<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
if query.verify {
for blob_list in &blob_lists {
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
Ok(()) => (),
Err(e) => {
return Err(warp_utils::reject::custom_server_error(format!(
"{e:?}"
)))
}
}
}
}

match chain.store.import_blobs_batch(blob_lists) {
Ok(()) => Ok(()),
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
}
})
},
);

// POST lighthouse/database/import_blobs_ssz
let post_lighthouse_database_import_blobs_ssz = database_path
.and(warp::path("import_blobs_ssz"))
.and(warp::path::end())
.and(warp::query::<api_types::ImportBlobsQuery>())
.and(warp::body::bytes())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|query: api_types::ImportBlobsQuery,
body: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let blob_lists = Vec::<BlobSidecarList<T::EthSpec>>::from_ssz_bytes(&body)
.map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?;

if query.verify {
for blob_list in &blob_lists {
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
Ok(()) => (),
Err(e) => {
return Err(warp_utils::reject::custom_server_error(format!(
"{e:?}"
)))
}
}
}
}

match chain.store.import_blobs_batch(blob_lists) {
Ok(()) => Ok(()),
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
}
})
},
);

// GET lighthouse/analysis/block_rewards
let get_lighthouse_block_rewards = warp::path("lighthouse")
.and(warp::path("analysis"))
Expand Down Expand Up @@ -4752,6 +4826,8 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_validator_liveness_epoch)
.uor(post_lighthouse_liveness)
.uor(post_lighthouse_database_reconstruct)
.uor(post_lighthouse_database_import_blobs)
.uor(post_lighthouse_database_import_blobs_ssz)
.uor(post_lighthouse_block_rewards)
.uor(post_lighthouse_ui_validator_metrics)
.uor(post_lighthouse_ui_validator_info)
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub enum Error {
AddPayloadLogicError,
InvalidKey,
InvalidBytes,
InvalidBlobImport(String),
InconsistentFork(InconsistentFork),
Hdiff(hdiff::Error),
CacheBuildError(EpochCacheError),
Expand Down
77 changes: 77 additions & 0 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,83 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}

/// Import a batch of blobs.
/// Implements the following checks:
/// - Checks that `block_root` is consistent across each `BlobSidecarList`.
/// - Checks that `block_root` exists in the database.
/// - Checks if a `BlobSidecarList` is already stored for that `block_root`.
/// If it is, ensure it matches the `BlobSidecarList` we are attempting to store.
pub fn import_blobs_batch(
&self,
historical_blobs: Vec<BlobSidecarList<E>>,
) -> Result<(), Error> {
if historical_blobs.is_empty() {
return Ok(());
}

let mut total_imported = 0;

let mut ops = vec![];

for blob_list in historical_blobs {
// Ensure all block_roots in the blob list are the same.
let block_root = {
let first_block_root = blob_list[0].block_root();
if !blob_list
.iter()
.all(|blob| blob.block_root() == first_block_root)
{
return Err(Error::InvalidBlobImport(
"Inconsistent block roots".to_string(),
));
}
first_block_root
};

// Verify block exists.
if !self.block_exists(&block_root)? {
warn!(
self.log,
"Aborting blob import; block root does not exist.";
"block_root" => ?block_root,
"num_blobs" => blob_list.len(),
);
return Err(Error::InvalidBlobImport("Missing block root".to_string()));
}

// Check if a blob_list is already stored for this block root.
if let Some(existing_blob_list) = self.get_blobs(&block_root)? {
if existing_blob_list == blob_list {
debug!(
self.log,
"Skipping blob import as identical blob exists";
"block_root" => ?block_root,
"num_blobs" => blob_list.len(),
);
continue;
}

return Err(Error::InvalidBlobImport(format!(
"Conflicting blobs exist for block root {:?}",
block_root
)));
}

self.blobs_as_kv_store_ops(&block_root, blob_list.clone(), &mut ops);
total_imported += blob_list.len();
}

self.blobs_db.do_atomically(ops)?;

debug!(
self.log,
"Imported historical blobs.";
"total_imported" => total_imported,
);

Ok(())
}

pub fn blobs_as_kv_store_ops(
&self,
key: &Hash256,
Expand Down
17 changes: 17 additions & 0 deletions common/eth2/src/lighthouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
},
BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot,
};
use bytes::Bytes;
use proto_array::core::ProtoArray;
use serde::{Deserialize, Serialize};
use ssz::four_byte_option_impl;
Expand Down Expand Up @@ -528,6 +529,22 @@ impl BeaconNodeHttpClient {
self.post_with_response(path, &()).await
}

/// `POST lighthouse/database/import_blobs_ssz`
pub async fn post_lighthouse_database_import_blobs_ssz(
&self,
blobs: Bytes,
) -> Result<String, Error> {
let mut path = self.server.full.clone();

path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("lighthouse")
.push("database")
.push("import_blobs_ssz");

self.post_with_response(path, &blobs).await
}

/*
Analysis endpoints.
*/
Expand Down
5 changes: 5 additions & 0 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,11 @@ pub struct LightClientUpdateResponseChunk {
pub payload: Vec<u8>,
}

#[derive(Clone, Serialize, Deserialize)]
pub struct ImportBlobsQuery {
pub verify: bool,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub struct BeaconCommitteeSubscription {
#[serde(with = "serde_utils::quoted_u64")]
Expand Down
Loading