Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Move rest of the locks to SchedulerState #148

Merged
merged 2 commits into from
Mar 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 26 additions & 48 deletions crates/node/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,20 @@ struct RunningTask {
}

struct SchedulerState {
mempool: Arc<RwLock<Mempool>>,
program_manager: ProgramManager,

pending_programs: VecDeque<(Hash, Hash)>,
running_tasks: HashMap<Hash, RunningTask>,
running_vms: HashMap<Hash, ProgramHandle>,
task_queue: HashMap<Hash, VecDeque<(Task, Instant)>>,
}

impl SchedulerState {
fn new() -> Self {
fn new(mempool: Arc<RwLock<Mempool>>, program_manager: ProgramManager) -> Self {
Self {
mempool,
program_manager,
pending_programs: VecDeque::new(),
running_tasks: HashMap::new(),
running_vms: HashMap::new(),
Expand All @@ -84,9 +89,7 @@ impl SchedulerState {
// It could use some re-structuring in order to not be so convoluted, but
// otherwise it seems to be okayish.
pub struct Scheduler {
mempool: Arc<RwLock<Mempool>>,
database: Arc<Database>,
program_manager: Mutex<ProgramManager>,
workflow_engine: Arc<WorkflowEngine>,
node_key: SecretKey,

Expand Down Expand Up @@ -183,13 +186,11 @@ impl Scheduler {
tx_sender: TxEventSender<TxResultSender>,
) -> Self {
Self {
mempool,
database,
program_manager: Mutex::new(program_manager),
workflow_engine,
node_key,

state: Arc::new(Mutex::new(SchedulerState::new())),
state: Arc::new(Mutex::new(SchedulerState::new(mempool, program_manager))),
data_directory,
http_download_host,
tx_sender,
Expand All @@ -205,10 +206,8 @@ impl Scheduler {
{
let mut state = self.state.lock().await;
while let Some((tx_hash, program_id)) = state.pending_programs.pop_front() {
match self
match state
.program_manager
.lock()
.await
.start_program(tx_hash, program_id, None)
.await
{
Expand Down Expand Up @@ -262,27 +261,23 @@ impl Scheduler {
}

// Push the task into program's work queue.
{
let mut state = self.state.lock().await;
if let Some(program_task_queue) = state.task_queue.get_mut(&task.tx) {
program_task_queue.push_back((task.clone(), Instant::now()));
} else {
let mut queue = VecDeque::new();
queue.push_back((task.clone(), Instant::now()));
state.task_queue.insert(task.tx, queue);
}
let mut state = self.state.lock().await;
if let Some(program_task_queue) = state.task_queue.get_mut(&task.tx) {
program_task_queue.push_back((task.clone(), Instant::now()));
} else {
let mut queue = VecDeque::new();
queue.push_back((task.clone(), Instant::now()));
state.task_queue.insert(task.tx, queue);
}

// Start the program.
match self
match state
.program_manager
.lock()
.await
.start_program(task.tx, task.program_id, None)
.await
{
Ok(p) => {
self.state.lock().await.running_vms.insert(task.tx, p);
state.running_vms.insert(task.tx, p);
}
Err(ref err) => {
if let Some(err) = err.downcast_ref::<ResourceError>() {
Expand All @@ -302,16 +297,18 @@ impl Scheduler {
// Drop the program's task queue. The program's not
// there so no need to have queue for it either.
let program_id = &task.program_id.clone();
self.state.lock().await.task_queue.remove(&task.tx);
state.task_queue.remove(&task.tx);
}
}
}
}
}

async fn pick_task(&self) -> Option<Task> {
let state = self.state.lock().await;

// Acquire write lock.
let mut mempool = self.mempool.write().await;
let mut mempool = state.mempool.write().await;

// Check if next tx is ready for processing?

Expand Down Expand Up @@ -409,13 +406,7 @@ impl Scheduler {
// - Remove the task from the `running_tasks`.
//
if let Some(vm_handle) = state.running_vms.remove(&task_tx) {
if let Err(err) = self
.program_manager
.lock()
.await
.stop_program(vm_handle)
.await
{
if let Err(err) = state.program_manager.stop_program(vm_handle).await {
tracing::error!("failed to stop VMvm_handle {}", err);
}
}
Expand Down Expand Up @@ -450,13 +441,7 @@ impl Scheduler {

if let Some(idx) = state.running_vms.get(&tx_hash) {
if let Some(vm_handle) = state.running_vms.remove(&tx_hash) {
if let Err(err) = self
.program_manager
.lock()
.await
.stop_program(vm_handle)
.await
{
if let Err(err) = state.program_manager.stop_program(vm_handle).await {
tracing::error!("failed to stop VM for Tx {}: {}", tx_hash, err);
}
}
Expand Down Expand Up @@ -489,13 +474,7 @@ impl TaskManager for Scheduler {
tracing::debug!("terminating VM running tx {}", tx_hash);

if let Some(program_handle) = state.running_vms.remove(&tx_hash) {
if let Err(err) = self
.program_manager
.lock()
.await
.stop_program(program_handle)
.await
{
if let Err(err) = state.program_manager.stop_program(program_handle).await {
tracing::error!("failed to stop program {}: {}", tx_hash, err);
}
}
Expand Down Expand Up @@ -628,9 +607,8 @@ impl TaskManager for Scheduler {

match state.running_vms.remove(&tx_hash) {
Some(program_handle) => {
self.program_manager
.lock()
.await
state
.program_manager
.stop_program(program_handle)
.await
.map_err(|err| format!("failed to stop program {}: {}", program, err))?;
Expand Down
Loading