Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "feat: parallel partial witness handling in the partial witnes… #12741

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ rand_hc = "0.3.1"
rand_xorshift = "0.3"
rayon = "1.5"
redis = "0.23.0"
reed-solomon-erasure = { version = "6.0.0", features = ["simd-accel"] }
reed-solomon-erasure = "6.0.0"
regex = "1.7.1"
region = "3.0"
reqwest = { version = "0.11.14", features = ["blocking"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub struct PartialWitnessActor {
epoch_manager: Arc<dyn EpochManagerAdapter>,
runtime: Arc<dyn RuntimeAdapter>,
/// Tracks the parts of the state witness sent from chunk producers to chunk validators.
partial_witness_tracker: Arc<PartialEncodedStateWitnessTracker>,
partial_witness_tracker: PartialEncodedStateWitnessTracker,
partial_deploys_tracker: PartialEncodedContractDeploysTracker,
/// Tracks a collection of state witnesses sent from chunk producers to chunk validators.
state_witness_tracker: ChunkStateWitnessTracker,
Expand All @@ -75,7 +75,6 @@ pub struct PartialWitnessActor {
/// Same as above for contract deploys.
contract_deploys_encoders: ReedSolomonEncoderCache,
compile_contracts_spawner: Arc<dyn AsyncComputationSpawner>,
partial_witness_spawner: Arc<dyn AsyncComputationSpawner>,
/// AccountId in the key corresponds to the requester (chunk validator).
processed_contract_code_requests: LruCache<(ChunkProductionKey, AccountId), ()>,
}
Expand Down Expand Up @@ -167,10 +166,9 @@ impl PartialWitnessActor {
epoch_manager: Arc<dyn EpochManagerAdapter>,
runtime: Arc<dyn RuntimeAdapter>,
compile_contracts_spawner: Arc<dyn AsyncComputationSpawner>,
partial_witness_spawner: Arc<dyn AsyncComputationSpawner>,
) -> Self {
let partial_witness_tracker =
Arc::new(PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone()));
PartialEncodedStateWitnessTracker::new(client_sender, epoch_manager.clone());
Self {
network_adapter,
my_signer,
Expand All @@ -184,7 +182,6 @@ impl PartialWitnessActor {
CONTRACT_DEPLOYS_RATIO_DATA_PARTS,
),
compile_contracts_spawner,
partial_witness_spawner,
processed_contract_code_requests: LruCache::new(
NonZeroUsize::new(PROCESSED_CONTRACT_CODE_REQUESTS_CACHE_SIZE).unwrap(),
),
Expand Down Expand Up @@ -368,20 +365,13 @@ impl PartialWitnessActor {
));
}

/// Function to handle receiving partial_encoded_state_witness message from chunk producer.
fn handle_partial_encoded_state_witness(
&mut self,
/// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part.
fn forward_state_witness_part(
&self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage");
let signer = self.my_validator_signer()?;
let validator_account_id = signer.validator_id().clone();
let epoch_manager = self.epoch_manager.clone();
let runtime_adapter = self.runtime.clone();

let ChunkProductionKey { shard_id, epoch_id, height_created } =
partial_witness.chunk_production_key();

let chunk_producer = self
.epoch_manager
.get_chunk_producer_info(&ChunkProductionKey { epoch_id, height_created, shard_id })?
Expand All @@ -396,40 +386,32 @@ impl PartialWitnessActor {
.filter(|validator| validator != &chunk_producer)
.collect();

let network_adapter = self.network_adapter.clone();

self.partial_witness_spawner.spawn("handle_partial_encoded_state_witness", move || {
// Validate the partial encoded state witness and forward the part to all the chunk validators.
match validate_partial_encoded_state_witness(
epoch_manager.as_ref(),
&partial_witness,
&validator_account_id,
runtime_adapter.store(),
) {
Ok(true) => {
network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(
target_chunk_validators,
partial_witness,
),
));
}
Ok(false) => {
// TODO: ban sending peer
tracing::warn!(
target: "client",
"Received invalid partial encoded state witness"
);
}
Err(err) => {
tracing::warn!(
target: "client",
"Encountered error during validation: {}",
err
);
}
}
});
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitnessForward(
target_chunk_validators,
partial_witness,
),
));
Ok(())
}

/// Function to handle receiving partial_encoded_state_witness message from chunk producer.
fn handle_partial_encoded_state_witness(
&mut self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessMessage");

let signer = self.my_validator_signer()?;
// Validate the partial encoded state witness and forward the part to all the chunk validators.
if validate_partial_encoded_state_witness(
self.epoch_manager.as_ref(),
&partial_witness,
&signer,
self.runtime.store(),
)? {
self.forward_state_witness_part(partial_witness)?;
}

Ok(())
}
Expand All @@ -442,42 +424,15 @@ impl PartialWitnessActor {
tracing::debug!(target: "client", ?partial_witness, "Receive PartialEncodedStateWitnessForwardMessage");

let signer = self.my_validator_signer()?;
let validator_account_id = signer.validator_id().clone();
let partial_witness_tracker = self.partial_witness_tracker.clone();
let epoch_manager = self.epoch_manager.clone();
let runtime_adapter = self.runtime.clone();
self.partial_witness_spawner.spawn(
"handle_partial_encoded_state_witness_forward",
move || {
// Validate the partial encoded state witness and store the partial encoded state witness.
match validate_partial_encoded_state_witness(
epoch_manager.as_ref(),
&partial_witness,
&validator_account_id,
runtime_adapter.store(),
) {
Ok(true) => {
if let Err(err) = partial_witness_tracker.store_partial_encoded_state_witness(partial_witness) {
tracing::error!(target: "client", "Failed to store partial encoded state witness: {}", err);
}
}
Ok(false) => {
// TODO: ban sending peer
tracing::warn!(
target: "client",
"Received invalid partial encoded state witness"
);
}
Err(err) => {
tracing::warn!(
target: "client",
"Encountered error during validation: {}",
err
);
}
}
},
);
// Validate the partial encoded state witness and store the partial encoded state witness.
if validate_partial_encoded_state_witness(
self.epoch_manager.as_ref(),
&partial_witness,
&signer,
self.runtime.store(),
)? {
self.partial_witness_tracker.store_partial_encoded_state_witness(partial_witness)?;
}

Ok(())
}
Expand Down Expand Up @@ -641,7 +596,7 @@ impl PartialWitnessActor {

/// Sends the contract accesses to the same chunk validators
/// (except for the chunk producers that track the same shard),
/// which will receive the state witness for the new chunk.
/// which will receive the state witness for the new chunk.
fn send_contract_accesses_to_chunk_validators(
&self,
key: ChunkProductionKey,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use lru::LruCache;
use near_async::messaging::CanSend;
use near_async::time::Instant;
use near_cache::SyncLruCache;
use near_chain::chain::ChunkStateWitnessMessage;
use near_chain::Error;
use near_epoch_manager::EpochManagerAdapter;
Expand Down Expand Up @@ -309,13 +308,13 @@ pub struct PartialEncodedStateWitnessTracker {
/// Epoch manager to get the set of chunk validators
epoch_manager: Arc<dyn EpochManagerAdapter>,
/// Keeps track of state witness parts received from chunk producers.
parts_cache: Mutex<LruCache<ChunkProductionKey, CacheEntry>>,
parts_cache: LruCache<ChunkProductionKey, CacheEntry>,
/// Keeps track of the already decoded witnesses. This is needed
/// to protect chunk validator from processing the same witness multiple
/// times.
processed_witnesses: SyncLruCache<ChunkProductionKey, ()>,
processed_witnesses: LruCache<ChunkProductionKey, ()>,
/// Reed Solomon encoder for decoding state witness parts.
encoders: Mutex<ReedSolomonEncoderCache>,
encoders: ReedSolomonEncoderCache,
}

impl PartialEncodedStateWitnessTracker {
Expand All @@ -326,16 +325,16 @@ impl PartialEncodedStateWitnessTracker {
Self {
client_sender,
epoch_manager,
parts_cache: Mutex::new(LruCache::new(
NonZeroUsize::new(WITNESS_PARTS_CACHE_SIZE).unwrap(),
)),
processed_witnesses: SyncLruCache::new(PROCESSED_WITNESSES_CACHE_SIZE),
encoders: Mutex::new(ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS)),
parts_cache: LruCache::new(NonZeroUsize::new(WITNESS_PARTS_CACHE_SIZE).unwrap()),
processed_witnesses: LruCache::new(
NonZeroUsize::new(PROCESSED_WITNESSES_CACHE_SIZE).unwrap(),
),
encoders: ReedSolomonEncoderCache::new(WITNESS_RATIO_DATA_PARTS),
}
}

pub fn store_partial_encoded_state_witness(
&self,
&mut self,
partial_witness: PartialEncodedStateWitness,
) -> Result<(), Error> {
tracing::debug!(target: "client", ?partial_witness, "store_partial_encoded_state_witness");
Expand All @@ -346,7 +345,7 @@ impl PartialEncodedStateWitnessTracker {
}

pub fn store_accessed_contract_hashes(
&self,
&mut self,
key: ChunkProductionKey,
hashes: HashSet<CodeHash>,
) -> Result<(), Error> {
Expand All @@ -356,7 +355,7 @@ impl PartialEncodedStateWitnessTracker {
}

pub fn store_accessed_contract_codes(
&self,
&mut self,
key: ChunkProductionKey,
codes: Vec<CodeBytes>,
) -> Result<(), Error> {
Expand All @@ -366,7 +365,7 @@ impl PartialEncodedStateWitnessTracker {
}

fn process_update(
&self,
&mut self,
key: ChunkProductionKey,
create_if_not_exists: bool,
update: CacheUpdate,
Expand All @@ -383,23 +382,17 @@ impl PartialEncodedStateWitnessTracker {
if create_if_not_exists {
self.maybe_insert_new_entry_in_parts_cache(&key);
}
let mut parts_cache = self.parts_cache.lock().unwrap();
let Some(entry) = parts_cache.get_mut(&key) else {
let Some(entry) = self.parts_cache.get_mut(&key) else {
return Ok(());
};
let total_size: usize = if let Some((decode_result, accessed_contracts)) =
entry.update(update)
{
if let Some((decode_result, accessed_contracts)) = entry.update(update) {
// Record the time taken from receiving first part to decoding partial witness.
let time_to_last_part = Instant::now().signed_duration_since(entry.created_at);
metrics::PARTIAL_WITNESS_TIME_TO_LAST_PART
.with_label_values(&[key.shard_id.to_string().as_str()])
.observe(time_to_last_part.as_seconds_f64());

parts_cache.pop(&key);
let total_size = parts_cache.iter().map(|(_, entry)| entry.total_size()).sum();
drop(parts_cache);

self.parts_cache.pop(&key);
self.processed_witnesses.push(key.clone(), ());

let encoded_witness = match decode_result {
Expand Down Expand Up @@ -435,33 +428,26 @@ impl PartialEncodedStateWitnessTracker {

tracing::debug!(target: "client", ?key, "Sending encoded witness to client.");
self.client_sender.send(ChunkStateWitnessMessage { witness, raw_witness_size });

total_size
} else {
parts_cache.iter().map(|(_, entry)| entry.total_size()).sum()
};
metrics::PARTIAL_WITNESS_CACHE_SIZE.set(total_size as f64);

}
self.record_total_parts_cache_size_metric();
Ok(())
}

fn get_encoder(&self, key: &ChunkProductionKey) -> Result<Arc<ReedSolomonEncoder>, Error> {
fn get_encoder(&mut self, key: &ChunkProductionKey) -> Result<Arc<ReedSolomonEncoder>, Error> {
// The expected number of parts for the Reed Solomon encoding is the number of chunk validators.
let num_parts = self
.epoch_manager
.get_chunk_validator_assignments(&key.epoch_id, key.shard_id, key.height_created)?
.len();
let mut encoders = self.encoders.lock().unwrap();
Ok(encoders.entry(num_parts))
Ok(self.encoders.entry(num_parts))
}

// Function to insert a new entry into the cache for the chunk hash if it does not already exist
// We additionally check if an evicted entry has been fully decoded and processed.
fn maybe_insert_new_entry_in_parts_cache(&self, key: &ChunkProductionKey) {
let mut parts_cache = self.parts_cache.lock().unwrap();
if !parts_cache.contains(key) {
fn maybe_insert_new_entry_in_parts_cache(&mut self, key: &ChunkProductionKey) {
if !self.parts_cache.contains(key) {
if let Some((evicted_key, evicted_entry)) =
parts_cache.push(key.clone(), CacheEntry::new(key.shard_id))
self.parts_cache.push(key.clone(), CacheEntry::new(key.shard_id))
{
tracing::warn!(
target: "client",
Expand All @@ -474,6 +460,11 @@ impl PartialEncodedStateWitnessTracker {
}
}

fn record_total_parts_cache_size_metric(&self) {
let total_size: usize = self.parts_cache.iter().map(|(_, entry)| entry.total_size()).sum();
metrics::PARTIAL_WITNESS_CACHE_SIZE.set(total_size as f64);
}

fn decode_state_witness(
&self,
encoded_witness: &EncodedChunkStateWitness,
Expand Down
4 changes: 2 additions & 2 deletions chain/client/src/stateless_validation/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const MAX_HEIGHTS_AHEAD: BlockHeightDelta = 5;
pub fn validate_partial_encoded_state_witness(
epoch_manager: &dyn EpochManagerAdapter,
partial_witness: &PartialEncodedStateWitness,
validator_account_id: &AccountId,
signer: &ValidatorSigner,
store: &Store,
) -> Result<bool, Error> {
let ChunkProductionKey { shard_id, epoch_id, height_created } =
Expand Down Expand Up @@ -56,7 +56,7 @@ pub fn validate_partial_encoded_state_witness(
if !validate_chunk_relevant_as_validator(
epoch_manager,
&partial_witness.chunk_production_key(),
validator_account_id,
signer.validator_id(),
store,
)? {
return Ok(false);
Expand Down
1 change: 0 additions & 1 deletion chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ pub fn setup(
epoch_manager.clone(),
runtime.clone(),
Arc::new(RayonAsyncComputationSpawner),
Arc::new(RayonAsyncComputationSpawner),
));
let partial_witness_adapter = partial_witness_addr.with_auto_span_context();

Expand Down
Loading
Loading