diff --git a/src/tasks/scheduler.rs b/src/tasks/scheduler.rs index df363677..ad0b8832 100644 --- a/src/tasks/scheduler.rs +++ b/src/tasks/scheduler.rs @@ -41,7 +41,9 @@ impl Scheduler { async move { while let Some(msg) = receiver.recv().await { tracing::trace!(?msg, "Executing a message"); - + if let SchedulerMessage::Close = msg { + break; + } if let Err(e) = scheduler.execute(msg) { tracing::error!(error = &*e, "An error occurred while handling a message"); } @@ -96,6 +98,14 @@ impl Scheduler { Ok(()) } } + + pub fn close(&self) { + self.channel.send(SchedulerMessage::Close).unwrap(); + } + + pub fn is_closed(&self) -> bool { + self.channel.is_closed() + } } // Safety: The only way our !Send messages will be sent to the scheduler is if @@ -136,6 +146,12 @@ impl SchedulerState { fn execute(&mut self, message: SchedulerMessage) -> Result<(), Error> { match message { + SchedulerMessage::Close => { + tracing::debug!("Scheduler received Close message"); + self.idle.clear(); + // self.busy.clear(); + Ok(()) + } SchedulerMessage::SpawnAsync(task) => { self.post_message(PostMessagePayload::Async(AsyncJob::Thunk(task))) } diff --git a/src/tasks/scheduler_message.rs b/src/tasks/scheduler_message.rs index 6ccf1bd3..e54c01db 100644 --- a/src/tasks/scheduler_message.rs +++ b/src/tasks/scheduler_message.rs @@ -20,6 +20,8 @@ use crate::{ #[derive(Derivative)] #[derivative(Debug)] pub(crate) enum SchedulerMessage { + /// Close the scheduler. + Close, /// Run a promise on a worker thread. SpawnAsync(#[derivative(Debug(format_with = "crate::utils::hidden"))] AsyncTask), /// Run a blocking operation on a worker thread. @@ -131,6 +133,7 @@ impl SchedulerMessage { pub(crate) fn into_js(self) -> Result { match self { + SchedulerMessage::Close => Serializer::new(consts::TYPE_CLOSE).finish(), SchedulerMessage::SpawnAsync(task) => Serializer::new(consts::TYPE_SPAWN_ASYNC) .boxed(consts::PTR, task) .finish(), @@ -177,6 +180,7 @@ impl SchedulerMessage { } mod consts { + pub const TYPE_CLOSE: &str = "close"; pub const TYPE_SPAWN_ASYNC: &str = "spawn-async"; pub const TYPE_SPAWN_BLOCKING: &str = "spawn-blocking"; pub const TYPE_WORKER_IDLE: &str = "worker-idle"; diff --git a/src/tasks/thread_pool.rs b/src/tasks/thread_pool.rs index c5bcb20d..584fbdc0 100644 --- a/src/tasks/thread_pool.rs +++ b/src/tasks/thread_pool.rs @@ -48,6 +48,10 @@ impl ThreadPool { pub(crate) fn send(&self, msg: SchedulerMessage) { self.scheduler.send(msg).expect("scheduler is dead"); } + + pub fn close(&self) { + self.scheduler.close(); + } } impl Drop for ThreadPool { diff --git a/src/tasks/worker_handle.rs b/src/tasks/worker_handle.rs index 098a48b1..02239954 100644 --- a/src/tasks/worker_handle.rs +++ b/src/tasks/worker_handle.rs @@ -94,14 +94,19 @@ fn on_message(msg: web_sys::MessageEvent, sender: &Scheduler, worker_id: u32) { let result = unsafe { WorkerMessage::try_from_js(msg.data()) } .map_err(|e| crate::utils::js_error(e.into())) .context("Unable to parse the worker message") - .and_then(|msg| { + .and_then(|base_msg| { tracing::trace!( - ?msg, + ?base_msg, worker.id = worker_id, "Received a message from worker" ); - let msg = match msg { + if sender.is_closed() && matches!(&base_msg, WorkerMessage::MarkIdle) { + tracing::warn!("Scheduler is closed, dropping message {:?}", msg); + return Ok(()); + } + + let msg = match base_msg { WorkerMessage::MarkBusy => SchedulerMessage::WorkerBusy { worker_id }, WorkerMessage::MarkIdle => SchedulerMessage::WorkerIdle { worker_id }, WorkerMessage::Scheduler(msg) => msg, diff --git a/src/wasmer.rs b/src/wasmer.rs index 82f3eea0..a7bc2c9d 100644 --- a/src/wasmer.rs +++ b/src/wasmer.rs @@ -20,10 +20,7 @@ use web_sys::{ReadableStream, WritableStream}; use webc::{indexmap::IndexMap, metadata::Command as MetadataCommand, wasmer_package::Package}; use crate::{ - instance::ExitCondition, - runtime::Runtime, - utils::{Error, GlobalScope}, - Instance, JsRuntime, SpawnOptions, + instance::ExitCondition, runtime::Runtime, tasks::ThreadPool, utils::{Error, GlobalScope}, Instance, JsRuntime, SpawnOptions }; /// A package from the Wasmer registry. @@ -218,7 +215,9 @@ pub struct Command { impl Command { pub async fn run(&self, options: Option) -> Result { // We set the default pool as it may be not set - let runtime = Arc::new(self.runtime.with_default_pool()); + let thread_pool = Arc::new(ThreadPool::new()); + let runtime = Arc::new(self.runtime.with_task_manager(thread_pool.clone())); + // let runtime = Arc::new(self.runtime.with_default_pool()); let pkg = Arc::clone(&self.pkg); let tasks = Arc::clone(runtime.task_manager()); @@ -237,6 +236,7 @@ impl Command { tasks.task_dedicated(Box::new(move || { let result = runner.run_command(&command_name, &pkg, runtime); let _ = sender.send(ExitCondition::from_result(result)); + thread_pool.close(); }))?; Ok(Instance {