Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
test(concurrency): multithreaded test for decrease_validation_index
Browse files Browse the repository at this point in the history
  • Loading branch information
barak-b-starkware committed May 12, 2024
1 parent f7c2bd3 commit 1280c8b
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 13 deletions.
96 changes: 96 additions & 0 deletions crates/blockifier/src/concurrency/flow_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::collections::HashMap;
use std::sync::Arc;

use rstest::rstest;
use starknet_api::core::{ContractAddress, PatriciaKey};
use starknet_api::hash::{StarkFelt, StarkHash};
use starknet_api::stark_felt;

use crate::concurrency::scheduler::{Scheduler, Task};
use crate::concurrency::test_utils::{
contract_address, safe_versioned_state_for_testing, DEFAULT_CHUNK_SIZE,
};
use crate::state::cached_state::{ContractClassMapping, StateMaps};
use crate::state::state_api::StateReader;
use crate::storage_key;
use crate::test_utils::dict_state_reader::DictStateReader;

// TODO(barak, 01/07/2024): Check txs with "reads".
// TODO(barak, 01/07/2024): Check cached_initial_values in versioned_state_proxy flow test.
#[rstest]
fn scheduler_flow_test(contract_address: ContractAddress) {
// Simulate DEFAULT_CHUNK_SIZE txs. Each reads (CONTRACT_ADDRESS, STORAGE_KEY) and writes its tx
// index inside the chunk to the same storage cell.
let scheduler = Arc::new(Scheduler::new(DEFAULT_CHUNK_SIZE));
let storage_key = storage_key!(27_u8);
let storage_initial_value = stark_felt!(128_u8);
let versioned_state = safe_versioned_state_for_testing(DictStateReader {
storage_view: HashMap::from([((contract_address, storage_key), storage_initial_value)]),
..Default::default()
});
let mut handles = vec![];

let num_threads = 2_u8.pow(5);
for _ in 0..num_threads {
let scheduler = Arc::clone(&scheduler);
let versioned_state = versioned_state.clone();
let handle = std::thread::spawn(move || {
let mut task = Task::NoTask;
while !scheduler.done() {
match task {
Task::ExecutionTask(tx_index) => {
let versioned_state_proxy = versioned_state.pin_version(tx_index);
let storage_new_value: u8 = tx_index.try_into().unwrap();
let write_set = StateMaps {
storage: HashMap::from([(
(contract_address, storage_key),
stark_felt!(storage_new_value),
)]),
..Default::default()
};
versioned_state_proxy
.apply_writes(&write_set, &ContractClassMapping::default());
scheduler.finish_execution(tx_index);
task = Task::NoTask;
}
Task::ValidationTask(tx_index) => {
let versioned_state_proxy = versioned_state.pin_version(tx_index);
let current_cell_value = versioned_state_proxy
.get_storage_at(contract_address, storage_key)
.unwrap();
let read_set = StateMaps {
storage: HashMap::from([(
(contract_address, storage_key),
stark_felt!(current_cell_value),
)]),
..Default::default()
};
let read_set_valid = versioned_state_proxy.validate_read_set(&read_set);
let aborted = !read_set_valid && scheduler.try_validation_abort(tx_index);
if aborted {
versioned_state_proxy.delete_writes(tx_index)
}
task = scheduler.finish_validation(tx_index, aborted);
}
Task::NoTask => {
task = scheduler.next_task();
}
Task::Done => (),
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}

let chunk_size_as_u8: u8 = DEFAULT_CHUNK_SIZE.try_into().unwrap();
let final_storage_value = chunk_size_as_u8 - 1;
let storage_writes = versioned_state.state().get_writes(DEFAULT_CHUNK_SIZE).storage;
assert_eq!(
*storage_writes.get(&(contract_address, storage_key)).unwrap(),
stark_felt!(final_storage_value)
);
dbg!(storage_writes.get(&(contract_address, storage_key)).unwrap());
}
8 changes: 4 additions & 4 deletions crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use crate::concurrency::TxIndex;
#[path = "scheduler_test.rs"]
pub mod test;

#[cfg(test)]
#[path = "flow_test.rs"]
pub mod flow_test;

#[derive(Debug, Default)]
pub struct Scheduler {
execution_index: AtomicUsize,
Expand Down Expand Up @@ -53,10 +57,6 @@ impl Scheduler {
let index_to_validate = self.validation_index.load(Ordering::Acquire);
let index_to_execute = self.execution_index.load(Ordering::Acquire);

if min(index_to_validate, index_to_execute) >= self.chunk_size {
return Task::NoTask;
}

if index_to_validate < index_to_execute {
if let Some(tx_index) = self.next_version_to_validate() {
return Task::ValidationTask(tx_index);
Expand Down
3 changes: 1 addition & 2 deletions crates/blockifier/src/concurrency/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use pretty_assertions::assert_eq;
use rstest::rstest;

use crate::concurrency::scheduler::{Scheduler, Task, TransactionStatus};
use crate::concurrency::test_utils::DEFAULT_CHUNK_SIZE;
use crate::concurrency::TxIndex;
use crate::default_scheduler;

const DEFAULT_CHUNK_SIZE: usize = 100;

#[rstest]
fn test_new(#[values(0, 1, 32)] chunk_size: usize) {
let scheduler = Scheduler::new(chunk_size);
Expand Down
24 changes: 24 additions & 0 deletions crates/blockifier/src/concurrency/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use rstest::fixture;
use starknet_api::core::{ContractAddress, PatriciaKey};
use starknet_api::hash::StarkHash;
use starknet_api::{contract_address, patricia_key};

use crate::concurrency::versioned_state_proxy::{ThreadSafeVersionedState, VersionedState};
use crate::context::BlockContext;
use crate::execution::call_info::CallInfo;
Expand All @@ -7,6 +12,21 @@ use crate::test_utils::dict_state_reader::DictStateReader;
use crate::transaction::account_transaction::AccountTransaction;
use crate::transaction::transactions::ExecutableTransaction;

// Public Consts.

pub const DEFAULT_CHUNK_SIZE: usize = 64;

// Fixtures.

const TEST_CONTRACT_ADDRESS: &str = "0x18031991";

#[fixture]
pub fn contract_address() -> ContractAddress {
contract_address!(TEST_CONTRACT_ADDRESS)
}

// Macros.

#[macro_export]
macro_rules! default_scheduler {
($chunk_size:ident : $chunk:expr , $($field:ident $(: $value:expr)?),+ $(,)?) => {
Expand Down Expand Up @@ -35,13 +55,17 @@ macro_rules! default_scheduler {
};
}

// Concurrency constructors.

// TODO(meshi, 01/06/2024): Consider making this a macro.
pub fn safe_versioned_state_for_testing(
block_state: DictStateReader,
) -> ThreadSafeVersionedState<DictStateReader> {
ThreadSafeVersionedState::new(VersionedState::new(block_state))
}

// Utils.

// Note: this function does not mutate the state.
pub fn create_fee_transfer_call_info<S: StateReader>(
state: &mut CachedState<S>,
Expand Down
16 changes: 16 additions & 0 deletions crates/blockifier/src/concurrency/versioned_state_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ impl<S: StateReader> VersionedState<S> {
}
}

#[cfg(test)]
pub fn get_cached_initial_values(&self) -> StateMaps {
StateMaps {
storage: self.storage.get_cached_initial_values(),
nonces: self.nonces.get_cached_initial_values(),
class_hashes: self.class_hashes.get_cached_initial_values(),
compiled_class_hashes: self.compiled_class_hashes.get_cached_initial_values(),
declared_contracts: HashMap::new(),
}
}

pub fn commit<T>(&mut self, from_index: TxIndex, parent_state: &mut CachedState<T>)
where
T: StateReader,
Expand Down Expand Up @@ -165,6 +176,11 @@ impl<S: StateReader> ThreadSafeVersionedState<S> {
pub fn pin_version(&self, tx_index: TxIndex) -> VersionedStateProxy<S> {
VersionedStateProxy { tx_index, state: self.0.clone() }
}

#[cfg(test)]
pub fn state(&self) -> LockedVersionedState<'_, S> {
self.0.lock().expect("Failed to acquire state lock.")
}
}

impl<S: StateReader> Clone for ThreadSafeVersionedState<S> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use starknet_api::transaction::{Calldata, ContractAddressSalt, Fee, TransactionV
use starknet_api::{calldata, class_hash, contract_address, patricia_key, stark_felt};

use crate::abi::abi_utils::{get_fee_token_var_address, get_storage_var_address};
use crate::concurrency::test_utils::safe_versioned_state_for_testing;
use crate::concurrency::test_utils::{contract_address, safe_versioned_state_for_testing};
use crate::concurrency::versioned_state_proxy::{
ThreadSafeVersionedState, VersionedState, VersionedStateProxy,
};
Expand All @@ -27,14 +27,8 @@ use crate::transaction::test_utils::l1_resource_bounds;
use crate::transaction::transactions::ExecutableTransaction;
use crate::{compiled_class_hash, deploy_account_tx_args, nonce, storage_key};

const TEST_CONTRACT_ADDRESS: &str = "0x1";
const TEST_CLASS_HASH: u8 = 27_u8;

#[fixture]
pub fn contract_address() -> ContractAddress {
contract_address!(TEST_CONTRACT_ADDRESS)
}

#[fixture]
pub fn class_hash() -> ClassHash {
class_hash!(TEST_CLASS_HASH)
Expand Down
5 changes: 5 additions & 0 deletions crates/blockifier/src/concurrency/versioned_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,9 @@ where
}
writes
}

#[cfg(test)]
pub fn get_cached_initial_values(&self) -> HashMap<K, V> {
self.cached_initial_values.clone()
}
}

0 comments on commit 1280c8b

Please sign in to comment.