Web: async improvements (#3805)

- Internal: Fix dropping `Notifier` without sending a result causing `Future`s to never complete. This should never happen anyway, but now we get a panic instead of nothing if we hit a bug.
- Internal: Remove a bunch of `unwrap()`s that aren't required when correctly using `MainThreadMarker`.
- `Window::canvas()` is now able to return a reference instead of an owned value.

Extracted from #3801.
This commit is contained in:
daxpedda 2024-07-23 16:47:35 +02:00 committed by GitHub
parent 5ec934b1b0
commit ef580b817d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 238 additions and 219 deletions

View file

@ -10,13 +10,12 @@ pub struct Dispatcher<T: 'static>(Wrapper<T, Arc<Sender<Closure<T>>>, Closure<T>
struct Closure<T>(Box<dyn FnOnce(&T) + Send>);
impl<T> Dispatcher<T> {
#[track_caller]
pub fn new(main_thread: MainThreadMarker, value: T) -> Option<(Self, DispatchRunner<T>)> {
pub fn new(main_thread: MainThreadMarker, value: T) -> (Self, DispatchRunner<T>) {
let (sender, receiver) = channel::<Closure<T>>();
let sender = Arc::new(sender);
let receiver = Rc::new(receiver);
Wrapper::new(
let wrapper = Wrapper::new(
main_thread,
value,
|value, Closure(closure)| {
@ -29,8 +28,7 @@ impl<T> Dispatcher<T> {
move |value| async move {
while let Ok(Closure(closure)) = receiver.next().await {
// SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't
// do anything funny with it here. See
// `Self::queue()`.
// do anything funny with it here. See `Self::queue()`.
closure(value.borrow().as_ref().unwrap())
}
}
@ -41,25 +39,25 @@ impl<T> Dispatcher<T> {
// anything funny with it here. See `Self::queue()`.
sender.send(closure).unwrap()
},
)
.map(|wrapper| (Self(wrapper.clone()), DispatchRunner { wrapper, receiver }))
);
(Self(wrapper.clone()), DispatchRunner { wrapper, receiver })
}
pub fn value(&self) -> Option<Ref<'_, T>> {
self.0.value()
pub fn value(&self, main_thread: MainThreadMarker) -> Ref<'_, T> {
self.0.value(main_thread)
}
pub fn dispatch(&self, f: impl 'static + FnOnce(&T) + Send) {
if let Some(value) = self.0.value() {
f(&value)
if let Some(main_thread) = MainThreadMarker::new() {
f(&self.0.value(main_thread))
} else {
self.0.send(Closure(Box::new(f)))
}
}
pub fn queue<R: Send>(&self, f: impl FnOnce(&T) -> R + Send) -> R {
if let Some(value) = self.0.value() {
f(&value)
if let Some(main_thread) = MainThreadMarker::new() {
f(&self.0.value(main_thread))
} else {
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let closure = Box::new({
@ -98,13 +96,13 @@ pub struct DispatchRunner<T: 'static> {
}
impl<T> DispatchRunner<T> {
pub fn run(&self) {
pub fn run(&self, main_thread: MainThreadMarker) {
while let Some(Closure(closure)) =
self.receiver.try_recv().expect("should only be closed when `Dispatcher` is dropped")
{
// SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do anything
// funny with it here. See `Self::queue()`.
closure(&self.wrapper.value().expect("don't call this outside the main thread"))
closure(&self.wrapper.value(main_thread))
}
}
}

View file

@ -17,12 +17,6 @@ impl<T: Clone> Notifier<T> {
if self.0.value.set(value).is_err() {
unreachable!("value set before")
}
self.0.queue.close();
while let Ok(waker) = self.0.queue.pop() {
waker.wake()
}
}
pub fn notified(&self) -> Notified<T> {
@ -30,11 +24,21 @@ impl<T: Clone> Notifier<T> {
}
}
impl<T: Clone> Drop for Notifier<T> {
fn drop(&mut self) {
self.0.queue.close();
while let Ok(waker) = self.0.queue.pop() {
waker.wake()
}
}
}
#[derive(Clone, Debug)]
pub struct Notified<T: Clone>(Option<Arc<Inner<T>>>);
impl<T: Clone> Future for Notified<T> {
type Output = T;
type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.0.take().expect("`Receiver` polled after completion");
@ -54,14 +58,13 @@ impl<T: Clone> Future for Notified<T> {
}
}
let (Ok(Some(value)) | Err(Some(value))) = Arc::try_unwrap(this)
match Arc::try_unwrap(this)
.map(|mut inner| inner.value.take())
.map_err(|this| this.value.get().cloned())
else {
unreachable!("found no value despite being ready")
};
Poll::Ready(value)
{
Ok(Some(value)) | Err(Some(value)) => Poll::Ready(Some(value)),
_ => Poll::Ready(None),
}
}
}

View file

@ -19,8 +19,7 @@ struct Handler<T> {
struct Sender(Arc<Inner>);
impl<T> WakerSpawner<T> {
#[track_caller]
pub fn new(main_thread: MainThreadMarker, value: T, handler: fn(&T, bool)) -> Option<Self> {
pub fn new(main_thread: MainThreadMarker, value: T, handler: fn(&T, bool)) -> Self {
let inner = Arc::new(Inner {
awoken: AtomicBool::new(false),
waker: AtomicWaker::new(),
@ -31,7 +30,7 @@ impl<T> WakerSpawner<T> {
let sender = Sender(Arc::clone(&inner));
let wrapper = Wrapper::new(
Self(Wrapper::new(
main_thread,
handler,
|handler, _| {
@ -73,9 +72,7 @@ impl<T> WakerSpawner<T> {
inner.0.awoken.store(true, Ordering::Relaxed);
inner.0.waker.wake();
},
)?;
Some(Self(wrapper))
))
}
pub fn waker(&self) -> Waker<T> {

View file

@ -33,7 +33,6 @@ unsafe impl<V> Send for Value<V> {}
unsafe impl<V> Sync for Value<V> {}
impl<V, S: Clone + Send, E> Wrapper<V, S, E> {
#[track_caller]
pub fn new<R: Future<Output = ()>>(
_: MainThreadMarker,
value: V,
@ -41,7 +40,7 @@ impl<V, S: Clone + Send, E> Wrapper<V, S, E> {
receiver: impl 'static + FnOnce(Arc<RefCell<Option<V>>>) -> R,
sender_data: S,
sender_handler: fn(&S, E),
) -> Option<Self> {
) -> Self {
let value = Arc::new(RefCell::new(Some(value)));
wasm_bindgen_futures::spawn_local({
@ -52,12 +51,7 @@ impl<V, S: Clone + Send, E> Wrapper<V, S, E> {
}
});
Some(Self {
value: Value { value, local: PhantomData },
handler,
sender_data,
sender_handler,
})
Self { value: Value { value, local: PhantomData }, handler, sender_data, sender_handler }
}
pub fn send(&self, event: E) {
@ -68,9 +62,8 @@ impl<V, S: Clone + Send, E> Wrapper<V, S, E> {
}
}
pub fn value(&self) -> Option<Ref<'_, V>> {
MainThreadMarker::new()
.map(|_| Ref::map(self.value.value.borrow(), |value| value.as_ref().unwrap()))
pub fn value(&self, _: MainThreadMarker) -> Ref<'_, V> {
Ref::map(self.value.value.borrow(), |value| value.as_ref().unwrap())
}
pub fn with_sender_data<T>(&self, f: impl FnOnce(&S) -> T) -> T {