use atomic_waker::AtomicWaker; use std::future; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{self, Receiver, RecvError, SendError, Sender, TryRecvError}; use std::sync::{Arc, Mutex}; use std::task::Poll; // NOTE: This channel doesn't wake up when all senders or receivers are // dropped. This is acceptable as long as it's only used in `Dispatcher`, which // has it's own `Drop` behavior. pub fn channel() -> (AsyncSender, AsyncReceiver) { let (sender, receiver) = mpsc::channel(); let sender = Arc::new(Mutex::new(sender)); let inner = Arc::new(Inner { closed: AtomicBool::new(false), waker: AtomicWaker::new(), }); let sender = AsyncSender { sender, inner: Arc::clone(&inner), }; let receiver = AsyncReceiver { receiver: Rc::new(receiver), inner, }; (sender, receiver) } pub struct AsyncSender { // We need to wrap it into a `Mutex` to make it `Sync`. So the sender can't // be accessed on the main thread, as it could block. Additionally we need // to wrap it in an `Arc` to make it clonable on the main thread without // having to block. sender: Arc>>, inner: Arc, } impl AsyncSender { pub fn send(&self, event: T) -> Result<(), SendError> { self.sender.lock().unwrap().send(event)?; self.inner.waker.wake(); Ok(()) } pub fn close(&self) { self.inner.closed.store(true, Ordering::Relaxed); self.inner.waker.wake() } } impl Clone for AsyncSender { fn clone(&self) -> Self { Self { sender: Arc::clone(&self.sender), inner: Arc::clone(&self.inner), } } } pub struct AsyncReceiver { receiver: Rc>, inner: Arc, } impl AsyncReceiver { pub async fn next(&self) -> Result { future::poll_fn(|cx| match self.receiver.try_recv() { Ok(event) => Poll::Ready(Ok(event)), Err(TryRecvError::Empty) => { self.inner.waker.register(cx.waker()); match self.receiver.try_recv() { Ok(event) => Poll::Ready(Ok(event)), Err(TryRecvError::Empty) => { if self.inner.closed.load(Ordering::Relaxed) { Poll::Ready(Err(RecvError)) } else { Poll::Pending } } Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), } } Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)), }) .await } pub fn try_recv(&self) -> Result, RecvError> { match self.receiver.try_recv() { Ok(value) => Ok(Some(value)), Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Disconnected) => Err(RecvError), } } } impl Clone for AsyncReceiver { fn clone(&self) -> Self { Self { receiver: Rc::clone(&self.receiver), inner: Arc::clone(&self.inner), } } } struct Inner { closed: AtomicBool, waker: AtomicWaker, }