From b3515bb9ba9696d9de33a0c5d49233197e594b75 Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Mon, 14 Apr 2025 17:11:55 +0200 Subject: [PATCH] improv(bluetooth): potential fixes for excess CPU usage --- cosmic-applet-bluetooth/src/bluetooth.rs | 172 ++++++++++------------- 1 file changed, 77 insertions(+), 95 deletions(-) diff --git a/cosmic-applet-bluetooth/src/bluetooth.rs b/cosmic-applet-bluetooth/src/bluetooth.rs index fe9087f8..bb28b9a7 100644 --- a/cosmic-applet-bluetooth/src/bluetooth.rs +++ b/cosmic-applet-bluetooth/src/bluetooth.rs @@ -18,7 +18,6 @@ use cosmic::{ iced_futures::stream, }; -use futures::executor::block_on; use rand::Rng; use tokio::{ spawn, @@ -56,51 +55,23 @@ pub fn bluetooth_subscription( Subscription::run_with_id( id, stream::channel(50, move |mut output| async move { - let mut state = State::Ready(0); + let mut retry_count = 0u32; - loop { - state = start_listening(state, &mut output).await; - } - }), - ) -} - -pub enum State { - Ready(u32), - Waiting { session_state: BluerSessionState }, - Finished, -} - -async fn start_listening( - state: State, - output: &mut futures::channel::mpsc::Sender, -) -> State { - match state { - State::Ready(retry_count) => { - let session = match Session::new().await { - Ok(s) => s, - Err(_) => { - _ = tokio::time::sleep(Duration::from_millis( - 2_u64.saturating_pow(retry_count), - )) - .await; - - return State::Ready(retry_count.saturating_add(1)); + // Initialize connection. + let mut session_state = loop { + if let Ok(session) = Session::new().await { + if let Ok(state) = BluerSessionState::new(session).await { + break state; + } } - }; - let session_state = match BluerSessionState::new(session).await { - Ok(s) => s, - Err(_) => { - _ = tokio::time::sleep(Duration::from_millis( - 2_u64.saturating_pow(retry_count), - )) + retry_count = retry_count.saturating_add(1); + _ = tokio::time::sleep(Duration::from_millis(2_u64.saturating_pow(retry_count))) .await; - return State::Ready(retry_count.saturating_add(1)); - } }; let state = session_state.bluer_state().await; + // reconnect to paired and trusted devices if state.bluetooth_enabled { for d in &state.devices { @@ -118,49 +89,56 @@ async fn start_listening( state: state.clone(), }) .await; - State::Waiting { session_state } - } - State::Waiting { mut session_state } => { - let mut session_rx = match session_state.rx.take() { - Some(rx) => rx, - None => { - _ = output.send(BluerEvent::Finished).await; - return State::Finished; + + let mut event_handler = async |event| match event { + BluerSessionEvent::ChangesProcessed(state) => { + _ = output.send(BluerEvent::DevicesChanged { state }).await; } + + BluerSessionEvent::RequestResponse { + req, + state, + err_msg, + } => { + _ = output + .send(BluerEvent::RequestResponse { + req, + state, + err_msg, + }) + .await; + } + BluerSessionEvent::AgentEvent(e) => { + _ = output.send(BluerEvent::AgentEvent(e)).await; + } + + _ => {} }; - if let Some(event) = session_rx.recv().await { - match event { - BluerSessionEvent::ChangesProcessed(state) => { - _ = output.send(BluerEvent::DevicesChanged { state }).await; + let mut interval = tokio::time::interval(Duration::from_secs(1)); + + loop { + let Some(mut session_rx) = session_state.rx.take() else { + break; + }; + + if let Some(event) = session_rx.recv().await { + event_handler(event).await; + while let Some(event) = session_rx.try_recv().ok() { + event_handler(event).await; } - BluerSessionEvent::RequestResponse { - req, - state, - err_msg, - } => { - _ = output - .send(BluerEvent::RequestResponse { - req, - state, - err_msg, - }) - .await; - } - BluerSessionEvent::AgentEvent(e) => { - _ = output.send(BluerEvent::AgentEvent(e)).await; - } - _ => {} - } - } else { - _ = output.send(BluerEvent::Finished).await; - return State::Finished; - }; - session_state.rx = Some(session_rx); - State::Waiting { session_state } - } - State::Finished => iced::futures::future::pending().await, - } + } else { + break; + }; + + session_state.rx = Some(session_rx); + interval.tick().await; + } + + _ = output.send(BluerEvent::Finished).await; + futures::future::pending().await + }), + ) } #[derive(Debug, Clone, Hash, Eq, PartialEq)] @@ -553,8 +531,9 @@ impl BluerSessionState { let wake_up_discover_tx = self.wake_up_discover_tx.clone(); let _handle: JoinHandle> = spawn(async move { let mut status = adapter_clone.is_powered().await.unwrap_or_default(); + let mut interval = tokio::time::interval(Duration::from_secs(3)); loop { - _ = tokio::time::sleep(Duration::from_secs(5)).await; + interval.tick().await; let new_status = adapter_clone.is_powered().await.unwrap_or_default(); if new_status != status { status = new_status; @@ -586,23 +565,25 @@ impl BluerSessionState { let adapter_clone = self.adapter.clone(); let _monitor_devices: tokio::task::JoinHandle> = spawn( async move { - loop { - let mut milli_timeout = 10; + let mut milli_timeout = 10; + let mut change_stream = { + let mut res = adapter_clone.discover_devices_with_changes().await; + while res.is_err() { + _ = tokio::time::timeout( + Duration::from_millis(milli_timeout), + wake_up.recv(), + ) + .await; + res = adapter_clone.discover_devices_with_changes().await; + milli_timeout = milli_timeout.saturating_mul(5); + } + milli_timeout = 10; + res.unwrap() + }; - let mut change_stream = { - let mut res = adapter_clone.discover_devices_with_changes().await; - while res.is_err() { - _ = tokio::time::timeout( - Duration::from_millis(milli_timeout), - wake_up.recv(), - ) - .await; - res = adapter_clone.discover_devices_with_changes().await; - milli_timeout = milli_timeout.saturating_mul(2); - } - milli_timeout = 10; - res.unwrap() - }; + let mut interval = tokio::time::interval(Duration::from_secs(1)); + + loop { let mut devices: Vec = Vec::new(); 'outer: loop { tokio::select! { @@ -642,6 +623,7 @@ impl BluerSessionState { milli_timeout = 10; } let _ = tx.send(BluerSessionEvent::ChangeStreamEnded).await; + interval.tick().await; } }, );