From f08d80a891d2b9ee465d25246c79b892c8df2789 Mon Sep 17 00:00:00 2001 From: Ashley Wulber Date: Wed, 5 Mar 2025 11:43:42 -0500 Subject: [PATCH] fix(bluetooth): watch devices for changes after toggled back on --- cosmic-applet-bluetooth/src/bluetooth.rs | 102 +++++++++++++++-------- 1 file changed, 68 insertions(+), 34 deletions(-) diff --git a/cosmic-applet-bluetooth/src/bluetooth.rs b/cosmic-applet-bluetooth/src/bluetooth.rs index 01ce9dc7..7ca1ec3c 100644 --- a/cosmic-applet-bluetooth/src/bluetooth.rs +++ b/cosmic-applet-bluetooth/src/bluetooth.rs @@ -55,7 +55,7 @@ pub fn bluetooth_subscription( Subscription::run_with_id( id, stream::channel(50, move |mut output| async move { - let mut state = State::Ready; + let mut state = State::Ready(0); loop { state = start_listening(state, &mut output).await; @@ -65,7 +65,7 @@ pub fn bluetooth_subscription( } pub enum State { - Ready, + Ready(u32), Waiting { session_state: BluerSessionState }, Finished, } @@ -75,20 +75,24 @@ async fn start_listening( output: &mut futures::channel::mpsc::Sender, ) -> State { match state { - State::Ready => { + State::Ready(retry_count) => { let session = match Session::new().await { Ok(s) => s, Err(_) => { - _ = output.send(BluerEvent::Finished).await; - return State::Finished; + _ = tokio::time::sleep(Duration::from_millis( + 2_u64.saturating_pow(retry_count), + )); + return State::Ready(retry_count.saturating_add(1)); } }; let session_state = match BluerSessionState::new(session).await { Ok(s) => s, Err(_) => { - _ = output.send(BluerEvent::Finished).await; - return State::Finished; + _ = tokio::time::sleep(Duration::from_millis( + 2_u64.saturating_pow(retry_count), + )); + return State::Ready(retry_count.saturating_add(1)); } }; @@ -333,6 +337,8 @@ pub struct BluerSessionState { pub adapter: Adapter, pub rx: Option>, pub req_tx: Sender, + wake_up_discover_tx: Sender<()>, + wake_up_discover_rx: Option>, tx: Sender, active_requests: Arc>>>>, } @@ -517,13 +523,15 @@ impl BluerSessionState { _non_exhaustive: (), }; let _agent_handle = session.register_agent(_agent).await?; - - let self_ = Self { + let (wake_up_discover_tx, wake_up_discover_rx) = channel(10); + let mut self_ = Self { _agent_handle, _session: session, adapter, rx: Some(rx), req_tx, + wake_up_discover_rx: Some(wake_up_discover_rx), + wake_up_discover_tx, tx, active_requests: Arc::new(Mutex::new(HashMap::new())), }; @@ -538,6 +546,7 @@ impl BluerSessionState { let tx = self.tx.clone(); let req_tx = self.req_tx.clone(); let adapter_clone = self.adapter.clone(); + let wake_up_discover_tx = self.wake_up_discover_tx.clone(); let _handle: JoinHandle> = spawn(async move { let mut status = adapter_clone.is_powered().await.unwrap_or_default(); loop { @@ -556,34 +565,57 @@ impl BluerSessionState { } } } - + _ = wake_up_discover_tx.send(()).await; let _ = tx.send(BluerSessionEvent::ChangesProcessed(state)).await; } } }); } - // Note: For some reason, this doesn't actually seem to work so well. it seems unreliable... - pub(crate) fn process_changes(&self) { + pub(crate) fn process_changes(&mut self) { let tx = self.tx.clone(); let req_tx = self.req_tx.clone(); + let Some(mut wake_up) = self.wake_up_discover_rx.take() else { + tracing::error!("Failed to take wake up channel"); + return; + }; let adapter_clone = self.adapter.clone(); - let _monitor_devices: tokio::task::JoinHandle> = - spawn(async move { - let mut change_stream = adapter_clone.discover_devices_with_changes().await?; - let mut changed = false; - let mut milli_timeout = 10; - let mut devices: Vec = Vec::new(); - 'outer: loop { - while let Ok(event) = - timeout(Duration::from_millis(milli_timeout), change_stream.next()).await - { - if event.is_none() { - break 'outer; + let _monitor_devices: tokio::task::JoinHandle> = spawn( + async move { + loop { + let mut milli_timeout = 10; + + let mut change_stream = { + let mut res = adapter_clone.discover_devices_with_changes().await; + while res.is_err() { + _ = tokio::time::timeout( + Duration::from_millis(milli_timeout), + wake_up.recv(), + ) + .await; + res = adapter_clone.discover_devices_with_changes().await; + milli_timeout = (milli_timeout.saturating_mul(2)); } - changed = true; - } - if changed { + milli_timeout = 10; + res.unwrap() + }; + let mut devices: Vec = Vec::new(); + 'outer: loop { + tokio::select! { + change = timeout(Duration::from_millis(milli_timeout), change_stream.next()) => { + if let Ok(e) = change { + if e.is_none() { + break 'outer; + } + } else { + milli_timeout = (milli_timeout.saturating_mul(2)); + continue; + } + } + _wake = wake_up.recv() => { + } + }; + let mut new_devices = build_device_list(&adapter_clone).await; for d in new_devices .iter() @@ -593,7 +625,6 @@ impl BluerSessionState { } devices = mem::take(&mut new_devices); - changed = false; let _ = tx .send(BluerSessionEvent::ChangesProcessed(BluerState { devices: build_device_list(&adapter_clone).await, @@ -605,20 +636,18 @@ impl BluerSessionState { .await; // reset timeout milli_timeout = 10; - } else { - // slow down if no changes occur - milli_timeout = (milli_timeout * 2).max(5120); } + let _ = tx.send(BluerSessionEvent::ChangeStreamEnded).await; } - let _ = tx.send(BluerSessionEvent::ChangeStreamEnded).await; - Ok(()) - }); + }, + ); } pub(crate) fn process_requests(&self, request_rx: Receiver) { let active_requests = self.active_requests.clone(); let adapter = self.adapter.clone(); let tx = self.tx.clone(); + let wake_up_tx = self.wake_up_discover_tx.clone(); let _handle: JoinHandle> = spawn(async move { let mut request_rx = request_rx; @@ -629,11 +658,16 @@ impl BluerSessionState { let active_requests_clone = active_requests.clone(); let tx_clone = tx.clone(); let adapter_clone = adapter.clone(); + let wake_up_tx = wake_up_tx.clone(); + let handle = spawn(async move { let mut err_msg = None; match &req_clone { BluerRequest::SetBluetoothEnabled(enabled) => { let res = adapter_clone.set_powered(*enabled).await; + if *enabled { + _ = wake_up_tx.send(()).await; + } if let Err(e) = res { err_msg = Some(e.to_string()); }