Skip to content

Commit

Permalink
Close the scheduler on end
Browse files Browse the repository at this point in the history
  • Loading branch information
syrusakbary committed Oct 24, 2024
1 parent 6ba2d99 commit ae2c753
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 9 deletions.
18 changes: 17 additions & 1 deletion src/tasks/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ impl Scheduler {
async move {
while let Some(msg) = receiver.recv().await {
tracing::trace!(?msg, "Executing a message");

if let SchedulerMessage::Close = msg {
break;
}
if let Err(e) = scheduler.execute(msg) {
tracing::error!(error = &*e, "An error occurred while handling a message");
}
Expand Down Expand Up @@ -96,6 +98,14 @@ impl Scheduler {
Ok(())
}
}

pub fn close(&self) {
self.channel.send(SchedulerMessage::Close).unwrap();
}

pub fn is_closed(&self) -> bool {
self.channel.is_closed()
}
}

// Safety: The only way our !Send messages will be sent to the scheduler is if
Expand Down Expand Up @@ -136,6 +146,12 @@ impl SchedulerState {

fn execute(&mut self, message: SchedulerMessage) -> Result<(), Error> {
match message {
SchedulerMessage::Close => {
tracing::debug!("Scheduler received Close message");
self.idle.clear();
// self.busy.clear();
Ok(())
}
SchedulerMessage::SpawnAsync(task) => {
self.post_message(PostMessagePayload::Async(AsyncJob::Thunk(task)))
}
Expand Down
4 changes: 4 additions & 0 deletions src/tasks/scheduler_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::{
#[derive(Derivative)]
#[derivative(Debug)]
pub(crate) enum SchedulerMessage {
/// Close the scheduler.
Close,
/// Run a promise on a worker thread.
SpawnAsync(#[derivative(Debug(format_with = "crate::utils::hidden"))] AsyncTask),
/// Run a blocking operation on a worker thread.
Expand Down Expand Up @@ -131,6 +133,7 @@ impl SchedulerMessage {

pub(crate) fn into_js(self) -> Result<JsValue, Error> {
match self {
SchedulerMessage::Close => Serializer::new(consts::TYPE_CLOSE).finish(),
SchedulerMessage::SpawnAsync(task) => Serializer::new(consts::TYPE_SPAWN_ASYNC)
.boxed(consts::PTR, task)
.finish(),
Expand Down Expand Up @@ -177,6 +180,7 @@ impl SchedulerMessage {
}

mod consts {
pub const TYPE_CLOSE: &str = "close";
pub const TYPE_SPAWN_ASYNC: &str = "spawn-async";
pub const TYPE_SPAWN_BLOCKING: &str = "spawn-blocking";
pub const TYPE_WORKER_IDLE: &str = "worker-idle";
Expand Down
4 changes: 4 additions & 0 deletions src/tasks/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ impl ThreadPool {
pub(crate) fn send(&self, msg: SchedulerMessage) {
self.scheduler.send(msg).expect("scheduler is dead");
}

pub fn close(&self) {
self.scheduler.close();
}
}

impl Drop for ThreadPool {
Expand Down
11 changes: 8 additions & 3 deletions src/tasks/worker_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,19 @@ fn on_message(msg: web_sys::MessageEvent, sender: &Scheduler, worker_id: u32) {
let result = unsafe { WorkerMessage::try_from_js(msg.data()) }
.map_err(|e| crate::utils::js_error(e.into()))
.context("Unable to parse the worker message")
.and_then(|msg| {
.and_then(|base_msg| {
tracing::trace!(
?msg,
?base_msg,
worker.id = worker_id,
"Received a message from worker"
);

let msg = match msg {
if sender.is_closed() && matches!(&base_msg, WorkerMessage::MarkIdle) {
tracing::warn!("Scheduler is closed, dropping message {:?}", msg);
return Ok(());
}

let msg = match base_msg {
WorkerMessage::MarkBusy => SchedulerMessage::WorkerBusy { worker_id },
WorkerMessage::MarkIdle => SchedulerMessage::WorkerIdle { worker_id },
WorkerMessage::Scheduler(msg) => msg,
Expand Down
10 changes: 5 additions & 5 deletions src/wasmer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use web_sys::{ReadableStream, WritableStream};
use webc::{indexmap::IndexMap, metadata::Command as MetadataCommand, wasmer_package::Package};

use crate::{
instance::ExitCondition,
runtime::Runtime,
utils::{Error, GlobalScope},
Instance, JsRuntime, SpawnOptions,
instance::ExitCondition, runtime::Runtime, tasks::ThreadPool, utils::{Error, GlobalScope}, Instance, JsRuntime, SpawnOptions
};

/// A package from the Wasmer registry.
Expand Down Expand Up @@ -218,7 +215,9 @@ pub struct Command {
impl Command {
pub async fn run(&self, options: Option<SpawnOptions>) -> Result<Instance, Error> {
// We set the default pool as it may be not set
let runtime = Arc::new(self.runtime.with_default_pool());
let thread_pool = Arc::new(ThreadPool::new());
let runtime = Arc::new(self.runtime.with_task_manager(thread_pool.clone()));
// let runtime = Arc::new(self.runtime.with_default_pool());
let pkg = Arc::clone(&self.pkg);
let tasks = Arc::clone(runtime.task_manager());

Expand All @@ -237,6 +236,7 @@ impl Command {
tasks.task_dedicated(Box::new(move || {
let result = runner.run_command(&command_name, &pkg, runtime);
let _ = sender.send(ExitCondition::from_result(result));
thread_pool.close();
}))?;

Ok(Instance {
Expand Down

0 comments on commit ae2c753

Please sign in to comment.