Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On Web, implement Send for EventLoopProxy #2737

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ And please only add new entries to the top of this list, right below the `# Unre
# 0.28.3

- Fix macOS memory leaks.
- On Web, `EventLoopProxy` now implements `Send`.

# 0.28.2

Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ features = [
'WheelEvent'
]

[target.'cfg(target_family = "wasm")'.dependencies.wasm-bindgen]
version = "0.2.45"
[target.'cfg(target_family = "wasm")'.dependencies]
wasm-bindgen = "0.2.45"
wasm-bindgen-futures = "0.4.31"

[target.'cfg(target_family = "wasm")'.dev-dependencies]
console_log = "0.2"
Expand Down
109 changes: 101 additions & 8 deletions src/platform_impl/web/event_loop/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,119 @@
use super::runner;
use crate::event::Event;
use std::future;
use std::rc::Rc;
use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError};
use std::sync::{Arc, Mutex};
use std::task::{Poll, Waker};

use crate::event_loop::EventLoopClosed;

pub struct EventLoopProxy<T: 'static> {
runner: runner::Shared<T>,
sender: AsyncSender<T>,
}

impl<T: 'static> EventLoopProxy<T> {
pub fn new(runner: runner::Shared<T>) -> Self {
Self { runner }
pub fn new(sender: AsyncSender<T>) -> Self {
Self { sender }
}

pub fn send_event(&self, event: T) -> Result<(), EventLoopClosed<T>> {
self.runner.send_event(Event::UserEvent(event));
Ok(())
match self.sender.send(event) {
Ok(()) => Ok(()),
Err(SendError(val)) => Err(EventLoopClosed(val)),
}
}
}

impl<T: 'static> Clone for EventLoopProxy<T> {
fn clone(&self) -> Self {
Self {
runner: self.runner.clone(),
sender: self.sender.clone(),
}
}
}

pub fn channel<T: 'static>() -> (AsyncSender<T>, AsyncReceiver<T>) {
let (sender, receiver) = mpsc::channel();
let waker = Arc::new(Mutex::new(None));

let sender = AsyncSender {
sender: Some(sender),
waker: Arc::clone(&waker),
};
let receiver = AsyncReceiver {
receiver: Rc::new(receiver),
waker,
};

(sender, receiver)
}

pub struct AsyncSender<T: 'static> {
sender: Option<Sender<T>>,
waker: Arc<Mutex<Option<Waker>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Mutex available on WASM? I always forget what synchronization primitives panic on block for non-atomic WASM or not. Although, it looks like that, in single threaded cases, it shouldn't block. Is this right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Mutex available on WASM?

Yes.

Although, it looks like that, in single threaded cases, it shouldn't block. Is this right?

For single threaded cases it shouldn't block indeed, but it can for multi-threaded cases. Considering we are just storing a single value and not holding a lock in practice it's not a problem.

I would still like to propose that we don't use a Mutex here and use AtomicWaker instead. The crate has no dependencies and is really small.

If the dependency is still a concern, I would like to point to my suggestion above: hiding the Send support behind the +atomics target feature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would support using atomic-waker, but I'd ask the rest of the maintainers first.

}

impl<T: 'static> AsyncSender<T> {
pub fn send(&self, event: T) -> Result<(), SendError<T>> {
self.sender.as_ref().unwrap().send(event)?;

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}

Ok(())
}
}

impl<T: 'static> Clone for AsyncSender<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
waker: self.waker.clone(),
}
}
}

impl<T> Drop for AsyncSender<T> {
fn drop(&mut self) {
// The corresponding `Receiver` is used to spawn a future that waits
// for messages in a loop. The future itself needs to be cleaned up
// somehow, which is signalled by dropping the last `Sender`. But it
// will do nothing if not woken up.

// We have to drop the potentially last `Sender` **before** checking if
// this is the last `Sender`. `Arc::strong_count` doesn't prevent
// races.
self.sender.take().unwrap();

// This one + the one held by the future.
if Arc::strong_count(&self.waker) == 2 {
if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
}
}
}

pub struct AsyncReceiver<T: 'static> {
receiver: Rc<Receiver<T>>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl<T: 'static> AsyncReceiver<T> {
pub async fn next(&mut self) -> Result<T, RecvError> {
future::poll_fn(|cx| match self.receiver.try_recv() {
Ok(event) => Poll::Ready(Ok(event)),
Err(TryRecvError::Empty) => {
*self.waker.lock().unwrap() = Some(cx.waker().clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use will_wake here in order to avoid unnecessarily cloning a waker twice.

Copy link
Member Author

@daxpedda daxpedda Mar 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about this! I also noticed that neither futures or atomic-waker use this this optimization in their AtomicWaker implementation. I opened an issue here: smol-rs/atomic-waker#11.

In our case this won't be possible because to avoid a racing condition we would have to hold the lock, which can lead to blocking in a multi-threaded environment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't you do:

{
    let mut waker = self.waker.lock().unwrap();
    if waker.map_or(true, |waker| !waker.will_wake(cx.waker())) {
        *waker = Some(cx.waker().clone());
    }
}

in order to drop the lock once the temporary scope ends?

Copy link
Member Author

@daxpedda daxpedda Mar 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't concerned with how to drop the lock, but the fact that we hold it in the first place. The assumption in play here is that if you only assign a value without any checks in-between, optimizations can prevent this to block the main thread because it won't yield in-between a simple assignment. Not really a provable assumption, nor one I'm certain about, but it's what we have right now without using AtomicWaker.

If the the worker yields between holding the lock and comparing and cloning the Waker, we are gonna have a problem.


match self.receiver.try_recv() {
Ok(event) => Poll::Ready(Ok(event)),
Err(TryRecvError::Empty) => Poll::Pending,
Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)),
}
}
Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)),
})
.await
}
}
28 changes: 26 additions & 2 deletions src/platform_impl/web/event_loop/runner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use super::proxy::{self, AsyncSender};
use super::EventLoopProxy;
use super::{super::ScaleChangeArgs, backend, state::State};
use crate::event::{Event, StartCause};
use crate::event_loop::ControlFlow;
Expand Down Expand Up @@ -32,6 +34,7 @@ pub struct Execution<T: 'static> {
destroy_pending: RefCell<VecDeque<WindowId>>,
scale_change_detector: RefCell<Option<backend::ScaleChangeDetector>>,
unload_event_handle: RefCell<Option<backend::UnloadEventHandle>>,
proxy_sender: AsyncSender<T>,
}

enum RunnerEnum<T: 'static> {
Expand Down Expand Up @@ -99,7 +102,9 @@ impl<T: 'static> Runner<T> {

impl<T: 'static> Shared<T> {
pub fn new() -> Self {
Shared(Rc::new(Execution {
let (proxy_sender, mut proxy_receiver) = proxy::channel();

let this = Shared(Rc::new(Execution {
runner: RefCell::new(RunnerEnum::Pending),
events: RefCell::new(VecDeque::new()),
id: RefCell::new(0),
Expand All @@ -108,7 +113,22 @@ impl<T: 'static> Shared<T> {
destroy_pending: RefCell::new(VecDeque::new()),
scale_change_detector: RefCell::new(None),
unload_event_handle: RefCell::new(None),
}))
proxy_sender,
}));

wasm_bindgen_futures::spawn_local({
let runner = this.clone();
async move {
while let Ok(value) = proxy_receiver.next().await {
runner.send_event(Event::UserEvent(value))
}

// An error was returned because the channel was closed, which
// happens when the event loop gets closed, so we can stop now.
}
});

this
}

pub fn add_canvas(&self, id: WindowId, canvas: &Rc<RefCell<backend::Canvas>>) {
Expand Down Expand Up @@ -155,6 +175,10 @@ impl<T: 'static> Shared<T> {
*id
}

pub fn create_proxy(&self) -> EventLoopProxy<T> {
EventLoopProxy::new(self.0.proxy_sender.clone())
}

pub fn request_redraw(&self, id: WindowId) {
self.0.redraw_pending.borrow_mut().insert(id);
}
Expand Down
2 changes: 1 addition & 1 deletion src/platform_impl/web/event_loop/window_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<T> EventLoopWindowTarget<T> {
}

pub fn proxy(&self) -> EventLoopProxy<T> {
EventLoopProxy::new(self.runner.clone())
self.runner.create_proxy()
}

pub fn run(&self, event_handler: Box<runner::EventHandler<T>>) {
Expand Down
1 change: 0 additions & 1 deletion tests/send_objects.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#[allow(dead_code)]
fn needs_send<T: Send>() {}

#[cfg(not(wasm_platform))]
#[test]
fn event_loop_proxy_send() {
#[allow(dead_code)]
Expand Down