From 1074f3f49b8822de1967b3e097a596ff1759f06f Mon Sep 17 00:00:00 2001 From: Ashley Wulber Date: Tue, 11 Jul 2023 14:12:18 -0400 Subject: [PATCH] refactor(audio) use channel subscription --- cosmic-applet-audio/src/pulse.rs | 97 +++++++++++++++++++------------- 1 file changed, 58 insertions(+), 39 deletions(-) diff --git a/cosmic-applet-audio/src/pulse.rs b/cosmic-applet-audio/src/pulse.rs index a86a501e..7c3c6991 100644 --- a/cosmic-applet-audio/src/pulse.rs +++ b/cosmic-applet-audio/src/pulse.rs @@ -2,7 +2,8 @@ use std::cell::RefCell; use std::{rc::Rc, thread}; extern crate libpulse_binding as pulse; -use cosmic::iced::{subscription, Subscription}; +use cosmic::iced::{self, subscription}; +use cosmic::iced_futures::futures::{self, SinkExt}; //use futures::channel::mpsc; use libpulse_binding::{ callbacks::ListResult, @@ -16,42 +17,47 @@ use libpulse_binding::{ volume::ChannelVolumes, }; -pub fn connect() -> Subscription { - struct Connect; +pub fn connect() -> iced::Subscription { + struct SomeWorker; + + subscription::channel( + std::any::TypeId::of::(), + 50, + move |mut output| async move { + let mut state = State::Init; - subscription::unfold( - std::any::TypeId::of::(), - State::Init, - |mut state| async move { loop { - let (update, new_state) = connection(state).await; - state = new_state; - if let Some(update) = update { - return (update, state); - } + state = start_listening(state, &mut output).await; } }, ) } -async fn connection(state: State) -> (Option, State) { +async fn start_listening( + state: State, + output: &mut futures::channel::mpsc::Sender, +) -> State { match state { State::Init => { let PulseHandle { to_pulse, from_pulse, } = PulseHandle::new(); - ( - Some(Event::Init(Connection(to_pulse))), - State::Connecting(from_pulse), - ) + _ = output.send(Event::Init(Connection(to_pulse))).await; + + State::Connecting(from_pulse) } // Waiting for Connection to succeed // The GUI doesn't have to monitor this state, as it is never sent to the GUI State::Connecting(mut from_pulse) => match from_pulse.recv().await { - Some(Message::Connected) => (Some(Event::Connected), State::Connected(from_pulse)), + Some(Message::Connected) => { + _ = output.send(Event::Connected).await; + State::Connected(from_pulse) + } Some(Message::Disconnected) => { - (Some(Event::Disconnected), State::Connecting(from_pulse)) + _ = output.send(Event::Disconnected).await; + + State::Connecting(from_pulse) } Some(m) => { panic!("Unexpected message: {:?}", m); @@ -63,27 +69,40 @@ async fn connection(state: State) -> (Option, State) { State::Connected(mut from_pulse) => { // This is where we match messages from the pulse server to pass to the gui match from_pulse.recv().await { - Some(Message::SetSinks(sinks)) => ( - Some(Event::MessageReceived(Message::SetSinks(sinks))), - State::Connected(from_pulse), - ), - Some(Message::SetSources(sources)) => ( - Some(Event::MessageReceived(Message::SetSources(sources))), - State::Connected(from_pulse), - ), - Some(Message::SetDefaultSink(sink)) => ( - Some(Event::MessageReceived(Message::SetDefaultSink(sink))), - State::Connected(from_pulse), - ), - Some(Message::SetDefaultSource(source)) => ( - Some(Event::MessageReceived(Message::SetDefaultSource(source))), - State::Connected(from_pulse), - ), - Some(Message::Disconnected) => { - (Some(Event::Disconnected), State::Connecting(from_pulse)) + Some(Message::SetSinks(sinks)) => { + _ = output + .send(Event::MessageReceived(Message::SetSinks(sinks))) + .await; + + State::Connected(from_pulse) } - None => (Some(Event::Disconnected), State::Connecting(from_pulse)), - _ => (None, State::Connected(from_pulse)), + Some(Message::SetSources(sources)) => { + _ = output + .send(Event::MessageReceived(Message::SetSources(sources))) + .await; + State::Connected(from_pulse) + } + Some(Message::SetDefaultSink(sink)) => { + _ = output + .send(Event::MessageReceived(Message::SetDefaultSink(sink))) + .await; + State::Connected(from_pulse) + } + Some(Message::SetDefaultSource(source)) => { + _ = output + .send(Event::MessageReceived(Message::SetDefaultSource(source))) + .await; + State::Connected(from_pulse) + } + Some(Message::Disconnected) => { + _ = output.send(Event::Disconnected).await; + State::Connecting(from_pulse) + } + None => { + _ = output.send(Event::Disconnected).await; + State::Connecting(from_pulse) + } + _ => State::Connected(from_pulse), } } }