Skip to content

Commit

Permalink
fix(state-parts-dump-check): check the right sync hash (#12975)
Browse files Browse the repository at this point in the history
After the current epoch state sync feature is enabled, getting the state
roots from the last block of the previous epoch no longer works. So in
this PR we find the right sync hash if that protocol feature is enabled
by iterating over the chunks in the new epoch until we find the right
point.
  • Loading branch information
marcelo-gonzalez authored and VanBarbascu committed Mar 3, 2025
1 parent e3d2d63 commit 3ed7e99
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/state-parts-dump-check/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ near-jsonrpc.workspace = true
near-primitives-core.workspace = true
near-o11y.workspace = true
reqwest.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
[features]
Expand Down
119 changes: 102 additions & 17 deletions tools/state-parts-dump-check/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ use near_client::sync::external::{
create_bucket_readonly, external_storage_location, external_storage_location_directory,
get_num_parts_from_filename, ExternalConnection, StateFileType,
};
use near_jsonrpc::client::{new_client, JsonRpcClient};
use near_jsonrpc::client::{JsonRpcClient, new_client};
use near_jsonrpc::primitives::errors::RpcErrorKind;
use near_jsonrpc::primitives::types::config::RpcProtocolConfigRequest;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::state_part::PartId;
use near_primitives::state_sync::ShardStateSyncResponseHeader;
use near_primitives::types::{
BlockId, BlockReference, EpochId, EpochReference, Finality, ShardId, StateRoot,
BlockHeight, BlockId, BlockReference, EpochId, EpochReference, Finality, ShardId, StateRoot,
};
use near_primitives::views::BlockView;
use near_primitives::version::ProtocolFeature;
use near_primitives::views::{BlockView, ChunkHeaderView};
use near_store::Trie;
use nearcore::state_sync::extract_part_id_from_part_file_name;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -210,7 +212,7 @@ struct DumpCheckIterInfo {
epoch_id: EpochId,
epoch_height: u64,
shard_layout: ShardLayout,
state_roots: Vec<CryptoHash>,
state_roots: HashMap<ShardId, CryptoHash>,
}

fn create_external_connection(
Expand Down Expand Up @@ -301,9 +303,12 @@ fn run_loop_all_shards(
sleep(Duration::from_secs(loop_interval));
continue;
}
let dump_check_iter_info = dump_check_iter_info_res?;
let Some(dump_check_iter_info) = dump_check_iter_info_res? else {
tracing::info!("sync_hash not yet known. sleeping for {loop_interval}s.");
sleep(Duration::from_secs(loop_interval));
continue;
};
for shard_info in dump_check_iter_info.shard_layout.shard_infos() {
let shard_index = shard_info.shard_index();
let shard_id = shard_info.shard_id();
tracing::info!(?shard_id, "started check");
let dump_check_iter_info = dump_check_iter_info.clone();
Expand Down Expand Up @@ -394,7 +399,7 @@ fn run_loop_all_shards(
dump_check_iter_info.epoch_id,
dump_check_iter_info.epoch_height,
shard_id,
dump_check_iter_info.state_roots[shard_index],
*dump_check_iter_info.state_roots.get(&shard_id).unwrap(),
root_dir,
s3_bucket,
s3_region,
Expand Down Expand Up @@ -856,10 +861,78 @@ async fn process_header(
Ok(())
}

fn chunk_state_roots(chunks: &[ChunkHeaderView]) -> HashMap<ShardId, CryptoHash> {
chunks.iter().map(|chunk| (chunk.shard_id, chunk.prev_state_root)).collect()
}

async fn get_prev_epoch_state_roots(
rpc_client: &JsonRpcClient,
epoch_id: CryptoHash,
) -> anyhow::Result<HashMap<ShardId, CryptoHash>> {
let prev_epoch_last_block_response =
get_previous_epoch_last_block_response(rpc_client, epoch_id).await?;
Ok(chunk_state_roots(&prev_epoch_last_block_response.chunks))
}

async fn get_current_epoch_state_roots(
rpc_client: &JsonRpcClient,
epoch_id: CryptoHash,
head_height: BlockHeight,
shard_layout: &ShardLayout,
) -> anyhow::Result<Option<HashMap<ShardId, CryptoHash>>> {
let current_epoch_response = rpc_client
.validators(Some(EpochReference::EpochId(EpochId(epoch_id))))
.await
.or_else(|_| Err(anyhow!("validators_by_epoch_id for current_epoch_id failed")))?;

// Currently we just have to iterate over them to find the prev_hash of the sync hash, since the rpc client
// doesn't provide an API to find the sync hash
let mut num_new_chunks: HashMap<_, _> = shard_layout.shard_ids().map(|s| (s, 0)).collect();

for height in current_epoch_response.epoch_start_height + 1..=head_height {
// Since head_height was gotten with Finality::Final, we know any of these are on the canonical chain
match rpc_client.block_by_id(BlockId::Height(height)).await {
Ok(block) => {
for chunk in block.chunks.iter() {
if chunk.height_included == height {
let Some(n) = num_new_chunks.get_mut(&chunk.shard_id) else {
anyhow::bail!(
"bad shard ID {} in chunks for #{}",
chunk.shard_id,
height
);
};
*n += 1;
}
}
if num_new_chunks.iter().all(|(_shard_id, new_chunks)| *new_chunks >= 2) {
return Ok(Some(chunk_state_roots(&block.chunks)));
}
}
Err(e) => {
if let Some(RpcErrorKind::HandlerError(serde_json::Value::Object(err))) =
&e.error_struct
{
if let Some(serde_json::Value::String(name)) = err.get("name") {
if name.as_str() == "UNKNOWN_BLOCK" {
continue;
}
}
}
anyhow::bail!("block_by_id failed for height {}: {:?}", height, e);
}
}
}
Ok(None)
}

// get epoch information of the latest epoch that's complete
// TODO: this function fetches all the state roots with RPC calls even if we already checked that epoch.
// Should just check if the epoch ID is new before doing that. The logic in the main loop can end up calling
// this a huge number of times for no reason in very quick succession.
async fn get_processing_epoch_information(
rpc_client: &JsonRpcClient,
) -> anyhow::Result<DumpCheckIterInfo> {
) -> anyhow::Result<Option<DumpCheckIterInfo>> {
let block_reference = BlockReference::Finality(Finality::Final);
let latest_block_response = rpc_client
.block(block_reference)
Expand All @@ -879,19 +952,31 @@ async fn get_processing_epoch_information(
.or_else(|err| Err(anyhow!("validators_by_epoch_id for latest_epoch_id failed: {err}")))?;

let latest_epoch_height = latest_epoch_response.epoch_height;
let prev_epoch_last_block_response =
get_previous_epoch_last_block_response(rpc_client, latest_epoch_id).await?;
let mut chunks = prev_epoch_last_block_response.chunks;
chunks.sort_by(|c1, c2| c1.shard_id.cmp(&c2.shard_id));
let prev_epoch_state_roots: Vec<CryptoHash> =
chunks.iter().map(|chunk| chunk.prev_state_root).collect();

Ok(DumpCheckIterInfo {
let state_roots = if ProtocolFeature::CurrentEpochStateSync
.enabled(protocol_config.config_view.protocol_version)
{
let Some(roots) = get_current_epoch_state_roots(
rpc_client,
latest_epoch_id,
latest_block_response.header.height,
&protocol_config.config_view.shard_layout,
)
.await?
else {
return Ok(None);
};
roots
} else {
get_prev_epoch_state_roots(rpc_client, latest_epoch_id).await?
};

Ok(Some(DumpCheckIterInfo {
epoch_id: EpochId(latest_epoch_id),
epoch_height: latest_epoch_height,
shard_layout: protocol_config.config_view.shard_layout,
state_roots: prev_epoch_state_roots,
})
state_roots,
}))
}

async fn get_previous_epoch_last_block_response(
Expand Down

0 comments on commit 3ed7e99

Please sign in to comment.