From ff6e9e34830b4e4ab4d1ae5077e38656c3a5a419 Mon Sep 17 00:00:00 2001 From: Ashley Wulber Date: Tue, 11 Jul 2023 14:25:45 -0400 Subject: [PATCH] refactor(app-list): use channel subscription --- cosmic-app-list/src/app.rs | 2 +- cosmic-app-list/src/toplevel_subscription.rs | 76 ++++++++++---------- 2 files changed, 41 insertions(+), 37 deletions(-) diff --git a/cosmic-app-list/src/app.rs b/cosmic-app-list/src/app.rs index 7eda706b..2241d1d2 100755 --- a/cosmic-app-list/src/app.rs +++ b/cosmic-app-list/src/app.rs @@ -1045,7 +1045,7 @@ impl Application for CosmicAppList { fn subscription(&self) -> Subscription { Subscription::batch(vec![ self.applet_helper.theme_subscription(0).map(Message::Theme), - toplevel_subscription(self.subscription_ctr).map(|e| Message::Toplevel(e.1)), + toplevel_subscription(self.subscription_ctr).map(Message::Toplevel), events_with(|e, _| match e { cosmic::iced_runtime::core::Event::PlatformSpecific( event::PlatformSpecific::Wayland(event::wayland::Event::Seat(e, seat)), diff --git a/cosmic-app-list/src/toplevel_subscription.rs b/cosmic-app-list/src/toplevel_subscription.rs index 6aa7d316..b77b3bb0 100644 --- a/cosmic-app-list/src/toplevel_subscription.rs +++ b/cosmic-app-list/src/toplevel_subscription.rs @@ -9,7 +9,7 @@ use cosmic::iced::subscription; use cosmic_protocols::toplevel_info::v1::client::zcosmic_toplevel_handle_v1::ZcosmicToplevelHandleV1; use futures::{ channel::mpsc::{unbounded, UnboundedReceiver}, - StreamExt, + SinkExt, StreamExt, }; use std::{fmt::Debug, hash::Hash, thread::JoinHandle}; @@ -17,8 +17,14 @@ use crate::toplevel_handler::toplevel_handler; pub fn toplevel_subscription( id: I, -) -> iced::Subscription<(I, ToplevelUpdate)> { - subscription::unfold(id, State::Ready, move |state| start_listening(id, state)) +) -> iced::Subscription { + subscription::channel(id, 50, move |mut output| async move { + let mut state = State::Ready; + + loop { + state = start_listening(state, &mut output).await; + } + }) } pub enum State { @@ -31,40 +37,38 @@ pub enum State { Finished, } -async fn start_listening(id: I, mut state: State) -> ((I, ToplevelUpdate), State) { - loop { - let (update, new_state) = match state { - State::Ready => { - let (calloop_tx, calloop_rx) = calloop::channel::channel(); - let (toplevel_tx, toplevel_rx) = unbounded(); - let handle = std::thread::spawn(move || { - toplevel_handler(toplevel_tx, calloop_rx); - }); - ( - Some((id, ToplevelUpdate::Init(calloop_tx.clone()))), - State::Waiting(toplevel_rx, calloop_tx, handle), - ) - } - State::Waiting(mut rx, tx, handle) => { - if handle.is_finished() { - return ((id, ToplevelUpdate::Finished), State::Finished); - } - match rx.next().await { - Some(u) => (Some((id, u)), State::Waiting(rx, tx, handle)), - None => { - let _ = tx.send(ToplevelRequest::Exit); - (Some((id, ToplevelUpdate::Finished)), State::Finished) - } - } - } - State::Finished => iced::futures::future::pending().await, - }; - - if let Some(update) = update { - return (update, new_state); - } else { - state = new_state; +async fn start_listening( + state: State, + output: &mut futures::channel::mpsc::Sender, +) -> State { + match state { + State::Ready => { + let (calloop_tx, calloop_rx) = calloop::channel::channel(); + let (toplevel_tx, toplevel_rx) = unbounded(); + let handle = std::thread::spawn(move || { + toplevel_handler(toplevel_tx, calloop_rx); + }); + let tx = calloop_tx.clone(); + _ = output.send(ToplevelUpdate::Init(tx)).await; + State::Waiting(toplevel_rx, calloop_tx, handle) } + State::Waiting(mut rx, tx, handle) => { + if handle.is_finished() { + _ = output.send(ToplevelUpdate::Finished).await; + return State::Finished; + } + match rx.next().await { + Some(u) => { + _ = output.send(u).await; + State::Waiting(rx, tx, handle) + } + None => { + _ = output.send(ToplevelUpdate::Finished).await; + return State::Finished; + } + } + } + State::Finished => iced::futures::future::pending().await, } }