refactor(audio) use channel subscription

This commit is contained in:
Ashley Wulber 2023-07-11 14:12:18 -04:00 committed by Jeremy Soller
parent aaf0aa674c
commit 1074f3f49b

View file

@ -2,7 +2,8 @@ use std::cell::RefCell;
use std::{rc::Rc, thread}; use std::{rc::Rc, thread};
extern crate libpulse_binding as pulse; extern crate libpulse_binding as pulse;
use cosmic::iced::{subscription, Subscription}; use cosmic::iced::{self, subscription};
use cosmic::iced_futures::futures::{self, SinkExt};
//use futures::channel::mpsc; //use futures::channel::mpsc;
use libpulse_binding::{ use libpulse_binding::{
callbacks::ListResult, callbacks::ListResult,
@ -16,42 +17,47 @@ use libpulse_binding::{
volume::ChannelVolumes, volume::ChannelVolumes,
}; };
pub fn connect() -> Subscription<Event> { pub fn connect() -> iced::Subscription<Event> {
struct Connect; struct SomeWorker;
subscription::channel(
std::any::TypeId::of::<SomeWorker>(),
50,
move |mut output| async move {
let mut state = State::Init;
subscription::unfold(
std::any::TypeId::of::<Connect>(),
State::Init,
|mut state| async move {
loop { loop {
let (update, new_state) = connection(state).await; state = start_listening(state, &mut output).await;
state = new_state;
if let Some(update) = update {
return (update, state);
}
} }
}, },
) )
} }
async fn connection(state: State) -> (Option<Event>, State) { async fn start_listening(
state: State,
output: &mut futures::channel::mpsc::Sender<Event>,
) -> State {
match state { match state {
State::Init => { State::Init => {
let PulseHandle { let PulseHandle {
to_pulse, to_pulse,
from_pulse, from_pulse,
} = PulseHandle::new(); } = PulseHandle::new();
( _ = output.send(Event::Init(Connection(to_pulse))).await;
Some(Event::Init(Connection(to_pulse))),
State::Connecting(from_pulse), State::Connecting(from_pulse)
)
} }
// Waiting for Connection to succeed // Waiting for Connection to succeed
// The GUI doesn't have to monitor this state, as it is never sent to the GUI // The GUI doesn't have to monitor this state, as it is never sent to the GUI
State::Connecting(mut from_pulse) => match from_pulse.recv().await { State::Connecting(mut from_pulse) => match from_pulse.recv().await {
Some(Message::Connected) => (Some(Event::Connected), State::Connected(from_pulse)), Some(Message::Connected) => {
_ = output.send(Event::Connected).await;
State::Connected(from_pulse)
}
Some(Message::Disconnected) => { Some(Message::Disconnected) => {
(Some(Event::Disconnected), State::Connecting(from_pulse)) _ = output.send(Event::Disconnected).await;
State::Connecting(from_pulse)
} }
Some(m) => { Some(m) => {
panic!("Unexpected message: {:?}", m); panic!("Unexpected message: {:?}", m);
@ -63,27 +69,40 @@ async fn connection(state: State) -> (Option<Event>, State) {
State::Connected(mut from_pulse) => { State::Connected(mut from_pulse) => {
// This is where we match messages from the pulse server to pass to the gui // This is where we match messages from the pulse server to pass to the gui
match from_pulse.recv().await { match from_pulse.recv().await {
Some(Message::SetSinks(sinks)) => ( Some(Message::SetSinks(sinks)) => {
Some(Event::MessageReceived(Message::SetSinks(sinks))), _ = output
State::Connected(from_pulse), .send(Event::MessageReceived(Message::SetSinks(sinks)))
), .await;
Some(Message::SetSources(sources)) => (
Some(Event::MessageReceived(Message::SetSources(sources))), State::Connected(from_pulse)
State::Connected(from_pulse),
),
Some(Message::SetDefaultSink(sink)) => (
Some(Event::MessageReceived(Message::SetDefaultSink(sink))),
State::Connected(from_pulse),
),
Some(Message::SetDefaultSource(source)) => (
Some(Event::MessageReceived(Message::SetDefaultSource(source))),
State::Connected(from_pulse),
),
Some(Message::Disconnected) => {
(Some(Event::Disconnected), State::Connecting(from_pulse))
} }
None => (Some(Event::Disconnected), State::Connecting(from_pulse)), Some(Message::SetSources(sources)) => {
_ => (None, State::Connected(from_pulse)), _ = output
.send(Event::MessageReceived(Message::SetSources(sources)))
.await;
State::Connected(from_pulse)
}
Some(Message::SetDefaultSink(sink)) => {
_ = output
.send(Event::MessageReceived(Message::SetDefaultSink(sink)))
.await;
State::Connected(from_pulse)
}
Some(Message::SetDefaultSource(source)) => {
_ = output
.send(Event::MessageReceived(Message::SetDefaultSource(source)))
.await;
State::Connected(from_pulse)
}
Some(Message::Disconnected) => {
_ = output.send(Event::Disconnected).await;
State::Connecting(from_pulse)
}
None => {
_ = output.send(Event::Disconnected).await;
State::Connecting(from_pulse)
}
_ => State::Connected(from_pulse),
} }
} }
} }