From e35d5123f05bdd0ca2c06d0a3a60a04e0cb324ee Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Tue, 14 Apr 2026 16:38:56 +0200 Subject: [PATCH] perf: avoid holding async mutex guards across await points tokio recommends using a sync mutex with a notifier instead of the async mutex where possible. Rust forbids holding a sync mutex guard across await points so we can prevent a potential deadlock this way. This adds a custom channel based on the tokio mpmc example for handling gvfs events from callbacks to avoid the async mutex requirement. Messages are held in a `VecDeque` behind a sync mutex and the receiver will get notified via the notifier when a message is added to the queue. Weak references used in gio callbacks in case the sender is dropped by the application. --- src/app.rs | 5 +-- src/channel.rs | 77 +++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 +- src/mounter/gvfs.rs | 70 +++++++++++++++++++++++++++++------------ src/tab.rs | 23 ++++++-------- 5 files changed, 139 insertions(+), 39 deletions(-) create mode 100644 src/channel.rs diff --git a/src/app.rs b/src/app.rs index 19dc54d..a427065 100644 --- a/src/app.rs +++ b/src/app.rs @@ -6958,8 +6958,7 @@ impl Application for App { |_| { stream::channel( 1, - move |msg_tx: futures::channel::mpsc::Sender<_>| async move { - let msg_tx = Arc::new(tokio::sync::Mutex::new(msg_tx)); + move |mut msg_tx: futures::channel::mpsc::Sender<_>| async move { tokio::task::spawn_blocking(move || { match notify_rust::Notification::new() .summary(&fl!("notification-in-progress")) @@ -6969,8 +6968,6 @@ impl Application for App { Ok(notification) => { let _ = futures::executor::block_on(async { msg_tx - .lock() - .await .send(Message::Notification(Arc::new( Mutex::new(notification), ))) diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..e91bc35 --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,77 @@ +// Copyright 2025 System76 +// SPDX-License-Identifier: MPL-2.0 + +use std::{ + collections::VecDeque, + sync::{ + Arc, Mutex, + atomic::{AtomicBool, Ordering}, + }, +}; + +/// Create a channel backed by `tokio::sync::Notify` and a sync mutex with a vec deque. +pub fn channel() -> (Sender, Receiver) { + let channel = Arc::new(Channel { + queue: Mutex::new(VecDeque::default()), + notify: tokio::sync::Notify::const_new(), + closed: AtomicBool::new(false), + }); + + (Sender(channel.clone()), Receiver(channel)) +} + +/// A channel backed by `tokio::sync::Notify` and a sync mutex with a vec deque. +struct Channel { + pub(self) queue: Mutex>, + /// Set when a new message has been stored. + pub(self) notify: tokio::sync::Notify, + /// Set when the receiver is dropped. + pub(self) closed: AtomicBool, +} + +pub struct Sender(Arc>); + +impl Sender { + pub fn send(&self, message: Message) { + self.0.queue.lock().unwrap().push_back(message); + self.0.notify.notify_one(); + } +} + +impl Drop for Sender { + fn drop(&mut self) { + self.0.closed.store(true, Ordering::SeqCst); + self.0.notify.notify_one(); + } +} + +pub struct Receiver(Arc>); + +impl Receiver { + /// Returns a value until the sender is dropped. + pub async fn recv(&self) -> Option { + loop { + { + let mut queue = self.0.queue.lock().unwrap(); + if let Some(value) = queue.pop_front() { + if queue.capacity() - queue.len() > 32 { + let capacity = queue.len().next_power_of_two(); + queue.shrink_to(capacity); + } + drop(queue); + return Some(value); + } + } + + if self.0.closed.load(Ordering::SeqCst) { + return None; + } + + self.0.notify.notified().await; + } + } + + pub fn try_recv(&self) -> Option { + self.0.queue.lock().unwrap().pop_front() + } +} diff --git a/src/lib.rs b/src/lib.rs index b4c85c7..9214a7f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use app::{App, Flags}; pub mod app; mod archive; +pub mod channel; pub mod clipboard; mod context_action; use config::Config; @@ -136,7 +137,7 @@ pub fn main() -> Result<(), Box> { .event_format(log_format); tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::from_env("RUST_LOG")) + .with(tracing_subscriber::EnvFilter::from_default_env()) .with(log_layer) .init(); diff --git a/src/mounter/gvfs.rs b/src/mounter/gvfs.rs index 8ac42dc..d6294b6 100644 --- a/src/mounter/gvfs.rs +++ b/src/mounter/gvfs.rs @@ -5,7 +5,7 @@ use cosmic::{ }; use gio::{glib, prelude::*}; use std::{any::TypeId, cell::Cell, future::pending, hash::Hash, path::PathBuf, sync::Arc}; -use tokio::sync::{Mutex, mpsc}; +use tokio::sync::mpsc; use super::{Mounter, MounterAuth, MounterItem, MounterItems, MounterMessage}; use crate::{ @@ -230,7 +230,10 @@ fn dir_info(uri: &str) -> Result<(String, String, Option), glib::Error> Ok((resolved_uri, info.display_name().into(), file.path())) } -fn mount_op(uri: String, event_tx: mpsc::UnboundedSender) -> gio::MountOperation { +fn mount_op( + uri: String, + event_tx: std::sync::Weak>, +) -> gio::MountOperation { let mount_op = gio::MountOperation::new(); mount_op.connect_ask_password( move |mount_op, message, default_user, default_domain, flags| { @@ -253,9 +256,9 @@ fn mount_op(uri: String, event_tx: mpsc::UnboundedSender) -> gio::MountOp .then_some(false), }; let (auth_tx, mut auth_rx) = mpsc::channel(1); - event_tx - .send(Event::NetworkAuth(uri.clone(), auth, auth_tx)) - .unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::NetworkAuth(uri.clone(), auth, auth_tx)); + } //TODO: async recv? if let Some(auth) = auth_rx.blocking_recv() { if auth.anonymous_opt == Some(true) { @@ -358,37 +361,45 @@ impl Item { pub struct Gvfs { command_tx: mpsc::UnboundedSender, - event_rx: Arc>>, + event_rx: Arc>, } impl Gvfs { pub fn new() -> Self { //TODO: switch to using gvfs-zbus which will better integrate with async rust let (command_tx, mut command_rx) = mpsc::unbounded_channel(); - let (event_tx, event_rx) = mpsc::unbounded_channel(); + let (event_tx, event_rx) = crate::channel::channel(); + let event_tx = Arc::new(event_tx); std::thread::spawn(move || { let main_loop = glib::MainLoop::new(None, false); main_loop.context().spawn_local(async move { + let event_tx = Arc::downgrade(&event_tx); let monitor = gio::VolumeMonitor::get(); { let event_tx = event_tx.clone(); monitor.connect_mount_changed(move |_monitor, mount| { log::info!("mount changed {}", MountExt::name(mount)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } { let event_tx = event_tx.clone(); monitor.connect_mount_added(move |_monitor, mount| { log::info!("mount added {}", MountExt::name(mount)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } { let event_tx = event_tx.clone(); monitor.connect_mount_removed(move |_monitor, mount| { log::info!("mount removed {}", MountExt::name(mount)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } @@ -396,21 +407,27 @@ impl Gvfs { let event_tx = event_tx.clone(); monitor.connect_volume_changed(move |_monitor, volume| { log::info!("volume changed {}", VolumeExt::name(volume)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } { let event_tx = event_tx.clone(); monitor.connect_volume_added(move |_monitor, volume| { log::info!("volume added {}", VolumeExt::name(volume)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } { let event_tx = event_tx.clone(); monitor.connect_volume_removed(move |_monitor, volume| { log::info!("volume removed {}", VolumeExt::name(volume)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } @@ -420,7 +437,11 @@ impl Gvfs { items_tx.send(items(&monitor, sizes)).await.unwrap(); } Cmd::Rescan => { - event_tx.send(Event::Items(items(&monitor, IconSizes::default()))).unwrap(); + let Some(event_tx) = event_tx.upgrade() else { + return; + }; + + event_tx.send(Event::Items(items(&monitor, IconSizes::default()))); } Cmd::Mount(mounter_item, complete_tx) => { let MounterItem::Gvfs(ref item) = mounter_item else { @@ -472,6 +493,9 @@ impl Gvfs { .ok().map(|info| info.boolean(gio::FILE_ATTRIBUTE_FILESYSTEM_REMOTE)) .unwrap_or(true); } + let Some(event_tx) = event_tx.upgrade() else { + return; + }; event_tx.send(Event::MountResult(updated_item, match res { Ok(()) => { _ = complete_tx.send(Ok(())); @@ -483,7 +507,7 @@ impl Gvfs { Some(gio::IOErrorEnum::FailedHandled) => Ok(false), _ => Err(format!("{err}")) }} - })).unwrap(); + })); }, ); break; @@ -499,6 +523,9 @@ impl Gvfs { gio::Cancellable::NONE, move |res| { log::info!("network drive {uri}: result {res:?}"); + let Some(event_tx) = event_tx.upgrade() else { + return; + }; event_tx.send(Event::NetworkResult(uri, match res { Ok(()) => { _ = result_tx.send(Ok(())); @@ -509,7 +536,7 @@ impl Gvfs { Some(gio::IOErrorEnum::FailedHandled) => Ok(false), _ => Err(format!("{err}")) }} - })).unwrap(); + })); } ); } @@ -533,6 +560,9 @@ impl Gvfs { // FIXME sometimes a uri can be mounted and then not recognized as mounted... // seems to be related to uri with a path items_tx.blocking_send(network_scan(&uri, sizes)).unwrap(); + let Some(event_tx) = event_tx.upgrade() else { + return; + }; event_tx.send(Event::NetworkResult(resolved_uri, match res { Ok(()) => { Ok(true) @@ -541,7 +571,7 @@ impl Gvfs { Some(gio::IOErrorEnum::FailedHandled) => Ok(false), _ => Err(format!("{err}")) } - })).unwrap(); + })); } ); } else { @@ -597,7 +627,7 @@ impl Gvfs { }); Self { command_tx, - event_rx: Arc::new(Mutex::new(event_rx)), + event_rx: Arc::new(event_rx), } } } @@ -671,7 +701,7 @@ impl Mounter for Gvfs { let event_rx = self.event_rx.clone(); struct Wrapper { command_tx: mpsc::UnboundedSender, - event_rx: Arc>>, + event_rx: Arc>, } impl Hash for Wrapper { fn hash(&self, state: &mut H) { @@ -695,7 +725,7 @@ impl Mounter for Gvfs { MounterMessage, >| async move { command_tx.send(Cmd::Rescan).unwrap(); - while let Some(event) = event_rx.lock().await.recv().await { + while let Some(event) = event_rx.recv().await { match event { Event::Changed => command_tx.send(Cmd::Rescan).unwrap(), Event::Items(items) => { diff --git a/src/tab.rs b/src/tab.rs index 1e63588..a0f95a0 100644 --- a/src/tab.rs +++ b/src/tab.rs @@ -6955,9 +6955,8 @@ impl Tab { .await .unwrap(); - let output = Arc::new(tokio::sync::Mutex::new(output)); + let (watch_tx, mut watch_rx) = tokio::sync::watch::channel(true); { - let output = output.clone(); tokio::task::spawn_blocking(move || { scan_search( &search_location, @@ -6985,14 +6984,7 @@ impl Tab { true } else { // Wake up update method - futures::executor::block_on(async { - output - .lock() - .await - .send(Message::SearchReady(false)) - .await - }) - .is_ok() + watch_tx.send(false).is_ok() } } Err(_) => false, @@ -7005,13 +6997,16 @@ impl Tab { search_location, start.elapsed(), ); - }) - .await - .unwrap(); + }); + } + + while watch_rx.changed().await.is_ok() { + let is_ready = *watch_rx.borrow_and_update(); + let _ = output.send(Message::SearchReady(is_ready)).await; } // Send final ready - let _ = output.lock().await.send(Message::SearchReady(true)).await; + let _ = output.send(Message::SearchReady(true)).await; std::future::pending().await },