From a76353981f604e85349ce5671bba8be360feed92 Mon Sep 17 00:00:00 2001 From: Ashley Wulber Date: Tue, 31 Jan 2023 14:04:47 -0500 Subject: [PATCH] feat: update pulse connection every time the audio applet popup is opened --- Cargo.lock | 2 + cosmic-applet-audio/Cargo.toml | 2 + cosmic-applet-audio/src/main.rs | 197 ++++++++++++++++------------ cosmic-applet-audio/src/pulse.rs | 218 ++++++++++++++++++++----------- debian/control | 2 + 5 files changed, 262 insertions(+), 159 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0d13a123..abd3d2ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -524,6 +524,8 @@ dependencies = [ "libcosmic", "libpulse-binding", "libpulse-glib-binding", + "log", + "pretty_env_logger", "smithay-client-toolkit", "tokio", ] diff --git a/cosmic-applet-audio/Cargo.toml b/cosmic-applet-audio/Cargo.toml index fdd9ca08..2eb3d81a 100644 --- a/cosmic-applet-audio/Cargo.toml +++ b/cosmic-applet-audio/Cargo.toml @@ -12,3 +12,5 @@ libpulse-glib-binding = "2.25.0" tokio = { version = "1.20.1", features=["full"] } libcosmic = { git = "https://github.com/pop-os/libcosmic/", branch = "master", default-features = false, features = ["tokio", "wayland", "applet"] } sctk = { package = "smithay-client-toolkit", git = "https://github.com/Smithay/client-toolkit", rev = "3776d4a" } +log = "0.4.14" +pretty_env_logger = "0.4.0" \ No newline at end of file diff --git a/cosmic-applet-audio/src/main.rs b/cosmic-applet-audio/src/main.rs index 7b1b4b52..d768527f 100644 --- a/cosmic-applet-audio/src/main.rs +++ b/cosmic-applet-audio/src/main.rs @@ -25,6 +25,8 @@ use crate::pulse::DeviceInfo; use libpulse_binding::volume::VolumeLinear; pub fn main() -> cosmic::iced::Result { + pretty_env_logger::init(); + let helper = CosmicAppletHelper::default(); Audio::run(helper.window_settings()) } @@ -80,7 +82,6 @@ impl Application for Audio { current_input: None, outputs: vec![], inputs: vec![], - pulse_state: PulseState::Disconnected, icon_name: "audio-volume-high-symbolic".to_string(), ..Default::default() }, @@ -113,6 +114,9 @@ impl Application for Audio { if let Some(p) = self.popup.take() { return destroy_popup(p); } else { + if let Some(conn) = self.pulse_state.connection() { + conn.send(pulse::Message::UpdateConnection); + } self.id_ctr += 1; let new_id = window::Id::new(self.id_ctr); self.popup.replace(new_id); @@ -156,7 +160,7 @@ impl Application for Audio { if let PulseState::Connected(connection) = &mut self.pulse_state { if let Some(device) = &self.current_input { if let Some(name) = &device.name { - println!("increasing volume of {}", name); + log::info!("increasing volume of {}", name); connection.send(pulse::Message::SetSourceVolumeByName( name.clone(), device.volume, @@ -165,8 +169,8 @@ impl Application for Audio { } } } - Message::OutputChanged(val) => println!("changed output {}", val), - Message::InputChanged(val) => println!("changed input {}", val), + Message::OutputChanged(val) => log::info!("changed output {}", val), + Message::InputChanged(val) => log::info!("changed input {}", val), Message::OutputToggle => { self.is_open = if self.is_open == IsOpen::Output { IsOpen::None @@ -182,12 +186,16 @@ impl Application for Audio { } } Message::Pulse(event) => match event { - pulse::Event::Connected(mut connection) => { - connection.send(pulse::Message::GetSinks); - connection.send(pulse::Message::GetSources); - connection.send(pulse::Message::GetDefaultSink); - connection.send(pulse::Message::GetDefaultSource); - self.pulse_state = PulseState::Connected(connection); + pulse::Event::Init(conn) => self.pulse_state = PulseState::Disconnected(conn), + pulse::Event::Connected => { + self.pulse_state.connected(); + + if let Some(conn) = self.pulse_state.connection() { + conn.send(pulse::Message::GetSinks); + conn.send(pulse::Message::GetSources); + conn.send(pulse::Message::GetDefaultSink); + conn.send(pulse::Message::GetDefaultSource); + } } pulse::Event::MessageReceived(msg) => { match msg { @@ -215,15 +223,11 @@ impl Application for Audio { panic!("Subscriton error handling is bad. This should never happen.") } _ => { - println!("Received misc message") + log::trace!("Received misc message") } } } - // TODO: view() should gray out buttons/slider when state is disconnected - pulse::Event::Disconnected => { - println!("setting state to disconnected"); - self.pulse_state = PulseState::Disconnected - } + pulse::Event::Disconnected => self.pulse_state.disconnected(), }, Message::Ignore => {} Message::ToggleMediaControlsInTopPanel(enabled) => { @@ -247,6 +251,7 @@ impl Application for Audio { .on_press(Message::TogglePopup) .into(), SurfaceIdWrapper::Popup(_) => { + let audio_disabled = matches!(self.pulse_state, PulseState::Disconnected(_)); let out_f64 = VolumeLinear::from( self.current_output .as_ref() @@ -262,68 +267,79 @@ impl Application for Audio { ) .0 * 100.0; + let audio_content = if audio_disabled { + column![text("PulseAudio Disconnected") + .width(Length::Fill) + .horizontal_alignment(Horizontal::Center) + .size(24),] + } else { + column![ + row![ + icon("audio-volume-high-symbolic", 32) + .width(Length::Units(24)) + .height(Length::Units(24)) + .style(Svg::Symbolic), + slider(0.0..=100.0, out_f64, Message::SetOutputVolume) + .width(Length::FillPortion(5)), + text(format!("{}%", out_f64.round())) + .width(Length::FillPortion(1)) + .horizontal_alignment(Horizontal::Right) + ] + .spacing(12) + .align_items(Alignment::Center) + .padding([8, 24]), + row![ + icon("audio-input-microphone-symbolic", 32) + .width(Length::Units(24)) + .height(Length::Units(24)) + .style(Svg::Symbolic), + slider(0.0..=100.0, in_f64, Message::SetInputVolume) + .width(Length::FillPortion(5)), + text(format!("{}%", in_f64.round())) + .width(Length::FillPortion(1)) + .horizontal_alignment(Horizontal::Right) + ] + .spacing(12) + .align_items(Alignment::Center) + .padding([8, 24]), + container(horizontal_rule(1)) + .padding([12, 24]) + .width(Length::Fill), + revealer( + self.is_open == IsOpen::Output, + "Output", + match &self.current_output { + Some(output) => pretty_name(output.description.clone()), + None => String::from("No device selected"), + }, + self.outputs + .clone() + .into_iter() + .map(|output| pretty_name(output.description)) + .collect(), + Message::OutputToggle, + Message::OutputChanged(String::from("test")), + ), + revealer( + self.is_open == IsOpen::Input, + "Input", + match &self.current_input { + Some(input) => pretty_name(input.description.clone()), + None => String::from("No device selected"), + }, + self.inputs + .clone() + .into_iter() + .map(|input| pretty_name(input.description)) + .collect(), + Message::InputToggle, + Message::InputChanged(String::from("test")), + ) + ] + .align_items(Alignment::Start) + }; let content = column![ - row![ - icon("audio-volume-high-symbolic", 32) - .width(Length::Units(24)) - .height(Length::Units(24)) - .style(Svg::Symbolic), - slider(0.0..=100.0, out_f64, Message::SetOutputVolume) - .width(Length::FillPortion(5)), - text(format!("{}%", out_f64.round())) - .width(Length::FillPortion(1)) - .horizontal_alignment(Horizontal::Right) - ] - .spacing(12) - .align_items(Alignment::Center) - .padding([8, 24]), - row![ - icon("audio-input-microphone-symbolic", 32) - .width(Length::Units(24)) - .height(Length::Units(24)) - .style(Svg::Symbolic), - slider(0.0..=100.0, in_f64, Message::SetInputVolume) - .width(Length::FillPortion(5)), - text(format!("{}%", in_f64.round())) - .width(Length::FillPortion(1)) - .horizontal_alignment(Horizontal::Right) - ] - .spacing(12) - .align_items(Alignment::Center) - .padding([8, 24]), - container(horizontal_rule(1)) - .padding([12, 24]) - .width(Length::Fill), - revealer( - self.is_open == IsOpen::Output, - "Output", - match &self.current_output { - Some(output) => pretty_name(output.description.clone()), - None => String::from("No device selected"), - }, - self.outputs - .clone() - .into_iter() - .map(|output| pretty_name(output.description)) - .collect(), - Message::OutputToggle, - Message::OutputChanged(String::from("test")), - ), - revealer( - self.is_open == IsOpen::Input, - "Input", - match &self.current_input { - Some(input) => pretty_name(input.description.clone()), - None => String::from("No device selected"), - }, - self.inputs - .clone() - .into_iter() - .map(|input| pretty_name(input.description)) - .collect(), - Message::InputToggle, - Message::InputChanged(String::from("test")), - ), + audio_content, container(horizontal_rule(1)) .padding([12, 24]) .width(Length::Fill), @@ -393,14 +409,33 @@ fn pretty_name(name: Option) -> String { } } +#[derive(Default)] enum PulseState { - Disconnected, + #[default] + Init, + Disconnected(pulse::Connection), Connected(pulse::Connection), } -impl Default for PulseState { - fn default() -> Self { - Self::Disconnected +impl PulseState { + fn connection(&mut self) -> Option<&mut pulse::Connection> { + match self { + PulseState::Disconnected(c) => Some(c), + PulseState::Connected(c) => Some(c), + PulseState::Init => None, + } + } + + fn connected(&mut self) { + if let PulseState::Disconnected(c) = self { + *self = PulseState::Connected(c.clone()); + } + } + + fn disconnected(&mut self) { + if let PulseState::Connected(c) = self { + *self = PulseState::Disconnected(c.clone()); + } } } diff --git a/cosmic-applet-audio/src/pulse.rs b/cosmic-applet-audio/src/pulse.rs index 7e832e8f..25c541d5 100644 --- a/cosmic-applet-audio/src/pulse.rs +++ b/cosmic-applet-audio/src/pulse.rs @@ -20,40 +20,61 @@ pub fn connect() -> Subscription { subscription::unfold( std::any::TypeId::of::(), - State::Disconnected, + State::Init, |state| async move { match state { - // if app just started, or we are re-trying match here. Returns coenncting - // message. We should store this in our app's state, but it isn't safe to - // send messages until we get a conencted message. Which will be received - // by the `State::Connecting` message below - State::Disconnected => match PulseHandle::create() { - Ok(pulse_handle) => (None, State::Connecting(pulse_handle)), - Err(_) => (Some(Event::Disconnected), State::Disconnected), - }, - // Just a buffer to make sure the GUI doesn't send messages until pulse is ready + State::Init => { + let PulseHandle { + to_pulse, + from_pulse, + } = PulseHandle::new(); + ( + Some(Event::Init(Connection(to_pulse))), + State::Connecting(from_pulse), + ) + } + // Waiting for Connection to succeed // The GUI doesn't have to monitor this state, as it is never sent to the GUI - State::Connecting(mut pulse_handle) => { - match pulse_handle.from_pulse.recv().await { - Some(Message::Connected) => {( - Some(Event::Connected(Connection(pulse_handle.to_pulse))), - State::Connected(pulse_handle.from_pulse), - )} - Some(Message::Disconnected) => (Some(Event::Disconnected), State::Disconnected), - _ => panic!("Pulse subscription logic is faulty as the PulseServer shouldn't send unique messages until connection is successful") - } + State::Connecting(mut from_pulse) => match from_pulse.recv().await { + Some(Message::Connected) => { + (Some(Event::Connected), State::Connected(from_pulse)) + } + Some(Message::Disconnected) => { + (Some(Event::Disconnected), State::Connecting(from_pulse)) + } + Some(m) => { + log::error!("Unexpected message: {:?}", m); + (None, State::Connecting(from_pulse)) + } + None => { + panic!("Pulse Sender dropped, something has gone wrong!"); + } }, State::Connected(mut from_pulse) => { // This is where we match messages from the pulse server to pass to the gui - match from_pulse.recv().await { - Some(Message::SetSinks(sinks)) => (Some(Event::MessageReceived(Message::SetSinks(sinks))), State::Connected(from_pulse)), - Some(Message::SetSources(sources)) => (Some(Event::MessageReceived(Message::SetSources(sources))), State::Connected(from_pulse)), - Some(Message::SetDefaultSink(sink)) => (Some(Event::MessageReceived(Message::SetDefaultSink(sink))), State::Connected(from_pulse)), - Some(Message::SetDefaultSource(source)) => (Some(Event::MessageReceived(Message::SetDefaultSource(source))), State::Connected(from_pulse)), - Some(Message::Disconnected) => (Some(Event::Disconnected), State::Disconnected), - None => (Some(Event::Disconnected), State::Disconnected), - _ => (None, State::Connected(from_pulse)), + match from_pulse.recv().await { + Some(Message::SetSinks(sinks)) => ( + Some(Event::MessageReceived(Message::SetSinks(sinks))), + State::Connected(from_pulse), + ), + Some(Message::SetSources(sources)) => ( + Some(Event::MessageReceived(Message::SetSources(sources))), + State::Connected(from_pulse), + ), + Some(Message::SetDefaultSink(sink)) => ( + Some(Event::MessageReceived(Message::SetDefaultSink(sink))), + State::Connected(from_pulse), + ), + Some(Message::SetDefaultSource(source)) => ( + Some(Event::MessageReceived(Message::SetDefaultSource(source))), + State::Connected(from_pulse), + ), + Some(Message::Disconnected) => { + (Some(Event::Disconnected), State::Connecting(from_pulse)) } + None => (Some(Event::Disconnected), State::Connecting(from_pulse)), + _ => (None, State::Connected(from_pulse)), + } } } }, @@ -62,14 +83,15 @@ pub fn connect() -> Subscription { // #[derive(Debug)] enum State { - Disconnected, - Connecting(PulseHandle), + Init, + Connecting(tokio::sync::mpsc::Receiver), Connected(tokio::sync::mpsc::Receiver), } #[derive(Debug, Clone)] pub enum Event { - Connected(Connection), + Init(Connection), + Connected, Disconnected, MessageReceived(Message), } @@ -91,6 +113,7 @@ pub enum Message { Disconnected, GetSinks, GetSources, + UpdateConnection, SetSinks(Vec), SetSources(Vec), GetDefaultSink, @@ -108,34 +131,40 @@ struct PulseHandle { impl PulseHandle { // Create pulse server thread, and bidirectional comms - pub fn create() -> Result { + pub fn new() -> PulseHandle { let (to_pulse, mut to_pulse_recv) = tokio::sync::mpsc::channel(10); let (mut from_pulse_send, from_pulse) = tokio::sync::mpsc::channel(10); - //let from_pulse = Arc::new(Mutex::new(vec![])); - //let mut from_pulse2 = from_pulse.clone(); + // get initial connection status + to_pulse + .try_send(Message::UpdateConnection) + .expect("Failed to send initial connection update message"); // this thread should complete by pushing a completed message, // or fail message. This should never complete/fail without pushing // a message. This lets the iced subscription go to sleep while init // finishes. TLDR: be very careful with error handling thread::spawn(move || { - if let Ok(mut server) = PulseServer::connect().and_then(|server| server.init()) { - PulseHandle::blocking_send_connected(&mut from_pulse_send); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); - // take `PulseServer` and handle reciver into async context - // to listen for messages that need to be passed to the pulseserver - // this lets us put the thread to sleep, but keep hold a single - // thread, because pulse audio's API is not multithreaded... at all - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); + // take `PulseServer` and handle reciver into async context + // to listen for messages that need to be passed to the pulseserver + // this lets us put the thread to sleep, but keep hold a single + // thread, because pulse audio's API is not multithreaded... at all + rt.block_on(async { + let mut server: Option = None; - rt.block_on(async { - loop { - // This is where the we match messages from the GUI to pass to the pulse server - if let Some(msg) = to_pulse_recv.recv().await { - match msg { - Message::GetDefaultSink => match server.get_default_sink() { + loop { + // This is where the we match messages from the GUI to pass to the pulse server + if let Some(msg) = to_pulse_recv.recv().await { + match msg { + Message::GetDefaultSink => { + let server = match server.as_mut() { + Some(s) => s, + None => continue, + }; + match server.get_default_sink() { Ok(sink) => from_pulse_send .send(Message::SetDefaultSink(sink)) .await @@ -143,18 +172,30 @@ impl PulseHandle { Err(_) => { PulseHandle::send_disconnected(&mut from_pulse_send).await } - }, - Message::GetDefaultSource => match server.get_default_source() { + } + } + Message::GetDefaultSource => { + let server = match server.as_mut() { + Some(s) => s, + None => continue, + }; + match server.get_default_source() { Ok(source) => from_pulse_send .send(Message::SetDefaultSource(source)) .await .unwrap(), Err(e) => { - println!("ERROR! {:?}", e); + log::error!("ERROR! {:?}", e); PulseHandle::send_disconnected(&mut from_pulse_send).await; } - }, - Message::GetSinks => match server.get_sinks() { + } + } + Message::GetSinks => { + let server = match server.as_mut() { + Some(s) => s, + None => continue, + }; + match server.get_sinks() { Ok(sinks) => from_pulse_send .send(Message::SetSinks(sinks)) .await @@ -162,8 +203,14 @@ impl PulseHandle { Err(_) => { PulseHandle::send_disconnected(&mut from_pulse_send).await } - }, - Message::GetSources => match server.get_sources() { + } + } + Message::GetSources => { + let server = match server.as_mut() { + Some(s) => s, + None => continue, + }; + match server.get_sources() { Ok(sinks) => from_pulse_send .send(Message::SetSources(sinks)) .await @@ -171,36 +218,51 @@ impl PulseHandle { Err(_) => { PulseHandle::send_disconnected(&mut from_pulse_send).await } - }, - Message::SetSinkVolumeByName(name, channel_volumes) => { - server.set_sink_volume_by_name(&name, &channel_volumes) } - Message::SetSourceVolumeByName(name, channel_volumes) => { - server.set_source_volume_by_name(&name, &channel_volumes) - } - _ => { - println!("message doesn't match") + } + Message::SetSinkVolumeByName(name, channel_volumes) => { + let server = match server.as_mut() { + Some(s) => s, + None => continue, + }; + server.set_sink_volume_by_name(&name, &channel_volumes) + } + Message::SetSourceVolumeByName(name, channel_volumes) => { + let server = match server.as_mut() { + Some(s) => s, + None => continue, + }; + server.set_source_volume_by_name(&name, &channel_volumes) + } + Message::UpdateConnection => { + log::trace!("Updating Connection {:?}", server.is_some()); + if let Some(mut cur_server) = server.take() { + log::trace!("getting server info..."); + if let Err(_) = cur_server.get_server_info() { + PulseHandle::send_disconnected(&mut from_pulse_send).await; + } else { + server = Some(cur_server); + } + } else if let Ok(new_server) = + PulseServer::connect().and_then(|server| server.init()) + { + log::trace!("got new server..."); + PulseHandle::send_connected(&mut from_pulse_send).await; + server = Some(new_server); } } + _ => { + log::warn!("message doesn't match") + } } } - }); - } - // Always report that server is disconnected - PulseHandle::blocking_send_disconnected(&mut from_pulse_send); + } + }); }); - Ok(PulseHandle { + PulseHandle { to_pulse, from_pulse, - }) - } - - fn blocking_send_disconnected(sender: &mut tokio::sync::mpsc::Sender) { - sender.blocking_send(Message::Disconnected).unwrap() - } - - fn blocking_send_connected(sender: &mut tokio::sync::mpsc::Sender) { - sender.blocking_send(Message::Connected).unwrap() + } } async fn send_disconnected(sender: &mut tokio::sync::mpsc::Sender) { diff --git a/debian/control b/debian/control index 4beeaadf..afa01454 100644 --- a/debian/control +++ b/debian/control @@ -21,4 +21,6 @@ Architecture: amd64 arm64 Depends: ${misc:Depends}, ${shlibs:Depends} +Recommends: + pipewire-pulse Description: Cosmic Applets