Skip to content

Commit

Permalink
Merge pull request #56 from SOF3/deadlock-panic
Browse files Browse the repository at this point in the history
fix(scheduler): fix deadlock when a worker panics
  • Loading branch information
SOF3 authored Apr 10, 2024
2 parents 9ed893f + ba6e6a2 commit 0bdebcd
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 10 deletions.
103 changes: 93 additions & 10 deletions src/scheduler/executor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::atomic::{self, AtomicBool};
use std::time::Duration;

use parking_lot::{Condvar, Mutex, MutexGuard};

use super::planner::StealResult;
Expand Down Expand Up @@ -48,6 +51,7 @@ impl Executor {
ealloc_map: &mut ealloc::Map,
) {
let condvar = Condvar::new();
let had_panic = AtomicBool::new(false);

planner.get_mut().clone_from(topology.initial_planner());

Expand All @@ -66,7 +70,7 @@ impl Executor {
tracer.partition(node, partition);
}

let context = Context { topology, planner, condvar: &condvar };
let context = Context { topology, planner, condvar: &condvar, had_panic: &had_panic };

let prepare_ealloc_shards_context = tracer.start_prepare_ealloc_shards();
let mut ealloc_shards = ealloc_map.shards(self.concurrency + 1);
Expand Down Expand Up @@ -202,14 +206,23 @@ fn main_worker(
match planner_guard.steal_send(tracer, tracer::Thread::Main, context.topology) {
StealResult::CycleComplete => return,
StealResult::Pending => {
deadlock_counter.start_wait();
context.condvar.wait(&mut planner_guard);
match wait_for_task(
deadlock_counter,
&mut planner_guard,
context.condvar,
context.had_panic,
) {
TaskWait::HasTask => continue,
TaskWait::HadPanic => return,
}
}
StealResult::Ready(index) => {
MutexGuard::unlocked(&mut planner_guard, || {
let (debug_name, system) = send.state.get_send_system(index);

{
let mut panic_guard = context.panic_guard();

let mut system = system
.try_lock()
.expect("system should only be scheduled to one worker");
Expand All @@ -232,6 +245,8 @@ fn main_worker(
debug_name,
&mut **system,
);

panic_guard.done = true;
}
});

Expand All @@ -246,13 +261,22 @@ fn main_worker(
}
}
StealResult::Pending => {
deadlock_counter.start_wait();
context.condvar.wait(&mut planner_guard);
match wait_for_task(
deadlock_counter,
&mut planner_guard,
context.condvar,
context.had_panic,
) {
TaskWait::HasTask => continue,
TaskWait::HadPanic => return,
}
}
StealResult::Ready(index) => {
MutexGuard::unlocked(&mut planner_guard, || {
let (debug_name, system) = unsend.state.get_unsend_system_mut(index);

let mut panic_guard = context.panic_guard();

let run_context = tracer.start_run_unsendable(
tracer::Thread::Main,
Node::UnsendSystem(index),
Expand All @@ -273,6 +297,8 @@ fn main_worker(
debug_name,
&mut *system,
);

panic_guard.done = true;
});

planner_guard.complete(
Expand Down Expand Up @@ -304,14 +330,23 @@ fn threaded_worker(
match planner_guard.steal_send(tracer, thread, context.topology) {
StealResult::CycleComplete => return,
StealResult::Pending => {
deadlock_counter.start_wait();
context.condvar.wait(&mut planner_guard);
match wait_for_task(
deadlock_counter,
&mut planner_guard,
context.condvar,
context.had_panic,
) {
TaskWait::HasTask => continue,
TaskWait::HadPanic => return,
}
}
StealResult::Ready(index) => {
MutexGuard::unlocked(&mut planner_guard, || {
let (debug_name, system) = send.state.get_send_system(index);

{
let mut panic_guard = context.panic_guard();

let mut system = system
.try_lock()
.expect("system should only be scheduled to one worker");
Expand All @@ -329,6 +364,8 @@ fn threaded_worker(
debug_name,
&mut **system,
);

panic_guard.done = true;
}
});

Expand All @@ -344,6 +381,32 @@ fn threaded_worker(
}
}

enum TaskWait {
HasTask,
HadPanic,
}

fn wait_for_task(
deadlock_counter: &DeadlockCounter,
planner_guard: &mut MutexGuard<'_, Planner>,
condvar: &Condvar,
had_panic: &AtomicBool,
) -> TaskWait {
deadlock_counter.start_wait();

loop {
// wait for condvar to be notified, or poll for panic interrupts every second
let result = condvar.wait_for(planner_guard, Duration::from_secs(1));
if had_panic.load(atomic::Ordering::Acquire) {
return TaskWait::HadPanic;
}

if !result.timed_out() {
return TaskWait::HasTask;
}
}
}

#[cfg(debug_assertions)]
mod deadlock_counter {
use std::sync::atomic::{self, AtomicUsize};
Expand Down Expand Up @@ -381,9 +444,29 @@ pub(crate) use deadlock_counter::DeadlockCounter;

#[derive(Clone, Copy)]
struct Context<'t> {
topology: &'t Topology,
planner: &'t Mutex<Planner>,
condvar: &'t Condvar,
topology: &'t Topology,
planner: &'t Mutex<Planner>,
condvar: &'t Condvar,
had_panic: &'t AtomicBool,
}

impl<'t> Context<'t> {
fn panic_guard(&self) -> PanicGuard<'_> {
PanicGuard { done: false, had_panic: self.had_panic }
}
}

struct PanicGuard<'t> {
done: bool,
had_panic: &'t AtomicBool,
}

impl<'t> Drop for PanicGuard<'t> {
fn drop(&mut self) {
if !self.done {
self.had_panic.store(true, atomic::Ordering::Release);
}
}
}

#[derive(Clone, Copy)]
Expand Down
12 changes: 12 additions & 0 deletions src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,18 @@ fn test_zero_concurrency_single_unsend() {
);
}

#[test]
#[should_panic = "system panic"]
fn test_send_panic() {
bootstrap(3, || (), |_builder, [_], []| {}, || |_| panic!("system panic"), |_| {})
}

#[test]
#[should_panic = "system panic"]
fn test_unsend_panic() {
bootstrap(3, || (), |_builder, [], [_]| {}, || |_| panic!("system panic"), |_| {})
}

/// Bootstraps a test function for the scheduler.
///
/// This function performs the following:
Expand Down

0 comments on commit 0bdebcd

Please sign in to comment.