Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
test(concurrency): multithreaded test for decrease_validation_index
Browse files Browse the repository at this point in the history
  • Loading branch information
barak-b-starkware committed May 9, 2024
1 parent 25b259b commit a428c02
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 4 deletions.
102 changes: 102 additions & 0 deletions crates/blockifier/src/concurrency/flow_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use std::collections::HashMap;
use std::sync::Arc;

use rstest::rstest;
use starknet_api::core::{ContractAddress, PatriciaKey};
use starknet_api::hash::{StarkFelt, StarkHash};
use starknet_api::{contract_address, patricia_key, stark_felt};

use crate::concurrency::scheduler::{Scheduler, Task};
use crate::concurrency::test_utils::safe_versioned_state_for_testing;
use crate::state::cached_state::{ContractClassMapping, StateMaps};
use crate::state::state_api::StateReader;
use crate::storage_key;
use crate::test_utils::dict_state_reader::DictStateReader;

const DEFAULT_CHUNK_SIZE: usize = 64;

#[rstest]
fn aaaflow_test() {
// Simulate DEFAULT_CHUNK_SIZE txs. Each reads (CONTRACT_ADDRESS, STORAGE_KEY) and writes its tx
// index inside the chunk to the same storage cell.
let scheduler = Arc::new(Scheduler::new(4));
let versioned_state = safe_versioned_state_for_testing(DictStateReader {
storage_view: HashMap::from([(
(contract_address!("0x4"), storage_key!(27_u8)),
stark_felt!(128_u8),
)]),
..Default::default()
});
let mut handles = vec![];

let num_threads = 2_u8.pow(2);
for _ in 0..num_threads {
let scheduler = Arc::clone(&scheduler);
let versioned_state = versioned_state.clone();
let handle = std::thread::spawn(move || {
let mut task = Task::NoTask;
while !scheduler.done() {
match task {
Task::ExecutionTask(tx_index) => {
let storage_value: u8 = tx_index.try_into().unwrap();
let versioned_state_proxy = versioned_state.pin_version(tx_index);
// Simulate read to modify the cached_initial_values.
versioned_state_proxy
.get_storage_at(contract_address!("0x4"), storage_key!(27_u8))
.unwrap();
let write_set = StateMaps {
storage: HashMap::from([(
(contract_address!("0x4"), storage_key!(27_u8)),
stark_felt!(storage_value),
)]),
..Default::default()
};
versioned_state_proxy
.apply_writes(&write_set, &ContractClassMapping::default());
scheduler.finish_execution(tx_index);
task = Task::NoTask;
}
Task::ValidationTask(tx_index) => {
let versioned_state_proxy = versioned_state.pin_version(tx_index);
let current_cell_value = versioned_state_proxy
.get_storage_at(contract_address!("0x4"), storage_key!(27_u8))
.unwrap();
let read_set = StateMaps {
storage: HashMap::from([(
(contract_address!("0x4"), storage_key!(27_u8)),
stark_felt!(current_cell_value),
)]),
..Default::default()
};
let read_set_valid = versioned_state_proxy.validate_read_set(&read_set);
let aborted = !read_set_valid && scheduler.try_validation_abort(tx_index);
if aborted {
versioned_state_proxy.delete_writes(tx_index)
}
task = scheduler.finish_validation(tx_index, aborted);
}
Task::NoTask => {
task = scheduler.next_task();
}
Task::Done => (),
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let storage_writes = versioned_state.state().get_writes(DEFAULT_CHUNK_SIZE).storage;
let storage_ini_vals = versioned_state.state().get_cached_initial_values().storage;
assert_eq!(
*storage_writes.get(&(contract_address!("0x4"), storage_key!(27_u8))).unwrap(),
stark_felt!(3_u8)
);
assert_eq!(
*storage_ini_vals.get(&(contract_address!("0x4"), storage_key!(27_u8))).unwrap(),
stark_felt!(128_u8)
);
dbg!(storage_writes.get(&(contract_address!("0x4"), storage_key!(27_u8))).unwrap());
dbg!(storage_ini_vals.get(&(contract_address!("0x4"), storage_key!(27_u8))).unwrap());
}
8 changes: 4 additions & 4 deletions crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use crate::concurrency::TxIndex;
#[path = "scheduler_test.rs"]
pub mod test;

#[cfg(test)]
#[path = "flow_test.rs"]
pub mod flow_test;

// TODO(Avi, 01/04/2024): Remove dead_code attribute.
#[allow(dead_code)]
#[derive(Debug, Default)]
Expand Down Expand Up @@ -57,10 +61,6 @@ impl Scheduler {
let index_to_validate = self.validation_index.load(Ordering::Acquire);
let index_to_execute = self.execution_index.load(Ordering::Acquire);

if min(index_to_validate, index_to_execute) >= self.chunk_size {
return Task::NoTask;
}

if index_to_validate < index_to_execute {
if let Some(tx_index) = self.next_version_to_validate() {
return Task::ValidationTask(tx_index);
Expand Down
16 changes: 16 additions & 0 deletions crates/blockifier/src/concurrency/versioned_state_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ impl<S: StateReader> VersionedState<S> {
}
}

#[cfg(test)]
pub fn get_cached_initial_values(&self) -> StateMaps {
StateMaps {
storage: self.storage.get_cached_initial_values(),
nonces: self.nonces.get_cached_initial_values(),
class_hashes: self.class_hashes.get_cached_initial_values(),
compiled_class_hashes: self.compiled_class_hashes.get_cached_initial_values(),
declared_contracts: HashMap::new(),
}
}

pub fn commit<T>(&mut self, from_index: TxIndex, parent_state: &mut CachedState<T>)
where
T: StateReader,
Expand Down Expand Up @@ -160,6 +171,11 @@ impl<S: StateReader> ThreadSafeVersionedState<S> {
pub fn pin_version(&self, tx_index: TxIndex) -> VersionedStateProxy<S> {
VersionedStateProxy { tx_index, state: self.0.clone() }
}

#[cfg(test)]
pub fn state(&self) -> LockedVersionedState<'_, S> {
self.0.lock().expect("Failed to acquire state lock.")
}
}

impl<S: StateReader> Clone for ThreadSafeVersionedState<S> {
Expand Down
5 changes: 5 additions & 0 deletions crates/blockifier/src/concurrency/versioned_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,9 @@ where
}
writes
}

#[cfg(test)]
pub fn get_cached_initial_values(&self) -> HashMap<K, V> {
self.cached_initial_values.clone()
}
}

0 comments on commit a428c02

Please sign in to comment.