From ca8c83df99f8bfb0e7a71930528a83b80b304745 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Mon, 18 Mar 2024 19:33:58 +0200 Subject: [PATCH] Move rest of the locks to `SchedulerState` `Scheduler` had at least one more deadlock hiding in it's code. Most probably it was the `ProgramManager`. Moving both, the `mempool` and `program_manager` to `SchedulerState` doesn't reveal the deadlock anymore. --- crates/node/src/scheduler/mod.rs | 74 +++++++++++--------------------- 1 file changed, 26 insertions(+), 48 deletions(-) diff --git a/crates/node/src/scheduler/mod.rs b/crates/node/src/scheduler/mod.rs index 2f4ba44d..5663177d 100644 --- a/crates/node/src/scheduler/mod.rs +++ b/crates/node/src/scheduler/mod.rs @@ -60,6 +60,9 @@ struct RunningTask { } struct SchedulerState { + mempool: Arc>, + program_manager: ProgramManager, + pending_programs: VecDeque<(Hash, Hash)>, running_tasks: HashMap, running_vms: HashMap, @@ -67,8 +70,10 @@ struct SchedulerState { } impl SchedulerState { - fn new() -> Self { + fn new(mempool: Arc>, program_manager: ProgramManager) -> Self { Self { + mempool, + program_manager, pending_programs: VecDeque::new(), running_tasks: HashMap::new(), running_vms: HashMap::new(), @@ -84,9 +89,7 @@ impl SchedulerState { // It could use some re-structuring in order to not be so convoluted, but // otherwise it seems to be okayish. pub struct Scheduler { - mempool: Arc>, database: Arc, - program_manager: Mutex, workflow_engine: Arc, node_key: SecretKey, @@ -183,13 +186,11 @@ impl Scheduler { tx_sender: TxEventSender, ) -> Self { Self { - mempool, database, - program_manager: Mutex::new(program_manager), workflow_engine, node_key, - state: Arc::new(Mutex::new(SchedulerState::new())), + state: Arc::new(Mutex::new(SchedulerState::new(mempool, program_manager))), data_directory, http_download_host, tx_sender, @@ -205,10 +206,8 @@ impl Scheduler { { let mut state = self.state.lock().await; while let Some((tx_hash, program_id)) = state.pending_programs.pop_front() { - match self + match state .program_manager - .lock() - .await .start_program(tx_hash, program_id, None) .await { @@ -262,27 +261,23 @@ impl Scheduler { } // Push the task into program's work queue. - { - let mut state = self.state.lock().await; - if let Some(program_task_queue) = state.task_queue.get_mut(&task.tx) { - program_task_queue.push_back((task.clone(), Instant::now())); - } else { - let mut queue = VecDeque::new(); - queue.push_back((task.clone(), Instant::now())); - state.task_queue.insert(task.tx, queue); - } + let mut state = self.state.lock().await; + if let Some(program_task_queue) = state.task_queue.get_mut(&task.tx) { + program_task_queue.push_back((task.clone(), Instant::now())); + } else { + let mut queue = VecDeque::new(); + queue.push_back((task.clone(), Instant::now())); + state.task_queue.insert(task.tx, queue); } // Start the program. - match self + match state .program_manager - .lock() - .await .start_program(task.tx, task.program_id, None) .await { Ok(p) => { - self.state.lock().await.running_vms.insert(task.tx, p); + state.running_vms.insert(task.tx, p); } Err(ref err) => { if let Some(err) = err.downcast_ref::() { @@ -302,7 +297,7 @@ impl Scheduler { // Drop the program's task queue. The program's not // there so no need to have queue for it either. let program_id = &task.program_id.clone(); - self.state.lock().await.task_queue.remove(&task.tx); + state.task_queue.remove(&task.tx); } } } @@ -310,8 +305,10 @@ impl Scheduler { } async fn pick_task(&self) -> Option { + let state = self.state.lock().await; + // Acquire write lock. - let mut mempool = self.mempool.write().await; + let mut mempool = state.mempool.write().await; // Check if next tx is ready for processing? @@ -409,13 +406,7 @@ impl Scheduler { // - Remove the task from the `running_tasks`. // if let Some(vm_handle) = state.running_vms.remove(&task_tx) { - if let Err(err) = self - .program_manager - .lock() - .await - .stop_program(vm_handle) - .await - { + if let Err(err) = state.program_manager.stop_program(vm_handle).await { tracing::error!("failed to stop VMvm_handle {}", err); } } @@ -450,13 +441,7 @@ impl Scheduler { if let Some(idx) = state.running_vms.get(&tx_hash) { if let Some(vm_handle) = state.running_vms.remove(&tx_hash) { - if let Err(err) = self - .program_manager - .lock() - .await - .stop_program(vm_handle) - .await - { + if let Err(err) = state.program_manager.stop_program(vm_handle).await { tracing::error!("failed to stop VM for Tx {}: {}", tx_hash, err); } } @@ -489,13 +474,7 @@ impl TaskManager for Scheduler { tracing::debug!("terminating VM running tx {}", tx_hash); if let Some(program_handle) = state.running_vms.remove(&tx_hash) { - if let Err(err) = self - .program_manager - .lock() - .await - .stop_program(program_handle) - .await - { + if let Err(err) = state.program_manager.stop_program(program_handle).await { tracing::error!("failed to stop program {}: {}", tx_hash, err); } } @@ -628,9 +607,8 @@ impl TaskManager for Scheduler { match state.running_vms.remove(&tx_hash) { Some(program_handle) => { - self.program_manager - .lock() - .await + state + .program_manager .stop_program(program_handle) .await .map_err(|err| format!("failed to stop program {}: {}", program, err))?;