audio: Use custom, cancellable, single threaded future type

This commit is contained in:
Ian Douglas Scott 2022-07-21 13:16:08 -07:00
parent 35a2d6905b
commit 0aaa0dd74d
2 changed files with 152 additions and 107 deletions

View file

@ -0,0 +1,63 @@
use libpulse_binding::operation::Operation;
use std::{
cell::RefCell,
future::{self, Future},
pin::Pin,
rc::Rc,
task::{self, Poll, Waker},
};
struct PAFutInner<T> {
res: Option<T>,
waker: Option<Waker>,
}
pub struct PAFutWaker<T>(Rc<RefCell<PAFutInner<T>>>);
impl<T> PAFutWaker<T> {
pub fn wake(&self, res: T) {
let mut inner = self.0.borrow_mut();
inner.res = Some(res);
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
}
pub struct PAFut<T, F: ?Sized> {
inner: Rc<RefCell<PAFutInner<T>>>,
operation: Operation<F>,
}
impl<T, F: ?Sized> PAFut<T, F> {
pub fn new(cb: impl FnOnce(PAFutWaker<T>) -> Operation<F>) -> Self {
let inner = Rc::new(RefCell::new(PAFutInner {
res: None,
waker: None,
}));
let operation = cb(PAFutWaker(inner.clone()));
Self { inner, operation }
}
}
impl<T, F: ?Sized> Future for PAFut<T, F> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
let mut inner = self.inner.borrow_mut();
if let Some(res) = inner.res.take() {
Poll::Ready(res)
} else {
inner.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl<T, F: ?Sized> Drop for PAFut<T, F> {
fn drop(&mut self) {
self.operation.cancel();
}
}

View file

@ -1,4 +1,3 @@
use futures::{channel::oneshot, future::poll_fn, task::Poll};
use gtk4::glib;
use libpulse_binding::{
callbacks::ListResult,
@ -12,10 +11,13 @@ use libpulse_binding::{
};
use libpulse_glib_binding::Mainloop;
use std::{
cell::{Ref, RefCell},
cell::RefCell,
rc::Rc,
};
mod future;
use future::{PAFut, PAFutWaker};
pub struct DeviceInfo {
pub name: Option<String>,
pub description: Option<String>,
@ -90,37 +92,33 @@ impl PA {
}
pub async fn get_server_info(&self) -> ServerInfo {
let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender);
self.introspect().get_server_info(move |info| {
sender.take().unwrap().send(ServerInfo {
default_sink_name: info.default_sink_name.clone().map(|x| x.into_owned()),
default_source_name: info.default_source_name.clone().map(|x| x.into_owned()),
});
});
receiver.await.unwrap()
PAFut::new(|waker| {
self.introspect().get_server_info(move |info| {
waker.wake(ServerInfo {
default_sink_name: info.default_sink_name.clone().map(|x| x.into_owned()),
default_source_name: info.default_source_name.clone().map(|x| x.into_owned()),
});
})
})
.await
}
pub async fn get_sink_info_list(&self) -> Result<Vec<DeviceInfo>, ()> {
let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender);
let mut items = Some(Vec::new());
self.introspect()
.get_sink_info_list(move |result| match result {
ListResult::Item(item) => items.as_mut().unwrap().push(DeviceInfo {
name: item.name.clone().map(|x| x.into_owned()),
description: item.description.clone().map(|x| x.into_owned()),
volume: item.volume,
index: item.index,
}),
ListResult::End => {
sender.take().unwrap().send(Ok(items.take().unwrap()));
}
ListResult::Error => {
sender.take().unwrap().send(Err(()));
}
});
receiver.await.unwrap()
PAFut::new(|waker| {
self.introspect()
.get_sink_info_list(move |result| match result {
ListResult::Item(item) => items.as_mut().unwrap().push(DeviceInfo {
name: item.name.clone().map(|x| x.into_owned()),
description: item.description.clone().map(|x| x.into_owned()),
volume: item.volume,
index: item.index,
}),
ListResult::End => waker.wake(Ok(items.take().unwrap())),
ListResult::Error => waker.wake(Err(())),
})
})
.await
}
pub async fn get_default_sink(&self) -> Result<DeviceInfo, ()> {
@ -130,27 +128,23 @@ impl PA {
return Err(());
}
};
let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender);
let mut sink = None;
self.introspect()
.get_sink_info_by_name(&name, move |result| match result {
ListResult::Item(item) => {
sink = Some(DeviceInfo {
name: item.name.clone().map(|x| x.into_owned()),
description: item.description.clone().map(|x| x.into_owned()),
volume: item.volume,
index: item.index,
});
}
ListResult::End => {
sender.take().unwrap().send(sink.take().ok_or(()));
}
ListResult::Error => {
sender.take().unwrap().send(Err(()));
}
});
receiver.await.unwrap()
PAFut::new(|waker| {
self.introspect()
.get_sink_info_by_name(&name, move |result| match result {
ListResult::Item(item) => {
sink = Some(DeviceInfo {
name: item.name.clone().map(|x| x.into_owned()),
description: item.description.clone().map(|x| x.into_owned()),
volume: item.volume,
index: item.index,
});
}
ListResult::End => waker.wake(sink.take().ok_or(())),
ListResult::Error => waker.wake(Err(())),
})
})
.await
}
// XXX async wait and handle error
@ -163,25 +157,21 @@ impl PA {
}
pub async fn get_source_info_list(&self) -> Result<Vec<DeviceInfo>, ()> {
let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender);
let mut items = Some(Vec::new());
self.introspect()
.get_source_info_list(move |result| match result {
ListResult::Item(item) => items.as_mut().unwrap().push(DeviceInfo {
name: item.name.clone().map(|x| x.into_owned()),
description: item.description.clone().map(|x| x.into_owned()),
volume: item.volume,
index: item.index,
}),
ListResult::End => {
sender.take().unwrap().send(Ok(items.take().unwrap()));
}
ListResult::Error => {
sender.take().unwrap().send(Err(()));
}
});
receiver.await.unwrap()
PAFut::new(|waker| {
self.introspect()
.get_source_info_list(move |result| match result {
ListResult::Item(item) => items.as_mut().unwrap().push(DeviceInfo {
name: item.name.clone().map(|x| x.into_owned()),
description: item.description.clone().map(|x| x.into_owned()),
volume: item.volume,
index: item.index,
}),
ListResult::End => waker.wake(Ok(items.take().unwrap())),
ListResult::Error => waker.wake(Err(())),
})
})
.await
}
pub async fn get_default_source(&self) -> Result<DeviceInfo, ()> {
@ -191,52 +181,44 @@ impl PA {
return Err(());
}
};
let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender);
let mut source = None;
self.introspect()
.get_source_info_by_name(&name, move |result| match result {
ListResult::Item(item) => {
source = Some(DeviceInfo {
name: item.name.clone().map(|x| x.into_owned()),
description: item.description.clone().map(|x| x.into_owned()),
volume: item.volume,
index: item.index,
});
}
ListResult::End => {
sender.take().unwrap().send(source.take().ok_or(()));
}
ListResult::Error => {
sender.take().unwrap().send(Err(()));
}
});
receiver.await.unwrap()
PAFut::new(|waker| {
self.introspect()
.get_source_info_by_name(&name, move |result| match result {
ListResult::Item(item) => {
source = Some(DeviceInfo {
name: item.name.clone().map(|x| x.into_owned()),
description: item.description.clone().map(|x| x.into_owned()),
volume: item.volume,
index: item.index,
});
}
ListResult::End => waker.wake(source.take().ok_or(())),
ListResult::Error => waker.wake(Err(())),
})
})
.await
}
pub async fn set_sink_volume_by_name(&self, name: &str, volume: &ChannelVolumes) -> bool {
let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender);
self.introspect().set_sink_volume_by_name(
name,
volume,
Some(Box::new(move |success| {
sender.take().unwrap().send(success);
})),
);
receiver.await.unwrap()
PAFut::new(|waker| {
self.introspect().set_sink_volume_by_name(
name,
volume,
Some(Box::new(move |success| waker.wake(success))),
)
})
.await
}
pub async fn set_source_volume_by_name(&self, name: &str, volume: &ChannelVolumes) -> bool {
let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender);
self.introspect().set_source_volume_by_name(
name,
volume,
Some(Box::new(move |success| {
sender.take().unwrap().send(success);
})),
);
receiver.await.unwrap()
PAFut::new(|waker| {
self.introspect().set_source_volume_by_name(
name,
volume,
Some(Box::new(move |success| waker.wake(success))),
)
})
.await
}
}