refactor(bluetooth): use channel subscription

This commit is contained in:
Ashley Wulber 2023-07-11 15:16:44 -04:00 committed by Jeremy Soller
parent 8d9bb40b1b
commit 7249b6af68
2 changed files with 54 additions and 46 deletions

View file

@ -523,7 +523,7 @@ impl Application for CosmicBluetoothApplet {
fn subscription(&self) -> Subscription<Message> { fn subscription(&self) -> Subscription<Message> {
Subscription::batch(vec![ Subscription::batch(vec![
self.applet_helper.theme_subscription(0).map(Message::Theme), self.applet_helper.theme_subscription(0).map(Message::Theme),
bluetooth_subscription(0).map(|(_, e)| Message::BluetoothEvent(e)), bluetooth_subscription(0).map(Message::BluetoothEvent),
]) ])
} }

View file

@ -4,9 +4,12 @@ use bluer::{
agent::{Agent, AgentHandle}, agent::{Agent, AgentHandle},
Adapter, Address, DeviceProperty, Session, Uuid, Adapter, Address, DeviceProperty, Session, Uuid,
}; };
use cosmic::iced::{self, subscription}; use cosmic::iced::{
self,
futures::{SinkExt, StreamExt},
subscription,
};
use futures::StreamExt;
use rand::Rng; use rand::Rng;
use tokio::{ use tokio::{
spawn, spawn,
@ -20,8 +23,14 @@ use tokio::{
pub fn bluetooth_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>( pub fn bluetooth_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>(
id: I, id: I,
) -> iced::Subscription<(I, BluerEvent)> { ) -> iced::Subscription<BluerEvent> {
subscription::unfold(id, State::Ready, move |state| start_listening_loop(id, state)) subscription::channel(id, 50, move |mut output| async move {
let mut state = State::Ready;
loop {
state = start_listening(state, &mut output).await;
}
})
} }
pub enum State { pub enum State {
@ -30,78 +39,77 @@ pub enum State {
Finished, Finished,
} }
async fn start_listening_loop<I: Copy + Debug>( async fn start_listening(
id: I, state: State,
mut state: State, output: &mut futures::channel::mpsc::Sender<BluerEvent>,
) -> ((I, BluerEvent), State) { ) -> State {
loop {
let (update, new_state) = start_listening(id, state).await;
state = new_state;
if let Some(update) = update {
return (update, state);
}
}
}
async fn start_listening<I: Copy + Debug>(id: I, state: State) -> (Option<(I, BluerEvent)>, State) {
match state { match state {
State::Ready => { State::Ready => {
let session = match Session::new().await { let session = match Session::new().await {
Ok(s) => s, Ok(s) => s,
Err(_) => return (Some((id, BluerEvent::Finished)), State::Finished), Err(_) => {
_ = output.send(BluerEvent::Finished).await;
return State::Finished;
}
}; };
let (tx, rx) = channel(100); let (tx, rx) = channel(100);
let session_state = match BluerSessionState::new(session, rx).await { let session_state = match BluerSessionState::new(session, rx).await {
Ok(s) => s, Ok(s) => s,
Err(_) => return (Some((id, BluerEvent::Finished)), State::Finished), Err(_) => {
_ = output.send(BluerEvent::Finished).await;
return State::Finished;
}
}; };
let state = session_state.bluer_state().await; let state = session_state.bluer_state().await;
return (
Some(( _ = output
id, .send(BluerEvent::Init {
BluerEvent::Init { sender: tx,
sender: tx, state: state.clone(),
state: state.clone(), })
}, .await;
)), State::Waiting { session_state }
State::Waiting { session_state },
);
} }
State::Waiting { mut session_state } => { State::Waiting { mut session_state } => {
let mut session_rx = match session_state.rx.take() { let mut session_rx = match session_state.rx.take() {
Some(rx) => rx, Some(rx) => rx,
None => { None => {
return (Some((id, BluerEvent::Finished)), State::Finished); // fail if we can't get the rx _ = output.send(BluerEvent::Finished).await;
return State::Finished;
} }
}; };
let event = if let Some(event) = session_rx.recv().await { if let Some(event) = session_rx.recv().await {
match event { match event {
BluerSessionEvent::ChangesProcessed(state) => { BluerSessionEvent::ChangesProcessed(state) => {
Some((id, BluerEvent::DevicesChanged { state })) _ = output.send(BluerEvent::DevicesChanged { state }).await;
} }
BluerSessionEvent::RequestResponse { BluerSessionEvent::RequestResponse {
req, req,
state, state,
err_msg, err_msg,
} => Some(( } => {
id, _ = output
BluerEvent::RequestResponse { .send(BluerEvent::RequestResponse {
req, req,
state, state,
err_msg, err_msg,
}, })
)), .await;
BluerSessionEvent::AgentEvent(e) => Some((id, BluerEvent::AgentEvent(e))), }
_ => None, BluerSessionEvent::AgentEvent(e) => {
_ = output.send(BluerEvent::AgentEvent(e)).await;
}
_ => {}
} }
} else { } else {
return (Some((id, BluerEvent::Finished)), State::Finished); _ = output.send(BluerEvent::Finished).await;
return State::Finished;
}; };
session_state.rx = Some(session_rx); session_state.rx = Some(session_rx);
(event, State::Waiting { session_state }) State::Waiting { session_state }
} }
State::Finished => iced::futures::future::pending().await, State::Finished => iced::futures::future::pending().await,
} }