audio: Don't recreate mpris proxies and streams
Fixes https://github.com/pop-os/cosmic-applets/issues/306 and should overall be more correct and performant.
This commit is contained in:
parent
56d3a754e2
commit
1475f8f32b
3 changed files with 168 additions and 172 deletions
13
Cargo.lock
generated
13
Cargo.lock
generated
|
|
@ -1179,7 +1179,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cosmic-dbus-networkmanager"
|
name = "cosmic-dbus-networkmanager"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/pop-os/dbus-settings-bindings#8b9767f6cedede2def12941ce89e14bfcd913aeb"
|
source = "git+https://github.com/pop-os/dbus-settings-bindings#c81f428acec4c8633efb62b4f8284202dad9a492"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags 2.5.0",
|
"bitflags 2.5.0",
|
||||||
"derive_builder",
|
"derive_builder",
|
||||||
|
|
@ -3534,8 +3534,9 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mpris2-zbus"
|
name = "mpris2-zbus"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/pop-os/dbus-settings-bindings#8b9767f6cedede2def12941ce89e14bfcd913aeb"
|
source = "git+https://github.com/pop-os/dbus-settings-bindings#c81f428acec4c8633efb62b4f8284202dad9a492"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
"serde",
|
"serde",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"time",
|
"time",
|
||||||
|
|
@ -4943,7 +4944,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "switcheroo-control"
|
name = "switcheroo-control"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/pop-os/dbus-settings-bindings#8b9767f6cedede2def12941ce89e14bfcd913aeb"
|
source = "git+https://github.com/pop-os/dbus-settings-bindings#c81f428acec4c8633efb62b4f8284202dad9a492"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"zbus",
|
"zbus",
|
||||||
]
|
]
|
||||||
|
|
@ -5842,7 +5843,7 @@ dependencies = [
|
||||||
"js-sys",
|
"js-sys",
|
||||||
"log",
|
"log",
|
||||||
"naga",
|
"naga",
|
||||||
"parking_lot 0.12.1",
|
"parking_lot 0.11.2",
|
||||||
"profiling",
|
"profiling",
|
||||||
"raw-window-handle 0.6.0",
|
"raw-window-handle 0.6.0",
|
||||||
"smallvec",
|
"smallvec",
|
||||||
|
|
@ -5869,7 +5870,7 @@ dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"naga",
|
"naga",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"parking_lot 0.12.1",
|
"parking_lot 0.11.2",
|
||||||
"profiling",
|
"profiling",
|
||||||
"raw-window-handle 0.6.0",
|
"raw-window-handle 0.6.0",
|
||||||
"rustc-hash",
|
"rustc-hash",
|
||||||
|
|
@ -5909,7 +5910,7 @@ dependencies = [
|
||||||
"naga",
|
"naga",
|
||||||
"objc",
|
"objc",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"parking_lot 0.12.1",
|
"parking_lot 0.11.2",
|
||||||
"profiling",
|
"profiling",
|
||||||
"range-alloc",
|
"range-alloc",
|
||||||
"raw-window-handle 0.6.0",
|
"raw-window-handle 0.6.0",
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ const PLAY: &str = "media-playback-start-symbolic";
|
||||||
|
|
||||||
pub fn run() -> cosmic::iced::Result {
|
pub fn run() -> cosmic::iced::Result {
|
||||||
localize();
|
localize();
|
||||||
cosmic::applet::run::<Audio>(true, ())
|
cosmic::applet::run::<Audio>(false, ())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,21 @@
|
||||||
use std::{borrow::Cow, fmt::Debug, hash::Hash, path::PathBuf};
|
use std::{borrow::Cow, collections::HashMap, fmt::Debug, hash::Hash, path::PathBuf};
|
||||||
|
|
||||||
use cosmic::{
|
use cosmic::{
|
||||||
iced::{self, subscription},
|
iced::{self, subscription},
|
||||||
iced_futures::futures::{self, SinkExt, StreamExt},
|
iced_futures::futures::{self, future::OptionFuture, FutureExt, SinkExt, StreamExt},
|
||||||
};
|
};
|
||||||
use mpris2_zbus::{
|
use mpris2_zbus::{
|
||||||
|
enumerator,
|
||||||
media_player::MediaPlayer,
|
media_player::MediaPlayer,
|
||||||
player::{PlaybackStatus, Player},
|
player::{PlaybackStatus, Player},
|
||||||
};
|
};
|
||||||
use tokio::join;
|
use tokio::join;
|
||||||
use urlencoding::decode;
|
use urlencoding::decode;
|
||||||
use zbus::{fdo::DBusProxy, Connection};
|
use zbus::{
|
||||||
|
names::{BusName, OwnedBusName},
|
||||||
|
zvariant::OwnedValue,
|
||||||
|
Connection,
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct PlayerStatus {
|
pub struct PlayerStatus {
|
||||||
|
|
@ -77,19 +82,30 @@ pub fn mpris_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>(
|
||||||
id: I,
|
id: I,
|
||||||
) -> iced::Subscription<MprisUpdate> {
|
) -> iced::Subscription<MprisUpdate> {
|
||||||
subscription::channel(id, 50, move |mut output| async move {
|
subscription::channel(id, 50, move |mut output| async move {
|
||||||
let mut state = State::Setup;
|
run(&mut output).await;
|
||||||
|
let _ = output.send(MprisUpdate::Finished).await;
|
||||||
loop {
|
futures::future::pending().await
|
||||||
state = update(state, &mut output).await;
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum State {
|
struct MprisPlayer {
|
||||||
Setup,
|
player: Player,
|
||||||
Player(Player, DBusProxy<'static>),
|
#[allow(dead_code)]
|
||||||
Finished,
|
media_player: MediaPlayer,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MprisPlayer {
|
||||||
|
async fn new(conn: &Connection, name: OwnedBusName) -> mpris2_zbus::error::Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
player: Player::new(conn, name.clone()).await?,
|
||||||
|
media_player: MediaPlayer::new(conn, name).await?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn name(&self) -> &BusName {
|
||||||
|
self.player.destination()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
|
@ -107,162 +123,145 @@ pub enum MprisRequest {
|
||||||
Previous,
|
Previous,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update(state: State, output: &mut futures::channel::mpsc::Sender<MprisUpdate>) -> State {
|
struct State {
|
||||||
match state {
|
conn: Connection,
|
||||||
State::Setup => {
|
enumerator_stream:
|
||||||
let Ok(conn) = Connection::session().await else {
|
Box<dyn futures::Stream<Item = zbus::Result<enumerator::Event>> + Unpin + Send>,
|
||||||
tracing::error!("Failed to connect to session bus.");
|
players: Vec<MprisPlayer>,
|
||||||
_ = output.send(MprisUpdate::Finished).await;
|
active_player: Option<MprisPlayer>,
|
||||||
return State::Finished;
|
active_player_metadata_stream:
|
||||||
};
|
Option<zbus::PropertyStream<'static, HashMap<String, OwnedValue>>>,
|
||||||
let mut players = mpris2_zbus::media_player::MediaPlayer::new_all(&conn)
|
any_player_state_stream: futures::stream::SelectAll<zbus::PropertyStream<'static, String>>,
|
||||||
.await
|
}
|
||||||
.unwrap_or_else(|_| Vec::new());
|
|
||||||
let Ok(dbus_proxy) = zbus::fdo::DBusProxy::builder(&conn)
|
impl State {
|
||||||
.path("/org/freedesktop/DBus")
|
async fn new() -> Result<Self, zbus::Error> {
|
||||||
.unwrap()
|
let conn = Connection::session().await?;
|
||||||
.build()
|
|
||||||
.await
|
let enumerator = enumerator::Enumerator::new(&conn).await?;
|
||||||
else {
|
let enumerator_stream = enumerator.receive_changes().await?;
|
||||||
tracing::error!("Failed to create dbus proxy.");
|
|
||||||
return State::Finished;
|
let player_names = enumerator.players().await?;
|
||||||
};
|
let mut players = Vec::with_capacity(player_names.len());
|
||||||
if players.is_empty() {
|
for name in player_names {
|
||||||
let Ok(mut stream) = dbus_proxy.receive_name_owner_changed().await else {
|
match MprisPlayer::new(&conn, name).await {
|
||||||
tracing::error!("Failed to receive name owner changed signal.");
|
Ok(player) => {
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
players.push(player);
|
||||||
// restart from the beginning
|
}
|
||||||
return State::Setup;
|
Err(err) => {
|
||||||
};
|
tracing::error!("Failed to add player: {}", err);
|
||||||
while let Some(c) = stream.next().await {
|
|
||||||
if let Ok(args) = c.args() {
|
|
||||||
if args.name.contains("org.mpris.MediaPlayer2") {
|
|
||||||
if let Ok(p) =
|
|
||||||
MediaPlayer::new(&conn, args.name().to_owned().into()).await
|
|
||||||
{
|
|
||||||
players.push(p);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let Some(player) = find_active(players).await else {
|
|
||||||
tracing::error!("Failed to find active media player.");
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
|
||||||
return State::Setup;
|
|
||||||
};
|
|
||||||
|
|
||||||
let Some(player_status) = PlayerStatus::new(player.clone()).await else {
|
|
||||||
tracing::error!("Failed to get player status.");
|
|
||||||
return State::Setup;
|
|
||||||
};
|
|
||||||
|
|
||||||
_ = output.send(MprisUpdate::Player(player_status)).await;
|
|
||||||
State::Player(player, dbus_proxy)
|
|
||||||
}
|
}
|
||||||
State::Player(player, dbus_proxy) => {
|
|
||||||
let Ok(mut name_owner_changed) = player.receive_owner_changed().await else {
|
|
||||||
tracing::error!("Failed to receive owner changed signal.");
|
|
||||||
// restart from the beginning
|
|
||||||
return State::Setup;
|
|
||||||
};
|
|
||||||
let mut metadata_changed = player.receive_metadata_changed().await;
|
|
||||||
let Ok(mut new_mpris) = dbus_proxy.receive_name_owner_changed().await else {
|
|
||||||
tracing::error!("Failed to receive name owner changed signal.");
|
|
||||||
// restart from the beginning
|
|
||||||
return State::Setup;
|
|
||||||
};
|
|
||||||
let conn = player.connection();
|
|
||||||
let media_players = mpris2_zbus::media_player::MediaPlayer::new_all(conn)
|
|
||||||
.await
|
|
||||||
.unwrap_or_else(|_| Vec::new());
|
|
||||||
|
|
||||||
let mut players = Vec::with_capacity(media_players.len());
|
// pre-sort by path so that the same player is always selected
|
||||||
for p in media_players {
|
players.sort_by(|a, b| a.name().cmp(&b.name()));
|
||||||
if let Ok(p) = p.player().await {
|
|
||||||
players.push(p);
|
let mut state = Self {
|
||||||
}
|
conn,
|
||||||
|
enumerator_stream: Box::new(enumerator_stream),
|
||||||
|
players,
|
||||||
|
active_player: None,
|
||||||
|
active_player_metadata_stream: None,
|
||||||
|
any_player_state_stream: futures::stream::select_all(Vec::new()),
|
||||||
|
};
|
||||||
|
state.update_active_player().await;
|
||||||
|
state.update_any_player_state_stream().await;
|
||||||
|
Ok(state)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn add_player(&mut self, name: OwnedBusName) {
|
||||||
|
let player = match MprisPlayer::new(&self.conn, name).await {
|
||||||
|
Ok(player) => player,
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!("Failed to add player: {}", err);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
self.players.push(player);
|
||||||
|
self.players.sort_by(|a, b| a.name().cmp(&b.name()));
|
||||||
|
self.update_any_player_state_stream().await;
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
async fn remove_player(&mut self, name: OwnedBusName) {
|
||||||
let mut listeners = Vec::with_capacity(players.len());
|
if let Some(idx) = self.players.iter().position(|p| p.name() == &name) {
|
||||||
for p in &players {
|
self.players.remove(idx);
|
||||||
listeners.push(p.receive_playback_status_changed().await);
|
|
||||||
}
|
|
||||||
let mut player_state_changed_list = Vec::with_capacity(listeners.len());
|
|
||||||
for l in &mut listeners {
|
|
||||||
player_state_changed_list.push(Box::pin(async move {
|
|
||||||
let changed = l.next().await;
|
|
||||||
if let Some(c) = changed {
|
|
||||||
c.get().await.ok()
|
|
||||||
} else {
|
|
||||||
tracing::error!("Failed to receive playback status changed signal.");
|
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
let any_player_state_changed =
|
|
||||||
futures::future::select_all(player_state_changed_list);
|
|
||||||
let keep_going = tokio::select! {
|
|
||||||
m = metadata_changed.next() => {
|
|
||||||
m.is_some()
|
|
||||||
},
|
|
||||||
n = name_owner_changed.next() => {
|
|
||||||
n.map(|n| n.is_some()).unwrap_or_default()
|
|
||||||
},
|
|
||||||
_ = new_mpris.next() => {
|
|
||||||
true
|
|
||||||
},
|
|
||||||
_ = any_player_state_changed => {
|
|
||||||
true
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
if !keep_going {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(update) = PlayerStatus::new(player.clone()).await {
|
|
||||||
if matches!(update.status, PlaybackStatus::Stopped) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if paused check if any players are playing
|
|
||||||
// if they are, break
|
|
||||||
if !matches!(update.status, PlaybackStatus::Playing) {
|
|
||||||
let conn = player.connection();
|
|
||||||
let players = mpris2_zbus::media_player::MediaPlayer::new_all(conn)
|
|
||||||
.await
|
|
||||||
.unwrap_or_else(|_| Vec::new());
|
|
||||||
if let Some(active) = find_active(players).await {
|
|
||||||
if active.destination() != player.destination() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ = output.send(MprisUpdate::Player(update)).await;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ = output.send(MprisUpdate::Setup).await;
|
|
||||||
State::Setup
|
|
||||||
}
|
}
|
||||||
State::Finished => iced::futures::future::pending().await,
|
self.update_any_player_state_stream().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_active_player(&mut self) {
|
||||||
|
let new_active_player = find_active(&self.players).await;
|
||||||
|
if self.active_player.as_ref().map(|p| p.name()) != new_active_player.map(|p| p.name()) {
|
||||||
|
self.active_player = new_active_player.cloned();
|
||||||
|
if let Some(player) = new_active_player {
|
||||||
|
self.active_player_metadata_stream =
|
||||||
|
Some(player.player.receive_metadata_changed().await);
|
||||||
|
} else {
|
||||||
|
self.active_player_metadata_stream = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_any_player_state_stream(&mut self) {
|
||||||
|
let mut listeners = Vec::with_capacity(self.players.len());
|
||||||
|
for p in &self.players {
|
||||||
|
listeners.push(p.player.receive_playback_status_changed().await);
|
||||||
|
}
|
||||||
|
self.any_player_state_stream = futures::stream::select_all(listeners);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn find_active(mut players: Vec<MediaPlayer>) -> Option<Player> {
|
async fn run(output: &mut futures::channel::mpsc::Sender<MprisUpdate>) {
|
||||||
// pre-sort by path so that the same player is always selected
|
let mut state = match State::new().await {
|
||||||
players.sort_by(|a, b| {
|
Ok(state) => state,
|
||||||
let a = a.destination();
|
Err(err) => {
|
||||||
let b = b.destination();
|
tracing::error!("Faile do monitor for mpris clients: {}", err);
|
||||||
a.cmp(b)
|
return;
|
||||||
});
|
}
|
||||||
let mut best = (0, None);
|
};
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if let Some(player) = &state.active_player {
|
||||||
|
if let Some(player_status) = PlayerStatus::new(player.player.clone()).await {
|
||||||
|
_ = output.send(MprisUpdate::Player(player_status)).await;
|
||||||
|
} else {
|
||||||
|
tracing::error!("Failed to get player status.");
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
let _ = output.send(MprisUpdate::Setup).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let metadata_changed_next = OptionFuture::from(
|
||||||
|
state
|
||||||
|
.active_player_metadata_stream
|
||||||
|
.as_mut()
|
||||||
|
.map(|s| s.next()),
|
||||||
|
);
|
||||||
|
tokio::select! {
|
||||||
|
_ = metadata_changed_next, if state.active_player.is_some() => {
|
||||||
|
},
|
||||||
|
event = state.enumerator_stream.next() => {
|
||||||
|
match dbg!(event) {
|
||||||
|
Some(Ok(enumerator::Event::Add(name))) => state.add_player(name).await,
|
||||||
|
Some(Ok(enumerator::Event::Remove(name))) => state.remove_player(name).await,
|
||||||
|
Some(Err(err)) => {
|
||||||
|
tracing::error!("Error listening for mpris clients: {:?}", err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
None => {}
|
||||||
|
}
|
||||||
|
state.update_active_player().await;
|
||||||
|
}
|
||||||
|
_ = state.any_player_state_stream.next(), if !state.players.is_empty() => {
|
||||||
|
state.update_active_player().await;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn find_active<'a>(players: &'a Vec<MprisPlayer>) -> Option<&'a MprisPlayer> {
|
||||||
|
let mut best = (0, None::<&'a MprisPlayer>);
|
||||||
let eval = |p: Player| async move {
|
let eval = |p: Player| async move {
|
||||||
let v = {
|
let v = {
|
||||||
let status = p.playback_status().await;
|
let status = p.playback_status().await;
|
||||||
|
|
@ -277,12 +276,8 @@ async fn find_active(mut players: Vec<MediaPlayer>) -> Option<Player> {
|
||||||
v + p.metadata().await.is_ok() as i32
|
v + p.metadata().await.is_ok() as i32
|
||||||
};
|
};
|
||||||
|
|
||||||
for p in players {
|
for p in players.iter() {
|
||||||
let p = match p.player().await {
|
let v = eval(p.player.clone()).await;
|
||||||
Ok(p) => p,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
let v = eval(p.clone()).await;
|
|
||||||
if v > best.0 {
|
if v > best.0 {
|
||||||
best = (v, Some(p));
|
best = (v, Some(p));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue