Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

forknet big witness parallel #12652

Draft
wants to merge 23 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions chain/chain/src/stateless_validation/chunk_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ fn get_state_witness_block_range(
last_chunk_shard_id: ShardId,
}

let initial_prev_hash = *state_witness.chunk_header.prev_block_hash();
let initial_prev_hash = *state_witness.inner.chunk_header.prev_block_hash();
let initial_prev_block = store.get_block(&initial_prev_hash)?;
let initial_shard_layout =
epoch_manager.get_shard_layout_from_prev_block(&initial_prev_hash)?;
let initial_shard_id = state_witness.chunk_header.shard_id();
let initial_shard_id = state_witness.inner.chunk_header.shard_id();
// Check that shard id is present in current epoch.
// TODO: consider more proper way to validate this.
let _ = initial_shard_layout.get_shard_index(initial_shard_id)?;
Expand Down Expand Up @@ -327,8 +327,8 @@ pub fn pre_validate_chunk_state_witness(

// Ensure that the chunk header version is supported in this protocol version
let protocol_version =
epoch_manager.get_epoch_info(&state_witness.epoch_id)?.protocol_version();
state_witness.chunk_header.validate_version(protocol_version)?;
epoch_manager.get_epoch_info(&state_witness.inner.epoch_id)?.protocol_version();
state_witness.inner.chunk_header.validate_version(protocol_version)?;

// First, go back through the blockchain history to locate the last new chunk
// and last last new chunk for the shard.
Expand All @@ -342,19 +342,19 @@ pub fn pre_validate_chunk_state_witness(

let receipts_to_apply = validate_source_receipt_proofs(
epoch_manager,
&state_witness.source_receipt_proofs,
&state_witness.inner.source_receipt_proofs,
&blocks_after_last_last_chunk,
last_chunk_shard_layout,
last_chunk_shard_id,
)?;
let applied_receipts_hash = hash(&borsh::to_vec(receipts_to_apply.as_slice()).unwrap());
if applied_receipts_hash != state_witness.applied_receipts_hash {
if applied_receipts_hash != state_witness.inner.applied_receipts_hash {
return Err(Error::InvalidChunkStateWitness(format!(
"Receipts hash {:?} does not match expected receipts hash {:?}",
applied_receipts_hash, state_witness.applied_receipts_hash
applied_receipts_hash, state_witness.inner.applied_receipts_hash
)));
}
let (tx_root_from_state_witness, _) = merklize(&state_witness.transactions);
let (tx_root_from_state_witness, _) = merklize(&state_witness.inner.transactions);
let last_chunk_block = blocks_after_last_last_chunk.first().ok_or_else(|| {
Error::Other("blocks_after_last_last_chunk is empty, this should be impossible!".into())
})?;
Expand All @@ -368,15 +368,15 @@ pub fn pre_validate_chunk_state_witness(
}

let current_protocol_version =
epoch_manager.get_epoch_protocol_version(&state_witness.epoch_id)?;
epoch_manager.get_epoch_protocol_version(&state_witness.inner.epoch_id)?;
if !checked_feature!(
"protocol_feature_relaxed_chunk_validation",
RelaxedChunkValidation,
current_protocol_version
) {
let new_transactions = &state_witness.new_transactions;
let new_transactions = &state_witness.inner.new_transactions;
let (new_tx_root_from_state_witness, _) = merklize(&new_transactions);
let chunk_tx_root = state_witness.chunk_header.tx_root();
let chunk_tx_root = state_witness.inner.chunk_header.tx_root();
if new_tx_root_from_state_witness != chunk_tx_root {
return Err(Error::InvalidChunkStateWitness(format!(
"Witness new transactions root {:?} does not match chunk {:?}",
Expand All @@ -386,21 +386,21 @@ pub fn pre_validate_chunk_state_witness(
// Verify that all proposed transactions are valid.
if !new_transactions.is_empty() {
let transactions_validation_storage_config = RuntimeStorageConfig {
state_root: state_witness.chunk_header.prev_state_root(),
state_root: state_witness.inner.chunk_header.prev_state_root(),
use_flat_storage: true,
source: StorageDataSource::Recorded(PartialStorage {
nodes: state_witness.new_transactions_validation_state.clone(),
nodes: state_witness.inner.new_transactions_validation_state.clone(),
}),
state_patch: Default::default(),
};

match validate_prepared_transactions(
chain,
runtime_adapter,
&state_witness.chunk_header,
&state_witness.inner.chunk_header,
transactions_validation_storage_config,
&new_transactions,
&state_witness.transactions,
&state_witness.inner.transactions,
) {
Ok(result) => {
if result.transactions.len() != new_transactions.len() {
Expand Down Expand Up @@ -444,7 +444,7 @@ pub fn pre_validate_chunk_state_witness(
} else {
MainTransition::NewChunk(NewChunkData {
chunk_header: last_chunk_block.chunks().get(last_chunk_shard_index).unwrap().clone(),
transactions: state_witness.transactions.clone(),
transactions: state_witness.inner.transactions.clone(),
receipts: receipts_to_apply,
block: Chain::get_apply_chunk_block_context(
epoch_manager,
Expand All @@ -455,7 +455,7 @@ pub fn pre_validate_chunk_state_witness(
is_first_block_with_chunk_of_version: false,
storage_context: StorageContext {
storage_data_source: StorageDataSource::Recorded(PartialStorage {
nodes: state_witness.main_state_transition.base_state.clone(),
nodes: state_witness.inner.main_state_transition.base_state.clone(),
}),
state_patch: Default::default(),
},
Expand Down Expand Up @@ -590,13 +590,13 @@ pub fn validate_chunk_state_witness(
main_state_transition_cache: &MainStateTransitionCache,
) -> Result<(), Error> {
let _timer = crate::stateless_validation::metrics::CHUNK_STATE_WITNESS_VALIDATION_TIME
.with_label_values(&[&state_witness.chunk_header.shard_id().to_string()])
.with_label_values(&[&state_witness.inner.chunk_header.shard_id().to_string()])
.start_timer();
let span = tracing::debug_span!(target: "client", "validate_chunk_state_witness").entered();
let witness_shard_layout = epoch_manager.get_shard_layout(&state_witness.epoch_id)?;
let witness_chunk_shard_id = state_witness.chunk_header.shard_id();
let witness_shard_layout = epoch_manager.get_shard_layout(&state_witness.inner.epoch_id)?;
let witness_chunk_shard_id = state_witness.inner.chunk_header.shard_id();
let witness_chunk_shard_uid =
epoch_manager.shard_id_to_uid(witness_chunk_shard_id, &state_witness.epoch_id)?;
epoch_manager.shard_id_to_uid(witness_chunk_shard_id, &state_witness.inner.epoch_id)?;
let block_hash = pre_validation_output.main_transition_params.block_hash();
let epoch_id = epoch_manager.get_epoch_id(&block_hash)?;
let shard_id = pre_validation_output.main_transition_params.shard_id();
Expand Down Expand Up @@ -628,14 +628,14 @@ pub fn validate_chunk_state_witness(
}
(_, Some(result)) => (result.chunk_extra, result.outgoing_receipts),
};
if chunk_extra.state_root() != &state_witness.main_state_transition.post_state_root {
if chunk_extra.state_root() != &state_witness.inner.main_state_transition.post_state_root {
// This is an early check, it's not for correctness, only for better
// error reporting in case of an invalid state witness due to a bug.
// Only the final state root check against the chunk header is required.
return Err(Error::InvalidChunkStateWitness(format!(
"Post state root {:?} for main transition does not match expected post state root {:?}",
chunk_extra.state_root(),
state_witness.main_state_transition.post_state_root,
state_witness.inner.main_state_transition.post_state_root,
)));
}

Expand All @@ -648,7 +648,7 @@ pub fn validate_chunk_state_witness(
&mut outgoing_receipts,
protocol_version,
&witness_shard_layout,
state_witness.chunk_header.shard_id(),
state_witness.inner.chunk_header.shard_id(),
shard_id,
)?;
}
Expand All @@ -670,19 +670,19 @@ pub fn validate_chunk_state_witness(
}

if pre_validation_output.implicit_transition_params.len()
!= state_witness.implicit_transitions.len()
!= state_witness.inner.implicit_transitions.len()
{
return Err(Error::InvalidChunkStateWitness(format!(
"Implicit transitions count mismatch. Expected {}, found {}",
pre_validation_output.implicit_transition_params.len(),
state_witness.implicit_transitions.len(),
state_witness.inner.implicit_transitions.len(),
)));
}

for (implicit_transition_params, transition) in pre_validation_output
.implicit_transition_params
.into_iter()
.zip(state_witness.implicit_transitions.into_iter())
.zip(state_witness.inner.implicit_transitions.into_iter())
{
let (shard_uid, new_state_root) = match implicit_transition_params {
ImplicitTransitionParams::ApplyOldChunk(block, shard_uid) => {
Expand Down Expand Up @@ -738,7 +738,7 @@ pub fn validate_chunk_state_witness(
let (outgoing_receipts_root, _) = merklize(&outgoing_receipts_hashes);
validate_chunk_with_chunk_extra_and_receipts_root(
&chunk_extra,
&state_witness.chunk_header,
&state_witness.inner.chunk_header,
&outgoing_receipts_root,
)?;

Expand Down Expand Up @@ -772,9 +772,9 @@ impl Chain {
runtime_adapter: &dyn RuntimeAdapter,
processing_done_tracker: Option<ProcessingDoneTracker>,
) -> Result<(), Error> {
let shard_id = witness.chunk_header.shard_id();
let height_created = witness.chunk_header.height_created();
let chunk_hash = witness.chunk_header.chunk_hash();
let shard_id = witness.inner.chunk_header.shard_id();
let height_created = witness.inner.chunk_header.height_created();
let chunk_hash = witness.inner.chunk_header.chunk_hash();
let parent_span = tracing::debug_span!(
target: "chain", "shadow_validate", ?shard_id, height_created);
let (encoded_witness, raw_witness_size) = {
Expand Down
10 changes: 5 additions & 5 deletions chain/chain/src/stateless_validation/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ fn record_witness_size_metrics_fallible(
encoded_size: usize,
witness: &ChunkStateWitness,
) -> Result<(), std::io::Error> {
let shard_id = witness.chunk_header.shard_id().to_string();
let shard_id = witness.inner.chunk_header.shard_id().to_string();
CHUNK_STATE_WITNESS_RAW_SIZE
.with_label_values(&[shard_id.as_str()])
.observe(decoded_size as f64);
Expand All @@ -188,16 +188,16 @@ fn record_witness_size_metrics_fallible(
.observe(encoded_size as f64);
CHUNK_STATE_WITNESS_MAIN_STATE_TRANSISTION_SIZE
.with_label_values(&[shard_id.as_str()])
.observe(borsh::to_vec(&witness.main_state_transition)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.main_state_transition)?.len() as f64);
CHUNK_STATE_WITNESS_NEW_TRANSACTIONS_SIZE
.with_label_values(&[&shard_id.as_str()])
.observe(borsh::to_vec(&witness.new_transactions)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.new_transactions)?.len() as f64);
CHUNK_STATE_WITNESS_NEW_TRANSACTIONS_STATE_SIZE
.with_label_values(&[&shard_id.as_str()])
.observe(borsh::to_vec(&witness.new_transactions_validation_state)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.new_transactions_validation_state)?.len() as f64);
CHUNK_STATE_WITNESS_SOURCE_RECEIPT_PROOFS_SIZE
.with_label_values(&[&shard_id.as_str()])
.observe(borsh::to_vec(&witness.source_receipt_proofs)?.len() as f64);
.observe(borsh::to_vec(&witness.inner.source_receipt_proofs)?.len() as f64);
Ok(())
}

Expand Down
12 changes: 6 additions & 6 deletions chain/chain/src/store/latest_witnesses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ impl ChainStore {
let _span = tracing::info_span!(
target: "client",
"save_latest_chunk_state_witness",
witness_height = witness.chunk_header.height_created(),
witness_shard = ?witness.chunk_header.shard_id(),
witness_height = witness.inner.chunk_header.height_created(),
witness_shard = ?witness.inner.chunk_header.shard_id(),
)
.entered();

Expand Down Expand Up @@ -172,9 +172,9 @@ impl ChainStore {
let mut random_uuid = [0u8; 16];
OsRng.fill_bytes(&mut random_uuid);
let key = LatestWitnessesKey {
height: witness.chunk_header.height_created(),
shard_id: witness.chunk_header.shard_id().into(),
epoch_id: witness.epoch_id,
height: witness.inner.chunk_header.height_created(),
shard_id: witness.inner.chunk_header.shard_id().into(),
epoch_id: witness.inner.epoch_id,
witness_size: serialized_witness_size,
random_uuid,
};
Expand All @@ -195,7 +195,7 @@ impl ChainStore {

let store_commit_time = start_time.elapsed().saturating_sub(store_update_time);

let shard_id_str = witness.chunk_header.shard_id().to_string();
let shard_id_str = witness.inner.chunk_header.shard_id().to_string();
stateless_validation::metrics::SAVE_LATEST_WITNESS_GENERATE_UPDATE_TIME
.with_label_values(&[shard_id_str.as_str()])
.observe(store_update_time.as_secs_f64());
Expand Down
10 changes: 10 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,16 @@ pub(crate) static PARTIAL_WITNESS_ENCODE_TIME: LazyLock<HistogramVec> = LazyLock
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_DECODE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_decode_time",
"State witness decoding time from the partial state witness parts in seconds",
&["shard_id"],
Some(linear_buckets(0.0, 0.005, 20).unwrap()),
)
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_TIME_TO_LAST_PART: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_time_to_last_part",
Expand Down
24 changes: 12 additions & 12 deletions chain/client/src/stateless_validation/chunk_validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ impl ChunkValidator {
processing_done_tracker: Option<ProcessingDoneTracker>,
signer: &Arc<ValidatorSigner>,
) -> Result<(), Error> {
let prev_block_hash = state_witness.chunk_header.prev_block_hash();
let shard_id = state_witness.chunk_header.shard_id();
let prev_block_hash = state_witness.inner.chunk_header.prev_block_hash();
let shard_id = state_witness.inner.chunk_header.shard_id();
let epoch_id = self.epoch_manager.get_epoch_id_from_prev_block(prev_block_hash)?;
if epoch_id != state_witness.epoch_id {
if epoch_id != state_witness.inner.epoch_id {
return Err(Error::InvalidChunkStateWitness(format!(
"Invalid EpochId {:?} for previous block {}, expected {:?}",
state_witness.epoch_id, prev_block_hash, epoch_id
state_witness.inner.epoch_id, prev_block_hash, epoch_id
)));
}

Expand All @@ -92,7 +92,7 @@ impl ChunkValidator {
self.runtime_adapter.as_ref(),
)?;

let chunk_header = state_witness.chunk_header.clone();
let chunk_header = state_witness.inner.chunk_header.clone();
let network_sender = self.network_sender.clone();
let epoch_manager = self.epoch_manager.clone();

Expand Down Expand Up @@ -230,8 +230,8 @@ impl Client {
) -> Result<(), Error> {
tracing::debug!(
target: "client",
chunk_hash=?witness.chunk_header.chunk_hash(),
shard_id=?witness.chunk_header.shard_id(),
chunk_hash=?witness.inner.chunk_header.chunk_hash(),
shard_id=?witness.inner.chunk_header.shard_id(),
"process_chunk_state_witness",
);

Expand All @@ -252,7 +252,7 @@ impl Client {
self.chain.chain_store.save_latest_chunk_state_witness(&witness)?;
}

match self.chain.get_block(witness.chunk_header.prev_block_hash()) {
match self.chain.get_block(witness.inner.chunk_header.prev_block_hash()) {
Ok(block) => self.process_chunk_state_witness_with_prev_block(
witness,
&block,
Expand All @@ -273,7 +273,7 @@ impl Client {
// produced the witness. However some tests bypass PartialWitnessActor, thus when a chunk producer
// receives its own state witness, we log a warning instead of panicking.
// TODO: Make sure all tests run with "test_features" and panic for non-test builds.
if signer.validator_id() == &witness.chunk_producer {
if signer.validator_id() == &witness.inner.chunk_producer {
tracing::warn!(
"Validator {:?} received state witness from itself. Witness={:?}",
signer.validator_id(),
Expand All @@ -283,7 +283,7 @@ impl Client {
}
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::ChunkStateWitnessAck(
witness.chunk_producer.clone(),
witness.inner.chunk_producer.clone(),
ChunkStateWitnessAck::new(witness),
),
));
Expand All @@ -296,10 +296,10 @@ impl Client {
processing_done_tracker: Option<ProcessingDoneTracker>,
signer: &Arc<ValidatorSigner>,
) -> Result<(), Error> {
if witness.chunk_header.prev_block_hash() != prev_block.hash() {
if witness.inner.chunk_header.prev_block_hash() != prev_block.hash() {
return Err(Error::Other(format!(
"process_chunk_state_witness_with_prev_block - prev_block doesn't match ({} != {})",
witness.chunk_header.prev_block_hash(),
witness.inner.chunk_header.prev_block_hash(),
prev_block.hash()
)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl Client {
witness: ChunkStateWitness,
witness_size: usize,
) -> Result<HandleOrphanWitnessOutcome, Error> {
let chunk_header = &witness.chunk_header;
let chunk_header = &witness.inner.chunk_header;
let witness_height = chunk_header.height_created();
let witness_shard = chunk_header.shard_id();

Expand Down Expand Up @@ -83,7 +83,7 @@ impl Client {
.orphan_witness_pool
.take_state_witnesses_waiting_for_block(new_block.hash());
for witness in ready_witnesses {
let header = &witness.chunk_header;
let header = &witness.inner.chunk_header;
tracing::debug!(
target: "client",
witness_height = header.height_created(),
Expand Down
Loading
Loading