feat: merge subscriptions crate into cosmic-settings repo

This commit is contained in:
Michael Aaron Murphy 2025-10-08 08:19:35 +02:00 committed by Michael Murphy
parent a2f53f2239
commit 600720b7d1
47 changed files with 8399 additions and 63 deletions

View file

@ -0,0 +1,827 @@
// Copyright 2024 System76 <info@system76.com>
// SPDX-License-Identifier: MPL-2.0
pub mod pipewire;
pub mod pulse;
use cosmic::Task;
use cosmic::iced_futures::MaybeSend;
use futures::{Stream, StreamExt};
use indexmap::IndexMap;
use std::{collections::BTreeMap, sync::Arc, time::Duration};
pub type NodeId = u32;
pub type ProfileId = u32;
pub fn watch() -> impl Stream<Item = Message> + MaybeSend + 'static {
async_fn_stream::fn_stream(|emitter| async move {
let (cancel_tx, mut cancel_rx) = futures::channel::oneshot::channel::<()>();
let (tx, mut pulse_rx) = futures::channel::mpsc::channel(1);
let _pulse_handle = std::thread::spawn(move || {
pulse::thread(tx);
});
let (tx, mut pw_rx) = futures::channel::mpsc::channel(1);
let (_pipewire_handle, pipewire_terminate) = pipewire::thread(tx);
emitter
.emit(
Message::SubHandle(Arc::new(SubscriptionHandle {
cancel_tx,
pipewire: pipewire_terminate,
}))
.into(),
)
.await;
let mut pulse_channels = None;
let mut balance = None;
let mut source_volume = None;
let mut sink_volume = None;
let mut events = Vec::new();
let mut timer = tokio::time::interval(Duration::from_millis(64));
loop {
tokio::select! {
event = pulse_rx.next() => {
let Some(event) = event else {
break;
};
match event {
pulse::Event::Channels(channels) => pulse_channels = Some(channels),
pulse::Event::SinkVolume(volume) => sink_volume = Some(volume),
pulse::Event::SourceVolume(volume) => source_volume = Some(volume),
pulse::Event::Balance(value) => balance = Some(value),
_ => {
events.push(Server::Pulse(event));
timer.reset();
}
}
}
event = pw_rx.next() => {
let Some(event) = event else {
break;
};
timer.reset();
events.push(Server::Pipewire(event));
}
_ = timer.tick() => {
if let Some(channels) = pulse_channels.take() {
events.push(Server::Pulse(pulse::Event::Channels(channels)));
}
if let Some(volume) = sink_volume.take() {
events.push(Server::Pulse(pulse::Event::SinkVolume(volume)));
}
if let Some(volume) = source_volume.take() {
events.push(Server::Pulse(pulse::Event::SourceVolume(volume)));
}
if let Some(balance) = balance.take() {
events.push(Server::Pulse(pulse::Event::Balance(balance)));
}
if !events.is_empty() {
emitter
.emit(Message::Server(Arc::from(std::mem::take(&mut events))))
.await;
}
}
_ = &mut cancel_rx => break,
}
}
drop(pulse_rx);
drop(pw_rx);
futures::future::pending::<Message>().await;
})
}
#[derive(Default)]
pub struct Model {
subscription_handle: Option<SubscriptionHandle>,
sink_channels: Option<pulse::PulseChannels>,
devices: BTreeMap<DeviceId, Card>,
card_names: IndexMap<DeviceId, String>,
card_profiles: IndexMap<DeviceId, Vec<pulse::CardProfile>>,
active_profiles: IndexMap<DeviceId, Option<String>>,
/** Sink devices */
/// Product names for source sink devices.
sinks: Vec<String>,
/// Pipewire object IDs for sink devices.
sink_pw_ids: Vec<NodeId>,
/// Profile IDs for the actively-selected sink device.
sink_profiles: Vec<String>,
/// Names of profiles for the actively-selected sink device.
sink_profile_names: Vec<String>,
/// Device ID of active sink device.
active_sink_device: Option<DeviceId>,
/// Index of active sink device.
active_sink: Option<usize>,
/// Card profile index of active sink device.
active_sink_profile: Option<usize>,
/** Source devices */
/// Product names for source devices.
sources: Vec<String>,
/// Pipewire object IDs for source devices.
source_pw_ids: Vec<NodeId>,
/// Profile IDs for the actively-selected source device.
source_profiles: Vec<String>,
/// Names of profiles for the actively-selected source device.
source_profile_names: Vec<String>,
/// Device ID of active source device.
active_source_device: Option<DeviceId>,
/// Index of active source device.
active_source: Option<usize>,
/// Card profile index of active source device.
active_source_profile: Option<usize>,
/// Device identifier of the default sink.
default_sink: String,
/// Device identifier of the default source.
default_source: String,
pub sink_volume_text: String,
pub source_volume_text: String,
pub sink_balance_text: Option<String>,
pub sink_balance: Option<f32>,
pub sink_volume: u32,
pub source_volume: u32,
pub sink_mute: bool,
sink_volume_debounce: bool,
sink_balance_debounce: bool,
pub source_mute: bool,
source_volume_debounce: bool,
changing_sink_profile: Option<DeviceId>,
changing_source_profile: Option<DeviceId>,
}
impl Model {
pub fn active_sink(&self) -> Option<usize> {
self.active_sink
}
pub fn active_sink_profile(&self) -> Option<usize> {
self.active_sink_profile
}
pub fn active_source(&self) -> Option<usize> {
self.active_source
}
pub fn active_source_profile(&self) -> Option<usize> {
self.active_source_profile
}
pub fn sinks(&self) -> &[String] {
&self.sinks
}
pub fn sink_profiles(&self) -> &[String] {
&self.sink_profiles
}
pub fn sources(&self) -> &[String] {
&self.sources
}
pub fn source_profiles(&self) -> &[String] {
&self.source_profiles
}
pub fn clear(&mut self) {
if let Some(handle) = self.subscription_handle.take() {
_ = handle.cancel_tx.send(());
_ = handle.pipewire.send(());
}
if let Some(channel) = self.sink_channels.take() {
channel.quit();
}
}
pub fn sink_balance_changed(&mut self, balance: u32) -> Task<Message> {
self.sink_balance = Some((balance as f32 - 100.) / 100.);
self.sink_balance_text = Some(format!("{balance:.2}"));
if self.sink_balance_debounce {
return Task::none();
}
if !self
.sink_pw_ids
.get(self.active_sink.unwrap_or(0))
.is_none()
{
self.sink_balance_debounce = true;
return cosmic::Task::future(async move {
tokio::time::sleep(Duration::from_millis(64)).await;
Message::SinkBalanceApply.into()
});
}
Task::none()
}
pub fn sink_changed(&mut self, pos: usize) -> Task<Message> {
if let Some(&node_id) = self.sink_pw_ids.get(pos) {
for card in self.devices.values() {
for (&nid, port) in &card.ports {
if node_id == nid {
self.active_sink = Some(pos);
let identifier = port.identifier.clone();
return cosmic::Task::future(async move {
wpctl_set_default(nid).await;
Message::SetDefaultSink(identifier).into()
});
}
}
}
}
Task::none()
}
pub fn sink_mute_toggle(&mut self) {
self.sink_mute = !self.sink_mute;
if let Some(&node_id) = self.sink_pw_ids.get(self.active_sink.unwrap_or(0)) {
wpctl_set_mute(node_id, self.sink_mute);
}
}
pub fn sink_profile_changed(&mut self, profile: usize) -> Task<Message> {
self.active_sink_profile = Some(profile);
if let Some(profile) = self.sink_profile_names.get(profile).cloned() {
if let Some(device_id) = self.active_sink_device.clone() {
if let Some(name) = self.card_names.get(&device_id).cloned() {
self.active_profiles
.insert(device_id.clone(), Some(profile.clone()));
self.changing_sink_profile = Some(device_id);
return cosmic::Task::future(async move {
pactl_set_card_profile(name, profile).await;
})
.discard();
}
}
}
Task::none()
}
pub fn sink_volume_changed(&mut self, volume: u32) -> Task<Message> {
self.sink_volume = volume;
self.sink_volume_text = volume.to_string();
if self.sink_volume_debounce {
return Task::none();
}
if let Some(&node_id) = self.sink_pw_ids.get(self.active_sink.unwrap_or(0)) {
self.sink_volume_debounce = true;
return cosmic::Task::future(async move {
tokio::time::sleep(Duration::from_millis(64)).await;
Message::SinkVolumeApply(node_id).into()
});
}
Task::none()
}
pub fn source_changed(&mut self, pos: usize) -> Task<Message> {
if let Some(&node_id) = self.source_pw_ids.get(pos) {
for card in self.devices.values() {
for (&nid, port) in &card.ports {
if node_id == nid {
self.active_source = Some(pos);
let identifier = port.identifier.clone();
return cosmic::Task::future(async move {
wpctl_set_default(nid).await;
Message::SetDefaultSource(identifier).into()
});
}
}
}
}
Task::none()
}
pub fn source_mute_toggle(&mut self) {
self.source_mute = !self.source_mute;
if let Some(&node_id) = self.source_pw_ids.get(self.active_source.unwrap_or(0)) {
wpctl_set_mute(node_id, self.source_mute);
}
}
pub fn source_profile_changed(&mut self, profile: usize) -> Task<Message> {
self.active_source_profile = Some(profile);
if let Some(profile) = self.source_profile_names.get(profile).cloned() {
if let Some(device_id) = self.active_source_device.clone() {
if let Some(name) = self.card_names.get(&device_id).cloned() {
self.active_profiles
.insert(device_id.clone(), Some(profile.clone()));
self.changing_source_profile = Some(device_id.clone());
return cosmic::Task::future(async move {
pactl_set_card_profile(name, profile).await;
})
.discard();
}
}
}
Task::none()
}
pub fn source_volume_changed(&mut self, volume: u32) -> Task<Message> {
self.source_volume = volume;
self.source_volume_text = volume.to_string();
if self.source_volume_debounce {
return Task::none();
}
if let Some(&node_id) = self.source_pw_ids.get(self.active_source.unwrap_or(0)) {
self.source_volume_debounce = true;
return cosmic::Task::future(async move {
tokio::time::sleep(Duration::from_millis(64)).await;
Message::SourceVolumeApply(node_id).into()
});
}
Task::none()
}
pub fn update(&mut self, message: Message) -> Task<Message> {
match message {
Message::Server(events) => {
for event in Arc::into_inner(events).into_iter().flatten() {
match event {
Server::Pulse(event) => match event {
pulse::Event::SourceVolume(volume) => {
if self.sink_volume_debounce {
return Task::none();
}
self.source_volume = volume;
self.source_volume_text = volume.to_string();
}
pulse::Event::SinkVolume(volume) => {
if self.sink_volume_debounce {
return Task::none();
}
self.sink_volume = volume;
self.sink_volume_text = volume.to_string();
}
pulse::Event::CardInfo(card) => {
let device_id = match card.variant {
pulse::DeviceVariant::Alsa { alsa_card, .. } => {
DeviceId::Alsa(alsa_card)
}
pulse::DeviceVariant::Bluez5 { address, .. } => {
DeviceId::Bluez5(address)
}
};
eprintln!(
"inserting card {:?}: name={}, active_profile={:?}, profiles={:?}",
device_id,
card.name,
card.active_profile.as_ref().map(|p| p.name.as_str()),
card.profiles
);
self.card_names.insert(device_id.clone(), card.name);
self.card_profiles.insert(device_id.clone(), card.profiles);
self.active_profiles
.insert(device_id, card.active_profile.map(|p| p.name));
}
pulse::Event::DefaultSink(sink) => {
if !self.changing_sink_profile.is_some() {
self.set_default_sink(sink);
}
}
pulse::Event::DefaultSource(source) => {
if !self.changing_source_profile.is_some() {
self.set_default_source(source);
}
}
pulse::Event::SinkMute(mute) => {
self.sink_mute = mute;
}
pulse::Event::SourceMute(mute) => {
self.source_mute = mute;
}
pulse::Event::Balance(balance) => {
self.sink_balance = balance;
self.sink_balance_text = balance.map(|b| format!("{b:.2}"));
}
pulse::Event::Channels(channels) => {
self.sink_channels = Some(channels);
}
},
Server::Pipewire(event) => match event {
pipewire::DeviceEvent::Add(device) => {
let device_id = match device.variant {
pipewire::DeviceVariant::Alsa { alsa_card, .. } => {
DeviceId::Alsa(alsa_card)
}
pipewire::DeviceVariant::Bluez5 { address, .. } => {
DeviceId::Bluez5(address)
}
pipewire::DeviceVariant::Unknown {} => DeviceId::Unknown {},
};
match device.media_class {
pipewire::MediaClass::Sink => {
self.sinks.push(device.product_name.clone());
self.sink_pw_ids.push(device.object_id);
sort_pulse_devices(&mut self.sinks, &mut self.sink_pw_ids);
if self.default_sink == device.node_name {
self.active_sink_device = Some(device_id.clone());
self.active_sink = self
.sinks
.iter()
.position(|s| *s == device.product_name);
self.set_sink_profiles(&device_id);
}
}
pipewire::MediaClass::Source => {
self.sources.push(device.product_name.clone());
self.source_pw_ids.push(device.object_id);
sort_pulse_devices(
&mut self.sources,
&mut self.source_pw_ids,
);
if self.default_source == device.node_name {
self.active_source = self
.sources
.iter()
.position(|s| *s == device.product_name);
self.active_source_device = Some(device_id.clone());
self.set_source_profiles(&device_id);
}
}
}
let card = self.devices.entry(device_id).or_insert_with(|| Card {
ports: IndexMap::new(),
});
card.ports.insert(
device.object_id,
CardPort {
class: device.media_class,
identifier: device.node_name,
description: device.product_name,
},
);
card.ports.sort_unstable_by(|_, av, _, bv| {
av.description.cmp(&bv.description)
});
}
pipewire::DeviceEvent::Remove(node_id) => {
let mut remove = None;
for (card_id, card) in &mut self.devices {
if card.ports.shift_remove(&node_id).is_some() {
if card.ports.is_empty() {
remove = Some(card_id.clone());
}
break;
}
}
if let Some(card_id) = remove {
_ = self.devices.remove(&card_id);
}
if let Some(pos) =
self.sink_pw_ids.iter().position(|&id| id == node_id)
{
_ = self.sink_pw_ids.remove(pos);
_ = self.sinks.remove(pos);
if self.active_sink == Some(pos) {
self.active_sink = None;
self.active_sink_device = None;
self.active_sink_profile = None;
} else {
self.active_sink = self.active_sink.map(|active_pos| {
if active_pos > pos {
active_pos - 1
} else {
active_pos
}
});
}
} else if let Some(pos) =
self.source_pw_ids.iter().position(|&id| id == node_id)
{
_ = self.source_pw_ids.remove(pos);
_ = self.sources.remove(pos);
if self.active_source == Some(pos) {
self.active_source = None;
self.active_source_device = None;
self.active_source_profile = None;
}
}
}
},
}
}
let mut tasks = Task::none();
if let Some(device_id) = self.changing_sink_profile.take() {
tasks = tasks.chain(self.sink_profile_select(device_id));
}
if let Some(device_id) = self.changing_source_profile.take() {
tasks = tasks.chain(self.source_profile_select(device_id));
}
return tasks;
}
Message::SinkBalanceApply => {
self.sink_balance_debounce = false;
if let Some((balance, channels)) =
self.sink_balance.zip(self.sink_channels.as_mut())
{
channels.set_balance(balance);
}
}
Message::SinkVolumeApply(_) => {
self.sink_volume_debounce = false;
if let Some(channels) = self.sink_channels.as_mut() {
channels.set_volume(self.sink_volume as f32 / 100.);
}
}
Message::SourceVolumeApply(node_id) => {
self.source_volume_debounce = false;
wpctl_set_volume(node_id, self.source_volume);
}
Message::SetDefaultSink(identifier) => self.set_default_sink(identifier),
Message::SetDefaultSource(identifier) => self.set_default_source(identifier),
Message::SubHandle(handle) => {
if let Some(handle) = Arc::into_inner(handle) {
self.subscription_handle = Some(handle);
}
}
}
Task::none()
}
fn device_profiles(&self, device_id: &DeviceId) -> (Vec<String>, Vec<String>, Option<usize>) {
let (profiles, profile_descriptions): (Vec<String>, Vec<String>) = self
.card_profiles
.get(device_id)
.map_or((Vec::new(), Vec::new()), |profiles| {
profiles
.iter()
.filter(|p| p.available && p.name != "off")
.map(|p| (p.name.clone(), p.description.clone()))
.collect()
});
let active_profile = self.active_profiles.get(device_id).and_then(|profile| {
profile
.as_ref()
.and_then(|profile| profiles.iter().position(|p| p == profile))
});
(profiles, profile_descriptions, active_profile)
}
/// Update the state of the default sink and its profiles.
fn set_default_sink(&mut self, sink: String) {
if self.default_sink == sink {
return;
}
self.default_sink = sink;
for (device_id, card) in &self.devices {
for (&node_id, card_port) in &card.ports {
if let pipewire::MediaClass::Sink = card_port.class {
if &card_port.identifier == &self.default_sink {
let device_id = device_id.clone();
self.set_sink_profiles(&device_id);
self.active_sink = self.sink_pw_ids.iter().position(|&id| id == node_id);
self.active_sink_device = Some(device_id);
return;
}
}
}
}
}
fn set_default_source(&mut self, source: String) {
if self.default_source == source {
return;
}
self.default_source = source;
for (device_id, card) in &self.devices {
for (&node_id, card_ports) in &card.ports {
if let pipewire::MediaClass::Source = card_ports.class {
if card_ports.identifier == self.default_source {
self.active_source =
self.source_pw_ids.iter().position(|&id| id == node_id);
let device_id = device_id.clone();
self.set_source_profiles(&device_id);
self.active_source_device = Some(device_id);
return;
}
}
}
}
}
fn set_sink_profiles(&mut self, device_id: &DeviceId) {
(
self.sink_profile_names,
self.sink_profiles,
self.active_sink_profile,
) = self.device_profiles(device_id);
}
fn set_source_profiles(&mut self, device_id: &DeviceId) {
(
self.source_profile_names,
self.source_profiles,
self.active_source_profile,
) = self.device_profiles(device_id);
}
fn sink_profile_select(&mut self, device_id: DeviceId) -> Task<Message> {
let sink_pos = self.active_sink.unwrap_or(0);
if let Some(card) = self.devices.get(&device_id) {
if let Some((&nid, port)) = card.ports.get_index(sink_pos) {
let identifier = port.identifier.clone();
return cosmic::Task::future(async move {
wpctl_set_default(nid).await;
Message::SetDefaultSink(identifier)
});
}
}
Task::none()
}
fn source_profile_select(&mut self, device_id: DeviceId) -> Task<Message> {
self.changing_source_profile = None;
let source_pos = self.active_source.unwrap_or(0);
if let Some(card) = self.devices.get(&device_id) {
if let Some((&nid, port)) = card.ports.get_index(source_pos) {
let identifier = port.identifier.clone();
return cosmic::Task::future(async move {
wpctl_set_default(nid).await;
Message::SetDefaultSource(identifier)
});
}
}
Task::none()
}
}
#[derive(Debug)]
struct Card {
ports: IndexMap<NodeId, CardPort>,
}
#[derive(Debug)]
struct CardPort {
class: pipewire::MediaClass,
identifier: String,
description: String,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
pub enum DeviceId {
Alsa(u32),
Bluez5(String),
Unknown(),
}
#[derive(Clone, Debug)]
pub enum Message {
/// Handle messages from the sound server.
Server(Arc<Vec<Server>>),
/// Set the default sink.
SetDefaultSink(String),
/// Set the default source.
SetDefaultSource(String),
/// Change the output volume.
SinkVolumeApply(NodeId),
/// Change the output balance.
SinkBalanceApply,
/// Change the input volume.
SourceVolumeApply(NodeId),
/// On init of the subscription, channels for closing background threads are given to the app.
SubHandle(Arc<SubscriptionHandle>),
}
#[derive(Clone, Debug)]
pub enum Server {
/// Get default sinks/sources and their volumes/mute status.
Pulse(pulse::Event),
/// Get ALSA cards and their profiles.
Pipewire(pipewire::DeviceEvent),
}
pub struct SubscriptionHandle {
cancel_tx: futures::channel::oneshot::Sender<()>,
pipewire: pipewire::Sender<()>,
}
impl std::fmt::Debug for SubscriptionHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("SubscriptionHandle")
}
}
fn sort_pulse_devices(descriptions: &mut Vec<String>, node_ids: &mut Vec<NodeId>) {
let mut tmp: Vec<(String, NodeId)> = std::mem::take(descriptions)
.into_iter()
.zip(std::mem::take(node_ids))
.collect();
tmp.sort_unstable_by(|(ak, _), (bk, _)| ak.cmp(bk));
(*descriptions, *node_ids) = tmp.into_iter().collect();
}
async fn pactl_set_card_profile(id: String, profile: String) {
tracing::debug!("pactl set-card-profile {id} {profile}");
_ = tokio::process::Command::new("pactl")
.args(["set-card-profile", id.as_str(), profile.as_str()])
.status()
.await
}
async fn wpctl_set_default(id: u32) {
tracing::debug!("wpctl set-default {id}");
let id = id.to_string();
_ = tokio::process::Command::new("wpctl")
.args(["set-default", id.as_str()])
.status()
.await;
}
fn wpctl_set_mute(id: u32, mute: bool) {
tokio::task::spawn(async move {
let default = id.to_string();
_ = tokio::process::Command::new("wpctl")
.args(["set-mute", default.as_str(), if mute { "1" } else { "0" }])
.status()
.await;
});
}
fn wpctl_set_volume(id: u32, volume: u32) {
tokio::task::spawn(async move {
let id = id.to_string();
let volume = format!("{}.{:02}", volume / 100, volume % 100);
_ = tokio::process::Command::new("wpctl")
.args(["set-volume", id.as_str(), volume.as_str()])
.status()
.await;
});
}

View file

@ -0,0 +1,279 @@
// Copyright 2024 System76 <info@system76.com>
// SPDX-License-Identifier: MPL-2.0
// #![deny(missing_docs)]
pub use pipewire::channel::Sender;
use cosmic::iced_futures::{self, Subscription, stream};
use futures::{SinkExt, executor::block_on};
use pipewire::{
context::Context as PwContext,
main_loop::MainLoop as PwMainLoop,
node::{Node, NodeInfoRef, NodeState},
proxy::{Listener, ProxyT},
types::ObjectType,
};
use std::{
cell::RefCell,
collections::{BTreeMap, HashMap},
rc::Rc,
thread::JoinHandle,
};
pub fn subscription() -> iced_futures::Subscription<DeviceEvent> {
Subscription::run_with_id(
"pipewire",
stream::channel(20, |sender| async {
_ = thread(sender);
futures::future::pending().await
}),
)
}
pub fn thread(
on_event: futures::channel::mpsc::Sender<DeviceEvent>,
) -> (JoinHandle<()>, pipewire::channel::Sender<()>) {
let (pw_tx, pw_rx) = pipewire::channel::channel();
let handle = std::thread::spawn(move || {
devices_from_socket(pw_rx, on_event);
});
(handle, pw_tx)
}
/// Node event`
#[derive(Debug)]
pub enum NodeEvent<'a> {
/// Node info
NodeInfo(u32, &'a NodeInfoRef),
/// Node removal
Remove(u32),
}
/// Device event
#[derive(Clone, Debug)]
pub enum DeviceEvent {
/// A new device was detected.
Add(Device),
/// A device with the given object_id was removed.
Remove(u32),
}
/// Device information
#[must_use]
#[derive(Clone, Debug)]
pub struct Device {
pub object_id: u32,
pub variant: DeviceVariant,
pub media_class: MediaClass,
pub product_name: String,
pub node_name: String,
pub state: DeviceState,
}
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub enum DeviceVariant {
Alsa { alsa_card: u32 },
Bluez5 { address: String },
Unknown {},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DeviceState {
Idle,
Running,
Creating,
Suspended,
Error(String),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum MediaClass {
Source,
Sink,
}
impl Device {
/// Attains process info from a pipewire info node.
#[must_use]
pub fn from_node(info: &NodeInfoRef) -> Option<Self> {
let props = info.props()?;
let (variant, product_name) = if let Some(alsa_card) =
props.get("alsa.card").and_then(|v| v.parse::<u32>().ok())
{
let device_profile_description = props.get("device.profile.description")?.to_owned();
let description = props.get("node.description")?;
let description = description
.strip_suffix(&device_profile_description)
.map(str::trim_end)
.unwrap_or(description)
.replace("High Definition Audio", "HD Audio");
(DeviceVariant::Alsa { alsa_card }, description)
} else if let Some(address) = props
.get("api.bluez5.address")
.and_then(|v| v.parse::<String>().ok())
{
(
DeviceVariant::Bluez5 {
address: address.to_owned(),
},
props.get("node.description")?.to_owned(),
)
} else {
(
DeviceVariant::Unknown {},
props.get("node.description")?.to_owned(),
)
};
Some(Device {
object_id: props.get("object.id")?.parse::<u32>().ok()?,
variant,
media_class: match props.get("media.class")? {
"Audio/Sink" => MediaClass::Sink,
"Audio/Source" => MediaClass::Source,
_ => return None,
},
product_name,
node_name: props.get("node.name")?.to_owned(),
state: match info.state() {
NodeState::Idle => DeviceState::Idle,
NodeState::Running => DeviceState::Running,
NodeState::Creating => DeviceState::Creating,
NodeState::Suspended => DeviceState::Suspended,
NodeState::Error(why) => DeviceState::Error(why.to_owned()),
},
})
}
}
/// Monitors the devices from a given ``PipeWire`` socket.
///
/// ``PipeWire`` sockets are found in `/run/user/{{UID}}/pipewire-0`.
pub fn devices_from_socket(
pw_cancel: pipewire::channel::Receiver<()>,
mut on_event: futures::channel::mpsc::Sender<DeviceEvent>,
) {
let mut managed = BTreeMap::new();
let _res = nodes_from_socket(pw_cancel, move |main_loop, event| match event {
NodeEvent::NodeInfo(pw_id, info) => {
if let Some(device) = Device::from_node(info) {
if managed.insert(pw_id, device.object_id).is_none() {
if block_on(on_event.send(DeviceEvent::Add(device))).is_err() {
main_loop.quit();
}
}
}
}
NodeEvent::Remove(pw_id) => {
if let Some(object_id) = managed.remove(&pw_id) {
if block_on(on_event.send(DeviceEvent::Remove(object_id))).is_err() {
main_loop.quit();
}
}
}
});
}
/// Listens to information about nodes, passing that info into a callback.
///
/// # Errors
///
/// Errors if the pipewire connection fails
pub fn nodes_from_socket(
pw_cancel: pipewire::channel::Receiver<()>,
on_event: impl FnMut(&PwMainLoop, NodeEvent) + 'static,
) -> Result<(), Box<dyn std::error::Error>> {
let main_loop = PwMainLoop::new(None)?;
let context = PwContext::new(&main_loop)?;
let core = context.connect(None)?;
// Exit main loop on receivering terminate message.
let _cancel_rx = pw_cancel.attach(main_loop.loop_(), {
let main_loop = main_loop.clone();
move |_| main_loop.quit()
});
let registry = Rc::new(core.get_registry()?);
let registry_weak = Rc::downgrade(&registry);
let proxies = Rc::new(RefCell::new(HashMap::new()));
let on_event = Rc::new(RefCell::new(on_event));
let main_loop_clone = main_loop.clone();
let _registry_listener = registry
.add_listener_local()
.global(move |obj| {
let Some(registry) = registry_weak.upgrade() else {
return;
};
let attached_proxy: Option<(Box<dyn ProxyT>, Box<dyn Listener>)> = match obj.type_ {
ObjectType::Node => {
let Ok(node): Result<Node, _> = registry.bind(obj) else {
return;
};
let on_event_weak = Rc::downgrade(&on_event);
let main_loop = main_loop_clone.clone();
let id = node.upcast_ref().id();
let listener = node
.add_listener_local()
.info(move |info| {
if let Some(on_event) = on_event_weak.upgrade() {
on_event.borrow_mut()(&main_loop, NodeEvent::NodeInfo(id, info));
}
})
.register();
Some((Box::new(node), Box::new(listener)))
}
_ => None,
};
if let Some((proxy_spe, listener)) = attached_proxy {
let proxy = proxy_spe.upcast_ref();
let id = proxy.id();
let (object_type, _object_version) = proxy.get_type();
let proxies_weak = Rc::downgrade(&proxies);
let on_event_weak = Rc::downgrade(&on_event);
let main_loop = main_loop_clone.clone();
let remove_listener = proxy
.add_listener_local()
.removed(move || {
if object_type == ObjectType::Node {
if let Some(on_event) = on_event_weak.upgrade() {
on_event.borrow_mut()(&main_loop, NodeEvent::Remove(id));
}
}
if let Some(proxies) = proxies_weak.upgrade() {
proxies.borrow_mut().remove(&id);
}
})
.register();
proxies
.borrow_mut()
.insert(id, (proxy_spe, listener, remove_listener));
}
})
.register();
main_loop.run();
Ok(())
}

View file

@ -0,0 +1,752 @@
// Copyright 2024 System76 <info@system76.com>
// SPDX-License-Identifier: MPL-2.0
// Make sure not to fail if pulse not found, and reconnect?
// change to device shouldn't send osd?
use cosmic::iced_futures::{self, Subscription, stream};
use futures::{SinkExt, executor::block_on};
use libpulse_binding::{
callbacks::ListResult,
channelmap::Map,
context::{
Context, FlagSet, State,
introspect::{CardInfo, CardProfileInfo, Introspector, ServerInfo, SinkInfo, SourceInfo},
subscribe::{Facility, InterestMaskSet, Operation},
},
def::{PortAvailable, Retval},
mainloop::{
api::MainloopApi,
events::io::IoEventInternal,
standard::{IterateResult, Mainloop},
},
volume::{ChannelVolumes, Volume},
};
use std::{
borrow::Cow,
cell::{Cell, RefCell},
convert::Infallible,
io::{Read, Write},
os::{
fd::{FromRawFd, IntoRawFd, RawFd},
raw::c_void,
},
rc::Rc,
str::FromStr,
sync::mpsc,
};
pub fn subscription() -> iced_futures::Subscription<Event> {
Subscription::run_with_id(
"pulse",
stream::channel(20, |sender| async {
std::thread::spawn(move || thread(sender));
futures::future::pending().await
}),
)
}
pub fn thread(sender: futures::channel::mpsc::Sender<Event>) {
let Some(mut main_loop) = Mainloop::new() else {
log::error!("Failed to create PA main loop");
return;
};
let Some(mut context) = Context::new(&main_loop, "cosmic-osd") else {
log::error!("Failed to create PA context");
return;
};
let data = Rc::new(Data {
main_loop: RefCell::new(Mainloop {
_inner: Rc::clone(&main_loop._inner),
}),
introspector: context.introspect(),
sink_volume: Cell::new(None),
sink_mute: Cell::new(None),
source_volume: Cell::new(None),
source_mute: Cell::new(None),
default_sink_name: RefCell::new(None),
default_source_name: RefCell::new(None),
sender: RefCell::new(sender.clone()),
});
let data_clone = data.clone();
context.set_subscribe_callback(Some(Box::new(move |facility, operation, index| {
data_clone.subscribe_cb(facility.unwrap(), operation, index);
})));
let _ = context.connect(None, FlagSet::NOFAIL, None);
loop {
if sender.is_closed() {
return;
}
match main_loop.iterate(false) {
IterateResult::Success(_) => {}
IterateResult::Err(_e) => {
return;
}
IterateResult::Quit(_e) => {
return;
}
}
if context.get_state() == State::Ready {
break;
}
}
// Inspect all available cards on startup
data.introspector.get_card_info_list({
let data_weak = Rc::downgrade(&data);
move |card_info_res| {
if let Some(data) = data_weak.upgrade() {
data.card_info_cb(card_info_res)
}
}
});
data.get_server_info();
context.subscribe(
InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE,
|_| {},
);
if let Err((err, retval)) = main_loop.run() {
log::error!("PA main loop returned {:?}, error {}", retval, err);
}
}
#[derive(Clone, Debug)]
pub enum Event {
Balance(Option<f32>),
CardInfo(Card),
DefaultSink(String),
DefaultSource(String),
SinkVolume(u32),
Channels(PulseChannels),
SinkMute(bool),
SourceVolume(u32),
SourceMute(bool),
}
enum Request {
Volume(u32, f32),
Balance(u32, f32),
Quit,
}
#[derive(Debug)]
pub struct PulseChannels {
tx: mpsc::Sender<Request>,
pipe_tx: std::fs::File,
index: u32,
}
impl Clone for PulseChannels {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
pipe_tx: self
.pipe_tx
.try_clone()
.expect("failed to clone PulseChannels pipe writer"),
index: self.index,
}
}
}
/// Data used by the [`handle_balance_io_new`] callback.
struct HandleBalanceData(
Context,
ChannelVolumes,
Map,
std::sync::mpsc::Receiver<Request>,
);
/// Callback for creating an IO event source [`MainloopApi::io_new`].
extern "C" fn handle_balance_io_new(
api: *const MainloopApi,
event: *mut IoEventInternal,
reader_fd: RawFd,
_flags: libpulse_binding::mainloop::events::io::FlagSet,
data: *mut c_void,
) {
// Take ownership of the data and borrow its contents.
let mut data = unsafe { Box::<HandleBalanceData>::from_raw(data as _) };
let HandleBalanceData(ctx, volumes, map, rx) = data.as_mut();
// Return early if the context is not ready, and give the data back.
if ctx.get_state() != State::Ready {
let _ = Box::leak(data);
return;
}
// If the first byte cannot be read, destroy this event source with its reader and data.
let mut buf = [0u8; 1];
let mut reader = unsafe { std::fs::File::from_raw_fd(reader_fd) };
if reader.read_exact(&mut buf).is_err() {
(unsafe { &*api })
.io_free
.as_ref()
.expect("io_free function is missing")(event);
return;
}
// Give ownership of the reader back.
_ = reader.into_raw_fd();
while let Ok(req) = rx.try_recv() {
match req {
Request::Volume(index, volume_scale) => {
let mut intro = ctx.introspect();
let new_scale = Volume((volume_scale * Volume::NORMAL.0 as f32).round() as u32);
if let Some(v) = volumes.scale(new_scale) {
_ = intro.set_sink_volume_by_index(
index,
v,
Some(Box::new(|success| {
if !success {
tracing::error!("Failed to set sink balance");
}
})),
);
}
}
Request::Balance(index, new_balance) => {
if map.can_balance() {
if let Some(v) = volumes.set_balance(&map, new_balance) {
let mut intro = ctx.introspect();
_ = intro.set_sink_volume_by_index(
index,
v,
Some(Box::new(|success| {
if !success {
tracing::error!("Failed to set sink balance");
}
})),
);
}
}
}
Request::Quit => unsafe { &*api }
.quit
.as_ref()
.expect("quit function missing")(api, 0),
}
}
let _ = Box::leak(data);
}
impl PulseChannels {
fn new(
volumes: ChannelVolumes,
map: Map,
api: &MainloopApi,
index: u32,
ctx: Context,
) -> PulseChannels {
let (reader, writer) = rustix::pipe::pipe_with(rustix::pipe::PipeFlags::CLOEXEC)
.expect("failed to crate pipe");
let (tx, rx) = mpsc::channel::<Request>();
// Create IO event source object for handling speaker balance.
let event_source = api.io_new.as_ref().unwrap()(
api as *const _,
reader.into_raw_fd(),
libpulse_binding::mainloop::events::io::FlagSet::INPUT,
Some(handle_balance_io_new),
Box::into_raw(Box::new(HandleBalanceData(ctx, volumes, map, rx))) as *mut c_void,
);
if let Some(enable) = api.io_enable.as_ref() {
enable(
event_source,
libpulse_binding::mainloop::events::io::FlagSet::INPUT,
);
}
Self {
tx,
pipe_tx: std::fs::File::from(writer),
index,
}
}
/// Change the active index.
#[inline]
pub fn set_index(&mut self, index: u32) {
self.index = index;
}
/// Set the speaker balance of the active sink.
pub fn set_balance(&mut self, balance: f32) {
if let Err(err) = self.tx.send(Request::Balance(self.index, balance)) {
tracing::error!(?err, "Failed to send new balance to channel");
} else {
self.pipe_tx
.write_all(&[1])
.expect("PulseChannels pipe write failed");
}
}
/// Set the volume of the active sink.
pub fn set_volume(&mut self, volume: f32) {
if let Err(err) = self.tx.send(Request::Volume(self.index, volume)) {
tracing::error!(?err, "Failed to send new volume to channel");
} else {
self.pipe_tx
.write_all(&[1])
.expect("PulseChannels pipe write failed");
}
}
/// Request the pulse thread to quit.
pub fn quit(mut self) {
_ = self.tx.send(Request::Quit);
_ = self.pipe_tx.write_all(&[1]);
}
}
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct Card {
pub object_id: u32,
pub name: String,
pub product_name: String,
pub variant: DeviceVariant,
pub ports: Vec<CardPort>,
pub profiles: Vec<CardProfile>,
pub active_profile: Option<CardProfile>,
}
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct CardPort {
pub name: String,
pub description: String,
pub direction: Direction,
pub port_type: PortType,
pub profile_port: u32,
pub priority: u32,
pub profiles: Vec<CardProfile>,
pub availability: Availability,
}
#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)]
pub enum Availability {
Unknown,
No,
Yes,
}
impl From<PortAvailable> for Availability {
fn from(pa: PortAvailable) -> Self {
match pa {
PortAvailable::Unknown => Availability::Unknown,
PortAvailable::No => Availability::No,
PortAvailable::Yes => Availability::Yes,
}
}
}
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct CardProfile {
pub name: String,
pub description: String,
pub available: bool,
pub n_sinks: u32,
pub n_sources: u32,
pub priority: u32,
}
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub enum DeviceVariant {
Alsa { alsa_card: u32 },
Bluez5 { address: String },
}
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub enum Direction {
Input,
Output,
Both,
}
#[derive(Default, Clone, Debug, Hash, Eq, PartialEq)]
pub enum PortType {
Mic,
Speaker,
Headphones,
Headset,
Digital,
#[default]
Unknown,
}
impl FromStr for PortType {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"mic" => Ok(PortType::Mic),
"speaker" => Ok(PortType::Speaker),
"headphones" => Ok(PortType::Headphones),
"headset" => Ok(PortType::Headset),
"digital" => Ok(PortType::Digital),
_ => Ok(PortType::Unknown),
}
}
}
struct Data {
main_loop: RefCell<Mainloop>,
default_sink_name: RefCell<Option<String>>,
default_source_name: RefCell<Option<String>>,
sink_volume: Cell<Option<u32>>,
sink_mute: Cell<Option<bool>>,
source_volume: Cell<Option<u32>>,
source_mute: Cell<Option<bool>>,
introspector: Introspector,
sender: RefCell<futures::channel::mpsc::Sender<Event>>,
}
impl Data {
fn card_info_cb(self: &Rc<Self>, card_info: ListResult<&CardInfo>) {
if let ListResult::Item(card_info) = card_info {
let Some(object_id) = card_info
.proplist
.get_str("object.id")
.and_then(|v| v.parse::<u32>().ok())
else {
return;
};
let variant = if let Some(alsa_card) = card_info
.proplist
.get_str("alsa.card")
.and_then(|v| v.parse::<u32>().ok())
{
DeviceVariant::Alsa { alsa_card }
} else if let Some(address) = card_info.proplist.get_str("api.bluez5.address") {
DeviceVariant::Bluez5 { address }
} else {
return;
};
let card = Card {
name: card_info
.name
.as_ref()
.map(Cow::to_string)
.unwrap_or_default(),
product_name: card_info
.proplist
.get_str("device.product.name")
.unwrap_or_default(),
object_id,
variant,
ports: card_info
.ports
.iter()
.map(|port| CardPort {
name: port.name.as_ref().map(Cow::to_string).unwrap_or_default(),
description: port
.description
.as_ref()
.map(Cow::to_string)
.unwrap_or_default(),
direction: match port.direction.bits() {
x if x == libpulse_binding::direction::FlagSet::INPUT.bits() => {
Direction::Input
}
x if x == libpulse_binding::direction::FlagSet::OUTPUT.bits() => {
Direction::Output
}
_ => Direction::Both,
},
port_type: port
.proplist
.get_str("port.type")
.as_deref()
.map(|s| PortType::from_str(s).unwrap())
.unwrap_or_default(),
profile_port: port
.proplist
.get_str("card.profile.port")
.and_then(|v| v.parse::<u32>().ok())
.unwrap_or(0),
priority: port.priority,
profiles: collect_profiles(&port.profiles),
availability: port.available.into(),
})
.collect(),
profiles: collect_profiles(&card_info.profiles),
active_profile: card_info.active_profile.as_deref().map(CardProfile::from),
};
if block_on(self.sender.borrow_mut().send(Event::CardInfo(card))).is_err() {
self.main_loop.borrow_mut().quit(Retval(0));
}
}
}
fn server_info_cb(self: &Rc<Self>, server_info: &ServerInfo) {
let new_default_sink_name = server_info
.default_sink_name
.as_ref()
.map(|x| x.clone().into_owned());
let mut default_sink_name = self.default_sink_name.borrow_mut();
if new_default_sink_name != *default_sink_name {
if let Some(name) = &new_default_sink_name {
_ = block_on(
self.sender
.borrow_mut()
.send(Event::DefaultSink(name.clone())),
);
self.get_sink_info_by_name(name);
}
*default_sink_name = new_default_sink_name;
}
let new_default_source_name = server_info
.default_source_name
.as_ref()
.map(|x| x.clone().into_owned());
let mut default_source_name = self.default_source_name.borrow_mut();
if new_default_source_name != *default_source_name {
if let Some(name) = &new_default_source_name {
_ = block_on(
self.sender
.borrow_mut()
.send(Event::DefaultSource(name.clone())),
);
self.get_source_info_by_name(name);
}
*default_source_name = new_default_source_name;
}
}
fn get_server_info(self: &Rc<Self>) {
let data = self.clone();
self.introspector
.get_server_info(move |server_info| data.server_info_cb(server_info));
}
fn sink_info_cb(&self, sink_info_res: ListResult<&SinkInfo>) {
if let ListResult::Item(sink_info) = sink_info_res {
if sink_info.name.as_deref() != self.default_sink_name.borrow().as_deref() {
return;
}
let balance = (sink_info.channel_map.can_balance()
&& sink_info.base_volume.is_normal())
.then(|| sink_info.volume.get_balance(&sink_info.channel_map));
let volume = sink_info.volume.max().0 / (Volume::NORMAL.0 / 100);
if self.sink_mute.get() != Some(sink_info.mute) {
self.sink_mute.set(Some(sink_info.mute));
if block_on(
self.sender
.borrow_mut()
.send(Event::SinkMute(sink_info.mute)),
)
.is_err()
{
self.main_loop.borrow_mut().quit(Retval(0));
}
}
if self.sink_volume.get() != Some(volume) {
self.sink_volume.set(Some(volume));
if block_on(self.sender.borrow_mut().send(Event::SinkVolume(volume))).is_err() {
self.main_loop.borrow_mut().quit(Retval(0));
}
}
if block_on(self.sender.borrow_mut().send(Event::Balance(balance))).is_err() {
self.main_loop.borrow_mut().quit(Retval(0));
}
let mut main_loop = self.main_loop.borrow_mut();
let api = main_loop.get_api();
if let Some(mut ctx) = Context::new(&*main_loop, "balance") {
let _ = ctx.connect(None, FlagSet::NOFAIL, None);
let channels = PulseChannels::new(
sink_info.volume,
sink_info.channel_map,
api,
sink_info.index,
ctx,
);
if block_on(self.sender.borrow_mut().send(Event::Channels(channels))).is_err() {
main_loop.quit(Retval(0));
}
}
}
}
fn source_info_cb(&self, source_info_res: ListResult<&SourceInfo>) {
if let ListResult::Item(source_info) = source_info_res {
if source_info.name.as_deref() != self.default_source_name.borrow().as_deref() {
return;
}
let volume = source_info.volume.max().0 / (Volume::NORMAL.0 / 100);
if self.source_mute.get() != Some(source_info.mute) {
self.source_mute.set(Some(source_info.mute));
if block_on(
self.sender
.borrow_mut()
.send(Event::SourceMute(source_info.mute)),
)
.is_err()
{
self.main_loop.borrow_mut().quit(Retval(0));
}
}
if self.source_volume.get() != Some(volume) {
self.source_volume.set(Some(volume));
if block_on(self.sender.borrow_mut().send(Event::SourceVolume(volume))).is_err() {
self.main_loop.borrow_mut().quit(Retval(0));
}
}
}
}
fn get_card_info_by_index(self: &Rc<Self>, index: u32) {
let data = self.clone();
self.introspector
.get_card_info_by_index(index, move |card_info_res| {
data.card_info_cb(card_info_res);
});
}
fn get_sink_info_by_index(self: &Rc<Self>, index: u32) {
let data = self.clone();
self.introspector.get_sink_info_by_index(
index,
move |sink_info_res: ListResult<&SinkInfo<'_>>| {
if let ListResult::Item(ref info) = sink_info_res {
if let Some(card_index) = info.card {
let data_clone = data.clone();
data.introspector.get_card_info_by_index(
card_index,
move |card_info_res| {
data_clone.card_info_cb(card_info_res);
},
);
}
}
data.sink_info_cb(sink_info_res);
},
);
}
fn get_sink_info_by_name(self: &Rc<Self>, name: &str) {
let data = self.clone();
self.introspector
.get_sink_info_by_name(name, move |sink_info_res| {
if let ListResult::Item(ref info) = sink_info_res {
if let Some(card_index) = info.card {
let data_clone = data.clone();
data.introspector.get_card_info_by_index(
card_index,
move |card_info_res| {
data_clone.card_info_cb(card_info_res);
},
);
}
}
data.sink_info_cb(sink_info_res);
});
}
fn get_source_info_by_index(self: &Rc<Self>, index: u32) {
let data = self.clone();
self.introspector
.get_source_info_by_index(index, move |source_info_res| {
if let ListResult::Item(ref info) = source_info_res {
if let Some(card_index) = info.card {
let data_clone = data.clone();
data.introspector.get_card_info_by_index(
card_index,
move |card_info_res| {
data_clone.card_info_cb(card_info_res);
},
);
}
}
data.source_info_cb(source_info_res);
});
}
fn get_source_info_by_name(self: &Rc<Self>, name: &str) {
let data = self.clone();
self.introspector
.get_source_info_by_name(name, move |source_info_res| {
if let ListResult::Item(ref info) = source_info_res {
if let Some(card_index) = info.card {
let data_clone = data.clone();
data.introspector.get_card_info_by_index(
card_index,
move |card_info_res| {
data_clone.card_info_cb(card_info_res);
},
);
}
}
data.source_info_cb(source_info_res);
});
}
fn subscribe_cb(
self: &Rc<Self>,
facility: Facility,
_operation: Option<Operation>,
index: u32,
) {
match facility {
Facility::Server => {
self.get_server_info();
}
Facility::Sink => {
self.get_sink_info_by_index(index);
}
Facility::Source => {
self.get_source_info_by_index(index);
}
Facility::Card => {
self.get_card_info_by_index(index);
}
_ => {}
}
}
}
fn collect_profiles(profiles: &[CardProfileInfo]) -> Vec<CardProfile> {
profiles.iter().map(CardProfile::from).collect()
}
impl From<&CardProfileInfo<'_>> for CardProfile {
fn from(profile: &CardProfileInfo) -> Self {
CardProfile {
name: profile
.name
.as_ref()
.map(Cow::to_string)
.unwrap_or_default(),
description: profile
.description
.as_ref()
.map(Cow::to_string)
.unwrap_or_default(),
available: profile.available,
n_sinks: profile.n_sinks,
n_sources: profile.n_sources,
priority: profile.priority,
}
}
}