feat: update pulse connection every time the audio applet popup is opened
This commit is contained in:
parent
62ec66ab4e
commit
a76353981f
5 changed files with 262 additions and 159 deletions
|
|
@ -20,40 +20,61 @@ pub fn connect() -> Subscription<Event> {
|
|||
|
||||
subscription::unfold(
|
||||
std::any::TypeId::of::<Connect>(),
|
||||
State::Disconnected,
|
||||
State::Init,
|
||||
|state| async move {
|
||||
match state {
|
||||
// if app just started, or we are re-trying match here. Returns coenncting
|
||||
// message. We should store this in our app's state, but it isn't safe to
|
||||
// send messages until we get a conencted message. Which will be received
|
||||
// by the `State::Connecting` message below
|
||||
State::Disconnected => match PulseHandle::create() {
|
||||
Ok(pulse_handle) => (None, State::Connecting(pulse_handle)),
|
||||
Err(_) => (Some(Event::Disconnected), State::Disconnected),
|
||||
},
|
||||
// Just a buffer to make sure the GUI doesn't send messages until pulse is ready
|
||||
State::Init => {
|
||||
let PulseHandle {
|
||||
to_pulse,
|
||||
from_pulse,
|
||||
} = PulseHandle::new();
|
||||
(
|
||||
Some(Event::Init(Connection(to_pulse))),
|
||||
State::Connecting(from_pulse),
|
||||
)
|
||||
}
|
||||
// Waiting for Connection to succeed
|
||||
// The GUI doesn't have to monitor this state, as it is never sent to the GUI
|
||||
State::Connecting(mut pulse_handle) => {
|
||||
match pulse_handle.from_pulse.recv().await {
|
||||
Some(Message::Connected) => {(
|
||||
Some(Event::Connected(Connection(pulse_handle.to_pulse))),
|
||||
State::Connected(pulse_handle.from_pulse),
|
||||
)}
|
||||
Some(Message::Disconnected) => (Some(Event::Disconnected), State::Disconnected),
|
||||
_ => panic!("Pulse subscription logic is faulty as the PulseServer shouldn't send unique messages until connection is successful")
|
||||
}
|
||||
State::Connecting(mut from_pulse) => match from_pulse.recv().await {
|
||||
Some(Message::Connected) => {
|
||||
(Some(Event::Connected), State::Connected(from_pulse))
|
||||
}
|
||||
Some(Message::Disconnected) => {
|
||||
(Some(Event::Disconnected), State::Connecting(from_pulse))
|
||||
}
|
||||
Some(m) => {
|
||||
log::error!("Unexpected message: {:?}", m);
|
||||
(None, State::Connecting(from_pulse))
|
||||
}
|
||||
None => {
|
||||
panic!("Pulse Sender dropped, something has gone wrong!");
|
||||
}
|
||||
},
|
||||
State::Connected(mut from_pulse) => {
|
||||
// This is where we match messages from the pulse server to pass to the gui
|
||||
match from_pulse.recv().await {
|
||||
Some(Message::SetSinks(sinks)) => (Some(Event::MessageReceived(Message::SetSinks(sinks))), State::Connected(from_pulse)),
|
||||
Some(Message::SetSources(sources)) => (Some(Event::MessageReceived(Message::SetSources(sources))), State::Connected(from_pulse)),
|
||||
Some(Message::SetDefaultSink(sink)) => (Some(Event::MessageReceived(Message::SetDefaultSink(sink))), State::Connected(from_pulse)),
|
||||
Some(Message::SetDefaultSource(source)) => (Some(Event::MessageReceived(Message::SetDefaultSource(source))), State::Connected(from_pulse)),
|
||||
Some(Message::Disconnected) => (Some(Event::Disconnected), State::Disconnected),
|
||||
None => (Some(Event::Disconnected), State::Disconnected),
|
||||
_ => (None, State::Connected(from_pulse)),
|
||||
match from_pulse.recv().await {
|
||||
Some(Message::SetSinks(sinks)) => (
|
||||
Some(Event::MessageReceived(Message::SetSinks(sinks))),
|
||||
State::Connected(from_pulse),
|
||||
),
|
||||
Some(Message::SetSources(sources)) => (
|
||||
Some(Event::MessageReceived(Message::SetSources(sources))),
|
||||
State::Connected(from_pulse),
|
||||
),
|
||||
Some(Message::SetDefaultSink(sink)) => (
|
||||
Some(Event::MessageReceived(Message::SetDefaultSink(sink))),
|
||||
State::Connected(from_pulse),
|
||||
),
|
||||
Some(Message::SetDefaultSource(source)) => (
|
||||
Some(Event::MessageReceived(Message::SetDefaultSource(source))),
|
||||
State::Connected(from_pulse),
|
||||
),
|
||||
Some(Message::Disconnected) => {
|
||||
(Some(Event::Disconnected), State::Connecting(from_pulse))
|
||||
}
|
||||
None => (Some(Event::Disconnected), State::Connecting(from_pulse)),
|
||||
_ => (None, State::Connected(from_pulse)),
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
@ -62,14 +83,15 @@ pub fn connect() -> Subscription<Event> {
|
|||
|
||||
// #[derive(Debug)]
|
||||
enum State {
|
||||
Disconnected,
|
||||
Connecting(PulseHandle),
|
||||
Init,
|
||||
Connecting(tokio::sync::mpsc::Receiver<Message>),
|
||||
Connected(tokio::sync::mpsc::Receiver<Message>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Event {
|
||||
Connected(Connection),
|
||||
Init(Connection),
|
||||
Connected,
|
||||
Disconnected,
|
||||
MessageReceived(Message),
|
||||
}
|
||||
|
|
@ -91,6 +113,7 @@ pub enum Message {
|
|||
Disconnected,
|
||||
GetSinks,
|
||||
GetSources,
|
||||
UpdateConnection,
|
||||
SetSinks(Vec<DeviceInfo>),
|
||||
SetSources(Vec<DeviceInfo>),
|
||||
GetDefaultSink,
|
||||
|
|
@ -108,34 +131,40 @@ struct PulseHandle {
|
|||
|
||||
impl PulseHandle {
|
||||
// Create pulse server thread, and bidirectional comms
|
||||
pub fn create() -> Result<PulseHandle, PAErr> {
|
||||
pub fn new() -> PulseHandle {
|
||||
let (to_pulse, mut to_pulse_recv) = tokio::sync::mpsc::channel(10);
|
||||
let (mut from_pulse_send, from_pulse) = tokio::sync::mpsc::channel(10);
|
||||
//let from_pulse = Arc::new(Mutex::new(vec![]));
|
||||
//let mut from_pulse2 = from_pulse.clone();
|
||||
// get initial connection status
|
||||
to_pulse
|
||||
.try_send(Message::UpdateConnection)
|
||||
.expect("Failed to send initial connection update message");
|
||||
// this thread should complete by pushing a completed message,
|
||||
// or fail message. This should never complete/fail without pushing
|
||||
// a message. This lets the iced subscription go to sleep while init
|
||||
// finishes. TLDR: be very careful with error handling
|
||||
thread::spawn(move || {
|
||||
if let Ok(mut server) = PulseServer::connect().and_then(|server| server.init()) {
|
||||
PulseHandle::blocking_send_connected(&mut from_pulse_send);
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
// take `PulseServer` and handle reciver into async context
|
||||
// to listen for messages that need to be passed to the pulseserver
|
||||
// this lets us put the thread to sleep, but keep hold a single
|
||||
// thread, because pulse audio's API is not multithreaded... at all
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
// take `PulseServer` and handle reciver into async context
|
||||
// to listen for messages that need to be passed to the pulseserver
|
||||
// this lets us put the thread to sleep, but keep hold a single
|
||||
// thread, because pulse audio's API is not multithreaded... at all
|
||||
rt.block_on(async {
|
||||
let mut server: Option<PulseServer> = None;
|
||||
|
||||
rt.block_on(async {
|
||||
loop {
|
||||
// This is where the we match messages from the GUI to pass to the pulse server
|
||||
if let Some(msg) = to_pulse_recv.recv().await {
|
||||
match msg {
|
||||
Message::GetDefaultSink => match server.get_default_sink() {
|
||||
loop {
|
||||
// This is where the we match messages from the GUI to pass to the pulse server
|
||||
if let Some(msg) = to_pulse_recv.recv().await {
|
||||
match msg {
|
||||
Message::GetDefaultSink => {
|
||||
let server = match server.as_mut() {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
match server.get_default_sink() {
|
||||
Ok(sink) => from_pulse_send
|
||||
.send(Message::SetDefaultSink(sink))
|
||||
.await
|
||||
|
|
@ -143,18 +172,30 @@ impl PulseHandle {
|
|||
Err(_) => {
|
||||
PulseHandle::send_disconnected(&mut from_pulse_send).await
|
||||
}
|
||||
},
|
||||
Message::GetDefaultSource => match server.get_default_source() {
|
||||
}
|
||||
}
|
||||
Message::GetDefaultSource => {
|
||||
let server = match server.as_mut() {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
match server.get_default_source() {
|
||||
Ok(source) => from_pulse_send
|
||||
.send(Message::SetDefaultSource(source))
|
||||
.await
|
||||
.unwrap(),
|
||||
Err(e) => {
|
||||
println!("ERROR! {:?}", e);
|
||||
log::error!("ERROR! {:?}", e);
|
||||
PulseHandle::send_disconnected(&mut from_pulse_send).await;
|
||||
}
|
||||
},
|
||||
Message::GetSinks => match server.get_sinks() {
|
||||
}
|
||||
}
|
||||
Message::GetSinks => {
|
||||
let server = match server.as_mut() {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
match server.get_sinks() {
|
||||
Ok(sinks) => from_pulse_send
|
||||
.send(Message::SetSinks(sinks))
|
||||
.await
|
||||
|
|
@ -162,8 +203,14 @@ impl PulseHandle {
|
|||
Err(_) => {
|
||||
PulseHandle::send_disconnected(&mut from_pulse_send).await
|
||||
}
|
||||
},
|
||||
Message::GetSources => match server.get_sources() {
|
||||
}
|
||||
}
|
||||
Message::GetSources => {
|
||||
let server = match server.as_mut() {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
match server.get_sources() {
|
||||
Ok(sinks) => from_pulse_send
|
||||
.send(Message::SetSources(sinks))
|
||||
.await
|
||||
|
|
@ -171,36 +218,51 @@ impl PulseHandle {
|
|||
Err(_) => {
|
||||
PulseHandle::send_disconnected(&mut from_pulse_send).await
|
||||
}
|
||||
},
|
||||
Message::SetSinkVolumeByName(name, channel_volumes) => {
|
||||
server.set_sink_volume_by_name(&name, &channel_volumes)
|
||||
}
|
||||
Message::SetSourceVolumeByName(name, channel_volumes) => {
|
||||
server.set_source_volume_by_name(&name, &channel_volumes)
|
||||
}
|
||||
_ => {
|
||||
println!("message doesn't match")
|
||||
}
|
||||
Message::SetSinkVolumeByName(name, channel_volumes) => {
|
||||
let server = match server.as_mut() {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
server.set_sink_volume_by_name(&name, &channel_volumes)
|
||||
}
|
||||
Message::SetSourceVolumeByName(name, channel_volumes) => {
|
||||
let server = match server.as_mut() {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
server.set_source_volume_by_name(&name, &channel_volumes)
|
||||
}
|
||||
Message::UpdateConnection => {
|
||||
log::trace!("Updating Connection {:?}", server.is_some());
|
||||
if let Some(mut cur_server) = server.take() {
|
||||
log::trace!("getting server info...");
|
||||
if let Err(_) = cur_server.get_server_info() {
|
||||
PulseHandle::send_disconnected(&mut from_pulse_send).await;
|
||||
} else {
|
||||
server = Some(cur_server);
|
||||
}
|
||||
} else if let Ok(new_server) =
|
||||
PulseServer::connect().and_then(|server| server.init())
|
||||
{
|
||||
log::trace!("got new server...");
|
||||
PulseHandle::send_connected(&mut from_pulse_send).await;
|
||||
server = Some(new_server);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
log::warn!("message doesn't match")
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
// Always report that server is disconnected
|
||||
PulseHandle::blocking_send_disconnected(&mut from_pulse_send);
|
||||
}
|
||||
});
|
||||
});
|
||||
Ok(PulseHandle {
|
||||
PulseHandle {
|
||||
to_pulse,
|
||||
from_pulse,
|
||||
})
|
||||
}
|
||||
|
||||
fn blocking_send_disconnected(sender: &mut tokio::sync::mpsc::Sender<Message>) {
|
||||
sender.blocking_send(Message::Disconnected).unwrap()
|
||||
}
|
||||
|
||||
fn blocking_send_connected(sender: &mut tokio::sync::mpsc::Sender<Message>) {
|
||||
sender.blocking_send(Message::Connected).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_disconnected(sender: &mut tokio::sync::mpsc::Sender<Message>) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue