diff --git a/crates/blockifier/src/concurrency/flow_test.rs b/crates/blockifier/src/concurrency/flow_test.rs index 0a4efe15e6..19989ae46f 100644 --- a/crates/blockifier/src/concurrency/flow_test.rs +++ b/crates/blockifier/src/concurrency/flow_test.rs @@ -37,7 +37,7 @@ fn scheduler_flow_test( let scheduler = Arc::clone(&scheduler); let versioned_state = versioned_state.clone(); let handle = std::thread::spawn(move || { - let mut task = Task::NoTask; + let mut task = Task::AskForTask; loop { if let Some(mut transaction_committer) = scheduler.try_enter_commit_phase() { while let Some(tx_index) = transaction_committer.try_commit() { @@ -70,7 +70,7 @@ fn scheduler_flow_test( &HashMap::default(), ); scheduler.finish_execution(tx_index); - Task::NoTask + Task::AskForTask } Task::ValidationTask(tx_index) => { let state_proxy = versioned_state.pin_version(tx_index); @@ -82,10 +82,11 @@ fn scheduler_flow_test( state_proxy.delete_writes(&writes, &ContractClassMapping::default()); scheduler.finish_abort(tx_index) } else { - Task::NoTask + Task::AskForTask } } - Task::NoTask => scheduler.next_task(), + Task::NoTaskAvailable => Task::AskForTask, + Task::AskForTask => scheduler.next_task(), Task::Done => break, } } diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index 0e37a70e23..172918b365 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -1,3 +1,4 @@ +use std::cmp::min; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Mutex, MutexGuard, TryLockError}; @@ -91,6 +92,10 @@ impl Scheduler { let index_to_validate = self.validation_index.load(Ordering::Acquire); let index_to_execute = self.execution_index.load(Ordering::Acquire); + if min(index_to_validate, index_to_execute) >= self.chunk_size { + return Task::NoTaskAvailable; + } + if index_to_validate < index_to_execute { if let Some(tx_index) = self.next_version_to_validate() { return Task::ValidationTask(tx_index); @@ -101,7 +106,7 @@ impl Scheduler { return Task::ExecutionTask(tx_index); } - Task::NoTask + Task::AskForTask } /// Updates the Scheduler that an execution task has been finished and triggers the creation of @@ -129,7 +134,7 @@ impl Scheduler { if self.execution_index.load(Ordering::Acquire) > tx_index && self.try_incarnate(tx_index) { Task::ExecutionTask(tx_index) } else { - Task::NoTask + Task::AskForTask } } @@ -248,7 +253,8 @@ impl Scheduler { pub enum Task { ExecutionTask(TxIndex), ValidationTask(TxIndex), - NoTask, + AskForTask, + NoTaskAvailable, Done, } diff --git a/crates/blockifier/src/concurrency/scheduler_test.rs b/crates/blockifier/src/concurrency/scheduler_test.rs index ff62d79bda..d2cf9b440e 100644 --- a/crates/blockifier/src/concurrency/scheduler_test.rs +++ b/crates/blockifier/src/concurrency/scheduler_test.rs @@ -47,12 +47,17 @@ fn test_lock_tx_status_poisoned() { #[rstest] #[case::done(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, TransactionStatus::Executed, Task::Done)] -#[case::no_task(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, TransactionStatus::Executed, Task::NoTask)] +#[case::no_task( + DEFAULT_CHUNK_SIZE, + DEFAULT_CHUNK_SIZE, + TransactionStatus::Executed, + Task::NoTaskAvailable +)] #[case::no_task_as_validation_index_not_executed( DEFAULT_CHUNK_SIZE, 0, TransactionStatus::ReadyToExecute, - Task::NoTask + Task::AskForTask )] #[case::execution_task(0, 0, TransactionStatus::ReadyToExecute, Task::ExecutionTask(0))] #[case::execution_task_as_validation_index_not_executed( @@ -163,10 +168,8 @@ fn test_set_executed_status(#[case] tx_status: TransactionStatus) { #[case::reduces_validation_index(0, 10)] #[case::does_not_reduce_validation_index(10, 0)] fn test_finish_execution(#[case] tx_index: TxIndex, #[case] validation_index: TxIndex) { - let scheduler = default_scheduler!( - chunk_size: DEFAULT_CHUNK_SIZE, - validation_index: validation_index, - ); + let scheduler = + default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, validation_index: validation_index); scheduler.set_tx_status(tx_index, TransactionStatus::Executing); scheduler.finish_execution(tx_index); assert_eq!(*scheduler.lock_tx_status(tx_index), TransactionStatus::Executed); @@ -217,10 +220,8 @@ fn test_try_validation_abort(#[case] tx_status: TransactionStatus) { #[case::returns_execution_task(0, 10)] #[case::does_not_return_execution_task(10, 0)] fn test_finish_abort(#[case] tx_index: TxIndex, #[case] execution_index: TxIndex) { - let scheduler = default_scheduler!( - chunk_size: DEFAULT_CHUNK_SIZE, - execution_index: execution_index, - ); + let scheduler = + default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, execution_index: execution_index,); scheduler.set_tx_status(tx_index, TransactionStatus::Aborting); let result = scheduler.finish_abort(tx_index); let new_status = scheduler.lock_tx_status(tx_index); @@ -228,7 +229,7 @@ fn test_finish_abort(#[case] tx_index: TxIndex, #[case] execution_index: TxIndex assert_eq!(result, Task::ExecutionTask(tx_index)); assert_eq!(*new_status, TransactionStatus::Executing); } else { - assert_eq!(result, Task::NoTask); + assert_eq!(result, Task::AskForTask); assert_eq!(*new_status, TransactionStatus::ReadyToExecute); } } diff --git a/crates/blockifier/src/concurrency/worker_logic.rs b/crates/blockifier/src/concurrency/worker_logic.rs index 58644cdfb8..9702be3b62 100644 --- a/crates/blockifier/src/concurrency/worker_logic.rs +++ b/crates/blockifier/src/concurrency/worker_logic.rs @@ -1,6 +1,8 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::Mutex; +use std::thread; +use std::time::Duration; use num_traits::ToPrimitive; use starknet_api::core::{ClassHash, ContractAddress}; @@ -88,16 +90,22 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { } pub fn run(&self) { - let mut task = Task::NoTask; + let mut task = Task::AskForTask; loop { self.commit_while_possible(); task = match task { Task::ExecutionTask(tx_index) => { self.execute(tx_index); - Task::NoTask + Task::AskForTask } Task::ValidationTask(tx_index) => self.validate(tx_index), - Task::NoTask => self.scheduler.next_task(), + Task::NoTaskAvailable => { + // There's no available task at the moment; sleep for a bit to save CPU power. + // (since busy-looping might damage performance when using hyper-threads). + thread::sleep(Duration::from_micros(1)); + Task::AskForTask + } + Task::AskForTask => self.scheduler.next_task(), Task::Done => break, }; } @@ -171,7 +179,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { .delete_writes(&execution_output.writes, &execution_output.contract_classes); self.scheduler.finish_abort(tx_index) } else { - Task::NoTask + Task::AskForTask } } @@ -190,11 +198,9 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { fn commit_tx(&self, tx_index: TxIndex) -> bool { let execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index); let execution_output_ref = execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR); + let reads = &execution_output_ref.reads; - let tx = &self.chunk[tx_index]; let mut tx_versioned_state = self.state.pin_version(tx_index); - - let reads = &execution_output_ref.reads; let reads_valid = tx_versioned_state.validate_reads(reads); // First, re-validate the transaction. @@ -224,11 +230,11 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { let writes = &execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).writes; let reads = &execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).reads; let mut tx_state_changes_keys = StateChanges::from(writes.diff(reads)).into_keys(); - let tx_context = self.block_context.to_tx_context(tx); let tx_result = &mut execution_output.as_mut().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).result; if let Ok(tx_execution_info) = tx_result.as_mut() { + let tx_context = self.block_context.to_tx_context(&self.chunk[tx_index]); // Add the deleted sequencer balance key to the storage keys. tx_state_changes_keys.update_sequencer_key_in_storage(&tx_context, tx_execution_info); // Ask the bouncer if there is room for the transaction in the block. diff --git a/crates/blockifier/src/concurrency/worker_logic_test.rs b/crates/blockifier/src/concurrency/worker_logic_test.rs index 2e63e56c2c..8a0dbf25b4 100644 --- a/crates/blockifier/src/concurrency/worker_logic_test.rs +++ b/crates/blockifier/src/concurrency/worker_logic_test.rs @@ -503,7 +503,7 @@ fn test_worker_validate() { // Validate succeeds. let tx_index = 0; let next_task = worker_executor.validate(tx_index); - assert_eq!(next_task, Task::NoTask); + assert_eq!(next_task, Task::AskForTask); // Verify writes exist in state. assert_eq!( safe_versioned_state @@ -531,7 +531,7 @@ fn test_worker_validate() { assert_eq!(*worker_executor.scheduler.get_tx_status(tx_index), TransactionStatus::Executing); let next_task2 = worker_executor.validate(tx_index); - assert_eq!(next_task2, Task::NoTask); + assert_eq!(next_task2, Task::AskForTask); } use cairo_felt::Felt252; use rstest::rstest; @@ -674,7 +674,7 @@ fn test_deploy_before_declare() { // Successful validation for transaction 1. let next_task = worker_executor.validate(1); - assert_eq!(next_task, Task::NoTask); + assert_eq!(next_task, Task::AskForTask); } #[test]