-
Notifications
You must be signed in to change notification settings - Fork 932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
On Web, implement Send
for EventLoopProxy
#2737
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,119 @@ | ||
use super::runner; | ||
use crate::event::Event; | ||
use std::future; | ||
use std::rc::Rc; | ||
use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError}; | ||
use std::sync::{Arc, Mutex}; | ||
use std::task::{Poll, Waker}; | ||
|
||
use crate::event_loop::EventLoopClosed; | ||
|
||
pub struct EventLoopProxy<T: 'static> { | ||
runner: runner::Shared<T>, | ||
sender: AsyncSender<T>, | ||
} | ||
|
||
impl<T: 'static> EventLoopProxy<T> { | ||
pub fn new(runner: runner::Shared<T>) -> Self { | ||
Self { runner } | ||
pub fn new(sender: AsyncSender<T>) -> Self { | ||
Self { sender } | ||
} | ||
|
||
pub fn send_event(&self, event: T) -> Result<(), EventLoopClosed<T>> { | ||
self.runner.send_event(Event::UserEvent(event)); | ||
Ok(()) | ||
match self.sender.send(event) { | ||
Ok(()) => Ok(()), | ||
Err(SendError(val)) => Err(EventLoopClosed(val)), | ||
} | ||
} | ||
} | ||
|
||
impl<T: 'static> Clone for EventLoopProxy<T> { | ||
fn clone(&self) -> Self { | ||
Self { | ||
runner: self.runner.clone(), | ||
sender: self.sender.clone(), | ||
} | ||
} | ||
} | ||
|
||
pub fn channel<T: 'static>() -> (AsyncSender<T>, AsyncReceiver<T>) { | ||
let (sender, receiver) = mpsc::channel(); | ||
let waker = Arc::new(Mutex::new(None)); | ||
|
||
let sender = AsyncSender { | ||
sender: Some(sender), | ||
waker: Arc::clone(&waker), | ||
}; | ||
let receiver = AsyncReceiver { | ||
receiver: Rc::new(receiver), | ||
waker, | ||
}; | ||
|
||
(sender, receiver) | ||
} | ||
|
||
pub struct AsyncSender<T: 'static> { | ||
sender: Option<Sender<T>>, | ||
waker: Arc<Mutex<Option<Waker>>>, | ||
} | ||
|
||
impl<T: 'static> AsyncSender<T> { | ||
pub fn send(&self, event: T) -> Result<(), SendError<T>> { | ||
self.sender.as_ref().unwrap().send(event)?; | ||
|
||
if let Some(waker) = self.waker.lock().unwrap().take() { | ||
waker.wake(); | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl<T: 'static> Clone for AsyncSender<T> { | ||
fn clone(&self) -> Self { | ||
Self { | ||
sender: self.sender.clone(), | ||
waker: self.waker.clone(), | ||
} | ||
} | ||
} | ||
|
||
impl<T> Drop for AsyncSender<T> { | ||
fn drop(&mut self) { | ||
// The corresponding `Receiver` is used to spawn a future that waits | ||
// for messages in a loop. The future itself needs to be cleaned up | ||
// somehow, which is signalled by dropping the last `Sender`. But it | ||
// will do nothing if not woken up. | ||
|
||
// We have to drop the potentially last `Sender` **before** checking if | ||
// this is the last `Sender`. `Arc::strong_count` doesn't prevent | ||
// races. | ||
self.sender.take().unwrap(); | ||
|
||
// This one + the one held by the future. | ||
if Arc::strong_count(&self.waker) == 2 { | ||
if let Some(waker) = self.waker.lock().unwrap().take() { | ||
waker.wake(); | ||
} | ||
} | ||
} | ||
} | ||
|
||
pub struct AsyncReceiver<T: 'static> { | ||
receiver: Rc<Receiver<T>>, | ||
waker: Arc<Mutex<Option<Waker>>>, | ||
} | ||
|
||
impl<T: 'static> AsyncReceiver<T> { | ||
pub async fn next(&mut self) -> Result<T, RecvError> { | ||
future::poll_fn(|cx| match self.receiver.try_recv() { | ||
Ok(event) => Poll::Ready(Ok(event)), | ||
Err(TryRecvError::Empty) => { | ||
*self.waker.lock().unwrap() = Some(cx.waker().clone()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't know about this! I also noticed that neither In our case this won't be possible because to avoid a racing condition we would have to hold the lock, which can lead to blocking in a multi-threaded environment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Couldn't you do: {
let mut waker = self.waker.lock().unwrap();
if waker.map_or(true, |waker| !waker.will_wake(cx.waker())) {
*waker = Some(cx.waker().clone());
}
} in order to drop the lock once the temporary scope ends? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wasn't concerned with how to drop the lock, but the fact that we hold it in the first place. The assumption in play here is that if you only assign a value without any checks in-between, optimizations can prevent this to block the main thread because it won't yield in-between a simple assignment. Not really a provable assumption, nor one I'm certain about, but it's what we have right now without using If the the worker yields between holding the lock and comparing and cloning the |
||
|
||
match self.receiver.try_recv() { | ||
Ok(event) => Poll::Ready(Ok(event)), | ||
Err(TryRecvError::Empty) => Poll::Pending, | ||
Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), | ||
} | ||
} | ||
Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), | ||
}) | ||
.await | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is
Mutex
available on WASM? I always forget what synchronization primitives panic on block for non-atomic WASM or not. Although, it looks like that, in single threaded cases, it shouldn't block. Is this right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
For single threaded cases it shouldn't block indeed, but it can for multi-threaded cases. Considering we are just storing a single value and not holding a lock in practice it's not a problem.
I would still like to propose that we don't use a
Mutex
here and useAtomicWaker
instead. The crate has no dependencies and is really small.If the dependency is still a concern, I would like to point to my suggestion above: hiding the
Send
support behind the+atomics
target feature.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would support using
atomic-waker
, but I'd ask the rest of the maintainers first.