diff --git a/Cargo.lock b/Cargo.lock index 853db61..46cee13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1828,7 +1828,6 @@ name = "cosmic-settings-sound-subscription" version = "1.0.0-beta6" dependencies = [ "cosmic-pipewire", - "crossbeam-queue", "futures", "intmap", "libcosmic", diff --git a/crates/cosmic-pipewire/src/lib.rs b/crates/cosmic-pipewire/src/lib.rs index a93aa7b..2f899ae 100644 --- a/crates/cosmic-pipewire/src/lib.rs +++ b/crates/cosmic-pipewire/src/lib.rs @@ -530,10 +530,6 @@ impl State { } fn add_node(&mut self, id: PipewireId, node: Node) { - eprintln!( - "adding node {} with card.profile.device {:?}", - node.object_id, node.card_profile_device - ); // Map the device's pipewire ID to its device ID if let Some(entry) = self.proxies.nodes.get_mut(id) { entry.0 = node.object_id; @@ -573,7 +569,6 @@ impl State { } routes[index as usize] = route.clone(); - eprintln!("add route on device {id}[{index}]: {}", route.name); self.on_event(Event::AddRoute(id, index, route)); } @@ -590,12 +585,10 @@ impl State { } fn default_sink(&mut self, name: String) { - eprintln!("default sink set to {name}"); self.on_event(Event::DefaultSink(name)); } fn default_source(&mut self, name: String) { - eprintln!("default source set to {name}"); self.on_event(Event::DefaultSource(name)); } @@ -813,7 +806,6 @@ impl State { } fn set_profile(&mut self, id: DeviceId, index: u32, save: bool) { - eprintln!("set profile {id}[{index}]: {save}"); let Some(device) = self.device(id) else { return; }; @@ -854,7 +846,6 @@ impl State { volume: f32, balance: Option, ) { - eprintln!("set volume on {id} route device {route_device}"); let Some(device) = self.device(id) else { return; }; diff --git a/subscriptions/sound/Cargo.toml b/subscriptions/sound/Cargo.toml index c362b56..0865fe0 100644 --- a/subscriptions/sound/Cargo.toml +++ b/subscriptions/sound/Cargo.toml @@ -8,7 +8,6 @@ publish = true [dependencies] cosmic-pipewire = { path = "../../crates/cosmic-pipewire" } -crossbeam-queue = "0.3.12" futures = "0.3.31" intmap = "3.1.2" libcosmic = { git = "https://github.com/pop-os/libcosmic", default-features = false } diff --git a/subscriptions/sound/src/lib.rs b/subscriptions/sound/src/lib.rs index 13f9816..58cd094 100644 --- a/subscriptions/sound/src/lib.rs +++ b/subscriptions/sound/src/lib.rs @@ -4,10 +4,14 @@ use cosmic::Task; use cosmic::iced_futures::MaybeSend; use cosmic_pipewire as pipewire; -use futures::{FutureExt, SinkExt, Stream}; +use futures::{SinkExt, Stream}; use intmap::IntMap; use pipewire::Availability; -use std::{process::Stdio, sync::Arc, time::Duration}; +use std::{ + process::Stdio, + sync::{Arc, Mutex}, + time::Duration, +}; pub type DeviceId = u32; pub type NodeId = u32; @@ -16,46 +20,37 @@ pub type RouteId = u32; pub fn watch() -> impl Stream + MaybeSend + 'static { cosmic::iced_futures::stream::channel(1, |mut emitter| async move { - let (cancel_tx, mut cancel_rx) = futures::channel::oneshot::channel::<()>(); - let events = Arc::new(crossbeam_queue::SegQueue::new()); - - _ = emitter - .send( - Message::SubHandle(Arc::new(SubscriptionHandle { - cancel_tx, - pipewire: pipewire::run({ - let events = events.clone(); - move |event| { - events.push(event); - } - }), - })) - .into(), - ) - .await; - - let mut timer = tokio::time::interval(Duration::from_millis(64)); - loop { - futures::select! { - _ = timer.tick().fuse() => { - if !events.is_empty() { - let mut batched = Vec::with_capacity(events.len()); - while let Some(event) = events.pop() { - batched.push(event); - } + let (cancel_tx, cancel_rx) = futures::channel::oneshot::channel::<()>(); + let sender = Arc::new((Mutex::new(Vec::new()), tokio::sync::Notify::const_new())); + let receiver = sender.clone(); - _ = emitter - .send(Message::Server(Arc::from(batched))) - .await; + _ = emitter + .send( + Message::SubHandle(Arc::new(SubscriptionHandle { + cancel_tx, + pipewire: pipewire::run(move |event| { + sender.0.lock().unwrap().push(event); + sender.1.notify_one(); + }), + })) + .into(), + ) + .await; + + let forwarder = Box::pin(async { + loop { + _ = receiver.1.notified().await; + let events = std::mem::take(&mut *receiver.0.lock().unwrap()); + if !events.is_empty() { + _ = emitter.send(Message::Server(Arc::from(events))).await; + tokio::time::sleep(Duration::from_millis(64)).await; } } + }); - _ = &mut cancel_rx => break, - } + futures::future::select(cancel_rx, forwarder).await; } - - futures::future::pending::().await; }) } @@ -73,6 +68,7 @@ pub struct Model { device_ids: IntMap, node_names: IntMap, card_profile_devices: IntMap, + node_route_indexes: IntMap, device_names: IntMap, device_profiles: IntMap>, @@ -240,13 +236,28 @@ impl Model { self.set_default_sink_id(node_id); // Use pactl if the node is not a device node. - let virtual_sink_name: Option = if self.device_ids.contains_key(node_id) { - None - } else if let Some(name) = self.node_names.get(node_id) { - Some(name.clone()) - } else { - None - }; + let virtual_sink_name: Option = + if let Some(device) = self.device_ids.get(node_id).cloned() { + // Get route index of the selected node and apply it to the device. + if let Some((card_profile_device, route_index)) = self + .card_profile_devices + .get(node_id) + .cloned() + .zip(self.node_route_indexes.get(node_id).cloned()) + { + self.pipewire_send(pipewire::Request::SetRoute( + device, + card_profile_device, + route_index as u32, + )); + } + + None + } else if let Some(name) = self.node_names.get(node_id) { + Some(name.clone()) + } else { + None + }; tokio::task::spawn(async move { if let Some(node_name) = virtual_sink_name { @@ -304,13 +315,28 @@ impl Model { self.set_default_source_id(node_id); // Use pactl if the node is not a device node. - let virtual_source_name: Option = if self.device_ids.contains_key(node_id) { - None - } else if let Some(name) = self.node_names.get(node_id) { - Some(name.clone()) - } else { - None - }; + let virtual_source_name: Option = + if let Some(device) = self.device_ids.get(node_id).cloned() { + // Get route index of the selected node and apply it to the device. + if let Some((card_profile_device, route_index)) = self + .card_profile_devices + .get(node_id) + .cloned() + .zip(self.node_route_indexes.get(node_id).cloned()) + { + self.pipewire_send(pipewire::Request::SetRoute( + device, + card_profile_device, + route_index as u32, + )); + } + + None + } else if let Some(name) = self.node_names.get(node_id) { + Some(name.clone()) + } else { + None + }; tokio::task::spawn(async move { if let Some(node_name) = virtual_source_name { @@ -771,7 +797,9 @@ impl Model { continue; }; + tracing::debug!(target: "sound", "matched route {} on {}: {}", route.index, id, route.description); devices[pos] = [&route.description, " - ", device_name].concat(); + self.node_route_indexes.insert(node, route.index); break; }