From b73d8cc26d55ef3739497667906a1964a14aa6d8 Mon Sep 17 00:00:00 2001 From: Barak Date: Sat, 27 Apr 2024 09:30:20 +0300 Subject: [PATCH] test(concurrency): multithreaded test for decrease_validation_index --- .../blockifier/src/concurrency/flow_test.rs | 96 +++++++++++++++++++ .../blockifier/src/concurrency/scheduler.rs | 8 +- .../src/concurrency/scheduler_test.rs | 3 +- .../blockifier/src/concurrency/test_utils.rs | 12 +++ .../src/concurrency/versioned_state_proxy.rs | 16 ++++ .../src/concurrency/versioned_storage.rs | 5 + 6 files changed, 134 insertions(+), 6 deletions(-) create mode 100644 crates/blockifier/src/concurrency/flow_test.rs diff --git a/crates/blockifier/src/concurrency/flow_test.rs b/crates/blockifier/src/concurrency/flow_test.rs new file mode 100644 index 0000000000..74d66683f9 --- /dev/null +++ b/crates/blockifier/src/concurrency/flow_test.rs @@ -0,0 +1,96 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use rstest::rstest; +use starknet_api::core::{ContractAddress, PatriciaKey}; +use starknet_api::hash::{StarkFelt, StarkHash}; +use starknet_api::stark_felt; + +use crate::concurrency::scheduler::{Scheduler, Task}; +use crate::concurrency::test_utils::{ + contract_address, safe_versioned_state_for_testing, DEFAULT_CHUNK_SIZE, +}; +use crate::state::cached_state::{ContractClassMapping, StateMaps}; +use crate::state::state_api::StateReader; +use crate::storage_key; +use crate::test_utils::dict_state_reader::DictStateReader; + +// TODO(barak, 01/07/2024): Check txs with "reads". +// TODO(barak, 01/07/2024): Check cached_initial_values in versioned_state_proxy flow test. +#[rstest] +fn scheduler_flow_test(contract_address: ContractAddress) { + // Simulate DEFAULT_CHUNK_SIZE txs. Each reads (CONTRACT_ADDRESS, STORAGE_KEY) and writes its tx + // index inside the chunk to the same storage cell. + let scheduler = Arc::new(Scheduler::new(DEFAULT_CHUNK_SIZE)); + let storage_key = storage_key!(27_u8); + let storage_initial_value = stark_felt!(128_u8); + let versioned_state = safe_versioned_state_for_testing(DictStateReader { + storage_view: HashMap::from([((contract_address, storage_key), storage_initial_value)]), + ..Default::default() + }); + let mut handles = vec![]; + + let num_threads = 2_u8.pow(5); + for _ in 0..num_threads { + let scheduler = Arc::clone(&scheduler); + let versioned_state = versioned_state.clone(); + let handle = std::thread::spawn(move || { + let mut task = Task::NoTask; + while !scheduler.done() { + match task { + Task::ExecutionTask(tx_index) => { + let versioned_state_proxy = versioned_state.pin_version(tx_index); + let storage_new_value: u8 = tx_index.try_into().unwrap(); + let write_set = StateMaps { + storage: HashMap::from([( + (contract_address, storage_key), + stark_felt!(storage_new_value), + )]), + ..Default::default() + }; + versioned_state_proxy + .apply_writes(&write_set, &ContractClassMapping::default()); + scheduler.finish_execution(tx_index); + task = Task::NoTask; + } + Task::ValidationTask(tx_index) => { + let versioned_state_proxy = versioned_state.pin_version(tx_index); + let current_cell_value = versioned_state_proxy + .get_storage_at(contract_address, storage_key) + .unwrap(); + let read_set = StateMaps { + storage: HashMap::from([( + (contract_address, storage_key), + stark_felt!(current_cell_value), + )]), + ..Default::default() + }; + let read_set_valid = versioned_state_proxy.validate_read_set(&read_set); + let aborted = !read_set_valid && scheduler.try_validation_abort(tx_index); + if aborted { + versioned_state_proxy.delete_writes(tx_index) + } + task = scheduler.finish_validation(tx_index, aborted); + } + Task::NoTask => { + task = scheduler.next_task(); + } + Task::Done => (), + } + } + }); + handles.push(handle); + } + for handle in handles { + handle.join().unwrap(); + } + + let chunk_size_as_u8: u8 = DEFAULT_CHUNK_SIZE.try_into().unwrap(); + let final_storage_value = chunk_size_as_u8 - 1; + let storage_writes = versioned_state.state().get_writes(DEFAULT_CHUNK_SIZE).storage; + assert_eq!( + *storage_writes.get(&(contract_address, storage_key)).unwrap(), + stark_felt!(final_storage_value) + ); + dbg!(storage_writes.get(&(contract_address, storage_key)).unwrap()); +} diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index d5ebabc9fa..1e8848b638 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -8,6 +8,10 @@ use crate::concurrency::TxIndex; #[path = "scheduler_test.rs"] pub mod test; +#[cfg(test)] +#[path = "flow_test.rs"] +pub mod flow_test; + #[derive(Debug, Default)] pub struct Scheduler { execution_index: AtomicUsize, @@ -53,10 +57,6 @@ impl Scheduler { let index_to_validate = self.validation_index.load(Ordering::Acquire); let index_to_execute = self.execution_index.load(Ordering::Acquire); - if min(index_to_validate, index_to_execute) >= self.chunk_size { - return Task::NoTask; - } - if index_to_validate < index_to_execute { if let Some(tx_index) = self.next_version_to_validate() { return Task::ValidationTask(tx_index); diff --git a/crates/blockifier/src/concurrency/scheduler_test.rs b/crates/blockifier/src/concurrency/scheduler_test.rs index 8fa558aa90..05ebf4c25a 100644 --- a/crates/blockifier/src/concurrency/scheduler_test.rs +++ b/crates/blockifier/src/concurrency/scheduler_test.rs @@ -6,11 +6,10 @@ use pretty_assertions::assert_eq; use rstest::rstest; use crate::concurrency::scheduler::{Scheduler, Task, TransactionStatus}; +use crate::concurrency::test_utils::DEFAULT_CHUNK_SIZE; use crate::concurrency::TxIndex; use crate::default_scheduler; -const DEFAULT_CHUNK_SIZE: usize = 100; - #[rstest] fn test_new(#[values(0, 1, 32)] chunk_size: usize) { let scheduler = Scheduler::new(chunk_size); diff --git a/crates/blockifier/src/concurrency/test_utils.rs b/crates/blockifier/src/concurrency/test_utils.rs index ee92fb3620..424e381571 100644 --- a/crates/blockifier/src/concurrency/test_utils.rs +++ b/crates/blockifier/src/concurrency/test_utils.rs @@ -7,6 +7,14 @@ use crate::test_utils::dict_state_reader::DictStateReader; use crate::transaction::account_transaction::AccountTransaction; use crate::transaction::transactions::ExecutableTransaction; +// Public Consts. + +pub const DEFAULT_CHUNK_SIZE: usize = 64; + +// Fixtures. + +// Macros. + #[macro_export] macro_rules! default_scheduler { ($chunk_size:ident : $chunk:expr , $($field:ident $(: $value:expr)?),+ $(,)?) => { @@ -35,6 +43,8 @@ macro_rules! default_scheduler { }; } +// Concurrency constructors. + // TODO(meshi, 01/06/2024): Consider making this a macro. pub fn safe_versioned_state_for_testing( block_state: DictStateReader, @@ -42,6 +52,8 @@ pub fn safe_versioned_state_for_testing( ThreadSafeVersionedState::new(VersionedState::new(block_state)) } +// Utils. + // Note: this function does not mutate the state. pub fn create_fee_transfer_call_info( state: &mut CachedState, diff --git a/crates/blockifier/src/concurrency/versioned_state_proxy.rs b/crates/blockifier/src/concurrency/versioned_state_proxy.rs index 340edae4da..b4dfb7125f 100644 --- a/crates/blockifier/src/concurrency/versioned_state_proxy.rs +++ b/crates/blockifier/src/concurrency/versioned_state_proxy.rs @@ -52,6 +52,17 @@ impl VersionedState { } } + #[cfg(test)] + pub fn get_cached_initial_values(&self) -> StateMaps { + StateMaps { + storage: self.storage.get_cached_initial_values(), + nonces: self.nonces.get_cached_initial_values(), + class_hashes: self.class_hashes.get_cached_initial_values(), + compiled_class_hashes: self.compiled_class_hashes.get_cached_initial_values(), + declared_contracts: HashMap::new(), + } + } + pub fn commit(&mut self, from_index: TxIndex, parent_state: &mut CachedState) where T: StateReader, @@ -165,6 +176,11 @@ impl ThreadSafeVersionedState { pub fn pin_version(&self, tx_index: TxIndex) -> VersionedStateProxy { VersionedStateProxy { tx_index, state: self.0.clone() } } + + #[cfg(test)] + pub fn state(&self) -> LockedVersionedState<'_, S> { + self.0.lock().expect("Failed to acquire state lock.") + } } impl Clone for ThreadSafeVersionedState { diff --git a/crates/blockifier/src/concurrency/versioned_storage.rs b/crates/blockifier/src/concurrency/versioned_storage.rs index 2710231eaa..37776afe11 100644 --- a/crates/blockifier/src/concurrency/versioned_storage.rs +++ b/crates/blockifier/src/concurrency/versioned_storage.rs @@ -81,4 +81,9 @@ where } writes } + + #[cfg(test)] + pub fn get_cached_initial_values(&self) -> HashMap { + self.cached_initial_values.clone() + } }