Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
chore(concurrency): sleep if there are no tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoni-Starkware committed Jun 25, 2024
1 parent c3267c0 commit db34c50
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 29 deletions.
9 changes: 5 additions & 4 deletions crates/blockifier/src/concurrency/flow_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn scheduler_flow_test(
let scheduler = Arc::clone(&scheduler);
let versioned_state = versioned_state.clone();
let handle = std::thread::spawn(move || {
let mut task = Task::NoTask;
let mut task = Task::AskForTask;
loop {
if let Some(mut transaction_committer) = scheduler.try_enter_commit_phase() {
while let Some(tx_index) = transaction_committer.try_commit() {
Expand Down Expand Up @@ -70,7 +70,7 @@ fn scheduler_flow_test(
&HashMap::default(),
);
scheduler.finish_execution(tx_index);
Task::NoTask
Task::AskForTask
}
Task::ValidationTask(tx_index) => {
let state_proxy = versioned_state.pin_version(tx_index);
Expand All @@ -82,10 +82,11 @@ fn scheduler_flow_test(
state_proxy.delete_writes(&writes, &ContractClassMapping::default());
scheduler.finish_abort(tx_index)
} else {
Task::NoTask
Task::AskForTask
}
}
Task::NoTask => scheduler.next_task(),
Task::NoTaskAvailable => Task::AskForTask,
Task::AskForTask => scheduler.next_task(),
Task::Done => break,
}
}
Expand Down
12 changes: 9 additions & 3 deletions crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::min;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Mutex, MutexGuard, TryLockError};

Expand Down Expand Up @@ -91,6 +92,10 @@ impl Scheduler {
let index_to_validate = self.validation_index.load(Ordering::Acquire);
let index_to_execute = self.execution_index.load(Ordering::Acquire);

if min(index_to_validate, index_to_execute) >= self.chunk_size {
return Task::NoTaskAvailable;
}

if index_to_validate < index_to_execute {
if let Some(tx_index) = self.next_version_to_validate() {
return Task::ValidationTask(tx_index);
Expand All @@ -101,7 +106,7 @@ impl Scheduler {
return Task::ExecutionTask(tx_index);
}

Task::NoTask
Task::AskForTask
}

/// Updates the Scheduler that an execution task has been finished and triggers the creation of
Expand Down Expand Up @@ -129,7 +134,7 @@ impl Scheduler {
if self.execution_index.load(Ordering::Acquire) > tx_index && self.try_incarnate(tx_index) {
Task::ExecutionTask(tx_index)
} else {
Task::NoTask
Task::AskForTask
}
}

Expand Down Expand Up @@ -248,7 +253,8 @@ impl Scheduler {
pub enum Task {
ExecutionTask(TxIndex),
ValidationTask(TxIndex),
NoTask,
AskForTask,
NoTaskAvailable,
Done,
}

Expand Down
23 changes: 12 additions & 11 deletions crates/blockifier/src/concurrency/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,17 @@ fn test_lock_tx_status_poisoned() {

#[rstest]
#[case::done(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, TransactionStatus::Executed, Task::Done)]
#[case::no_task(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, TransactionStatus::Executed, Task::NoTask)]
#[case::no_task(
DEFAULT_CHUNK_SIZE,
DEFAULT_CHUNK_SIZE,
TransactionStatus::Executed,
Task::NoTaskAvailable
)]
#[case::no_task_as_validation_index_not_executed(
DEFAULT_CHUNK_SIZE,
0,
TransactionStatus::ReadyToExecute,
Task::NoTask
Task::AskForTask
)]
#[case::execution_task(0, 0, TransactionStatus::ReadyToExecute, Task::ExecutionTask(0))]
#[case::execution_task_as_validation_index_not_executed(
Expand Down Expand Up @@ -163,10 +168,8 @@ fn test_set_executed_status(#[case] tx_status: TransactionStatus) {
#[case::reduces_validation_index(0, 10)]
#[case::does_not_reduce_validation_index(10, 0)]
fn test_finish_execution(#[case] tx_index: TxIndex, #[case] validation_index: TxIndex) {
let scheduler = default_scheduler!(
chunk_size: DEFAULT_CHUNK_SIZE,
validation_index: validation_index,
);
let scheduler =
default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, validation_index: validation_index);
scheduler.set_tx_status(tx_index, TransactionStatus::Executing);
scheduler.finish_execution(tx_index);
assert_eq!(*scheduler.lock_tx_status(tx_index), TransactionStatus::Executed);
Expand Down Expand Up @@ -217,18 +220,16 @@ fn test_try_validation_abort(#[case] tx_status: TransactionStatus) {
#[case::returns_execution_task(0, 10)]
#[case::does_not_return_execution_task(10, 0)]
fn test_finish_abort(#[case] tx_index: TxIndex, #[case] execution_index: TxIndex) {
let scheduler = default_scheduler!(
chunk_size: DEFAULT_CHUNK_SIZE,
execution_index: execution_index,
);
let scheduler =
default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, execution_index: execution_index,);
scheduler.set_tx_status(tx_index, TransactionStatus::Aborting);
let result = scheduler.finish_abort(tx_index);
let new_status = scheduler.lock_tx_status(tx_index);
if execution_index > tx_index {
assert_eq!(result, Task::ExecutionTask(tx_index));
assert_eq!(*new_status, TransactionStatus::Executing);
} else {
assert_eq!(result, Task::NoTask);
assert_eq!(result, Task::AskForTask);
assert_eq!(*new_status, TransactionStatus::ReadyToExecute);
}
}
Expand Down
22 changes: 14 additions & 8 deletions crates/blockifier/src/concurrency/worker_logic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::Mutex;
use std::thread;
use std::time::Duration;

use num_traits::ToPrimitive;
use starknet_api::core::{ClassHash, ContractAddress};
Expand Down Expand Up @@ -88,16 +90,22 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
}

pub fn run(&self) {
let mut task = Task::NoTask;
let mut task = Task::AskForTask;
loop {
self.commit_while_possible();
task = match task {
Task::ExecutionTask(tx_index) => {
self.execute(tx_index);
Task::NoTask
Task::AskForTask
}
Task::ValidationTask(tx_index) => self.validate(tx_index),
Task::NoTask => self.scheduler.next_task(),
Task::NoTaskAvailable => {
// There's no available task at the moment; sleep for a bit to save CPU power.
// (since busy-looping might damage performance when using hyper-threads).
thread::sleep(Duration::from_micros(1));
Task::AskForTask
}
Task::AskForTask => self.scheduler.next_task(),
Task::Done => break,
};
}
Expand Down Expand Up @@ -171,7 +179,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
.delete_writes(&execution_output.writes, &execution_output.contract_classes);
self.scheduler.finish_abort(tx_index)
} else {
Task::NoTask
Task::AskForTask
}
}

Expand All @@ -190,11 +198,9 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
fn commit_tx(&self, tx_index: TxIndex) -> bool {
let execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index);
let execution_output_ref = execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR);
let reads = &execution_output_ref.reads;

let tx = &self.chunk[tx_index];
let mut tx_versioned_state = self.state.pin_version(tx_index);

let reads = &execution_output_ref.reads;
let reads_valid = tx_versioned_state.validate_reads(reads);

// First, re-validate the transaction.
Expand Down Expand Up @@ -224,11 +230,11 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
let writes = &execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).writes;
let reads = &execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).reads;
let mut tx_state_changes_keys = StateChanges::from(writes.diff(reads)).into_keys();
let tx_context = self.block_context.to_tx_context(tx);
let tx_result =
&mut execution_output.as_mut().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).result;

if let Ok(tx_execution_info) = tx_result.as_mut() {
let tx_context = self.block_context.to_tx_context(&self.chunk[tx_index]);
// Add the deleted sequencer balance key to the storage keys.
tx_state_changes_keys.update_sequencer_key_in_storage(&tx_context, tx_execution_info);
// Ask the bouncer if there is room for the transaction in the block.
Expand Down
6 changes: 3 additions & 3 deletions crates/blockifier/src/concurrency/worker_logic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ fn test_worker_validate() {
// Validate succeeds.
let tx_index = 0;
let next_task = worker_executor.validate(tx_index);
assert_eq!(next_task, Task::NoTask);
assert_eq!(next_task, Task::AskForTask);
// Verify writes exist in state.
assert_eq!(
safe_versioned_state
Expand Down Expand Up @@ -531,7 +531,7 @@ fn test_worker_validate() {
assert_eq!(*worker_executor.scheduler.get_tx_status(tx_index), TransactionStatus::Executing);

let next_task2 = worker_executor.validate(tx_index);
assert_eq!(next_task2, Task::NoTask);
assert_eq!(next_task2, Task::AskForTask);
}
use cairo_felt::Felt252;
use rstest::rstest;
Expand Down Expand Up @@ -674,7 +674,7 @@ fn test_deploy_before_declare() {

// Successful validation for transaction 1.
let next_task = worker_executor.validate(1);
assert_eq!(next_task, Task::NoTask);
assert_eq!(next_task, Task::AskForTask);
}

#[test]
Expand Down

0 comments on commit db34c50

Please sign in to comment.