fix(cosmic-config): attempt to reconnect to the settings daemon

This commit is contained in:
Ashley Wulber 2024-05-12 14:17:43 -04:00 committed by Michael Murphy
parent 3cfc5c16a3
commit 6a97435263
5 changed files with 114 additions and 28 deletions

View file

@ -34,8 +34,15 @@ serde-keycode = ["iced_core/serde"]
single-instance = ["dep:zbus", "serde", "ron"] single-instance = ["dep:zbus", "serde", "ron"]
# smol async runtime # smol async runtime
smol = ["iced/smol", "zbus?/async-io"] smol = ["iced/smol", "zbus?/async-io"]
tokio = [
"dep:tokio",
"ashpd?/tokio",
"iced/tokio",
"rfd?/tokio",
"zbus?/tokio",
"cosmic-config/tokio",
]
# Tokio async runtime # Tokio async runtime
tokio = ["dep:tokio", "ashpd?/tokio", "iced/tokio", "rfd?/tokio", "zbus?/tokio"]
# Wayland window support # Wayland window support
wayland = [ wayland = [
"ashpd?/wayland", "ashpd?/wayland",

View file

@ -17,12 +17,15 @@ notify = "6.0.0"
ron = "0.8.0" ron = "0.8.0"
serde = "1.0.152" serde = "1.0.152"
cosmic-config-derive = { path = "../cosmic-config-derive/", optional = true } cosmic-config-derive = { path = "../cosmic-config-derive/", optional = true }
iced = { path = "../iced/", default-features = false, optional = true } iced = { path = "../iced/", default-features = false, optional = true }
iced_futures = { path = "../iced/futures/", default-features = false, optional = true } iced_futures = { path = "../iced/futures/", default-features = false, optional = true }
once_cell = "1.19.0" once_cell = "1.19.0"
cosmic-settings-daemon = { git = "https://github.com/pop-os/dbus-settings-bindings", branch = "cosmic-settings-daemon", optional = true } cosmic-settings-daemon = { git = "https://github.com/pop-os/dbus-settings-bindings", branch = "cosmic-settings-daemon", optional = true }
futures-util = { version = "0.3", optional = true } futures-util = { version = "0.3", optional = true }
dirs.workspace = true dirs.workspace = true
tokio = { version = "1.0", optional = true, features = ["time"] }
async-std = { version = "1.10", optional = true }
tracing = "0.1"
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
xdg = "2.1" xdg = "2.1"

View file

@ -1,9 +1,10 @@
use std::ops::Deref; use std::ops::Deref;
use crate::{CosmicConfigEntry, Update}; use crate::{CosmicConfigEntry, Update};
use cosmic_settings_daemon::{ConfigProxy, CosmicSettingsDaemonProxy}; use cosmic_settings_daemon::{Changed, ConfigProxy, CosmicSettingsDaemonProxy};
use futures_util::SinkExt; use futures_util::SinkExt;
use iced_futures::futures::{future::pending, StreamExt}; use iced_futures::futures::{self, future::pending, StreamExt};
pub async fn settings_daemon_proxy() -> zbus::Result<CosmicSettingsDaemonProxy<'static>> { pub async fn settings_daemon_proxy() -> zbus::Result<CosmicSettingsDaemonProxy<'static>> {
let conn = zbus::Connection::session().await?; let conn = zbus::Connection::session().await?;
CosmicSettingsDaemonProxy::new(&conn).await CosmicSettingsDaemonProxy::new(&conn).await
@ -51,11 +52,17 @@ impl Watcher {
} }
} }
#[allow(clippy::too_many_lines)]
pub fn watcher_subscription<T: CosmicConfigEntry + Send + Sync + Default + 'static + Clone>( pub fn watcher_subscription<T: CosmicConfigEntry + Send + Sync + Default + 'static + Clone>(
settings_daemon: CosmicSettingsDaemonProxy<'static>, settings_daemon: CosmicSettingsDaemonProxy<'static>,
config_id: &'static str, config_id: &'static str,
is_state: bool, is_state: bool,
) -> iced_futures::Subscription<Update<T>> { ) -> iced_futures::Subscription<Update<T>> {
enum Change {
Changes(Changed),
OwnerChanged(bool),
}
let id = std::any::TypeId::of::<T>(); let id = std::any::TypeId::of::<T>();
iced_futures::subscription::channel((is_state, config_id, id), 5, move |mut tx| async move { iced_futures::subscription::channel((is_state, config_id, id), 5, move |mut tx| async move {
let version = T::VERSION; let version = T::VERSION;
@ -68,6 +75,7 @@ pub fn watcher_subscription<T: CosmicConfigEntry + Send + Sync + Default + 'stat
pending::<()>().await; pending::<()>().await;
unreachable!(); unreachable!();
}; };
let mut config = match T::get_entry(&cosmic_config) { let mut config = match T::get_entry(&cosmic_config) {
Ok(config) => config, Ok(config) => config,
Err((errors, default)) => { Err((errors, default)) => {
@ -77,6 +85,7 @@ pub fn watcher_subscription<T: CosmicConfigEntry + Send + Sync + Default + 'stat
default default
} }
}; };
if let Err(err) = tx if let Err(err) = tx
.send(Update { .send(Update {
errors: Vec::new(), errors: Vec::new(),
@ -88,24 +97,93 @@ pub fn watcher_subscription<T: CosmicConfigEntry + Send + Sync + Default + 'stat
eprintln!("Failed to send config: {err}"); eprintln!("Failed to send config: {err}");
} }
let watcher = if is_state { let mut attempts = 0;
Watcher::new_state(&settings_daemon, config_id, version).await
} else {
Watcher::new_config(&settings_daemon, config_id, version).await
};
let Ok(watcher) = watcher else {
pending::<()>().await;
unreachable!();
};
loop { loop {
let Ok(mut changes) = watcher.receive_changed().await else { let watcher = if is_state {
pending::<()>().await; Watcher::new_state(&settings_daemon, config_id, version).await
unreachable!(); } else {
Watcher::new_config(&settings_daemon, config_id, version).await
}; };
while let Some(change) = changes.next().await { let Ok(watcher) = watcher else {
tracing::error!("Failed to create watcher for {config_id}");
#[cfg(feature = "tokio")]
::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(feature = "async-std")]
async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
{
pending::<()>().await;
unreachable!();
}
attempts += 1;
// The settings daemon has exited
continue;
};
let Ok(changes) = watcher.receive_changed().await else {
tracing::error!("Failed to listen for changes for {config_id}");
#[cfg(feature = "tokio")]
::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(feature = "async-std")]
async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
{
pending::<()>().await;
unreachable!();
}
attempts += 1;
// The settings daemon has exited
continue;
};
let mut changes = changes.map(Change::Changes).fuse();
let Ok(owner_changed) = watcher.receive_owner_changed().await else {
tracing::error!("Failed to listen for owner changes for {config_id}");
#[cfg(feature = "tokio")]
::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(feature = "async-std")]
async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
{
pending::<()>().await;
unreachable!();
}
attempts += 1;
// The settings daemon has exited
continue;
};
let mut owner_changed = owner_changed
.map(|c| Change::OwnerChanged(c.is_some()))
.fuse();
loop {
let change: Changed = futures::select! {
c = changes.next() => {
let Some(Change::Changes(c)) = c else {
break;
};
c
}
c = owner_changed.next() => {
let Some(Change::OwnerChanged(cont)) = c else {
break;
};
if cont {
continue;
} else {
// The settings daemon has exited
break;
}
},
};
// Reset the attempts counter if we received a change
attempts = 0;
let Ok(args) = change.args() else { let Ok(args) = change.args() else {
continue; // The settings daemon has exited
break;
}; };
let (errors, keys) = config.update_keys(&cosmic_config, &[args.key]); let (errors, keys) = config.update_keys(&cosmic_config, &[args.key]);
if !keys.is_empty() { if !keys.is_empty() {

View file

@ -71,20 +71,18 @@ async fn start_listening<
match state { match state {
ConfigState::Init(config_id, version, is_state) => { ConfigState::Init(config_id, version, is_state) => {
let (tx, rx) = mpsc::channel(100); let (tx, rx) = mpsc::channel(100);
let config = match if is_state { let Ok(config) = (if is_state {
Config::new_state(&config_id, version) Config::new_state(&config_id, version)
} else { } else {
Config::new(&config_id, version) Config::new(&config_id, version)
} { }) else {
Ok(c) => c, return ConfigState::Failed;
Err(_) => return ConfigState::Failed,
}; };
let watcher = match config.watch(move |_helper, keys| { let Ok(watcher) = config.watch(move |_helper, keys| {
let mut tx = tx.clone(); let mut tx = tx.clone();
let _ = tx.try_send(keys.to_vec()); let _ = tx.try_send(keys.to_vec());
}) { }) else {
Ok(w) => w, return ConfigState::Failed;
Err(_) => return ConfigState::Failed,
}; };
match T::get_entry(&config) { match T::get_entry(&config) {
@ -115,7 +113,7 @@ async fn start_listening<
if !changed.is_empty() { if !changed.is_empty() {
_ = output _ = output
.send(crate::Update { .send(crate::Update {
errors: errors, errors,
keys: changed, keys: changed,
config: conf_data.clone(), config: conf_data.clone(),
}) })

View file

@ -11,4 +11,4 @@ tracing-log = "0.2.0"
[dependencies.libcosmic] [dependencies.libcosmic]
path = "../../" path = "../../"
default-features = false default-features = false
features = ["debug", "winit", "tokio", "xdg-portal"] features = ["debug", "winit", "tokio", "xdg-portal", "dbus-config"]