refactor: use channel subscription for config subscriptions

This commit is contained in:
Ashley Wulber 2023-08-04 15:45:31 -04:00 committed by Ashley Wulber
parent 68225c78cd
commit 40efcbbe31

View file

@ -1,7 +1,6 @@
use iced_futures::futures::SinkExt;
#[cfg(feature = "subscription")] #[cfg(feature = "subscription")]
use iced_futures::futures::channel::mpsc; use iced_futures::{futures::channel::mpsc, subscription};
#[cfg(feature = "subscription")]
use iced_futures::subscription;
use notify::{ use notify::{
event::{EventKind, ModifyKind}, event::{EventKind, ModifyKind},
RecommendedWatcher, Watcher, RecommendedWatcher, Watcher,
@ -319,24 +318,27 @@ pub fn config_subscription<
config_id: Cow<'static, str>, config_id: Cow<'static, str>,
config_version: u64, config_version: u64,
) -> iced_futures::Subscription<(I, Result<T, (Vec<crate::Error>, T)>)> { ) -> iced_futures::Subscription<(I, Result<T, (Vec<crate::Error>, T)>)> {
subscription::unfold( subscription::channel(id, 100, move |mut output| {
id, let config_id = config_id.clone();
ConfigState::Init(config_id, config_version), async move {
move |state| start_listening_loop(id, state), let config_id = config_id.clone();
) let mut state = ConfigState::Init(config_id, config_version);
loop {
state = start_listening(state, &mut output, id).await;
}
}
})
} }
#[cfg(feature = "subscription")]
async fn start_listening< async fn start_listening<
I: Copy, I: Copy,
T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry, T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
>( >(
id: I,
state: ConfigState<T>, state: ConfigState<T>,
) -> ( output: &mut mpsc::Sender<(I, Result<T, (Vec<crate::Error>, T)>)>,
Option<(I, Result<T, (Vec<crate::Error>, T)>)>, id: I,
ConfigState<T>, ) -> ConfigState<T> {
) {
use iced_futures::futures::{future::pending, StreamExt}; use iced_futures::futures::{future::pending, StreamExt};
match state { match state {
@ -344,66 +346,49 @@ async fn start_listening<
let (tx, rx) = mpsc::channel(100); let (tx, rx) = mpsc::channel(100);
let config = match Config::new(&config_id, version) { let config = match Config::new(&config_id, version) {
Ok(c) => c, Ok(c) => c,
Err(_) => return (None, ConfigState::Failed), Err(_) => return ConfigState::Failed,
}; };
let watcher = match config.watch(move |_helper, _keys| { let watcher = match config.watch(move |_helper, _keys| {
let mut tx = tx.clone(); let mut tx = tx.clone();
let _ = tx.try_send(()); let _ = tx.try_send(());
}) { }) {
Ok(w) => w, Ok(w) => w,
Err(_) => return (None, ConfigState::Failed), Err(_) => return ConfigState::Failed,
}; };
let msg = T::get_entry(&config);
_ = output.send((id, msg)).await;
match T::get_entry(&config) { match T::get_entry(&config) {
Ok(t) => ( Ok(t) => {
Some((id, Ok(t.clone()))), _ = output.send((id, Ok(t.clone()))).await;
ConfigState::Waiting(t, watcher, rx, config), ConfigState::Waiting(t, watcher, rx, config)
), }
Err((errors, t)) => ( Err((errors, t)) => {
Some((id, Err((errors, t.clone())))), _ = output.send((id, Err((errors, t.clone())))).await;
ConfigState::Waiting(t, watcher, rx, config), ConfigState::Waiting(t, watcher, rx, config)
), }
} }
} }
ConfigState::Waiting(old, watcher, mut rx, config) => match rx.next().await { ConfigState::Waiting(mut old, watcher, mut rx, config) => match rx.next().await {
Some(_) => match T::get_entry(&config) { Some(_) => match T::get_entry(&config) {
Ok(t) => ( Ok(t) => {
if t != old { if t != old {
Some((id, Ok(t.clone()))) old = t;
} else { _ = output.send((id, Ok(old.clone()))).await;
None }
}, ConfigState::Waiting(old, watcher, rx, config)
ConfigState::Waiting(t, watcher, rx, config), }
), Err((errors, t)) => {
Err((errors, t)) => (
if t != old { if t != old {
Some((id, Err((errors, t.clone())))) old = t;
} else { _ = output.send((id, Err((errors, old.clone())))).await;
None }
}, ConfigState::Waiting(old, watcher, rx, config)
ConfigState::Waiting(t, watcher, rx, config), }
),
}, },
None => (None, ConfigState::Failed), None => ConfigState::Failed,
}, },
ConfigState::Failed => pending().await, ConfigState::Failed => pending().await,
} }
} }
#[cfg(feature = "subscription")]
async fn start_listening_loop<
I: Copy,
T: 'static + Send + Sync + PartialEq + Clone + CosmicConfigEntry,
>(
id: I,
mut state: ConfigState<T>,
) -> ((I, Result<T, (Vec<crate::Error>, T)>), ConfigState<T>) {
loop {
let (update, new_state) = start_listening(id, state).await;
state = new_state;
if let Some(update) = update {
return (update, state);
}
}
}