Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
test(concurrency): multithreaded test for decrease_validation_index
Browse files Browse the repository at this point in the history
  • Loading branch information
barak-b-starkware committed May 9, 2024
1 parent ae1ab0f commit 00d12f6
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 6 deletions.
96 changes: 96 additions & 0 deletions crates/blockifier/src/concurrency/flow_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::collections::HashMap;
use std::sync::Arc;

use rstest::rstest;
use starknet_api::core::{ContractAddress, PatriciaKey};
use starknet_api::hash::{StarkFelt, StarkHash};
use starknet_api::stark_felt;

use crate::concurrency::scheduler::{Scheduler, Task};
use crate::concurrency::test_utils::{
contract_address, safe_versioned_state_for_testing, DEFAULT_CHUNK_SIZE,
};
use crate::state::cached_state::{ContractClassMapping, StateMaps};
use crate::state::state_api::StateReader;
use crate::storage_key;
use crate::test_utils::dict_state_reader::DictStateReader;

// TODO(barak, 01/07/2024): Check txs with "reads".
// TODO(barak, 01/07/2024): Check cached_initial_values in versioned_state_proxy flow test.
#[rstest]
fn scheduler_flow_test(contract_address: ContractAddress) {
// Simulate DEFAULT_CHUNK_SIZE txs. Each reads (CONTRACT_ADDRESS, STORAGE_KEY) and writes its tx
// index inside the chunk to the same storage cell.
let scheduler = Arc::new(Scheduler::new(DEFAULT_CHUNK_SIZE));
let storage_key = storage_key!(27_u8);
let storage_initial_value = stark_felt!(128_u8);
let versioned_state = safe_versioned_state_for_testing(DictStateReader {
storage_view: HashMap::from([((contract_address, storage_key), storage_initial_value)]),
..Default::default()
});
let mut handles = vec![];

let num_threads = 2_u8.pow(5);
for _ in 0..num_threads {
let scheduler = Arc::clone(&scheduler);
let versioned_state = versioned_state.clone();
let handle = std::thread::spawn(move || {
let mut task = Task::NoTask;
while !scheduler.done() {
match task {
Task::ExecutionTask(tx_index) => {
let versioned_state_proxy = versioned_state.pin_version(tx_index);
let storage_new_value: u8 = tx_index.try_into().unwrap();
let write_set = StateMaps {
storage: HashMap::from([(
(contract_address, storage_key),
stark_felt!(storage_new_value),
)]),
..Default::default()
};
versioned_state_proxy
.apply_writes(&write_set, &ContractClassMapping::default());
scheduler.finish_execution(tx_index);
task = Task::NoTask;
}
Task::ValidationTask(tx_index) => {
let versioned_state_proxy = versioned_state.pin_version(tx_index);
let current_cell_value = versioned_state_proxy
.get_storage_at(contract_address, storage_key)
.unwrap();
let read_set = StateMaps {
storage: HashMap::from([(
(contract_address, storage_key),
stark_felt!(current_cell_value),
)]),
..Default::default()
};
let read_set_valid = versioned_state_proxy.validate_read_set(&read_set);
let aborted = !read_set_valid && scheduler.try_validation_abort(tx_index);
if aborted {
versioned_state_proxy.delete_writes(tx_index)
}
task = scheduler.finish_validation(tx_index, aborted);
}
Task::NoTask => {
task = scheduler.next_task();
}
Task::Done => (),
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}

let chunk_size_as_u8: u8 = DEFAULT_CHUNK_SIZE.try_into().unwrap();
let final_storage_value = chunk_size_as_u8 - 1;
let storage_writes = versioned_state.state().get_writes(DEFAULT_CHUNK_SIZE).storage;
assert_eq!(
*storage_writes.get(&(contract_address, storage_key)).unwrap(),
stark_felt!(final_storage_value)
);
dbg!(storage_writes.get(&(contract_address, storage_key)).unwrap());
}
8 changes: 4 additions & 4 deletions crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use crate::concurrency::TxIndex;
#[path = "scheduler_test.rs"]
pub mod test;

#[cfg(test)]
#[path = "flow_test.rs"]
pub mod flow_test;

// TODO(Avi, 01/04/2024): Remove dead_code attribute.
#[allow(dead_code)]
#[derive(Debug, Default)]
Expand Down Expand Up @@ -57,10 +61,6 @@ impl Scheduler {
let index_to_validate = self.validation_index.load(Ordering::Acquire);
let index_to_execute = self.execution_index.load(Ordering::Acquire);

if min(index_to_validate, index_to_execute) >= self.chunk_size {
return Task::NoTask;
}

if index_to_validate < index_to_execute {
if let Some(tx_index) = self.next_version_to_validate() {
return Task::ValidationTask(tx_index);
Expand Down
3 changes: 1 addition & 2 deletions crates/blockifier/src/concurrency/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use pretty_assertions::assert_eq;
use rstest::rstest;

use crate::concurrency::scheduler::{Scheduler, Task, TransactionStatus};
use crate::concurrency::test_utils::DEFAULT_CHUNK_SIZE;
use crate::concurrency::TxIndex;
use crate::default_scheduler;

const DEFAULT_CHUNK_SIZE: usize = 100;

#[rstest]
fn test_new(#[values(0, 1, 32)] chunk_size: usize) {
let scheduler = Scheduler::new(chunk_size);
Expand Down
9 changes: 9 additions & 0 deletions crates/blockifier/src/concurrency/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@ use crate::state::state_api::StateReader;
use crate::test_utils::dict_state_reader::DictStateReader;
use crate::transaction::account_transaction::AccountTransaction;
use crate::transaction::transactions::ExecutableTransaction;

// Public Consts.

pub const DEFAULT_CHUNK_SIZE: usize = 64;

// Fixtures.

<<<<<<< HEAD
const TEST_CONTRACT_ADDRESS: &str = "0x18031991";

=======
const TEST_CONTRACT_ADDRESS: &str = "0x1";
>>>>>>> 689a2d45... test(concurrency): multithreaded test for decrease_validation_index
#[fixture]
pub fn contract_address() -> ContractAddress {
contract_address!(TEST_CONTRACT_ADDRESS)
Expand Down
16 changes: 16 additions & 0 deletions crates/blockifier/src/concurrency/versioned_state_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ impl<S: StateReader> VersionedState<S> {
}
}

#[cfg(test)]
pub fn get_cached_initial_values(&self) -> StateMaps {
StateMaps {
storage: self.storage.get_cached_initial_values(),
nonces: self.nonces.get_cached_initial_values(),
class_hashes: self.class_hashes.get_cached_initial_values(),
compiled_class_hashes: self.compiled_class_hashes.get_cached_initial_values(),
declared_contracts: HashMap::new(),
}
}

pub fn commit<T>(&mut self, from_index: TxIndex, parent_state: &mut CachedState<T>)
where
T: StateReader,
Expand Down Expand Up @@ -160,6 +171,11 @@ impl<S: StateReader> ThreadSafeVersionedState<S> {
pub fn pin_version(&self, tx_index: TxIndex) -> VersionedStateProxy<S> {
VersionedStateProxy { tx_index, state: self.0.clone() }
}

#[cfg(test)]
pub fn state(&self) -> LockedVersionedState<'_, S> {
self.0.lock().expect("Failed to acquire state lock.")
}
}

impl<S: StateReader> Clone for ThreadSafeVersionedState<S> {
Expand Down
5 changes: 5 additions & 0 deletions crates/blockifier/src/concurrency/versioned_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,9 @@ where
}
writes
}

#[cfg(test)]
pub fn get_cached_initial_values(&self) -> HashMap<K, V> {
self.cached_initial_values.clone()
}
}

0 comments on commit 00d12f6

Please sign in to comment.