Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

chore(concurrency): add log messages to the concurrency module #1990

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/blockifier/src/concurrency/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,11 @@ pub fn lock_mutex_in_array<T: Debug>(array: &[Mutex<T>], tx_index: TxIndex) -> M
panic!("Cell of transaction index {} is poisoned. Data: {:?}.", tx_index, *error.get_ref())
})
}

#[macro_export]
macro_rules! debug_thread {
($($arg:tt)*) => {{
let thread_id = std::thread::current().id();
log::debug!("[{:?}] {}", thread_id, format!($($arg)*));
}}
}
10 changes: 10 additions & 0 deletions crates/blockifier/src/concurrency/worker_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::concurrency::utils::lock_mutex_in_array;
use crate::concurrency::versioned_state::ThreadSafeVersionedState;
use crate::concurrency::TxIndex;
use crate::context::BlockContext;
use crate::debug_thread;
use crate::execution::execution_utils::stark_felt_to_felt;
use crate::fee::fee_utils::get_sequencer_balance_keys;
use crate::state::cached_state::{
Expand Down Expand Up @@ -88,6 +89,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
}

pub fn run(&self) {
debug_thread!("Worker started.");
let mut task = Task::NoTask;
loop {
self.commit_while_possible();
Expand All @@ -105,17 +107,22 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {

fn commit_while_possible(&self) {
if let Some(mut transaction_committer) = self.scheduler.try_enter_commit_phase() {
// log_debug_thread!("Entered commit phase.");
while let Some(tx_index) = transaction_committer.try_commit() {
debug_thread!("Committing transaction index {}.", tx_index);
let commit_succeeded = self.commit_tx(tx_index);
if !commit_succeeded {
debug_thread!("No room for transaction index {}. Halting scheduler.", tx_index,);
transaction_committer.halt_scheduler();
}
}
}
}

fn execute(&self, tx_index: TxIndex) {
debug_thread!("Executing transaction index {}.", tx_index);
self.execute_tx(tx_index);
debug_thread!("Finished executing transaction index {}.", tx_index);
self.scheduler.finish_execution(tx_index)
}

Expand Down Expand Up @@ -159,6 +166,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
}

fn validate(&self, tx_index: TxIndex) -> Task {
debug_thread!("Validating transaction index {}.", tx_index);
let tx_versioned_state = self.state.pin_version(tx_index);
let execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index);
let execution_output = execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR);
Expand All @@ -167,10 +175,12 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {

let aborted = !reads_valid && self.scheduler.try_validation_abort(tx_index);
if aborted {
debug_thread!("Aborted transaction index {}.", tx_index);
tx_versioned_state
.delete_writes(&execution_output.writes, &execution_output.contract_classes);
self.scheduler.finish_abort(tx_index)
} else {
debug_thread!("Finished validating transaction index {} with no abort.", tx_index);
Task::NoTask
}
}
Expand Down
Loading