improv(bluetooth): potential fixes for excess CPU usage

This commit is contained in:
Michael Aaron Murphy 2025-04-14 17:11:55 +02:00 committed by Michael Murphy
parent a49a040093
commit b3515bb9ba

View file

@ -18,7 +18,6 @@ use cosmic::{
iced_futures::stream,
};
use futures::executor::block_on;
use rand::Rng;
use tokio::{
spawn,
@ -56,51 +55,23 @@ pub fn bluetooth_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>(
Subscription::run_with_id(
id,
stream::channel(50, move |mut output| async move {
let mut state = State::Ready(0);
let mut retry_count = 0u32;
loop {
state = start_listening(state, &mut output).await;
}
}),
)
}
pub enum State {
Ready(u32),
Waiting { session_state: BluerSessionState },
Finished,
}
async fn start_listening(
state: State,
output: &mut futures::channel::mpsc::Sender<BluerEvent>,
) -> State {
match state {
State::Ready(retry_count) => {
let session = match Session::new().await {
Ok(s) => s,
Err(_) => {
_ = tokio::time::sleep(Duration::from_millis(
2_u64.saturating_pow(retry_count),
))
.await;
return State::Ready(retry_count.saturating_add(1));
// Initialize connection.
let mut session_state = loop {
if let Ok(session) = Session::new().await {
if let Ok(state) = BluerSessionState::new(session).await {
break state;
}
}
};
let session_state = match BluerSessionState::new(session).await {
Ok(s) => s,
Err(_) => {
_ = tokio::time::sleep(Duration::from_millis(
2_u64.saturating_pow(retry_count),
))
retry_count = retry_count.saturating_add(1);
_ = tokio::time::sleep(Duration::from_millis(2_u64.saturating_pow(retry_count)))
.await;
return State::Ready(retry_count.saturating_add(1));
}
};
let state = session_state.bluer_state().await;
// reconnect to paired and trusted devices
if state.bluetooth_enabled {
for d in &state.devices {
@ -118,49 +89,56 @@ async fn start_listening(
state: state.clone(),
})
.await;
State::Waiting { session_state }
}
State::Waiting { mut session_state } => {
let mut session_rx = match session_state.rx.take() {
Some(rx) => rx,
None => {
_ = output.send(BluerEvent::Finished).await;
return State::Finished;
let mut event_handler = async |event| match event {
BluerSessionEvent::ChangesProcessed(state) => {
_ = output.send(BluerEvent::DevicesChanged { state }).await;
}
BluerSessionEvent::RequestResponse {
req,
state,
err_msg,
} => {
_ = output
.send(BluerEvent::RequestResponse {
req,
state,
err_msg,
})
.await;
}
BluerSessionEvent::AgentEvent(e) => {
_ = output.send(BluerEvent::AgentEvent(e)).await;
}
_ => {}
};
if let Some(event) = session_rx.recv().await {
match event {
BluerSessionEvent::ChangesProcessed(state) => {
_ = output.send(BluerEvent::DevicesChanged { state }).await;
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
let Some(mut session_rx) = session_state.rx.take() else {
break;
};
if let Some(event) = session_rx.recv().await {
event_handler(event).await;
while let Some(event) = session_rx.try_recv().ok() {
event_handler(event).await;
}
BluerSessionEvent::RequestResponse {
req,
state,
err_msg,
} => {
_ = output
.send(BluerEvent::RequestResponse {
req,
state,
err_msg,
})
.await;
}
BluerSessionEvent::AgentEvent(e) => {
_ = output.send(BluerEvent::AgentEvent(e)).await;
}
_ => {}
}
} else {
_ = output.send(BluerEvent::Finished).await;
return State::Finished;
};
session_state.rx = Some(session_rx);
State::Waiting { session_state }
}
State::Finished => iced::futures::future::pending().await,
}
} else {
break;
};
session_state.rx = Some(session_rx);
interval.tick().await;
}
_ = output.send(BluerEvent::Finished).await;
futures::future::pending().await
}),
)
}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
@ -553,8 +531,9 @@ impl BluerSessionState {
let wake_up_discover_tx = self.wake_up_discover_tx.clone();
let _handle: JoinHandle<anyhow::Result<()>> = spawn(async move {
let mut status = adapter_clone.is_powered().await.unwrap_or_default();
let mut interval = tokio::time::interval(Duration::from_secs(3));
loop {
_ = tokio::time::sleep(Duration::from_secs(5)).await;
interval.tick().await;
let new_status = adapter_clone.is_powered().await.unwrap_or_default();
if new_status != status {
status = new_status;
@ -586,23 +565,25 @@ impl BluerSessionState {
let adapter_clone = self.adapter.clone();
let _monitor_devices: tokio::task::JoinHandle<Result<(), anyhow::Error>> = spawn(
async move {
loop {
let mut milli_timeout = 10;
let mut milli_timeout = 10;
let mut change_stream = {
let mut res = adapter_clone.discover_devices_with_changes().await;
while res.is_err() {
_ = tokio::time::timeout(
Duration::from_millis(milli_timeout),
wake_up.recv(),
)
.await;
res = adapter_clone.discover_devices_with_changes().await;
milli_timeout = milli_timeout.saturating_mul(5);
}
milli_timeout = 10;
res.unwrap()
};
let mut change_stream = {
let mut res = adapter_clone.discover_devices_with_changes().await;
while res.is_err() {
_ = tokio::time::timeout(
Duration::from_millis(milli_timeout),
wake_up.recv(),
)
.await;
res = adapter_clone.discover_devices_with_changes().await;
milli_timeout = milli_timeout.saturating_mul(2);
}
milli_timeout = 10;
res.unwrap()
};
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
let mut devices: Vec<BluerDevice> = Vec::new();
'outer: loop {
tokio::select! {
@ -642,6 +623,7 @@ impl BluerSessionState {
milli_timeout = 10;
}
let _ = tx.send(BluerSessionEvent::ChangeStreamEnded).await;
interval.tick().await;
}
},
);