refactor(battery): use channel subscription

This commit is contained in:
Ashley Wulber 2023-07-11 14:54:44 -04:00 committed by Jeremy Soller
parent ff6e9e3483
commit 8d9bb40b1b
5 changed files with 127 additions and 174 deletions

View file

@ -18,8 +18,7 @@
//!
//! …consequently `zbus-xmlgen` did not generate code for the above interfaces.
use cosmic::iced;
use cosmic::iced::subscription;
use cosmic::iced::{self, futures::SinkExt, subscription};
use std::fmt::Debug;
use std::hash::Hash;
use tokio::sync::mpsc::UnboundedReceiver;
@ -113,9 +112,13 @@ pub async fn set_power_profile(daemon: PowerDaemonProxy<'_>, power: Power) -> Re
pub fn power_profile_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>(
id: I,
) -> iced::Subscription<(I, PowerProfileUpdate)> {
subscription::unfold(id, State::Ready, move |state| {
start_listening_loop(id, state)
) -> iced::Subscription<PowerProfileUpdate> {
subscription::channel(id, 50, move |mut output| async move {
let mut state = State::Ready;
loop {
state = start_listening(state, &mut output).await;
}
})
}
@ -126,25 +129,18 @@ pub enum State {
Finished,
}
async fn start_listening_loop<I: Copy + Debug>(
id: I,
mut state: State,
) -> ((I, PowerProfileUpdate), State) {
loop {
let (update, new_state) = start_listening(id, state).await;
state = new_state;
if let Some(update) = update {
return (update, state);
}
}
}
async fn start_listening<I: Copy>(id: I, state: State) -> (Option<(I, PowerProfileUpdate)>, State) {
async fn start_listening(
state: State,
output: &mut futures::channel::mpsc::Sender<PowerProfileUpdate>,
) -> State {
match state {
State::Ready => {
let conn = match Connection::system().await.map_err(|e| e.to_string()) {
Ok(conn) => conn,
Err(e) => return (Some((id, PowerProfileUpdate::Error(e))), State::Finished),
Err(e) => {
_ = output.send(PowerProfileUpdate::Error(e)).await;
return State::Finished;
}
};
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
@ -154,10 +150,9 @@ async fn start_listening<I: Copy>(id: I, state: State) -> (Option<(I, PowerProfi
{
Ok(p) => p,
Err(e) => {
return (
Some((id, PowerProfileUpdate::Error(e))),
State::Waiting(conn, rx),
)
_ = output.send(PowerProfileUpdate::Error(e)).await;
return State::Waiting(conn, rx);
}
};
let profile = match get_power_profile(power_proxy)
@ -166,17 +161,12 @@ async fn start_listening<I: Copy>(id: I, state: State) -> (Option<(I, PowerProfi
{
Ok(p) => p,
Err(e) => {
return (
Some((id, PowerProfileUpdate::Error(e))),
State::Waiting(conn, rx),
)
_ = output.send(PowerProfileUpdate::Error(e)).await;
return State::Waiting(conn, rx);
}
};
(
Some((id, PowerProfileUpdate::Init(profile, tx))),
State::Waiting(conn, rx),
)
_ = output.send(PowerProfileUpdate::Init(profile, tx)).await;
State::Waiting(conn, rx)
}
State::Waiting(conn, mut rx) => {
let power_proxy = match PowerDaemonProxy::new(&conn)
@ -185,32 +175,24 @@ async fn start_listening<I: Copy>(id: I, state: State) -> (Option<(I, PowerProfi
{
Ok(p) => p,
Err(e) => {
return (
Some((id, PowerProfileUpdate::Error(e))),
State::Waiting(conn, rx),
)
_ = output.send(PowerProfileUpdate::Error(e)).await;
return State::Waiting(conn, rx);
}
};
match rx.recv().await {
Some(PowerProfileRequest::Get) => {
if let Ok(profile) = get_power_profile(power_proxy).await {
(
Some((id, PowerProfileUpdate::Update { profile })),
State::Waiting(conn, rx),
)
} else {
(None, State::Waiting(conn, rx))
_ = output.send(PowerProfileUpdate::Update { profile }).await;
}
State::Waiting(conn, rx)
}
Some(PowerProfileRequest::Set(profile)) => {
let _ = set_power_profile(power_proxy, profile).await;
(
Some((id, PowerProfileUpdate::Update { profile })),
State::Waiting(conn, rx),
)
_ = output.send(PowerProfileUpdate::Update { profile }).await;
State::Waiting(conn, rx)
}
None => (None, State::Finished),
None => State::Finished,
}
}
State::Finished => iced::futures::future::pending().await,