From 9ee9a5eb49066aae712c4ef4e75a2f8f220d5e1d Mon Sep 17 00:00:00 2001 From: dAxpeDDa Date: Mon, 13 Mar 2023 14:05:32 +0100 Subject: [PATCH 1/3] On Web, `EventLoopProxy` now implements `Send` --- CHANGELOG.md | 1 + Cargo.toml | 5 +- src/platform_impl/web/event_loop/proxy.rs | 86 +++++++++++++++++-- src/platform_impl/web/event_loop/runner.rs | 28 +++++- .../web/event_loop/window_target.rs | 2 +- tests/send_objects.rs | 1 - 6 files changed, 109 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9711dde3c8..de204c044e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ And please only add new entries to the top of this list, right below the `# Unre # 0.28.3 - Fix macOS memory leaks. +- On Web, `EventLoopProxy` now implements `Send`. # 0.28.2 diff --git a/Cargo.toml b/Cargo.toml index c86287cf82..fb293d66f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -148,8 +148,9 @@ features = [ 'WheelEvent' ] -[target.'cfg(target_family = "wasm")'.dependencies.wasm-bindgen] -version = "0.2.45" +[target.'cfg(target_family = "wasm")'.dependencies] +wasm-bindgen = "0.2.45" +wasm-bindgen-futures = "0.4.31" [target.'cfg(target_family = "wasm")'.dev-dependencies] console_log = "0.2" diff --git a/src/platform_impl/web/event_loop/proxy.rs b/src/platform_impl/web/event_loop/proxy.rs index 1c70992317..f3595cfb92 100644 --- a/src/platform_impl/web/event_loop/proxy.rs +++ b/src/platform_impl/web/event_loop/proxy.rs @@ -1,26 +1,96 @@ -use super::runner; -use crate::event::Event; +use std::future::Future; +use std::pin::Pin; +use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; + use crate::event_loop::EventLoopClosed; pub struct EventLoopProxy { - runner: runner::Shared, + sender: AsyncSender, } impl EventLoopProxy { - pub fn new(runner: runner::Shared) -> Self { - Self { runner } + pub fn new(sender: AsyncSender) -> Self { + Self { sender } } pub fn send_event(&self, event: T) -> Result<(), EventLoopClosed> { - self.runner.send_event(Event::UserEvent(event)); - Ok(()) + match self.sender.send(event) { + Ok(()) => Ok(()), + Err(SendError(val)) => Err(EventLoopClosed(val)), + } } } impl Clone for EventLoopProxy { fn clone(&self) -> Self { Self { - runner: self.runner.clone(), + sender: self.sender.clone(), + } + } +} + +pub fn channel() -> (AsyncSender, AsyncReceiver) { + let (sender, receiver) = mpsc::channel(); + let waker = Arc::new(Mutex::new(None)); + + let sender = AsyncSender { + sender, + waker: Arc::clone(&waker), + }; + let receiver = AsyncReceiver { receiver, waker }; + + (sender, receiver) +} + +pub struct AsyncSender { + sender: Sender, + waker: Arc>>, +} + +impl AsyncSender { + pub fn send(&self, event: T) -> Result<(), SendError> { + self.sender.send(event)?; + + if let Some(waker) = self.waker.lock().unwrap().take() { + waker.wake(); + } + + Ok(()) + } +} + +impl Clone for AsyncSender { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + waker: self.waker.clone(), + } + } +} + +pub struct AsyncReceiver { + receiver: Receiver, + waker: Arc>>, +} + +impl Future for AsyncReceiver { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.receiver.try_recv() { + Ok(event) => Poll::Ready(Ok(event)), + Err(TryRecvError::Empty) => { + *self.waker.lock().unwrap() = Some(cx.waker().clone()); + + match self.receiver.try_recv() { + Ok(event) => Poll::Ready(Ok(event)), + Err(TryRecvError::Empty) => Poll::Pending, + Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), + } + } + Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), } } } diff --git a/src/platform_impl/web/event_loop/runner.rs b/src/platform_impl/web/event_loop/runner.rs index bdb68ff466..7bcc48f5b3 100644 --- a/src/platform_impl/web/event_loop/runner.rs +++ b/src/platform_impl/web/event_loop/runner.rs @@ -1,3 +1,5 @@ +use super::proxy::{self, AsyncSender}; +use super::EventLoopProxy; use super::{super::ScaleChangeArgs, backend, state::State}; use crate::event::{Event, StartCause}; use crate::event_loop::ControlFlow; @@ -32,6 +34,7 @@ pub struct Execution { destroy_pending: RefCell>, scale_change_detector: RefCell>, unload_event_handle: RefCell>, + proxy_sender: AsyncSender, } enum RunnerEnum { @@ -99,7 +102,9 @@ impl Runner { impl Shared { pub fn new() -> Self { - Shared(Rc::new(Execution { + let (proxy_sender, mut proxy_receiver) = proxy::channel(); + + let this = Shared(Rc::new(Execution { runner: RefCell::new(RunnerEnum::Pending), events: RefCell::new(VecDeque::new()), id: RefCell::new(0), @@ -108,7 +113,22 @@ impl Shared { destroy_pending: RefCell::new(VecDeque::new()), scale_change_detector: RefCell::new(None), unload_event_handle: RefCell::new(None), - })) + proxy_sender, + })); + + wasm_bindgen_futures::spawn_local({ + let runner = this.clone(); + async move { + while let Ok(value) = (&mut proxy_receiver).await { + runner.send_event(Event::UserEvent(value)) + } + + // An error was returned because the channel was closed, which + // happens when the event loop gets closed, so we can stop now. + } + }); + + this } pub fn add_canvas(&self, id: WindowId, canvas: &Rc>) { @@ -155,6 +175,10 @@ impl Shared { *id } + pub fn create_proxy(&self) -> EventLoopProxy { + EventLoopProxy::new(self.0.proxy_sender.clone()) + } + pub fn request_redraw(&self, id: WindowId) { self.0.redraw_pending.borrow_mut().insert(id); } diff --git a/src/platform_impl/web/event_loop/window_target.rs b/src/platform_impl/web/event_loop/window_target.rs index 17aafd946d..1c4d6f8be8 100644 --- a/src/platform_impl/web/event_loop/window_target.rs +++ b/src/platform_impl/web/event_loop/window_target.rs @@ -36,7 +36,7 @@ impl EventLoopWindowTarget { } pub fn proxy(&self) -> EventLoopProxy { - EventLoopProxy::new(self.runner.clone()) + self.runner.create_proxy() } pub fn run(&self, event_handler: Box>) { diff --git a/tests/send_objects.rs b/tests/send_objects.rs index 15fd879339..bda19c2cb3 100644 --- a/tests/send_objects.rs +++ b/tests/send_objects.rs @@ -1,7 +1,6 @@ #[allow(dead_code)] fn needs_send() {} -#[cfg(not(wasm_platform))] #[test] fn event_loop_proxy_send() { #[allow(dead_code)] From f579aeee224d3a4efad3620efbdf746b80d39c09 Mon Sep 17 00:00:00 2001 From: dAxpeDDa Date: Wed, 15 Mar 2023 17:23:36 +0100 Subject: [PATCH 2/3] Use more idiomatic interface for `AsyncReceiver` --- src/platform_impl/web/event_loop/proxy.rs | 24 ++++++++++++---------- src/platform_impl/web/event_loop/runner.rs | 2 +- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/platform_impl/web/event_loop/proxy.rs b/src/platform_impl/web/event_loop/proxy.rs index f3595cfb92..545b55e89f 100644 --- a/src/platform_impl/web/event_loop/proxy.rs +++ b/src/platform_impl/web/event_loop/proxy.rs @@ -1,8 +1,8 @@ -use std::future::Future; -use std::pin::Pin; +use std::future; +use std::rc::Rc; use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError}; use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll, Waker}; +use std::task::{Poll, Waker}; use crate::event_loop::EventLoopClosed; @@ -39,7 +39,10 @@ pub fn channel() -> (AsyncSender, AsyncReceiver) { sender, waker: Arc::clone(&waker), }; - let receiver = AsyncReceiver { receiver, waker }; + let receiver = AsyncReceiver { + receiver: Rc::new(receiver), + waker, + }; (sender, receiver) } @@ -71,15 +74,13 @@ impl Clone for AsyncSender { } pub struct AsyncReceiver { - receiver: Receiver, + receiver: Rc>, waker: Arc>>, } -impl Future for AsyncReceiver { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.receiver.try_recv() { +impl AsyncReceiver { + pub async fn next(&mut self) -> Result { + future::poll_fn(|cx| match self.receiver.try_recv() { Ok(event) => Poll::Ready(Ok(event)), Err(TryRecvError::Empty) => { *self.waker.lock().unwrap() = Some(cx.waker().clone()); @@ -91,6 +92,7 @@ impl Future for AsyncReceiver { } } Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), - } + }) + .await } } diff --git a/src/platform_impl/web/event_loop/runner.rs b/src/platform_impl/web/event_loop/runner.rs index 7bcc48f5b3..b2fc56dff5 100644 --- a/src/platform_impl/web/event_loop/runner.rs +++ b/src/platform_impl/web/event_loop/runner.rs @@ -119,7 +119,7 @@ impl Shared { wasm_bindgen_futures::spawn_local({ let runner = this.clone(); async move { - while let Ok(value) = (&mut proxy_receiver).await { + while let Ok(value) = proxy_receiver.next().await { runner.send_event(Event::UserEvent(value)) } From 105fd3aa8f725a7aea89399e2ee32e3326a0767c Mon Sep 17 00:00:00 2001 From: dAxpeDDa Date: Fri, 17 Mar 2023 00:05:38 +0100 Subject: [PATCH 3/3] Make sure the spawned `Future` is cleaned up --- src/platform_impl/web/event_loop/proxy.rs | 27 ++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/src/platform_impl/web/event_loop/proxy.rs b/src/platform_impl/web/event_loop/proxy.rs index 545b55e89f..901f15ffe1 100644 --- a/src/platform_impl/web/event_loop/proxy.rs +++ b/src/platform_impl/web/event_loop/proxy.rs @@ -36,7 +36,7 @@ pub fn channel() -> (AsyncSender, AsyncReceiver) { let waker = Arc::new(Mutex::new(None)); let sender = AsyncSender { - sender, + sender: Some(sender), waker: Arc::clone(&waker), }; let receiver = AsyncReceiver { @@ -48,13 +48,13 @@ pub fn channel() -> (AsyncSender, AsyncReceiver) { } pub struct AsyncSender { - sender: Sender, + sender: Option>, waker: Arc>>, } impl AsyncSender { pub fn send(&self, event: T) -> Result<(), SendError> { - self.sender.send(event)?; + self.sender.as_ref().unwrap().send(event)?; if let Some(waker) = self.waker.lock().unwrap().take() { waker.wake(); @@ -73,6 +73,27 @@ impl Clone for AsyncSender { } } +impl Drop for AsyncSender { + fn drop(&mut self) { + // The corresponding `Receiver` is used to spawn a future that waits + // for messages in a loop. The future itself needs to be cleaned up + // somehow, which is signalled by dropping the last `Sender`. But it + // will do nothing if not woken up. + + // We have to drop the potentially last `Sender` **before** checking if + // this is the last `Sender`. `Arc::strong_count` doesn't prevent + // races. + self.sender.take().unwrap(); + + // This one + the one held by the future. + if Arc::strong_count(&self.waker) == 2 { + if let Some(waker) = self.waker.lock().unwrap().take() { + waker.wake(); + } + } + } +} + pub struct AsyncReceiver { receiver: Rc>, waker: Arc>>,