Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
test(concurrency): multithreaded test for decrease_validation_index
Browse files Browse the repository at this point in the history
  • Loading branch information
barak-b-starkware committed May 26, 2024
1 parent 0105b9d commit c153712
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 10 deletions.
11 changes: 8 additions & 3 deletions crates/blockifier/src/abi/sierra_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,23 @@ pub trait SierraType: Sized {
fn from_memory(vm: &VirtualMachine, ptr: &mut Relocatable) -> SierraTypeResult<Self>;

fn from_storage(
state: &mut dyn StateReader,
state: &dyn StateReader,
contract_address: &ContractAddress,
key: &StorageKey,
) -> SierraTypeResult<Self>;
}

// Utils.

pub fn felt_to_u128(felt: &Felt252) -> Result<u128, SierraTypeError> {
felt.to_u128()
.ok_or_else(|| SierraTypeError::ValueTooLargeForType { val: felt.clone(), ty: "u128" })
}

pub fn stark_felt_to_u128(stark_felt: &StarkFelt) -> Result<u128, SierraTypeError> {
felt_to_u128(&stark_felt_to_felt(*stark_felt))
}

// TODO(barak, 01/10/2023): Move to starknet_api under StorageKey implementation.
pub fn next_storage_key(key: &StorageKey) -> Result<StorageKey, StarknetApiError> {
Ok(StorageKey(PatriciaKey::try_from(StarkFelt::from(
Expand Down Expand Up @@ -78,7 +83,7 @@ impl SierraType for SierraU128 {
}

fn from_storage(
state: &mut dyn StateReader,
state: &dyn StateReader,
contract_address: &ContractAddress,
key: &StorageKey,
) -> SierraTypeResult<Self> {
Expand Down Expand Up @@ -111,7 +116,7 @@ impl SierraType for SierraU256 {
}

fn from_storage(
state: &mut dyn StateReader,
state: &dyn StateReader,
contract_address: &ContractAddress,
key: &StorageKey,
) -> SierraTypeResult<Self> {
Expand Down
136 changes: 136 additions & 0 deletions crates/blockifier/src/concurrency/flow_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use rstest::rstest;
use starknet_api::core::{ContractAddress, PatriciaKey};
use starknet_api::hash::{StarkFelt, StarkHash};
use starknet_api::stark_felt;
use starknet_api::state::StorageKey;

use crate::abi::sierra_types::{stark_felt_to_u128, SierraType, SierraU128};
use crate::concurrency::scheduler::{Scheduler, Task, TransactionStatus};
use crate::concurrency::test_utils::{
contract_address, safe_versioned_state_for_testing, DEFAULT_CHUNK_SIZE,
};
use crate::state::cached_state::{ContractClassMapping, StateMaps};
use crate::storage_key;
use crate::test_utils::dict_state_reader::DictStateReader;

#[rstest]
fn scheduler_flow_test(
contract_address: ContractAddress,
// TODO(barak, 01/07/2024): Add a separate identical test and use the package loom.
#[values(1, 2, 4, 32, 64)] num_threads: u8,
) {
// Tests the Scheduler under a heavy load of validation aborts. To do that, we simulate multiple
// transactions with multiple threads, where every transaction depends on its predecessor. Each
// transaction sequentially advances a counter by reading the previous value and bumping it by
// 1.
let scheduler = Arc::new(Scheduler::new(DEFAULT_CHUNK_SIZE));
let storage_key = storage_key!(27_u8);
let versioned_state = safe_versioned_state_for_testing(DictStateReader::default());
let mut handles = vec![];

for _ in 0..num_threads {
let scheduler = Arc::clone(&scheduler);
let versioned_state = versioned_state.clone();
let handle = std::thread::spawn(move || {
let mut task = Task::NoTask;
loop {
if let Some(mut transaction_committer) = scheduler.try_enter_commit_phase() {

}
task = match task {
Task::ExecutionTask(tx_index) => {
let state_proxy = versioned_state.pin_version(tx_index);
// Read previous counter value.
let counter =
SierraU128::from_storage(&state_proxy, &contract_address, &storage_key)
.unwrap();
// Advance counter and write to storage.
let writes = state_maps_with_single_storage_entry(
contract_address,
storage_key,
counter.as_value() + 1,
);
state_proxy.apply_writes(&writes, &ContractClassMapping::default());
scheduler.finish_execution(tx_index);
Task::NoTask
}
Task::ValidationTask(tx_index) => {
// Access the state's inners to get the tx's written (incremented) value by
// accessing. If no one has written to (contract_address, storage_key), it
// evaluates to 1 since this is the first write to the counter.
let tx_written_value = stark_felt_to_u128(
versioned_state
.state()
.get_writes_of_index(tx_index)
.storage
.get(&(contract_address, storage_key))
.unwrap_or(&StarkFelt::ONE),
)
.unwrap();
// If tx number tx_index wrote tx_written_value then it must have read
// tx_written_value - 1.
let read_set = state_maps_with_single_storage_entry(
contract_address,
storage_key,
tx_written_value - 1,
);
let state_proxy = versioned_state.pin_version(tx_index);
let read_set_valid = state_proxy.validate_reads(&read_set);
let aborted = !read_set_valid && scheduler.try_validation_abort(tx_index);
if aborted {
let write_set = state_maps_with_single_storage_entry(
contract_address,
storage_key,
tx_written_value,
);
state_proxy.delete_writes(&write_set, &ContractClassMapping::default());
scheduler.finish_abort(tx_index)
} else {
Task::NoTask
}
}
Task::NoTask => scheduler.next_task(),
Task::Done => break,
}
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}

// execution_index and validation_index can be strictly greater than chunk_size. This is a side
// effect of using atomic variables instead of locks, which can encapsulate both the check
// (whether to increment the variable or not) and the incrementation in a scope where no other
// threads can access the variable.
assert!(scheduler.execution_index.load(Ordering::Acquire) >= DEFAULT_CHUNK_SIZE);
assert!(scheduler.validation_index.load(Ordering::Acquire) >= DEFAULT_CHUNK_SIZE);
// TODO(barak, 01/07/2024): Add to `commit_index` assertion once committing logic is added.
// assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), 0);
assert!(scheduler.done_marker.load(Ordering::Acquire));
for tx_index in 0..DEFAULT_CHUNK_SIZE {
// TODO(barak, 01/07/2024): Change to `Committed` status once committing logic is added.
assert_eq!(*scheduler.tx_statuses[tx_index].lock().unwrap(), TransactionStatus::Executed);
let storage_writes = versioned_state.state().get_writes_of_index(tx_index).storage;
assert_eq!(
*storage_writes.get(&(contract_address, storage_key)).unwrap(),
stark_felt!(format!("{:x}", tx_index + 1).as_str())
);
}
}

fn state_maps_with_single_storage_entry(
contract_address: ContractAddress,
storage_key: StorageKey,
value: u128,
) -> StateMaps {
StateMaps {
storage: HashMap::from([((contract_address, storage_key), stark_felt!(value))]),
..Default::default()
}
}
9 changes: 4 additions & 5 deletions crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::cmp::min;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Mutex, MutexGuard, TryLockError};

Expand All @@ -9,6 +8,10 @@ use crate::concurrency::TxIndex;
#[path = "scheduler_test.rs"]
pub mod test;

#[cfg(test)]
#[path = "flow_test.rs"]
pub mod flow_test;

pub struct TransactionCommitter<'a> {
scheduler: &'a Scheduler,
commit_index_guard: MutexGuard<'a, usize>,
Expand Down Expand Up @@ -84,10 +87,6 @@ impl Scheduler {
let index_to_validate = self.validation_index.load(Ordering::Acquire);
let index_to_execute = self.execution_index.load(Ordering::Acquire);

if min(index_to_validate, index_to_execute) >= self.chunk_size {
return Task::NoTask;
}

if index_to_validate < index_to_execute {
if let Some(tx_index) = self.next_version_to_validate() {
return Task::ValidationTask(tx_index);
Expand Down
3 changes: 1 addition & 2 deletions crates/blockifier/src/concurrency/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use pretty_assertions::assert_eq;
use rstest::rstest;

use crate::concurrency::scheduler::{Scheduler, Task, TransactionStatus};
use crate::concurrency::test_utils::DEFAULT_CHUNK_SIZE;
use crate::concurrency::TxIndex;
use crate::default_scheduler;

const DEFAULT_CHUNK_SIZE: usize = 100;

#[rstest]
fn test_new(#[values(0, 1, 32)] chunk_size: usize) {
let scheduler = Scheduler::new(chunk_size);
Expand Down
8 changes: 8 additions & 0 deletions crates/blockifier/src/concurrency/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use crate::test_utils::dict_state_reader::DictStateReader;
use crate::transaction::account_transaction::AccountTransaction;
use crate::transaction::transactions::ExecutableTransaction;

// Public Consts.

pub const DEFAULT_CHUNK_SIZE: usize = 64;

// Fixtures.

#[fixture]
Expand Down Expand Up @@ -54,13 +58,17 @@ macro_rules! default_scheduler {
};
}

// Concurrency constructors.

// TODO(meshi, 01/06/2024): Consider making this a macro.
pub fn safe_versioned_state_for_testing(
block_state: DictStateReader,
) -> ThreadSafeVersionedState<DictStateReader> {
ThreadSafeVersionedState::new(VersionedState::new(block_state))
}

// Utils.

// Note: this function does not mutate the state.
pub fn create_fee_transfer_call_info<S: StateReader>(
state: &mut CachedState<S>,
Expand Down
5 changes: 5 additions & 0 deletions crates/blockifier/src/concurrency/versioned_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ impl<S: StateReader> ThreadSafeVersionedState<S> {
pub fn pin_version(&self, tx_index: TxIndex) -> VersionedStateProxy<S> {
VersionedStateProxy { tx_index, state: self.0.clone() }
}

#[cfg(test)]
pub fn state(&self) -> LockedVersionedState<'_, S> {
self.0.lock().expect("Failed to acquire state lock.")
}
}

impl<S: StateReader> Clone for ThreadSafeVersionedState<S> {
Expand Down

0 comments on commit c153712

Please sign in to comment.