wip rebase updates

This commit is contained in:
Ashley Wulber 2026-02-10 15:37:41 -05:00
parent 86dcf8af6c
commit e10459fb37
68 changed files with 1776 additions and 1544 deletions

View file

@ -1,11 +1,11 @@
use std::ops::Deref;
use std::{any::TypeId, ops::Deref};
use crate::{CosmicConfigEntry, Update};
use cosmic_settings_daemon::{Changed, ConfigProxy, CosmicSettingsDaemonProxy};
use futures_util::SinkExt;
use iced_futures::{
Subscription,
futures::{self, Stream, StreamExt, future::pending},
futures::{self, StreamExt, future::pending},
stream,
};
@ -57,6 +57,20 @@ impl Watcher {
}
}
#[derive(Clone)]
struct Wrapper(
TypeId,
CosmicSettingsDaemonProxy<'static>,
&'static str,
bool,
);
impl std::hash::Hash for Wrapper {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.hash(state);
}
}
#[allow(clippy::too_many_lines)]
pub fn watcher_subscription<T: CosmicConfigEntry + Send + Sync + Default + 'static + Clone>(
settings_daemon: CosmicSettingsDaemonProxy<'static>,
@ -64,166 +78,185 @@ pub fn watcher_subscription<T: CosmicConfigEntry + Send + Sync + Default + 'stat
is_state: bool,
) -> iced_futures::Subscription<Update<T>> {
let id = std::any::TypeId::of::<T>();
Subscription::run_with_id(
(id, config_id),
watcher_stream(settings_daemon, config_id, is_state),
)
}
Subscription::run_with(
Wrapper(id, settings_daemon, config_id, is_state),
|&Wrapper(_, ref settings_daemon, ref config_id, ref is_state)| {
let is_state = *is_state;
let config_id = *config_id;
let settings_daemon = settings_daemon.clone();
enum Change {
Changes(Changed),
OwnerChanged(bool),
}
stream::channel(
5,
move |mut tx: futures::channel::mpsc::Sender<Update<T>>| async move {
let version = T::VERSION;
fn watcher_stream<T: CosmicConfigEntry + Send + Sync + Default + 'static + Clone>(
settings_daemon: CosmicSettingsDaemonProxy<'static>,
config_id: &'static str,
is_state: bool,
) -> impl Stream<Item = Update<T>> {
enum Change {
Changes(Changed),
OwnerChanged(bool),
}
stream::channel(5, move |mut tx| async move {
let version = T::VERSION;
let Ok(cosmic_config) = (if is_state {
crate::Config::new_state(config_id, version)
} else {
crate::Config::new(config_id, version)
}) else {
pending::<()>().await;
unreachable!();
};
let Ok(cosmic_config) = (if is_state {
crate::Config::new_state(config_id, version)
} else {
crate::Config::new(config_id, version)
}) else {
pending::<()>().await;
unreachable!();
};
let mut attempts = 0;
let mut attempts = 0;
loop {
let watcher = if is_state {
Watcher::new_state(&settings_daemon, config_id, version).await
} else {
Watcher::new_config(&settings_daemon, config_id, version).await
};
let Ok(watcher) = watcher else {
tracing::error!("Failed to create watcher for {config_id}");
loop {
let watcher = if is_state {
Watcher::new_state(&settings_daemon, config_id, version).await
} else {
Watcher::new_config(&settings_daemon, config_id, version).await
};
let Ok(watcher) = watcher else {
tracing::error!("Failed to create watcher for {config_id}");
#[cfg(feature = "tokio")]
::tokio::time::sleep(::tokio::time::Duration::from_secs(
2_u64.pow(attempts),
))
.await;
#[cfg(feature = "async-std")]
async_std::task::sleep(std::time::Duration::from_secs(
2_u64.pow(attempts),
))
.await;
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
{
pending::<()>().await;
unreachable!();
}
attempts += 1;
// The settings daemon has exited
continue;
};
let Ok(changes) = watcher.receive_changed().await else {
tracing::error!("Failed to listen for changes for {config_id}");
#[cfg(feature = "tokio")]
::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(feature = "async-std")]
async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
{
pending::<()>().await;
unreachable!();
}
attempts += 1;
// The settings daemon has exited
continue;
};
let Ok(changes) = watcher.receive_changed().await else {
tracing::error!("Failed to listen for changes for {config_id}");
#[cfg(feature = "tokio")]
::tokio::time::sleep(::tokio::time::Duration::from_secs(
2_u64.pow(attempts),
))
.await;
#[cfg(feature = "async-std")]
async_std::task::sleep(std::time::Duration::from_secs(
2_u64.pow(attempts),
))
.await;
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
{
pending::<()>().await;
unreachable!();
}
attempts += 1;
// The settings daemon has exited
continue;
};
#[cfg(feature = "tokio")]
::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(feature = "async-std")]
async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
{
pending::<()>().await;
unreachable!();
}
attempts += 1;
// The settings daemon has exited
continue;
};
let mut changes = changes.map(Change::Changes).fuse();
let mut changes = changes.map(Change::Changes).fuse();
let Ok(owner_changed) = watcher.inner().receive_owner_changed().await
else {
tracing::error!("Failed to listen for owner changes for {config_id}");
#[cfg(feature = "tokio")]
::tokio::time::sleep(::tokio::time::Duration::from_secs(
2_u64.pow(attempts),
))
.await;
#[cfg(feature = "async-std")]
async_std::task::sleep(std::time::Duration::from_secs(
2_u64.pow(attempts),
))
.await;
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
{
pending::<()>().await;
unreachable!();
}
attempts += 1;
// The settings daemon has exited
continue;
};
let mut owner_changed = owner_changed
.map(|c| Change::OwnerChanged(c.is_some()))
.fuse();
let Ok(owner_changed) = watcher.inner().receive_owner_changed().await else {
tracing::error!("Failed to listen for owner changes for {config_id}");
#[cfg(feature = "tokio")]
::tokio::time::sleep(::tokio::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(feature = "async-std")]
async_std::task::sleep(std::time::Duration::from_secs(2_u64.pow(attempts))).await;
#[cfg(not(any(feature = "tokio", feature = "async-std")))]
{
pending::<()>().await;
unreachable!();
}
attempts += 1;
// The settings daemon has exited
continue;
};
let mut owner_changed = owner_changed
.map(|c| Change::OwnerChanged(c.is_some()))
.fuse();
// update now, just in case we missed changes while setting up stream
let mut config = match T::get_entry(&cosmic_config) {
Ok(config) => config,
Err((errors, default)) => {
for why in &errors {
if why.is_err() {
if let crate::Error::GetKey(_, err) = &why {
if err.kind() == std::io::ErrorKind::NotFound {
// No system default config installed; don't error
continue;
}
}
tracing::error!("error getting config: {config_id} {why}");
}
}
default
}
};
// update now, just in case we missed changes while setting up stream
let mut config = match T::get_entry(&cosmic_config) {
Ok(config) => config,
Err((errors, default)) => {
for why in &errors {
if why.is_err() {
if let crate::Error::GetKey(_, err) = &why {
if err.kind() == std::io::ErrorKind::NotFound {
// No system default config installed; don't error
continue;
if let Err(err) = tx
.send(Update {
errors: Vec::new(),
keys: Vec::new(),
config: config.clone(),
})
.await
{
tracing::error!("Failed to send config: {err}");
}
loop {
let change: Changed = futures::select! {
c = changes.next() => {
let Some(Change::Changes(c)) = c else {
break;
};
c
}
c = owner_changed.next() => {
let Some(Change::OwnerChanged(cont)) = c else {
break;
};
if cont {
continue;
} else {
// The settings daemon has exited
break;
}
},
};
// Reset the attempts counter if we received a change
attempts = 0;
let Ok(args) = change.args() else {
// The settings daemon has exited
break;
};
let (errors, keys) = config.update_keys(&cosmic_config, &[args.key]);
if !keys.is_empty() {
if let Err(err) = tx
.send(Update {
errors,
keys,
config: config.clone(),
})
.await
{
tracing::error!("Failed to send config update: {err}");
}
}
tracing::error!("error getting config: {config_id} {why}");
}
}
default
}
};
if let Err(err) = tx
.send(Update {
errors: Vec::new(),
keys: Vec::new(),
config: config.clone(),
})
.await
{
tracing::error!("Failed to send config: {err}");
}
loop {
let change: Changed = futures::select! {
c = changes.next() => {
let Some(Change::Changes(c)) = c else {
break;
};
c
}
c = owner_changed.next() => {
let Some(Change::OwnerChanged(cont)) = c else {
break;
};
if cont {
continue;
} else {
// The settings daemon has exited
break;
}
},
};
// Reset the attempts counter if we received a change
attempts = 0;
let Ok(args) = change.args() else {
// The settings daemon has exited
break;
};
let (errors, keys) = config.update_keys(&cosmic_config, &[args.key]);
if !keys.is_empty() {
if let Err(err) = tx
.send(Update {
errors,
keys,
config: config.clone(),
})
.await
{
tracing::error!("Failed to send config update: {err}");
}
}
}
}
})
},
)
},
)
}

View file

@ -25,7 +25,24 @@ pub fn config_subscription<
config_id: Cow<'static, str>,
config_version: u64,
) -> iced_futures::Subscription<crate::Update<T>> {
iced_futures::Subscription::run_with_id(id, watcher_stream(config_id, config_version, false))
iced_futures::Subscription::run_with(
(id, config_id, config_version, false),
// FIXME there are type issues related to the 'static lifetime of the Cow if this is extracted to a named function...
|(_, config_id, config_version, is_state)| {
let config_id = config_id.clone();
let config_version = *config_version;
let is_state = *is_state;
stream::channel(100, move |mut output| async move {
let config_id = config_id.clone();
let mut state = ConfigState::Init(config_id, config_version, is_state);
loop {
state = start_listening::<T>(state, &mut output).await;
}
})
},
)
}
#[cold]
@ -37,25 +54,23 @@ pub fn config_state_subscription<
config_id: Cow<'static, str>,
config_version: u64,
) -> iced_futures::Subscription<crate::Update<T>> {
iced_futures::Subscription::run_with_id(id, watcher_stream(config_id, config_version, true))
}
fn watcher_stream<T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry>(
config_id: Cow<'static, str>,
config_version: u64,
is_state: bool,
) -> impl Stream<Item = crate::Update<T>> {
stream::channel(100, move |mut output| {
let config_id = config_id.clone();
async move {
iced_futures::Subscription::run_with(
(id, config_id, config_version, true),
|(_, config_id, config_version, is_state)| {
let config_id = config_id.clone();
let mut state = ConfigState::Init(config_id, config_version, is_state);
let config_version = *config_version;
let is_state = *is_state;
loop {
state = start_listening::<T>(state, &mut output).await;
}
}
})
stream::channel(100, move |mut output| async move {
let config_id = config_id.clone();
let mut state = ConfigState::Init(config_id, config_version, is_state);
loop {
state = start_listening::<T>(state, &mut output).await;
}
})
},
)
}
async fn start_listening<T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry>(