Refactor PA to handle interior mutability / ref counting itself

This commit is contained in:
Ian Douglas Scott 2022-06-23 12:46:38 -07:00
parent 9f3803fedc
commit a431457468
2 changed files with 79 additions and 38 deletions

View file

@ -29,7 +29,6 @@ use libpulse_binding::{
}; };
use mpris2_zbus::metadata::Metadata; use mpris2_zbus::metadata::Metadata;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::{cell::RefCell, rc::Rc};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
static RT: Lazy<Runtime> = Lazy::new(|| Runtime::new().expect("failed to build tokio runtime")); static RT: Lazy<Runtime> = Lazy::new(|| Runtime::new().expect("failed to build tokio runtime"));
@ -45,12 +44,12 @@ fn main() {
fn app(application: &Application) { fn app(application: &Application) {
// XXX handle no pulseaudio daemon? // XXX handle no pulseaudio daemon?
let mut pa = PA::new().unwrap(); let pa = PA::new().unwrap();
let (refresh_output_tx, mut refresh_output_rx) = mpsc::unbounded(); let (refresh_output_tx, mut refresh_output_rx) = mpsc::unbounded();
let (refresh_input_tx, mut refresh_input_rx) = mpsc::unbounded(); let (refresh_input_tx, mut refresh_input_rx) = mpsc::unbounded();
let (now_playing_tx, mut now_playing_rx) = mpsc::unbounded::<Vec<Metadata>>(); let (now_playing_tx, mut now_playing_rx) = mpsc::unbounded::<Vec<Metadata>>();
pa.context pa
.set_subscribe_callback(Some(Box::new(clone!(@strong refresh_output_tx, @strong refresh_input_tx => move |facility, operation, _idx| { .set_subscribe_callback(clone!(@strong refresh_output_tx, @strong refresh_input_tx => move |facility, operation, _idx| {
if !matches!(operation, Some(Operation::Changed)) { if !matches!(operation, Some(Operation::Changed)) {
return; return;
} }
@ -63,23 +62,19 @@ fn app(application: &Application) {
} }
_ => {} _ => {}
} }
})))); }));
let pa = Rc::new(RefCell::new(pa)); pa.set_state_callback(move |pa, state| {
pa.borrow_mut() if state == State::Ready {
.context pa.subscribe(InterestMaskSet::SINK | InterestMaskSet::SOURCE);
.set_state_callback(Some(Box::new(clone!(@strong pa => move || { refresh_output_tx
let mut pa = pa.borrow_mut(); .unbounded_send(())
if pa.context.get_state() == State::Ready { .expect("failed to send output refresh message");
pa.context refresh_input_tx
.subscribe(InterestMaskSet::SINK | InterestMaskSet::SOURCE, |_| {}); .unbounded_send(())
refresh_output_tx.unbounded_send(()).expect("failed to send output refresh message"); .expect("failed to send output refresh message");
refresh_input_tx.unbounded_send(()).expect("failed to send output refresh message"); }
} });
})))); pa.connect().unwrap(); // XXX unwrap
pa.borrow_mut()
.context
.connect(None, FlagSet::empty(), None)
.unwrap();
view! { view! {
window = ApplicationWindow { window = ApplicationWindow {
set_application: Some(application), set_application: Some(application),
@ -168,7 +163,6 @@ fn app(application: &Application) {
glib::MainContext::default().spawn_local( glib::MainContext::default().spawn_local(
clone!(@weak inputs, @weak current_input, @weak input_volume, @strong pa => async move { clone!(@weak inputs, @weak current_input, @weak input_volume, @strong pa => async move {
while let Some(()) = refresh_input_rx.next().await { while let Some(()) = refresh_input_rx.next().await {
let pa = pa.borrow();
input::refresh_input_widgets(&pa, &inputs).await; input::refresh_input_widgets(&pa, &inputs).await;
let default_input = input::refresh_default_input(&pa, &current_input).await; let default_input = input::refresh_default_input(&pa, &current_input).await;
volume::update_volume(&default_input, &input_volume); volume::update_volume(&default_input, &input_volume);
@ -178,7 +172,6 @@ fn app(application: &Application) {
glib::MainContext::default().spawn_local( glib::MainContext::default().spawn_local(
clone!(@weak outputs, @weak current_output, @weak output_volume, @strong pa => async move { clone!(@weak outputs, @weak current_output, @weak output_volume, @strong pa => async move {
while let Some(()) = refresh_output_rx.next().await { while let Some(()) = refresh_output_rx.next().await {
let pa = pa.borrow();
output::refresh_output_widgets(&pa, &outputs); output::refresh_output_widgets(&pa, &outputs);
let default_output = output::refresh_default_output(&pa, &current_output).await; let default_output = output::refresh_default_output(&pa, &current_output).await;
volume::update_volume(&default_output, &output_volume); volume::update_volume(&default_output, &output_volume);

View file

@ -1,11 +1,19 @@
use futures::{channel::oneshot, future::poll_fn, task::Poll}; use futures::{channel::oneshot, future::poll_fn, task::Poll};
use libpulse_binding::{ use libpulse_binding::{
callbacks::ListResult, callbacks::ListResult,
context::{introspect::SinkInfo, Context}, context::{
introspect::{Introspector, SinkInfo},
subscribe::{Facility, InterestMaskSet, Operation},
Context, FlagSet, State,
},
error::PAErr,
volume::ChannelVolumes, volume::ChannelVolumes,
}; };
use libpulse_glib_binding::Mainloop; use libpulse_glib_binding::Mainloop;
use std::rc::Rc; use std::{
cell::{Ref, RefCell},
rc::Rc,
};
pub struct DeviceInfo { pub struct DeviceInfo {
pub name: Option<String>, pub name: Option<String>,
@ -18,22 +26,66 @@ pub struct ServerInfo {
pub default_source_name: Option<String>, pub default_source_name: Option<String>,
} }
pub struct PA { struct PAInner {
main_loop: Mainloop, main_loop: Mainloop,
pub context: Context, pub context: RefCell<Context>,
} }
#[derive(Clone)]
pub struct PA(Rc<PAInner>);
impl PA { impl PA {
pub fn new() -> Option<Self> { pub fn new() -> Option<Self> {
let main_loop = Mainloop::new(None)?; let main_loop = Mainloop::new(None)?;
let context = Context::new(&main_loop, "com.system76.cosmic.applets.audio")?; let context = Context::new(&main_loop, "com.system76.cosmic.applets.audio")?;
Some(Self { main_loop, context }) Some(Self(Rc::new(PAInner {
main_loop,
context: RefCell::new(context),
})))
}
pub fn set_state_callback<F: Fn(&Self, State) + 'static>(&self, cb: F) {
let pa = self.clone(); // TODO: weak ref?
self.0
.context
.borrow_mut()
.set_state_callback(Some(Box::new(move || {
let state = pa.0.context.borrow().get_state();
cb(&pa, state);
})));
}
// TODO: builder pattern?
pub fn set_subscribe_callback<F: FnMut(Option<Facility>, Option<Operation>, u32) + 'static>(
&self,
cb: F,
) {
self.0
.context
.borrow_mut()
.set_subscribe_callback(Some(Box::new(cb)));
}
pub fn subscribe(&self, mask: InterestMaskSet) {
// XXX cb; operation; async
self.0.context.borrow_mut().subscribe(mask, |_| {});
}
pub fn connect(&self) -> Result<(), PAErr> {
self.0
.context
.borrow_mut()
.connect(None, FlagSet::empty(), None)
}
fn introspect(&self) -> Introspector {
self.0.context.borrow().introspect()
} }
pub async fn get_server_info(&self) -> ServerInfo { pub async fn get_server_info(&self) -> ServerInfo {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender); let mut sender = Some(sender);
self.context.introspect().get_server_info(move |info| { self.introspect().get_server_info(move |info| {
sender.take().unwrap().send(ServerInfo { sender.take().unwrap().send(ServerInfo {
default_sink_name: info.default_sink_name.clone().map(|x| x.into_owned()), default_sink_name: info.default_sink_name.clone().map(|x| x.into_owned()),
default_source_name: info.default_source_name.clone().map(|x| x.into_owned()), default_source_name: info.default_source_name.clone().map(|x| x.into_owned()),
@ -46,8 +98,7 @@ impl PA {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender); let mut sender = Some(sender);
let mut items = Some(Vec::new()); let mut items = Some(Vec::new());
self.context self.introspect()
.introspect()
.get_sink_info_list(move |result| match result { .get_sink_info_list(move |result| match result {
ListResult::Item(item) => items.as_mut().unwrap().push(DeviceInfo { ListResult::Item(item) => items.as_mut().unwrap().push(DeviceInfo {
name: item.name.clone().map(|x| x.into_owned()), name: item.name.clone().map(|x| x.into_owned()),
@ -74,8 +125,7 @@ impl PA {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender); let mut sender = Some(sender);
let mut sink = None; let mut sink = None;
self.context self.introspect()
.introspect()
.get_sink_info_by_name(&name, move |result| match result { .get_sink_info_by_name(&name, move |result| match result {
ListResult::Item(item) => { ListResult::Item(item) => {
sink = Some(DeviceInfo { sink = Some(DeviceInfo {
@ -97,7 +147,7 @@ impl PA {
/* /*
// XXX async wait and handle error // XXX async wait and handle error
pub fn set_default_sink(&mut self, name: &str) { pub fn set_default_sink(&mut self, name: &str) {
self.context.set_default_sink(name, |_| {}); self.0.context.set_default_sink(name, |_| {});
} }
*/ */
@ -105,8 +155,7 @@ impl PA {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender); let mut sender = Some(sender);
let mut items = Some(Vec::new()); let mut items = Some(Vec::new());
self.context self.introspect()
.introspect()
.get_source_info_list(move |result| match result { .get_source_info_list(move |result| match result {
ListResult::Item(item) => items.as_mut().unwrap().push(DeviceInfo { ListResult::Item(item) => items.as_mut().unwrap().push(DeviceInfo {
name: item.name.clone().map(|x| x.into_owned()), name: item.name.clone().map(|x| x.into_owned()),
@ -133,8 +182,7 @@ impl PA {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
let mut sender = Some(sender); let mut sender = Some(sender);
let mut source = None; let mut source = None;
self.context self.introspect()
.introspect()
.get_source_info_by_name(&name, move |result| match result { .get_source_info_by_name(&name, move |result| match result {
ListResult::Item(item) => { ListResult::Item(item) => {
source = Some(DeviceInfo { source = Some(DeviceInfo {