fix: make all subscriptions resistant to being restarted

many of the errors we've been seeing the last few days are because of subscriptions which are restarting
This commit is contained in:
Ashley Wulber 2024-01-18 21:02:35 -05:00 committed by Ashley Wulber
parent ebe688c747
commit d4e0dd8fb8
12 changed files with 184 additions and 138 deletions

View file

@ -43,8 +43,7 @@ mod pulse;
const VERSION: &str = env!("CARGO_PKG_VERSION");
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> cosmic::iced::Result {
pub fn main() -> cosmic::iced::Result {
tracing_subscriber::fmt::init();
let _ = tracing_log::LogTracer::init();

View file

@ -4,6 +4,7 @@ use std::{rc::Rc, thread};
extern crate libpulse_binding as pulse;
use cosmic::iced::{self, subscription};
use cosmic::iced_futures::futures::{self, SinkExt};
use cosmic_time::once_cell::sync::Lazy;
//use futures::channel::mpsc;
use libpulse_binding::{
callbacks::ListResult,
@ -16,6 +17,10 @@ use libpulse_binding::{
proplist::Proplist,
volume::ChannelVolumes,
};
use tokio::sync::{mpsc, Mutex};
pub static FROM_PULSE: Lazy<Mutex<Option<(mpsc::Receiver<Message>, mpsc::Sender<Message>)>>> =
Lazy::new(|| Mutex::new(None));
pub fn connect() -> iced::Subscription<Event> {
struct SomeWorker;
@ -24,7 +29,7 @@ pub fn connect() -> iced::Subscription<Event> {
std::any::TypeId::of::<SomeWorker>(),
50,
move |mut output| async move {
let mut state = State::Init;
let mut state = State::Connecting;
loop {
state = start_listening(state, &mut output).await;
@ -38,34 +43,50 @@ async fn start_listening(
output: &mut futures::channel::mpsc::Sender<Event>,
) -> State {
match state {
State::Init => {
let PulseHandle {
to_pulse,
from_pulse,
} = PulseHandle::new();
_ = output.send(Event::Init(Connection(to_pulse))).await;
State::Connecting(from_pulse)
}
// Waiting for Connection to succeed
State::Connecting(mut from_pulse) => match from_pulse.recv().await {
Some(Message::Connected) => {
_ = output.send(Event::Connected).await;
State::Connected(from_pulse)
}
Some(Message::Disconnected) => {
_ = output.send(Event::Disconnected).await;
State::Connecting => {
let mut guard = FROM_PULSE.lock().await;
let (from_pulse, to_pulse) = {
if guard.is_none() {
let PulseHandle {
to_pulse,
from_pulse,
} = PulseHandle::new();
_ = output.send(Event::Init(Connection(to_pulse.clone()))).await;
State::Connecting(from_pulse)
*guard = Some((from_pulse, to_pulse));
}
guard.as_mut().unwrap()
};
to_pulse
.send(Message::UpdateConnection)
.await
.expect("Failed to request connection update");
match from_pulse.recv().await {
Some(Message::Connected) => {
_ = output.send(Event::Connected).await;
State::Connected
}
Some(Message::Disconnected) => {
_ = output.send(Event::Disconnected).await;
State::Connecting
}
Some(m) => {
tracing::error!("Unexpected message: {:?}", m);
State::Connecting
}
None => {
panic!("Pulse Sender dropped, something has gone wrong!");
}
}
Some(m) => {
panic!("Unexpected message: {:?}", m);
}
None => {
panic!("Pulse Sender dropped, something has gone wrong!");
}
},
State::Connected(mut from_pulse) => {
}
State::Connected => {
let mut guard = FROM_PULSE.lock().await;
let Some((from_pulse, _)) = guard.as_mut() else {
return State::Connecting;
};
// This is where we match messages from the pulse server to pass to the gui
match from_pulse.recv().await {
Some(Message::SetSinks(sinks)) => {
@ -73,35 +94,35 @@ async fn start_listening(
.send(Event::MessageReceived(Message::SetSinks(sinks)))
.await;
State::Connected(from_pulse)
State::Connected
}
Some(Message::SetSources(sources)) => {
_ = output
.send(Event::MessageReceived(Message::SetSources(sources)))
.await;
State::Connected(from_pulse)
State::Connected
}
Some(Message::SetDefaultSink(sink)) => {
_ = output
.send(Event::MessageReceived(Message::SetDefaultSink(sink)))
.await;
State::Connected(from_pulse)
State::Connected
}
Some(Message::SetDefaultSource(source)) => {
_ = output
.send(Event::MessageReceived(Message::SetDefaultSource(source)))
.await;
State::Connected(from_pulse)
State::Connected
}
Some(Message::Disconnected) => {
_ = output.send(Event::Disconnected).await;
State::Connecting(from_pulse)
State::Connecting
}
None => {
_ = output.send(Event::Disconnected).await;
State::Connecting(from_pulse)
State::Connecting
}
_ => State::Connected(from_pulse),
_ => State::Connected,
}
}
}
@ -109,9 +130,8 @@ async fn start_listening(
// #[derive(Debug)]
enum State {
Init,
Connecting(tokio::sync::mpsc::Receiver<Message>),
Connected(tokio::sync::mpsc::Receiver<Message>),
Connecting,
Connected,
}
#[derive(Debug, Clone)]
@ -123,7 +143,7 @@ pub enum Event {
}
#[derive(Debug, Clone)]
pub struct Connection(tokio::sync::mpsc::Sender<Message>);
pub struct Connection(mpsc::Sender<Message>);
impl Connection {
pub fn send(&mut self, message: Message) {
@ -160,10 +180,7 @@ impl PulseHandle {
pub fn new() -> Self {
let (to_pulse, mut to_pulse_recv) = tokio::sync::mpsc::channel(10);
let (from_pulse_send, from_pulse) = tokio::sync::mpsc::channel(10);
// get initial connection status
to_pulse
.try_send(Message::UpdateConnection)
.expect("Failed to send initial connection update message");
// this thread should complete by pushing a completed message,
// or fail message. This should never complete/fail without pushing
// a message. This lets the iced subscription go to sleep while init
@ -197,7 +214,6 @@ impl PulseHandle {
.await
{
tracing::error!("ERROR! {}", err);
break;
}
}
Err(_) => Self::send_disconnected(&from_pulse_send).await,
@ -215,7 +231,6 @@ impl PulseHandle {
.await
{
tracing::error!("ERROR! {}", err);
break;
}
}
Err(e) => {
@ -235,7 +250,6 @@ impl PulseHandle {
from_pulse_send.send(Message::SetSinks(sinks)).await
{
tracing::error!("ERROR! {}", err);
break;
}
}
Err(_) => Self::send_disconnected(&from_pulse_send).await,
@ -252,7 +266,6 @@ impl PulseHandle {
from_pulse_send.send(Message::SetSources(sinks)).await
{
tracing::error!("ERROR! {}", err);
break;
}
}
Err(_) => Self::send_disconnected(&from_pulse_send).await,
@ -278,13 +291,13 @@ impl PulseHandle {
server.is_some()
);
if let Some(mut cur_server) = server.take() {
tracing::trace!("getting server info...");
if cur_server.get_server_info().is_err() {
tracing::warn!("got error, server must be disconnected...");
Self::send_disconnected(&from_pulse_send).await;
} else {
tracing::trace!("got server info, still connected...");
tracing::info!("got server info, still connected...");
server = Some(cur_server);
Self::send_connected(&from_pulse_send).await;
}
} else {
match PulseServer::connect().and_then(|server| server.init()) {
@ -298,6 +311,7 @@ impl PulseHandle {
"Failed to connect to server: {:?}",
err
);
Self::send_disconnected(&from_pulse_send).await;
}
}
}