Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip parallel pw handling #12641

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use itertools::Itertools;
use lru::LruCache;
Expand Down Expand Up @@ -65,7 +65,7 @@ pub struct PartialWitnessActor {
epoch_manager: Arc<dyn EpochManagerAdapter>,
runtime: Arc<dyn RuntimeAdapter>,
/// Tracks the parts of the state witness sent from chunk producers to chunk validators.
partial_witness_tracker: PartialEncodedStateWitnessTracker,
partial_witness_tracker: Arc<Mutex<PartialEncodedStateWitnessTracker>>,
partial_deploys_tracker: PartialEncodedContractDeploysTracker,
/// Tracks a collection of state witnesses sent from chunk producers to chunk validators.
state_witness_tracker: ChunkStateWitnessTracker,
Expand All @@ -75,6 +75,7 @@ pub struct PartialWitnessActor {
/// Same as above for contract deploys.
contract_deploys_encoders: ReedSolomonEncoderCache,
compile_contracts_spawner: Arc<dyn AsyncComputationSpawner>,
partial_witness_spawner: Arc<dyn AsyncComputationSpawner>,
/// AccountId in the key corresponds to the requester (chunk validator).
processed_contract_code_requests: LruCache<(ChunkProductionKey, AccountId), ()>,
}
Expand Down Expand Up @@ -166,9 +167,12 @@ impl PartialWitnessActor {
epoch_manager: Arc<dyn EpochManagerAdapter>,
runtime: Arc<dyn RuntimeAdapter>,
compile_contracts_spawner: Arc<dyn AsyncComputationSpawner>,
partial_witness_spawner: Arc<dyn AsyncComputationSpawner>,
) -> Self {
let partial_witness_tracker =
PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone());
let partial_witness_tracker = Arc::new(Mutex::new(PartialEncodedStateWitnessTracker::new(
client_sender,
epoch_manager.clone(),
)));
Self {
network_adapter,
my_signer,
Expand All @@ -182,6 +186,7 @@ impl PartialWitnessActor {
CONTRACT_DEPLOYS_RATIO_DATA_PARTS,
),
compile_contracts_spawner,
partial_witness_spawner,
processed_contract_code_requests: LruCache::new(
NonZeroUsize::new(PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE).unwrap(),
),
Expand Down Expand Up @@ -368,7 +373,7 @@ impl PartialWitnessActor {
}

/// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part.
fn forward_state_witness_part(
fn _forward_state_witness_part(
&self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
Expand Down Expand Up @@ -403,17 +408,60 @@ impl PartialWitnessActor {
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage");

let signer = self.my_validator_signer()?;
// Validate the partial encoded state witness and forward the part to all the chunk validators.
if validate_partial_encoded_state_witness(
self.epoch_manager.as_ref(),
&partial_witness,
&signer,
self.runtime.store(),
)? {
self.forward_state_witness_part(partial_witness)?;
}
let epoch_manager = self.epoch_manager.clone();
let runtime_adapter = self.runtime.clone();

let ChunkProductionKey { shard_id, epoch_id, height_created } =
partial_witness.chunk_production_key();

let chunk_producer = self
.epoch_manager
.get_chunk_producer_info(&ChunkProductionKey { epoch_id, height_created, shard_id })?
.take_account_id();

// Forward witness part to chunk validators except the validator that produced the chunk and witness.
let target_chunk_validators = self
.epoch_manager
.get_chunk_validator_assignments(&epoch_id, shard_id, height_created)?
.ordered_chunk_validators()
.into_iter()
.filter(|validator| validator != &chunk_producer)
.collect();

let network_adapter = self.network_adapter.clone();

self.partial_witness_spawner.spawn("handle_partial_encoded_state_witness", move || {
// Validate the partial encoded state witness and forward the part to all the chunk validators.
let validation = validate_partial_encoded_state_witness(
epoch_manager.as_ref(),
&partial_witness,
&signer,
runtime_adapter.store(),
);
match validation {
Ok(true) => {
forward_state_witness_part_v2(
partial_witness,
target_chunk_validators,
network_adapter,
);
}
Ok(false) => {
tracing::warn!(
target: "client",
"Received partial encoded state witness that is not valid"
);
}
Err(err) => {
tracing::warn!(
target: "client",
"Encountered error during validation: {}",
err
);
}
}
});

Ok(())
}
Expand All @@ -426,15 +474,42 @@ impl PartialWitnessActor {
tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessForwardMessage");

let signer = self.my_validator_signer()?;
// Validate the partial encoded state witness and store the partial encoded state witness.
if validate_partial_encoded_state_witness(
self.epoch_manager.as_ref(),
&partial_witness,
&signer,
self.runtime.store(),
)? {
self.partial_witness_tracker.store_partial_encoded_state_witness(partial_witness)?;
}
let partial_witness_tracker = self.partial_witness_tracker.clone();
let epoch_manager = self.epoch_manager.clone();
let runtime_adapter = self.runtime.clone();
self.partial_witness_spawner.spawn(
"handle_partial_encoded_state_witness_forward",
move || {
// Validate the partial encoded state witness and store the partial encoded state witness.
let validation = validate_partial_encoded_state_witness(
epoch_manager.as_ref(),
&partial_witness,
&signer,
runtime_adapter.store(),
);
match validation {
Ok(true) => {
let mut partial_witness_tracker = partial_witness_tracker.lock().unwrap();
partial_witness_tracker
.store_partial_encoded_state_witness(partial_witness)
.unwrap();
}
Ok(false) => {
tracing::warn!(
target: "client",
"Received partial encoded state witness that is not valid"
);
}
Err(err) => {
tracing::warn!(
target: "client",
"Encountered error during validation: {}",
err
);
}
}
},
);

Ok(())
}
Expand Down Expand Up @@ -576,8 +651,11 @@ impl PartialWitnessActor {
if missing_contract_hashes.is_empty() {
return Ok(());
}
self.partial_witness_tracker
.store_accessed_contract_hashes(key.clone(), missing_contract_hashes.clone())?;
{
let mut partial_witness_tracker = self.partial_witness_tracker.lock().unwrap();
partial_witness_tracker
.store_accessed_contract_hashes(key.clone(), missing_contract_hashes.clone())?;
}
let random_chunk_producer = {
let mut chunk_producers = self
.epoch_manager
Expand All @@ -598,7 +676,7 @@ impl PartialWitnessActor {

/// Sends the contract accesses to the same chunk validators
/// (except for the chunk producers that track the same shard),
/// which will receive the state witness for the new chunk.
/// which will receive the state witness for the new chunk.
fn send_contract_accesses_to_chunk_validators(
&self,
key: ChunkProductionKey,
Expand Down Expand Up @@ -745,7 +823,8 @@ impl PartialWitnessActor {
) -> Result<(), Error> {
let key = response.chunk_production_key().clone();
let contracts = response.decompress_contracts()?;
self.partial_witness_tracker.store_accessed_contract_codes(key, contracts)
let mut partial_witness_tracker = self.partial_witness_tracker.lock().unwrap();
partial_witness_tracker.store_accessed_contract_codes(key, contracts)
}

fn my_validator_signer(&self) -> Result<Arc<ValidatorSigner>, Error> {
Expand Down Expand Up @@ -799,3 +878,17 @@ fn contracts_cache_contains_contract(
let cache_key = get_contract_cache_key(contract_hash.0, &runtime_config.wasm_config);
cache.memory_cache().contains(cache_key) || cache.has(&cache_key).is_ok_and(|has| has)
}

/// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part.
fn forward_state_witness_part_v2(
partial_witness: PartialEncodedStateWitness,
target_chunk_validators: Vec<AccountId>,
network_adapter: PeerManagerAdapter,
) {
network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(
target_chunk_validators,
partial_witness,
),
));
}
1 change: 1 addition & 0 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ pub fn setup(
epoch_manager.clone(),
runtime.clone(),
Arc::new(RayonAsyncComputationSpawner),
Arc::new(RayonAsyncComputationSpawner),
));
let partial_witness_adapter = partial_witness_addr.with_auto_span_context();

Expand Down
1 change: 1 addition & 0 deletions integration-tests/src/test_loop/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ impl TestLoopBuilder {
epoch_manager.clone(),
runtime_adapter.clone(),
Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))),
Arc::new(self.test_loop.async_computation_spawner(|_| Duration::milliseconds(80))),
);

let gc_actor = GCActor::new(
Expand Down
1 change: 1 addition & 0 deletions integration-tests/src/tests/network/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ fn setup_network_node(
epoch_manager,
runtime,
Arc::new(RayonAsyncComputationSpawner),
Arc::new(RayonAsyncComputationSpawner),
));
shards_manager_adapter.bind(shards_manager_actor.with_auto_span_context());
let peer_manager = PeerManagerActor::spawn(
Expand Down
1 change: 1 addition & 0 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ pub fn start_with_config_and_synchronization(
epoch_manager.clone(),
runtime.clone(),
Arc::new(RayonAsyncComputationSpawner),
Arc::new(RayonAsyncComputationSpawner),
));

let (_gc_actor, gc_arbiter) = spawn_actix_actor(GCActor::new(
Expand Down
Loading