refactor: optional config subscriptions using dbus
This commit is contained in:
parent
a4d1b1b651
commit
06c33dcf06
14 changed files with 381 additions and 139 deletions
|
|
@ -5,11 +5,13 @@ edition = "2021"
|
|||
|
||||
[features]
|
||||
default = ["macro", "subscription"]
|
||||
dbus = ["dep:zbus", "cosmic-settings-daemon", "futures-util", "subscription"]
|
||||
macro = ["cosmic-config-derive"]
|
||||
subscription = ["iced_futures"]
|
||||
|
||||
[dependencies]
|
||||
# For redox support
|
||||
zbus = { version = "3.14.1", default-features = false, optional = true }
|
||||
atomicwrites = { git = "https://github.com/jackpot51/rust-atomicwrites" }
|
||||
calloop = { version = "0.12.2", optional = true }
|
||||
dirs = "5.0.1"
|
||||
|
|
@ -19,4 +21,6 @@ serde = "1.0.152"
|
|||
cosmic-config-derive = { path = "../cosmic-config-derive/", optional = true }
|
||||
iced = { path = "../iced/", default-features = false, optional = true }
|
||||
iced_futures = { path = "../iced/futures/", default-features = false, optional = true }
|
||||
|
||||
once_cell = "1.19.0"
|
||||
cosmic-settings-daemon = { git = "https://github.com/pop-os/dbus-settings-bindings", branch = "cosmic-settings-daemon", optional = true }
|
||||
futures-util = { version = "0.3", optional = true }
|
||||
|
|
|
|||
137
cosmic-config/src/dbus.rs
Normal file
137
cosmic-config/src/dbus.rs
Normal file
|
|
@ -0,0 +1,137 @@
|
|||
use std::ops::Deref;
|
||||
|
||||
use crate::CosmicConfigEntry;
|
||||
use cosmic_settings_daemon::{Changed, ConfigProxy, CosmicSettingsDaemonProxy, Ping};
|
||||
use futures_util::SinkExt;
|
||||
use iced_futures::futures::{future::pending, StreamExt};
|
||||
pub async fn settings_daemon_proxy() -> zbus::Result<CosmicSettingsDaemonProxy<'static>> {
|
||||
let conn = zbus::Connection::session().await?;
|
||||
CosmicSettingsDaemonProxy::new(&conn).await
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Watcher {
|
||||
proxy: ConfigProxy<'static>,
|
||||
}
|
||||
|
||||
impl Deref for Watcher {
|
||||
type Target = ConfigProxy<'static>;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.proxy
|
||||
}
|
||||
}
|
||||
|
||||
impl Watcher {
|
||||
pub async fn new_config(
|
||||
settings_daemon_proxy: &CosmicSettingsDaemonProxy<'static>,
|
||||
id: &str,
|
||||
version: u64,
|
||||
) -> zbus::Result<Self> {
|
||||
let (path, name) = settings_daemon_proxy.watch_config(id, version).await?;
|
||||
ConfigProxy::builder(settings_daemon_proxy.connection())
|
||||
.path(path)?
|
||||
.destination(name)?
|
||||
.build()
|
||||
.await
|
||||
.map(|proxy| Self { proxy })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConfigUpdate<T> {
|
||||
pub errors: Vec<crate::Error>,
|
||||
pub keys: Vec<&'static str>,
|
||||
pub config: T,
|
||||
}
|
||||
|
||||
pub fn watcher_subscription<T: CosmicConfigEntry + Send + Sync + Default + 'static + Clone>(
|
||||
settings_daemon: CosmicSettingsDaemonProxy<'static>,
|
||||
config_id: &'static str,
|
||||
) -> iced_futures::Subscription<ConfigUpdate<T>> {
|
||||
let id = std::any::TypeId::of::<T>();
|
||||
iced_futures::subscription::channel((config_id, id), 5, move |mut tx| async move {
|
||||
let version = T::VERSION;
|
||||
let Ok(cosmic_config) = crate::Config::new(config_id, version) else {
|
||||
pending::<()>().await;
|
||||
unreachable!();
|
||||
};
|
||||
dbg!(config_id, version, &cosmic_config);
|
||||
let mut config = match T::get_entry(&cosmic_config) {
|
||||
Ok(config) => config,
|
||||
Err((errors, default)) => {
|
||||
if !errors.is_empty() {
|
||||
eprintln!("Failed to get config: {errors:?}");
|
||||
}
|
||||
default
|
||||
}
|
||||
};
|
||||
if let Err(err) = tx
|
||||
.send(ConfigUpdate {
|
||||
errors: Vec::new(),
|
||||
keys: Vec::new(),
|
||||
config: config.clone(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
eprintln!("Failed to send config: {err}");
|
||||
}
|
||||
|
||||
dbg!("sent init");
|
||||
|
||||
let Ok(watcher) = Watcher::new_config(&settings_daemon, config_id, version).await else {
|
||||
dbg!("failed to create watcher");
|
||||
pending::<()>().await;
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
dbg!("watcher created");
|
||||
|
||||
loop {
|
||||
let Ok(changes) = watcher.receive_changed().await else {
|
||||
pending::<()>().await;
|
||||
unreachable!();
|
||||
};
|
||||
let Ok(pings) = watcher.receive_ping().await else {
|
||||
pending::<()>().await;
|
||||
unreachable!();
|
||||
};
|
||||
let mut streams = futures_util::stream_select!(
|
||||
changes.map(Message::ConfigChanged),
|
||||
pings.map(Message::ConfigPing)
|
||||
);
|
||||
while let Some(v) = streams.next().await {
|
||||
match v {
|
||||
Message::ConfigChanged(change) => {
|
||||
let Ok(args) = change.args() else {
|
||||
continue;
|
||||
};
|
||||
let (errors, keys) = config.update_keys(&cosmic_config, &[args.key]);
|
||||
if !keys.is_empty() {
|
||||
if let Err(err) = tx
|
||||
.send(ConfigUpdate {
|
||||
errors,
|
||||
keys,
|
||||
config: config.clone(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
eprintln!("Failed to send config update: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Message::ConfigPing(_) => {
|
||||
// send pong
|
||||
if let Err(err) = watcher.pong().await {
|
||||
eprintln!("Failed to send pong: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub enum Message {
|
||||
ConfigChanged(Changed),
|
||||
ConfigPing(Ping),
|
||||
}
|
||||
|
|
@ -1,20 +1,22 @@
|
|||
use iced_futures::futures::SinkExt;
|
||||
#[cfg(feature = "subscription")]
|
||||
use iced_futures::{futures::channel::mpsc, subscription};
|
||||
use notify::{
|
||||
event::{EventKind, ModifyKind},
|
||||
RecommendedWatcher, Watcher,
|
||||
Watcher,
|
||||
};
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
fmt, fs,
|
||||
hash::Hash,
|
||||
io::Write,
|
||||
path::{Path, PathBuf},
|
||||
sync::Mutex,
|
||||
};
|
||||
|
||||
#[cfg(feature = "subscription")]
|
||||
mod subscription;
|
||||
pub use subscription::*;
|
||||
|
||||
#[cfg(all(feature = "dbus", feature = "subscription"))]
|
||||
pub mod dbus;
|
||||
|
||||
#[cfg(feature = "macro")]
|
||||
pub use cosmic_config_derive;
|
||||
|
||||
|
|
@ -322,24 +324,12 @@ impl<'a> ConfigSet for ConfigTransaction<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "subscription")]
|
||||
pub enum ConfigState<T> {
|
||||
Init(Cow<'static, str>, u64, bool),
|
||||
Waiting(T, RecommendedWatcher, mpsc::Receiver<Vec<String>>, Config),
|
||||
Failed,
|
||||
}
|
||||
|
||||
#[cfg(feature = "subscription")]
|
||||
pub enum ConfigUpdate<T> {
|
||||
Update(T),
|
||||
UpdateError(T, Vec<crate::Error>),
|
||||
Failed,
|
||||
}
|
||||
|
||||
pub trait CosmicConfigEntry
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
const VERSION: u64;
|
||||
|
||||
fn write_entry(&self, config: &Config) -> Result<(), crate::Error>;
|
||||
fn get_entry(config: &Config) -> Result<Self, (Vec<crate::Error>, Self)>;
|
||||
/// Returns the keys that were updated
|
||||
|
|
@ -347,108 +337,5 @@ where
|
|||
&mut self,
|
||||
config: &Config,
|
||||
changed_keys: &[T],
|
||||
) -> (Vec<crate::Error>, Vec<&str>);
|
||||
}
|
||||
|
||||
#[cfg(feature = "subscription")]
|
||||
pub fn config_subscription<
|
||||
I: 'static + Copy + Send + Sync + Hash,
|
||||
T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
|
||||
>(
|
||||
id: I,
|
||||
config_id: Cow<'static, str>,
|
||||
config_version: u64,
|
||||
) -> iced_futures::Subscription<(I, Result<T, (Vec<crate::Error>, T)>)> {
|
||||
subscription::channel(id, 100, move |mut output| {
|
||||
let config_id = config_id.clone();
|
||||
async move {
|
||||
let config_id = config_id.clone();
|
||||
let mut state = ConfigState::Init(config_id, config_version, false);
|
||||
|
||||
loop {
|
||||
state = start_listening(state, &mut output, id).await;
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "subscription")]
|
||||
pub fn config_state_subscription<
|
||||
I: 'static + Copy + Send + Sync + Hash,
|
||||
T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
|
||||
>(
|
||||
id: I,
|
||||
config_id: Cow<'static, str>,
|
||||
config_version: u64,
|
||||
) -> iced_futures::Subscription<(I, Result<T, (Vec<crate::Error>, T)>)> {
|
||||
subscription::channel(id, 100, move |mut output| {
|
||||
let config_id = config_id.clone();
|
||||
async move {
|
||||
let config_id = config_id.clone();
|
||||
let mut state = ConfigState::Init(config_id, config_version, true);
|
||||
|
||||
loop {
|
||||
state = start_listening(state, &mut output, id).await;
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn start_listening<
|
||||
I: Copy,
|
||||
T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
|
||||
>(
|
||||
state: ConfigState<T>,
|
||||
output: &mut mpsc::Sender<(I, Result<T, (Vec<crate::Error>, T)>)>,
|
||||
id: I,
|
||||
) -> ConfigState<T> {
|
||||
use iced_futures::futures::{future::pending, StreamExt};
|
||||
|
||||
match state {
|
||||
ConfigState::Init(config_id, version, is_state) => {
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
let config = match if is_state {
|
||||
Config::new_state(&config_id, version)
|
||||
} else {
|
||||
Config::new(&config_id, version)
|
||||
} {
|
||||
Ok(c) => c,
|
||||
Err(_) => return ConfigState::Failed,
|
||||
};
|
||||
let watcher = match config.watch(move |_helper, keys| {
|
||||
let mut tx = tx.clone();
|
||||
let _ = tx.try_send(keys.to_vec());
|
||||
}) {
|
||||
Ok(w) => w,
|
||||
Err(_) => return ConfigState::Failed,
|
||||
};
|
||||
|
||||
match T::get_entry(&config) {
|
||||
Ok(t) => {
|
||||
_ = output.send((id, Ok(t.clone()))).await;
|
||||
ConfigState::Waiting(t, watcher, rx, config)
|
||||
}
|
||||
Err((errors, t)) => {
|
||||
_ = output.send((id, Err((errors, t.clone())))).await;
|
||||
ConfigState::Waiting(t, watcher, rx, config)
|
||||
}
|
||||
}
|
||||
}
|
||||
ConfigState::Waiting(mut conf_data, watcher, mut rx, config) => match rx.next().await {
|
||||
Some(keys) => {
|
||||
let (errors, changed) = conf_data.update_keys(&config, &keys);
|
||||
|
||||
if !changed.is_empty() {
|
||||
if errors.is_empty() {
|
||||
_ = output.send((id, Ok(conf_data.clone()))).await;
|
||||
} else {
|
||||
_ = output.send((id, Err((errors, conf_data.clone())))).await;
|
||||
}
|
||||
}
|
||||
ConfigState::Waiting(conf_data, watcher, rx, config)
|
||||
}
|
||||
None => ConfigState::Failed,
|
||||
},
|
||||
ConfigState::Failed => pending().await,
|
||||
}
|
||||
) -> (Vec<crate::Error>, Vec<&'static str>);
|
||||
}
|
||||
|
|
|
|||
119
cosmic-config/src/subscription.rs
Normal file
119
cosmic-config/src/subscription.rs
Normal file
|
|
@ -0,0 +1,119 @@
|
|||
use iced_futures::futures::SinkExt;
|
||||
use iced_futures::{futures::channel::mpsc, subscription};
|
||||
use notify::RecommendedWatcher;
|
||||
use std::{borrow::Cow, hash::Hash};
|
||||
|
||||
use crate::{Config, CosmicConfigEntry};
|
||||
|
||||
pub enum ConfigState<T> {
|
||||
Init(Cow<'static, str>, u64, bool),
|
||||
Waiting(T, RecommendedWatcher, mpsc::Receiver<Vec<String>>, Config),
|
||||
Failed,
|
||||
}
|
||||
|
||||
pub enum ConfigUpdate<T> {
|
||||
Update(T),
|
||||
UpdateError(T, Vec<crate::Error>),
|
||||
Failed,
|
||||
}
|
||||
|
||||
pub fn config_subscription<
|
||||
I: 'static + Copy + Send + Sync + Hash,
|
||||
T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
|
||||
>(
|
||||
id: I,
|
||||
config_id: Cow<'static, str>,
|
||||
config_version: u64,
|
||||
) -> iced_futures::Subscription<(I, Result<T, (Vec<crate::Error>, T)>)> {
|
||||
subscription::channel(id, 100, move |mut output| {
|
||||
let config_id = config_id.clone();
|
||||
async move {
|
||||
let config_id = config_id.clone();
|
||||
let mut state = ConfigState::Init(config_id, config_version, false);
|
||||
|
||||
loop {
|
||||
state = start_listening(state, &mut output, id).await;
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn config_state_subscription<
|
||||
I: 'static + Copy + Send + Sync + Hash,
|
||||
T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
|
||||
>(
|
||||
id: I,
|
||||
config_id: Cow<'static, str>,
|
||||
config_version: u64,
|
||||
) -> iced_futures::Subscription<(I, Result<T, (Vec<crate::Error>, T)>)> {
|
||||
subscription::channel(id, 100, move |mut output| {
|
||||
let config_id = config_id.clone();
|
||||
async move {
|
||||
let config_id = config_id.clone();
|
||||
let mut state = ConfigState::Init(config_id, config_version, true);
|
||||
|
||||
loop {
|
||||
state = start_listening(state, &mut output, id).await;
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn start_listening<
|
||||
I: Copy,
|
||||
T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
|
||||
>(
|
||||
state: ConfigState<T>,
|
||||
output: &mut mpsc::Sender<(I, Result<T, (Vec<crate::Error>, T)>)>,
|
||||
id: I,
|
||||
) -> ConfigState<T> {
|
||||
use iced_futures::futures::{future::pending, StreamExt};
|
||||
|
||||
match state {
|
||||
ConfigState::Init(config_id, version, is_state) => {
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
let config = match if is_state {
|
||||
Config::new_state(&config_id, version)
|
||||
} else {
|
||||
Config::new(&config_id, version)
|
||||
} {
|
||||
Ok(c) => c,
|
||||
Err(_) => return ConfigState::Failed,
|
||||
};
|
||||
let watcher = match config.watch(move |_helper, keys| {
|
||||
let mut tx = tx.clone();
|
||||
let _ = tx.try_send(keys.to_vec());
|
||||
}) {
|
||||
Ok(w) => w,
|
||||
Err(_) => return ConfigState::Failed,
|
||||
};
|
||||
|
||||
match T::get_entry(&config) {
|
||||
Ok(t) => {
|
||||
_ = output.send((id, Ok(t.clone()))).await;
|
||||
ConfigState::Waiting(t, watcher, rx, config)
|
||||
}
|
||||
Err((errors, t)) => {
|
||||
_ = output.send((id, Err((errors, t.clone())))).await;
|
||||
ConfigState::Waiting(t, watcher, rx, config)
|
||||
}
|
||||
}
|
||||
}
|
||||
ConfigState::Waiting(mut conf_data, watcher, mut rx, config) => match rx.next().await {
|
||||
Some(keys) => {
|
||||
let (errors, changed) = conf_data.update_keys(&config, &keys);
|
||||
|
||||
if !changed.is_empty() {
|
||||
if errors.is_empty() {
|
||||
_ = output.send((id, Ok(conf_data.clone()))).await;
|
||||
} else {
|
||||
_ = output.send((id, Err((errors, conf_data.clone())))).await;
|
||||
}
|
||||
}
|
||||
ConfigState::Waiting(conf_data, watcher, rx, config)
|
||||
}
|
||||
None => ConfigState::Failed,
|
||||
},
|
||||
ConfigState::Failed => pending().await,
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue