fix(bluetooth): watch devices for changes after toggled back on

This commit is contained in:
Ashley Wulber 2025-03-05 11:43:42 -05:00 committed by Ashley Wulber
parent 406cdd3627
commit f08d80a891

View file

@ -55,7 +55,7 @@ pub fn bluetooth_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>(
Subscription::run_with_id( Subscription::run_with_id(
id, id,
stream::channel(50, move |mut output| async move { stream::channel(50, move |mut output| async move {
let mut state = State::Ready; let mut state = State::Ready(0);
loop { loop {
state = start_listening(state, &mut output).await; state = start_listening(state, &mut output).await;
@ -65,7 +65,7 @@ pub fn bluetooth_subscription<I: 'static + Hash + Copy + Send + Sync + Debug>(
} }
pub enum State { pub enum State {
Ready, Ready(u32),
Waiting { session_state: BluerSessionState }, Waiting { session_state: BluerSessionState },
Finished, Finished,
} }
@ -75,20 +75,24 @@ async fn start_listening(
output: &mut futures::channel::mpsc::Sender<BluerEvent>, output: &mut futures::channel::mpsc::Sender<BluerEvent>,
) -> State { ) -> State {
match state { match state {
State::Ready => { State::Ready(retry_count) => {
let session = match Session::new().await { let session = match Session::new().await {
Ok(s) => s, Ok(s) => s,
Err(_) => { Err(_) => {
_ = output.send(BluerEvent::Finished).await; _ = tokio::time::sleep(Duration::from_millis(
return State::Finished; 2_u64.saturating_pow(retry_count),
));
return State::Ready(retry_count.saturating_add(1));
} }
}; };
let session_state = match BluerSessionState::new(session).await { let session_state = match BluerSessionState::new(session).await {
Ok(s) => s, Ok(s) => s,
Err(_) => { Err(_) => {
_ = output.send(BluerEvent::Finished).await; _ = tokio::time::sleep(Duration::from_millis(
return State::Finished; 2_u64.saturating_pow(retry_count),
));
return State::Ready(retry_count.saturating_add(1));
} }
}; };
@ -333,6 +337,8 @@ pub struct BluerSessionState {
pub adapter: Adapter, pub adapter: Adapter,
pub rx: Option<Receiver<BluerSessionEvent>>, pub rx: Option<Receiver<BluerSessionEvent>>,
pub req_tx: Sender<BluerRequest>, pub req_tx: Sender<BluerRequest>,
wake_up_discover_tx: Sender<()>,
wake_up_discover_rx: Option<Receiver<()>>,
tx: Sender<BluerSessionEvent>, tx: Sender<BluerSessionEvent>,
active_requests: Arc<Mutex<HashMap<BluerRequest, JoinHandle<anyhow::Result<()>>>>>, active_requests: Arc<Mutex<HashMap<BluerRequest, JoinHandle<anyhow::Result<()>>>>>,
} }
@ -517,13 +523,15 @@ impl BluerSessionState {
_non_exhaustive: (), _non_exhaustive: (),
}; };
let _agent_handle = session.register_agent(_agent).await?; let _agent_handle = session.register_agent(_agent).await?;
let (wake_up_discover_tx, wake_up_discover_rx) = channel(10);
let self_ = Self { let mut self_ = Self {
_agent_handle, _agent_handle,
_session: session, _session: session,
adapter, adapter,
rx: Some(rx), rx: Some(rx),
req_tx, req_tx,
wake_up_discover_rx: Some(wake_up_discover_rx),
wake_up_discover_tx,
tx, tx,
active_requests: Arc::new(Mutex::new(HashMap::new())), active_requests: Arc::new(Mutex::new(HashMap::new())),
}; };
@ -538,6 +546,7 @@ impl BluerSessionState {
let tx = self.tx.clone(); let tx = self.tx.clone();
let req_tx = self.req_tx.clone(); let req_tx = self.req_tx.clone();
let adapter_clone = self.adapter.clone(); let adapter_clone = self.adapter.clone();
let wake_up_discover_tx = self.wake_up_discover_tx.clone();
let _handle: JoinHandle<anyhow::Result<()>> = spawn(async move { let _handle: JoinHandle<anyhow::Result<()>> = spawn(async move {
let mut status = adapter_clone.is_powered().await.unwrap_or_default(); let mut status = adapter_clone.is_powered().await.unwrap_or_default();
loop { loop {
@ -556,34 +565,57 @@ impl BluerSessionState {
} }
} }
} }
_ = wake_up_discover_tx.send(()).await;
let _ = tx.send(BluerSessionEvent::ChangesProcessed(state)).await; let _ = tx.send(BluerSessionEvent::ChangesProcessed(state)).await;
} }
} }
}); });
} }
// Note: For some reason, this doesn't actually seem to work so well. it seems unreliable... pub(crate) fn process_changes(&mut self) {
pub(crate) fn process_changes(&self) {
let tx = self.tx.clone(); let tx = self.tx.clone();
let req_tx = self.req_tx.clone(); let req_tx = self.req_tx.clone();
let Some(mut wake_up) = self.wake_up_discover_rx.take() else {
tracing::error!("Failed to take wake up channel");
return;
};
let adapter_clone = self.adapter.clone(); let adapter_clone = self.adapter.clone();
let _monitor_devices: tokio::task::JoinHandle<Result<(), anyhow::Error>> = let _monitor_devices: tokio::task::JoinHandle<Result<(), anyhow::Error>> = spawn(
spawn(async move { async move {
let mut change_stream = adapter_clone.discover_devices_with_changes().await?; loop {
let mut changed = false; let mut milli_timeout = 10;
let mut milli_timeout = 10;
let mut devices: Vec<BluerDevice> = Vec::new(); let mut change_stream = {
'outer: loop { let mut res = adapter_clone.discover_devices_with_changes().await;
while let Ok(event) = while res.is_err() {
timeout(Duration::from_millis(milli_timeout), change_stream.next()).await _ = tokio::time::timeout(
{ Duration::from_millis(milli_timeout),
if event.is_none() { wake_up.recv(),
break 'outer; )
.await;
res = adapter_clone.discover_devices_with_changes().await;
milli_timeout = (milli_timeout.saturating_mul(2));
} }
changed = true; milli_timeout = 10;
} res.unwrap()
if changed { };
let mut devices: Vec<BluerDevice> = Vec::new();
'outer: loop {
tokio::select! {
change = timeout(Duration::from_millis(milli_timeout), change_stream.next()) => {
if let Ok(e) = change {
if e.is_none() {
break 'outer;
}
} else {
milli_timeout = (milli_timeout.saturating_mul(2));
continue;
}
}
_wake = wake_up.recv() => {
}
};
let mut new_devices = build_device_list(&adapter_clone).await; let mut new_devices = build_device_list(&adapter_clone).await;
for d in new_devices for d in new_devices
.iter() .iter()
@ -593,7 +625,6 @@ impl BluerSessionState {
} }
devices = mem::take(&mut new_devices); devices = mem::take(&mut new_devices);
changed = false;
let _ = tx let _ = tx
.send(BluerSessionEvent::ChangesProcessed(BluerState { .send(BluerSessionEvent::ChangesProcessed(BluerState {
devices: build_device_list(&adapter_clone).await, devices: build_device_list(&adapter_clone).await,
@ -605,20 +636,18 @@ impl BluerSessionState {
.await; .await;
// reset timeout // reset timeout
milli_timeout = 10; milli_timeout = 10;
} else {
// slow down if no changes occur
milli_timeout = (milli_timeout * 2).max(5120);
} }
let _ = tx.send(BluerSessionEvent::ChangeStreamEnded).await;
} }
let _ = tx.send(BluerSessionEvent::ChangeStreamEnded).await; },
Ok(()) );
});
} }
pub(crate) fn process_requests(&self, request_rx: Receiver<BluerRequest>) { pub(crate) fn process_requests(&self, request_rx: Receiver<BluerRequest>) {
let active_requests = self.active_requests.clone(); let active_requests = self.active_requests.clone();
let adapter = self.adapter.clone(); let adapter = self.adapter.clone();
let tx = self.tx.clone(); let tx = self.tx.clone();
let wake_up_tx = self.wake_up_discover_tx.clone();
let _handle: JoinHandle<anyhow::Result<()>> = spawn(async move { let _handle: JoinHandle<anyhow::Result<()>> = spawn(async move {
let mut request_rx = request_rx; let mut request_rx = request_rx;
@ -629,11 +658,16 @@ impl BluerSessionState {
let active_requests_clone = active_requests.clone(); let active_requests_clone = active_requests.clone();
let tx_clone = tx.clone(); let tx_clone = tx.clone();
let adapter_clone = adapter.clone(); let adapter_clone = adapter.clone();
let wake_up_tx = wake_up_tx.clone();
let handle = spawn(async move { let handle = spawn(async move {
let mut err_msg = None; let mut err_msg = None;
match &req_clone { match &req_clone {
BluerRequest::SetBluetoothEnabled(enabled) => { BluerRequest::SetBluetoothEnabled(enabled) => {
let res = adapter_clone.set_powered(*enabled).await; let res = adapter_clone.set_powered(*enabled).await;
if *enabled {
_ = wake_up_tx.send(()).await;
}
if let Err(e) = res { if let Err(e) = res {
err_msg = Some(e.to_string()); err_msg = Some(e.to_string());
} }