Move Web backend to winit-web

This commit is contained in:
Mads Marquart 2025-05-16 01:23:35 +02:00 committed by Kirill Chibisov
parent 2d4b9938f0
commit e542a78deb
50 changed files with 259 additions and 273 deletions

View file

@ -0,0 +1,96 @@
use std::error::Error;
use std::fmt::{self, Display, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use pin_project::pin_project;
use super::AtomicWaker;
#[pin_project]
pub struct Abortable<F: Future> {
#[pin]
future: F,
shared: Arc<Shared>,
}
impl<F: Future> Abortable<F> {
pub fn new(handle: AbortHandle, future: F) -> Self {
Self { future, shared: handle.0 }
}
}
impl<F: Future> Future for Abortable<F> {
type Output = Result<F::Output, Aborted>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.shared.aborted.load(Ordering::Relaxed) {
return Poll::Ready(Err(Aborted));
}
if let Poll::Ready(value) = self.as_mut().project().future.poll(cx) {
return Poll::Ready(Ok(value));
}
self.shared.waker.register(cx.waker());
if self.shared.aborted.load(Ordering::Relaxed) {
return Poll::Ready(Err(Aborted));
}
Poll::Pending
}
}
#[derive(Debug)]
struct Shared {
waker: AtomicWaker,
aborted: AtomicBool,
}
#[derive(Clone, Debug)]
pub struct AbortHandle(Arc<Shared>);
impl AbortHandle {
pub fn new() -> Self {
Self(Arc::new(Shared { waker: AtomicWaker::new(), aborted: AtomicBool::new(false) }))
}
pub fn abort(&self) {
self.0.aborted.store(true, Ordering::Relaxed);
self.0.waker.wake()
}
}
#[derive(Debug)]
pub struct DropAbortHandle(AbortHandle);
impl DropAbortHandle {
pub fn new(handle: AbortHandle) -> Self {
Self(handle)
}
pub fn handle(&self) -> AbortHandle {
self.0.clone()
}
}
impl Drop for DropAbortHandle {
fn drop(&mut self) {
self.0.abort()
}
}
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub struct Aborted;
impl Display for Aborted {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "`Abortable` future has been aborted")
}
}
impl Error for Aborted {}

View file

@ -0,0 +1,35 @@
use std::cell::RefCell;
use std::ops::Deref;
use std::task::Waker;
#[derive(Debug)]
pub struct AtomicWaker(RefCell<Option<Waker>>);
impl AtomicWaker {
pub const fn new() -> Self {
Self(RefCell::new(None))
}
pub fn register(&self, waker: &Waker) {
let mut this = self.0.borrow_mut();
if let Some(old_waker) = this.deref() {
if old_waker.will_wake(waker) {
return;
}
}
*this = Some(waker.clone());
}
pub fn wake(&self) {
if let Some(waker) = self.0.borrow_mut().take() {
waker.wake();
}
}
}
// SAFETY: Wasm without the `atomics` target feature is single-threaded.
unsafe impl Send for AtomicWaker {}
// SAFETY: Wasm without the `atomics` target feature is single-threaded.
unsafe impl Sync for AtomicWaker {}

View file

@ -0,0 +1,81 @@
use std::future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, RecvError, SendError, TryRecvError};
use std::sync::Arc;
use std::task::Poll;
use super::AtomicWaker;
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (sender, receiver) = mpsc::channel();
let shared = Arc::new(Shared { closed: AtomicBool::new(false), waker: AtomicWaker::new() });
let sender = Sender { sender, shared: Arc::clone(&shared) };
let receiver = Receiver { receiver, shared };
(sender, receiver)
}
pub struct Sender<T> {
sender: mpsc::Sender<T>,
shared: Arc<Shared>,
}
impl<T> Sender<T> {
pub fn send(&self, event: T) -> Result<(), SendError<T>> {
self.sender.send(event)?;
self.shared.waker.wake();
Ok(())
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.shared.closed.store(true, Ordering::Relaxed);
self.shared.waker.wake();
}
}
pub struct Receiver<T> {
receiver: mpsc::Receiver<T>,
shared: Arc<Shared>,
}
impl<T> Receiver<T> {
pub async fn next(&self) -> Result<T, RecvError> {
future::poll_fn(|cx| match self.receiver.try_recv() {
Ok(event) => Poll::Ready(Ok(event)),
Err(TryRecvError::Empty) => {
self.shared.waker.register(cx.waker());
match self.receiver.try_recv() {
Ok(event) => Poll::Ready(Ok(event)),
Err(TryRecvError::Empty) => {
if self.shared.closed.load(Ordering::Relaxed) {
Poll::Ready(Err(RecvError))
} else {
Poll::Pending
}
},
Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)),
}
},
Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)),
})
.await
}
pub fn try_recv(&self) -> Result<Option<T>, RecvError> {
match self.receiver.try_recv() {
Ok(value) => Ok(Some(value)),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(RecvError),
}
}
}
struct Shared {
closed: AtomicBool,
waker: AtomicWaker,
}

View file

@ -0,0 +1,52 @@
use std::cell::{Cell, RefCell};
#[derive(Debug)]
pub struct ConcurrentQueue<T> {
queue: RefCell<Vec<T>>,
closed: Cell<bool>,
}
pub enum PushError<T> {
#[allow(dead_code)]
Full(T),
Closed(T),
}
pub enum PopError {
Empty,
Closed,
}
impl<T> ConcurrentQueue<T> {
pub fn unbounded() -> Self {
Self { queue: RefCell::new(Vec::new()), closed: Cell::new(false) }
}
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
if self.closed.get() {
return Err(PushError::Closed(value));
}
self.queue.borrow_mut().push(value);
Ok(())
}
pub fn pop(&self) -> Result<T, PopError> {
self.queue.borrow_mut().pop().ok_or_else(|| {
if self.closed.get() {
PopError::Closed
} else {
PopError::Empty
}
})
}
pub fn close(&self) -> bool {
!self.closed.replace(true)
}
}
// SAFETY: Wasm without the `atomics` target feature is single-threaded.
unsafe impl<T> Send for ConcurrentQueue<T> {}
// SAFETY: Wasm without the `atomics` target feature is single-threaded.
unsafe impl<T> Sync for ConcurrentQueue<T> {}

View file

@ -0,0 +1,149 @@
use std::cell::Ref;
use std::cmp::Ordering;
use std::fmt::{self, Debug, Formatter};
use std::hash::{Hash, Hasher};
use std::rc::Rc;
use std::sync::{Arc, Condvar, Mutex};
use super::super::main_thread::MainThreadMarker;
use super::{channel, Receiver, Sender, Wrapper};
pub struct Dispatcher<T: 'static>(Wrapper<T, Arc<Sender<Closure<T>>>, Closure<T>>);
struct Closure<T>(Box<dyn FnOnce(&T) + Send>);
impl<T> Clone for Dispatcher<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> Debug for Dispatcher<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Dispatcher").finish_non_exhaustive()
}
}
impl<T> Eq for Dispatcher<T> {}
impl<T> Hash for Dispatcher<T> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}
impl<T> Ord for Dispatcher<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.0.cmp(&other.0)
}
}
impl<T> PartialEq for Dispatcher<T> {
fn eq(&self, other: &Self) -> bool {
self.0.eq(&other.0)
}
}
impl<T> PartialOrd for Dispatcher<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T> Dispatcher<T> {
pub fn new(main_thread: MainThreadMarker, value: T) -> (Self, DispatchRunner<T>) {
let (sender, receiver) = channel::<Closure<T>>();
let sender = Arc::new(sender);
let receiver = Rc::new(receiver);
let wrapper = Wrapper::new(
main_thread,
value,
|value, Closure(closure)| {
// SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do
// anything funny with it here. See `Self::queue()`.
closure(value.borrow().as_ref().unwrap())
},
{
let receiver = Rc::clone(&receiver);
move |value| async move {
while let Ok(Closure(closure)) = receiver.next().await {
// SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't
// do anything funny with it here. See `Self::queue()`.
closure(value.borrow().as_ref().unwrap())
}
}
},
sender,
|sender, closure| {
// SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do
// anything funny with it here. See `Self::queue()`.
sender.send(closure).unwrap()
},
);
(Self(wrapper.clone()), DispatchRunner { wrapper, receiver })
}
pub fn value(&self, main_thread: MainThreadMarker) -> Ref<'_, T> {
self.0.value(main_thread)
}
pub fn dispatch(&self, f: impl 'static + FnOnce(&T) + Send) {
if let Some(main_thread) = MainThreadMarker::new() {
f(&self.0.value(main_thread))
} else {
self.0.send(Closure(Box::new(f)))
}
}
pub fn queue<R: Send>(&self, f: impl FnOnce(&T) -> R + Send) -> R {
if let Some(main_thread) = MainThreadMarker::new() {
f(&self.0.value(main_thread))
} else {
let pair = Arc::new((Mutex::new(None), Condvar::new()));
let closure = Box::new({
let pair = pair.clone();
move |value: &T| {
*pair.0.lock().unwrap() = Some(f(value));
pair.1.notify_one();
}
}) as Box<dyn FnOnce(&T) + Send>;
// SAFETY: The `transmute` is necessary because `Closure` requires `'static`. This is
// safe because this function won't return until `f` has finished executing. See
// `Self::new()`.
let closure = Closure(unsafe {
std::mem::transmute::<
Box<dyn FnOnce(&T) + Send>,
Box<dyn FnOnce(&T) + Send + 'static>,
>(closure)
});
self.0.send(closure);
let mut started = pair.0.lock().unwrap();
while started.is_none() {
started = pair.1.wait(started).unwrap();
}
started.take().unwrap()
}
}
}
pub struct DispatchRunner<T: 'static> {
wrapper: Wrapper<T, Arc<Sender<Closure<T>>>, Closure<T>>,
receiver: Rc<Receiver<Closure<T>>>,
}
impl<T> DispatchRunner<T> {
pub fn run(&self, main_thread: MainThreadMarker) {
while let Some(Closure(closure)) =
self.receiver.try_recv().expect("should only be closed when `Dispatcher` is dropped")
{
// SAFETY: The given `Closure` here isn't really `'static`, so we shouldn't do anything
// funny with it here. See `Self::queue()`.
closure(&self.wrapper.value(main_thread))
}
}
}

View file

@ -0,0 +1,18 @@
mod abortable;
#[cfg(not(target_feature = "atomics"))]
mod atomic_waker;
mod channel;
#[cfg(not(target_feature = "atomics"))]
mod concurrent_queue;
mod dispatcher;
mod notifier;
mod wrapper;
pub(crate) use atomic_waker::AtomicWaker;
use concurrent_queue::{ConcurrentQueue, PushError};
pub use self::abortable::{AbortHandle, Abortable, DropAbortHandle};
pub use self::channel::{channel, Receiver, Sender};
pub use self::dispatcher::{DispatchRunner, Dispatcher};
pub use self::notifier::{Notified, Notifier};
pub(crate) use self::wrapper::Wrapper;

View file

@ -0,0 +1,75 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll, Waker};
use super::{ConcurrentQueue, PushError};
#[derive(Debug)]
pub struct Notifier<T: Clone>(Arc<Inner<T>>);
impl<T: Clone> Notifier<T> {
pub fn new() -> Self {
Self(Arc::new(Inner { queue: ConcurrentQueue::unbounded(), value: OnceLock::new() }))
}
pub fn notify(self, value: T) {
if self.0.value.set(value).is_err() {
unreachable!("value set before")
}
}
pub fn notified(&self) -> Notified<T> {
Notified(Some(Arc::clone(&self.0)))
}
}
impl<T: Clone> Drop for Notifier<T> {
fn drop(&mut self) {
self.0.queue.close();
while let Ok(waker) = self.0.queue.pop() {
waker.wake()
}
}
}
#[derive(Clone, Debug)]
pub struct Notified<T: Clone>(Option<Arc<Inner<T>>>);
impl<T: Clone> Future for Notified<T> {
type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.0.take().expect("`Receiver` polled after completion");
if this.value.get().is_none() {
match this.queue.push(cx.waker().clone()) {
Ok(()) => {
if this.value.get().is_none() {
self.0 = Some(this);
return Poll::Pending;
}
},
Err(PushError::Closed(_)) => (),
Err(PushError::Full(_)) => {
unreachable!("found full queue despite using unbounded queue")
},
}
}
match Arc::try_unwrap(this)
.map(|mut inner| inner.value.take())
.map_err(|this| this.value.get().cloned())
{
Ok(Some(value)) | Err(Some(value)) => Poll::Ready(Some(value)),
_ => Poll::Ready(None),
}
}
}
#[derive(Debug)]
struct Inner<T> {
queue: ConcurrentQueue<Waker>,
value: OnceLock<T>,
}

View file

@ -0,0 +1,113 @@
use std::cell::{Ref, RefCell};
use std::cmp;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::sync::Arc;
use super::super::main_thread::MainThreadMarker;
// Unsafe wrapper type that allows us to use `T` when it's not `Send` from other threads.
// `value` **must** only be accessed on the main thread.
#[derive(Debug)]
pub struct Wrapper<V: 'static, S: Clone + Send, E> {
value: Value<V>,
handler: fn(&RefCell<Option<V>>, E),
sender_data: S,
sender_handler: fn(&S, E),
}
#[derive(Debug)]
struct Value<V> {
// SAFETY:
// This value must not be accessed if not on the main thread.
//
// - We wrap this in an `Arc` to allow it to be safely cloned without accessing the value.
// - The `RefCell` lets us mutably access in the main thread but is safe to drop in any thread
// because it has no `Drop` behavior.
// - The `Option` lets us safely drop `T` only in the main thread.
value: Arc<RefCell<Option<V>>>,
// Prevent's `Send` or `Sync` to be automatically implemented.
local: PhantomData<*const ()>,
}
// SAFETY: See `Self::value`.
unsafe impl<V> Send for Value<V> {}
// SAFETY: See `Self::value`.
unsafe impl<V> Sync for Value<V> {}
impl<V, S: Clone + Send, E> Wrapper<V, S, E> {
pub fn new<R: Future<Output = ()>>(
_: MainThreadMarker,
value: V,
handler: fn(&RefCell<Option<V>>, E),
receiver: impl 'static + FnOnce(Arc<RefCell<Option<V>>>) -> R,
sender_data: S,
sender_handler: fn(&S, E),
) -> Self {
let value = Arc::new(RefCell::new(Some(value)));
wasm_bindgen_futures::spawn_local({
let value = Arc::clone(&value);
async move {
receiver(Arc::clone(&value)).await;
drop(value.borrow_mut().take().unwrap());
}
});
Self { value: Value { value, local: PhantomData }, handler, sender_data, sender_handler }
}
pub fn send(&self, event: E) {
if MainThreadMarker::new().is_some() {
(self.handler)(&self.value.value, event)
} else {
(self.sender_handler)(&self.sender_data, event)
}
}
pub fn value(&self, _: MainThreadMarker) -> Ref<'_, V> {
Ref::map(self.value.value.borrow(), |value| value.as_ref().unwrap())
}
pub fn with_sender_data<T>(&self, f: impl FnOnce(&S) -> T) -> T {
f(&self.sender_data)
}
}
impl<V, S: Clone + Send, E> Clone for Wrapper<V, S, E> {
fn clone(&self) -> Self {
Self {
value: Value { value: self.value.value.clone(), local: PhantomData },
handler: self.handler,
sender_data: self.sender_data.clone(),
sender_handler: self.sender_handler,
}
}
}
impl<V, S: Clone + Send, E> Eq for Wrapper<V, S, E> {}
impl<V, S: Clone + Send, E> Hash for Wrapper<V, S, E> {
fn hash<H: Hasher>(&self, state: &mut H) {
Arc::as_ptr(&self.value.value).hash(state)
}
}
impl<V, S: Clone + Send, E> Ord for Wrapper<V, S, E> {
fn cmp(&self, other: &Self) -> cmp::Ordering {
Arc::as_ptr(&self.value.value).cmp(&Arc::as_ptr(&other.value.value))
}
}
impl<V, S: Clone + Send, E> PartialOrd for Wrapper<V, S, E> {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl<V, S: Clone + Send, E> PartialEq for Wrapper<V, S, E> {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.value.value, &other.value.value)
}
}