diff --git a/cosmic-config/src/lib.rs b/cosmic-config/src/lib.rs index e3d82138..27f3a13e 100644 --- a/cosmic-config/src/lib.rs +++ b/cosmic-config/src/lib.rs @@ -1,7 +1,6 @@ +use iced_futures::futures::SinkExt; #[cfg(feature = "subscription")] -use iced_futures::futures::channel::mpsc; -#[cfg(feature = "subscription")] -use iced_futures::subscription; +use iced_futures::{futures::channel::mpsc, subscription}; use notify::{ event::{EventKind, ModifyKind}, RecommendedWatcher, Watcher, @@ -319,24 +318,27 @@ pub fn config_subscription< config_id: Cow<'static, str>, config_version: u64, ) -> iced_futures::Subscription<(I, Result, T)>)> { - subscription::unfold( - id, - ConfigState::Init(config_id, config_version), - move |state| start_listening_loop(id, state), - ) + subscription::channel(id, 100, move |mut output| { + let config_id = config_id.clone(); + async move { + let config_id = config_id.clone(); + let mut state = ConfigState::Init(config_id, config_version); + + loop { + state = start_listening(state, &mut output, id).await; + } + } + }) } -#[cfg(feature = "subscription")] async fn start_listening< I: Copy, T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry, >( - id: I, state: ConfigState, -) -> ( - Option<(I, Result, T)>)>, - ConfigState, -) { + output: &mut mpsc::Sender<(I, Result, T)>)>, + id: I, +) -> ConfigState { use iced_futures::futures::{future::pending, StreamExt}; match state { @@ -344,66 +346,49 @@ async fn start_listening< let (tx, rx) = mpsc::channel(100); let config = match Config::new(&config_id, version) { Ok(c) => c, - Err(_) => return (None, ConfigState::Failed), + Err(_) => return ConfigState::Failed, }; let watcher = match config.watch(move |_helper, _keys| { let mut tx = tx.clone(); let _ = tx.try_send(()); }) { Ok(w) => w, - Err(_) => return (None, ConfigState::Failed), + Err(_) => return ConfigState::Failed, }; + let msg = T::get_entry(&config); + _ = output.send((id, msg)).await; match T::get_entry(&config) { - Ok(t) => ( - Some((id, Ok(t.clone()))), - ConfigState::Waiting(t, watcher, rx, config), - ), - Err((errors, t)) => ( - Some((id, Err((errors, t.clone())))), - ConfigState::Waiting(t, watcher, rx, config), - ), + Ok(t) => { + _ = output.send((id, Ok(t.clone()))).await; + ConfigState::Waiting(t, watcher, rx, config) + } + Err((errors, t)) => { + _ = output.send((id, Err((errors, t.clone())))).await; + ConfigState::Waiting(t, watcher, rx, config) + } } } - ConfigState::Waiting(old, watcher, mut rx, config) => match rx.next().await { + ConfigState::Waiting(mut old, watcher, mut rx, config) => match rx.next().await { Some(_) => match T::get_entry(&config) { - Ok(t) => ( + Ok(t) => { if t != old { - Some((id, Ok(t.clone()))) - } else { - None - }, - ConfigState::Waiting(t, watcher, rx, config), - ), - Err((errors, t)) => ( + old = t; + _ = output.send((id, Ok(old.clone()))).await; + } + ConfigState::Waiting(old, watcher, rx, config) + } + Err((errors, t)) => { if t != old { - Some((id, Err((errors, t.clone())))) - } else { - None - }, - ConfigState::Waiting(t, watcher, rx, config), - ), + old = t; + _ = output.send((id, Err((errors, old.clone())))).await; + } + ConfigState::Waiting(old, watcher, rx, config) + } }, - None => (None, ConfigState::Failed), + None => ConfigState::Failed, }, ConfigState::Failed => pending().await, } } - -#[cfg(feature = "subscription")] -async fn start_listening_loop< - I: Copy, - T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry, ->( - id: I, - mut state: ConfigState, -) -> ((I, Result, T)>), ConfigState) { - loop { - let (update, new_state) = start_listening(id, state).await; - state = new_state; - if let Some(update) = update { - return (update, state); - } - } -}