Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(state-parts-dump-check): check the right sync hash (#12975) #13028

Merged
merged 1 commit into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/state-parts-dump-check/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ near-jsonrpc.workspace = true
near-primitives-core.workspace = true
near-o11y.workspace = true
reqwest.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
[features]
Expand Down
117 changes: 101 additions & 16 deletions tools/state-parts-dump-check/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
get_num_parts_from_filename, ExternalConnection, StateFileType,
};
use near_jsonrpc::client::{new_client, JsonRpcClient};
use near_jsonrpc::primitives::errors::RpcErrorKind;
use near_jsonrpc::primitives::types::config::RpcProtocolConfigRequest;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::state_part::PartId;
use near_primitives::state_sync::ShardStateSyncResponseHeader;
use near_primitives::types::{
BlockId, BlockReference, EpochId, EpochReference, Finality, ShardId, StateRoot,
BlockHeight, BlockId, BlockReference, EpochId, EpochReference, Finality, ShardId, StateRoot,
};
use near_primitives::views::BlockView;
use near_primitives::version::ProtocolFeature;
use near_primitives::views::{BlockView, ChunkHeaderView};
use near_store::Trie;
use nearcore::state_sync::extract_part_id_from_part_file_name;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -210,7 +212,7 @@
epoch_id: EpochId,
epoch_height: u64,
shard_layout: ShardLayout,
state_roots: Vec<CryptoHash>,
state_roots: HashMap<ShardId, CryptoHash>,
}

fn create_external_connection(
Expand Down Expand Up @@ -301,9 +303,12 @@
sleep(Duration::from_secs(loop_interval));
continue;
}
let dump_check_iter_info = dump_check_iter_info_res?;
let Some(dump_check_iter_info) = dump_check_iter_info_res? else {
tracing::info!("sync_hash not yet known. sleeping for {loop_interval}s.");
sleep(Duration::from_secs(loop_interval));
continue;
};
for shard_info in dump_check_iter_info.shard_layout.shard_infos() {
let shard_index = shard_info.shard_index();
let shard_id = shard_info.shard_id();
tracing::info!(?shard_id, "started check");
let dump_check_iter_info = dump_check_iter_info.clone();
Expand Down Expand Up @@ -351,7 +356,7 @@
.set(1);
reset_num_parts_metrics(&chain_id, shard_id);
} else {
// this check would be working on the same epoch as last check, so we don't reset the num parts metrics to 0 repeatedlyd

Check warning on line 359 in tools/state-parts-dump-check/src/cli.rs

View workflow job for this annotation

GitHub Actions / spellcheck

Unknown word (repeatedlyd)
tracing::info!(
?epoch_height,
?parts_done,
Expand Down Expand Up @@ -394,7 +399,7 @@
dump_check_iter_info.epoch_id,
dump_check_iter_info.epoch_height,
shard_id,
dump_check_iter_info.state_roots[shard_index],
*dump_check_iter_info.state_roots.get(&shard_id).unwrap(),
root_dir,
s3_bucket,
s3_region,
Expand Down Expand Up @@ -856,10 +861,78 @@
Ok(())
}

fn chunk_state_roots(chunks: &[ChunkHeaderView]) -> HashMap<ShardId, CryptoHash> {
chunks.iter().map(|chunk| (chunk.shard_id, chunk.prev_state_root)).collect()
}

async fn get_prev_epoch_state_roots(
rpc_client: &JsonRpcClient,
epoch_id: CryptoHash,
) -> anyhow::Result<HashMap<ShardId, CryptoHash>> {
let prev_epoch_last_block_response =
get_previous_epoch_last_block_response(rpc_client, epoch_id).await?;
Ok(chunk_state_roots(&prev_epoch_last_block_response.chunks))
}

async fn get_current_epoch_state_roots(
rpc_client: &JsonRpcClient,
epoch_id: CryptoHash,
head_height: BlockHeight,
shard_layout: &ShardLayout,
) -> anyhow::Result<Option<HashMap<ShardId, CryptoHash>>> {
let current_epoch_response = rpc_client
.validators(Some(EpochReference::EpochId(EpochId(epoch_id))))
.await
.or_else(|_| Err(anyhow!("validators_by_epoch_id for current_epoch_id failed")))?;

// Currently we just have to iterate over them to find the prev_hash of the sync hash, since the rpc client
// doesn't provide an API to find the sync hash
let mut num_new_chunks: HashMap<_, _> = shard_layout.shard_ids().map(|s| (s, 0)).collect();

for height in current_epoch_response.epoch_start_height + 1..=head_height {
// Since head_height was gotten with Finality::Final, we know any of these are on the canonical chain
match rpc_client.block_by_id(BlockId::Height(height)).await {
Ok(block) => {
for chunk in block.chunks.iter() {
if chunk.height_included == height {
let Some(n) = num_new_chunks.get_mut(&chunk.shard_id) else {
anyhow::bail!(
"bad shard ID {} in chunks for #{}",
chunk.shard_id,
height
);
};
*n += 1;
}
}
if num_new_chunks.iter().all(|(_shard_id, new_chunks)| *new_chunks >= 2) {
return Ok(Some(chunk_state_roots(&block.chunks)));
}
}
Err(e) => {
if let Some(RpcErrorKind::HandlerError(serde_json::Value::Object(err))) =
&e.error_struct
{
if let Some(serde_json::Value::String(name)) = err.get("name") {
if name.as_str() == "UNKNOWN_BLOCK" {
continue;
}
}
}
anyhow::bail!("block_by_id failed for height {}: {:?}", height, e);
}
}
}
Ok(None)
}

// get epoch information of the latest epoch that's complete
// TODO: this function fetches all the state roots with RPC calls even if we already checked that epoch.
// Should just check if the epoch ID is new before doing that. The logic in the main loop can end up calling
// this a huge number of times for no reason in very quick succession.
async fn get_processing_epoch_information(
rpc_client: &JsonRpcClient,
) -> anyhow::Result<DumpCheckIterInfo> {
) -> anyhow::Result<Option<DumpCheckIterInfo>> {
let block_reference = BlockReference::Finality(Finality::Final);
let latest_block_response = rpc_client
.block(block_reference)
Expand All @@ -879,19 +952,31 @@
.or_else(|err| Err(anyhow!("validators_by_epoch_id for latest_epoch_id failed: {err}")))?;

let latest_epoch_height = latest_epoch_response.epoch_height;
let prev_epoch_last_block_response =
get_previous_epoch_last_block_response(rpc_client, latest_epoch_id).await?;
let mut chunks = prev_epoch_last_block_response.chunks;
chunks.sort_by(|c1, c2| c1.shard_id.cmp(&c2.shard_id));
let prev_epoch_state_roots: Vec<CryptoHash> =
chunks.iter().map(|chunk| chunk.prev_state_root).collect();

Ok(DumpCheckIterInfo {
let state_roots = if ProtocolFeature::CurrentEpochStateSync
.enabled(protocol_config.config_view.protocol_version)
{
let Some(roots) = get_current_epoch_state_roots(
rpc_client,
latest_epoch_id,
latest_block_response.header.height,
&protocol_config.config_view.shard_layout,
)
.await?
else {
return Ok(None);
};
roots
} else {
get_prev_epoch_state_roots(rpc_client, latest_epoch_id).await?
};

Ok(Some(DumpCheckIterInfo {
epoch_id: EpochId(latest_epoch_id),
epoch_height: latest_epoch_height,
shard_layout: protocol_config.config_view.shard_layout,
state_roots: prev_epoch_state_roots,
})
state_roots,
}))
}

async fn get_previous_epoch_last_block_response(
Expand Down
Loading