Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
feat(concurrency): add finish execution/validation methods to the sch…
Browse files Browse the repository at this point in the history
…eduler (#1800)
  • Loading branch information
avi-starkware authored May 6, 2024
1 parent dce31fb commit 9cf97eb
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 47 deletions.
125 changes: 85 additions & 40 deletions crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,36 +49,6 @@ impl Scheduler {
self.done_marker.load(Ordering::Acquire)
}

/// Checks if all transactions have been executed and validated.
fn check_done(&self) {
let observed_decrease_counter = self.decrease_counter.load(Ordering::Acquire);

if min(
self.validation_index.load(Ordering::Acquire),
self.execution_index.load(Ordering::Acquire),
) >= self.chunk_size
&& self.n_active_tasks.load(Ordering::Acquire) == 0
&& observed_decrease_counter == self.decrease_counter.load(Ordering::Acquire)
{
self.done_marker.store(true, Ordering::Release);
}
}

fn safe_decrement_n_active_tasks(&self) {
let previous_n_active_tasks = self.n_active_tasks.fetch_sub(1, Ordering::SeqCst);
assert!(previous_n_active_tasks > 0, "n_active_tasks underflow");
}

fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> {
self.tx_statuses[tx_index].lock().unwrap_or_else(|error| {
panic!(
"Status of transaction index {} is poisoned. Data: {:?}.",
tx_index,
*error.get_ref()
)
})
}

pub fn next_task(&self) -> Task {
if self.done() {
return Task::Done;
Expand All @@ -104,20 +74,92 @@ impl Scheduler {
Task::NoTask
}

// TODO(barak, 01/04/2024): Ensure documentation matches logic.
/// Updates the Scheduler that an execution task has been finished and triggers the creation of
/// new tasks accordingly: schedules validation for the current and higher transactions, if not
/// already scheduled.
pub fn finish_execution(&self) -> Task {
todo!()
pub fn finish_execution(&self, tx_index: TxIndex) {
self.set_executed_status(tx_index);
if self.validation_index.load(Ordering::Acquire) > tx_index {
self.decrease_validation_index(tx_index);
}
self.safe_decrement_n_active_tasks();
}

pub fn try_validation_abort(&self, tx_index: TxIndex) -> bool {
let mut status = self.lock_tx_status(tx_index);
if *status == TransactionStatus::Executed {
*status = TransactionStatus::Aborting;
return true;
}
false
}

// TODO(barak, 01/04/2024): Ensure documentation matches logic.
/// Updates the Scheduler that a validation task has been finished and triggers the creation of
/// new tasks in case of failure: schedules validation for higher transactions + re-executes the
/// current transaction (if ready).
pub fn finish_validation(&self) -> Task {
todo!()
pub fn finish_validation(&self, tx_index: TxIndex, aborted: bool) -> Option<Task> {
if aborted {
self.set_ready_status(tx_index);
if self.execution_index.load(Ordering::Acquire) > tx_index
&& self.try_incarnate(tx_index)
{
return Some(Task::ExecutionTask(tx_index));
}
}
self.safe_decrement_n_active_tasks();
None
}

/// Checks if all transactions have been executed and validated.
fn check_done(&self) {
let observed_decrease_counter = self.decrease_counter.load(Ordering::Acquire);

if min(
self.validation_index.load(Ordering::Acquire),
self.execution_index.load(Ordering::Acquire),
) >= self.chunk_size
&& self.n_active_tasks.load(Ordering::Acquire) == 0
&& observed_decrease_counter == self.decrease_counter.load(Ordering::Acquire)
{
self.done_marker.store(true, Ordering::Release);
}
}

fn safe_decrement_n_active_tasks(&self) {
let previous_n_active_tasks = self.n_active_tasks.fetch_sub(1, Ordering::SeqCst);
assert!(previous_n_active_tasks > 0, "n_active_tasks underflow");
}

fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> {
self.tx_statuses[tx_index].lock().unwrap_or_else(|error| {
panic!(
"Status of transaction index {} is poisoned. Data: {:?}.",
tx_index,
*error.get_ref()
)
})
}

fn set_executed_status(&self, tx_index: TxIndex) {
let mut status = self.lock_tx_status(tx_index);
assert_eq!(
*status,
TransactionStatus::Executing,
"Only executing transactions can gain status executed. Transaction {tx_index} is not \
executing. Transaction status: {status:?}."
);
*status = TransactionStatus::Executed;
}

fn set_ready_status(&self, tx_index: TxIndex) {
let mut status = self.lock_tx_status(tx_index);
assert_eq!(
*status,
TransactionStatus::Aborting,
"Only aborting transactions can be re-executed. Transaction {tx_index} is not \
aborting. Transaction status: {status:?}."
);
*status = TransactionStatus::ReadyToExecute;
}

fn decrease_validation_index(&self, target_index: TxIndex) {
Expand All @@ -137,16 +179,16 @@ impl Scheduler {
}

/// Updates a transaction's status to `Executing` if it is ready to execute.
fn try_incarnate(&self, tx_index: TxIndex) -> Option<TxIndex> {
fn try_incarnate(&self, tx_index: TxIndex) -> bool {
if tx_index < self.chunk_size {
let mut status = self.lock_tx_status(tx_index);
if *status == TransactionStatus::ReadyToExecute {
*status = TransactionStatus::Executing;
return Some(tx_index);
return true;
}
}
self.safe_decrement_n_active_tasks();
None
false
}

fn next_version_to_validate(&self) -> Option<TxIndex> {
Expand Down Expand Up @@ -175,7 +217,10 @@ impl Scheduler {
}
self.n_active_tasks.fetch_add(1, Ordering::SeqCst);
let index_to_execute = self.execution_index.fetch_add(1, Ordering::SeqCst);
self.try_incarnate(index_to_execute)
if self.try_incarnate(index_to_execute) {
return Some(index_to_execute);
}
None
}

#[cfg(test)]
Expand Down
14 changes: 7 additions & 7 deletions crates/blockifier/src/concurrency/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,20 @@ fn test_decrease_execution_index(#[case] target_index: TxIndex, #[case] executio
}

#[rstest]
#[case::ready_to_execute(0, TransactionStatus::ReadyToExecute, Some(0))]
#[case::executing(0, TransactionStatus::Executing, None)]
#[case::executed(0, TransactionStatus::Executed, None)]
#[case::aborting(0, TransactionStatus::Aborting, None)]
#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, TransactionStatus::ReadyToExecute, None)]
#[case::ready_to_execute(0, TransactionStatus::ReadyToExecute, true)]
#[case::executing(0, TransactionStatus::Executing, false)]
#[case::executed(0, TransactionStatus::Executed, false)]
#[case::aborting(0, TransactionStatus::Aborting, false)]
#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, TransactionStatus::ReadyToExecute, false)]
fn test_try_incarnate(
#[case] tx_index: TxIndex,
#[case] tx_status: TransactionStatus,
#[case] expected_output: Option<TxIndex>,
#[case] expected_output: bool,
) {
let scheduler = default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, n_active_tasks: 1);
scheduler.set_tx_status(tx_index, tx_status);
assert_eq!(scheduler.try_incarnate(tx_index), expected_output);
if expected_output.is_some() {
if expected_output {
assert_eq!(*scheduler.lock_tx_status(tx_index), TransactionStatus::Executing);
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), 1);
} else {
Expand Down

0 comments on commit 9cf97eb

Please sign in to comment.