perf: avoid holding async mutex guards across await points

tokio recommends using a sync mutex with a notifier instead of the
async mutex where possible. Rust forbids holding a sync mutex guard
across await points so we can prevent a potential deadlock this way.

This adds a custom channel based on the tokio mpmc example for
handling gvfs events from callbacks to avoid the async mutex
requirement. Messages are held in a `VecDeque` behind a sync mutex
and the receiver will get notified via the notifier when a message
is added to the queue.

Weak references used in gio callbacks in case the sender is dropped
by the application.
This commit is contained in:
Michael Aaron Murphy 2026-04-14 16:38:56 +02:00
parent 971374f60b
commit e35d5123f0
No known key found for this signature in database
GPG key ID: B2732D4240C9212C
5 changed files with 139 additions and 39 deletions

View file

@ -6958,8 +6958,7 @@ impl Application for App {
|_| { |_| {
stream::channel( stream::channel(
1, 1,
move |msg_tx: futures::channel::mpsc::Sender<_>| async move { move |mut msg_tx: futures::channel::mpsc::Sender<_>| async move {
let msg_tx = Arc::new(tokio::sync::Mutex::new(msg_tx));
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
match notify_rust::Notification::new() match notify_rust::Notification::new()
.summary(&fl!("notification-in-progress")) .summary(&fl!("notification-in-progress"))
@ -6969,8 +6968,6 @@ impl Application for App {
Ok(notification) => { Ok(notification) => {
let _ = futures::executor::block_on(async { let _ = futures::executor::block_on(async {
msg_tx msg_tx
.lock()
.await
.send(Message::Notification(Arc::new( .send(Message::Notification(Arc::new(
Mutex::new(notification), Mutex::new(notification),
))) )))

77
src/channel.rs Normal file
View file

@ -0,0 +1,77 @@
// Copyright 2025 System76 <info@system76.com>
// SPDX-License-Identifier: MPL-2.0
use std::{
collections::VecDeque,
sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
},
};
/// Create a channel backed by `tokio::sync::Notify` and a sync mutex with a vec deque.
pub fn channel<Message>() -> (Sender<Message>, Receiver<Message>) {
let channel = Arc::new(Channel {
queue: Mutex::new(VecDeque::default()),
notify: tokio::sync::Notify::const_new(),
closed: AtomicBool::new(false),
});
(Sender(channel.clone()), Receiver(channel))
}
/// A channel backed by `tokio::sync::Notify` and a sync mutex with a vec deque.
struct Channel<Message> {
pub(self) queue: Mutex<VecDeque<Message>>,
/// Set when a new message has been stored.
pub(self) notify: tokio::sync::Notify,
/// Set when the receiver is dropped.
pub(self) closed: AtomicBool,
}
pub struct Sender<Message>(Arc<Channel<Message>>);
impl<Message> Sender<Message> {
pub fn send(&self, message: Message) {
self.0.queue.lock().unwrap().push_back(message);
self.0.notify.notify_one();
}
}
impl<Message> Drop for Sender<Message> {
fn drop(&mut self) {
self.0.closed.store(true, Ordering::SeqCst);
self.0.notify.notify_one();
}
}
pub struct Receiver<Message>(Arc<Channel<Message>>);
impl<Message> Receiver<Message> {
/// Returns a value until the sender is dropped.
pub async fn recv(&self) -> Option<Message> {
loop {
{
let mut queue = self.0.queue.lock().unwrap();
if let Some(value) = queue.pop_front() {
if queue.capacity() - queue.len() > 32 {
let capacity = queue.len().next_power_of_two();
queue.shrink_to(capacity);
}
drop(queue);
return Some(value);
}
}
if self.0.closed.load(Ordering::SeqCst) {
return None;
}
self.0.notify.notified().await;
}
}
pub fn try_recv(&self) -> Option<Message> {
self.0.queue.lock().unwrap().pop_front()
}
}

View file

@ -8,6 +8,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use app::{App, Flags}; use app::{App, Flags};
pub mod app; pub mod app;
mod archive; mod archive;
pub mod channel;
pub mod clipboard; pub mod clipboard;
mod context_action; mod context_action;
use config::Config; use config::Config;
@ -136,7 +137,7 @@ pub fn main() -> Result<(), Box<dyn std::error::Error>> {
.event_format(log_format); .event_format(log_format);
tracing_subscriber::registry() tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_env("RUST_LOG")) .with(tracing_subscriber::EnvFilter::from_default_env())
.with(log_layer) .with(log_layer)
.init(); .init();

View file

@ -5,7 +5,7 @@ use cosmic::{
}; };
use gio::{glib, prelude::*}; use gio::{glib, prelude::*};
use std::{any::TypeId, cell::Cell, future::pending, hash::Hash, path::PathBuf, sync::Arc}; use std::{any::TypeId, cell::Cell, future::pending, hash::Hash, path::PathBuf, sync::Arc};
use tokio::sync::{Mutex, mpsc}; use tokio::sync::mpsc;
use super::{Mounter, MounterAuth, MounterItem, MounterItems, MounterMessage}; use super::{Mounter, MounterAuth, MounterItem, MounterItems, MounterMessage};
use crate::{ use crate::{
@ -230,7 +230,10 @@ fn dir_info(uri: &str) -> Result<(String, String, Option<PathBuf>), glib::Error>
Ok((resolved_uri, info.display_name().into(), file.path())) Ok((resolved_uri, info.display_name().into(), file.path()))
} }
fn mount_op(uri: String, event_tx: mpsc::UnboundedSender<Event>) -> gio::MountOperation { fn mount_op(
uri: String,
event_tx: std::sync::Weak<crate::channel::Sender<Event>>,
) -> gio::MountOperation {
let mount_op = gio::MountOperation::new(); let mount_op = gio::MountOperation::new();
mount_op.connect_ask_password( mount_op.connect_ask_password(
move |mount_op, message, default_user, default_domain, flags| { move |mount_op, message, default_user, default_domain, flags| {
@ -253,9 +256,9 @@ fn mount_op(uri: String, event_tx: mpsc::UnboundedSender<Event>) -> gio::MountOp
.then_some(false), .then_some(false),
}; };
let (auth_tx, mut auth_rx) = mpsc::channel(1); let (auth_tx, mut auth_rx) = mpsc::channel(1);
event_tx if let Some(event_tx) = event_tx.upgrade() {
.send(Event::NetworkAuth(uri.clone(), auth, auth_tx)) event_tx.send(Event::NetworkAuth(uri.clone(), auth, auth_tx));
.unwrap(); }
//TODO: async recv? //TODO: async recv?
if let Some(auth) = auth_rx.blocking_recv() { if let Some(auth) = auth_rx.blocking_recv() {
if auth.anonymous_opt == Some(true) { if auth.anonymous_opt == Some(true) {
@ -358,37 +361,45 @@ impl Item {
pub struct Gvfs { pub struct Gvfs {
command_tx: mpsc::UnboundedSender<Cmd>, command_tx: mpsc::UnboundedSender<Cmd>,
event_rx: Arc<Mutex<mpsc::UnboundedReceiver<Event>>>, event_rx: Arc<crate::channel::Receiver<Event>>,
} }
impl Gvfs { impl Gvfs {
pub fn new() -> Self { pub fn new() -> Self {
//TODO: switch to using gvfs-zbus which will better integrate with async rust //TODO: switch to using gvfs-zbus which will better integrate with async rust
let (command_tx, mut command_rx) = mpsc::unbounded_channel(); let (command_tx, mut command_rx) = mpsc::unbounded_channel();
let (event_tx, event_rx) = mpsc::unbounded_channel(); let (event_tx, event_rx) = crate::channel::channel();
let event_tx = Arc::new(event_tx);
std::thread::spawn(move || { std::thread::spawn(move || {
let main_loop = glib::MainLoop::new(None, false); let main_loop = glib::MainLoop::new(None, false);
main_loop.context().spawn_local(async move { main_loop.context().spawn_local(async move {
let event_tx = Arc::downgrade(&event_tx);
let monitor = gio::VolumeMonitor::get(); let monitor = gio::VolumeMonitor::get();
{ {
let event_tx = event_tx.clone(); let event_tx = event_tx.clone();
monitor.connect_mount_changed(move |_monitor, mount| { monitor.connect_mount_changed(move |_monitor, mount| {
log::info!("mount changed {}", MountExt::name(mount)); log::info!("mount changed {}", MountExt::name(mount));
event_tx.send(Event::Changed).unwrap(); if let Some(event_tx) = event_tx.upgrade() {
event_tx.send(Event::Changed);
}
}); });
} }
{ {
let event_tx = event_tx.clone(); let event_tx = event_tx.clone();
monitor.connect_mount_added(move |_monitor, mount| { monitor.connect_mount_added(move |_monitor, mount| {
log::info!("mount added {}", MountExt::name(mount)); log::info!("mount added {}", MountExt::name(mount));
event_tx.send(Event::Changed).unwrap(); if let Some(event_tx) = event_tx.upgrade() {
event_tx.send(Event::Changed);
}
}); });
} }
{ {
let event_tx = event_tx.clone(); let event_tx = event_tx.clone();
monitor.connect_mount_removed(move |_monitor, mount| { monitor.connect_mount_removed(move |_monitor, mount| {
log::info!("mount removed {}", MountExt::name(mount)); log::info!("mount removed {}", MountExt::name(mount));
event_tx.send(Event::Changed).unwrap(); if let Some(event_tx) = event_tx.upgrade() {
event_tx.send(Event::Changed);
}
}); });
} }
@ -396,21 +407,27 @@ impl Gvfs {
let event_tx = event_tx.clone(); let event_tx = event_tx.clone();
monitor.connect_volume_changed(move |_monitor, volume| { monitor.connect_volume_changed(move |_monitor, volume| {
log::info!("volume changed {}", VolumeExt::name(volume)); log::info!("volume changed {}", VolumeExt::name(volume));
event_tx.send(Event::Changed).unwrap(); if let Some(event_tx) = event_tx.upgrade() {
event_tx.send(Event::Changed);
}
}); });
} }
{ {
let event_tx = event_tx.clone(); let event_tx = event_tx.clone();
monitor.connect_volume_added(move |_monitor, volume| { monitor.connect_volume_added(move |_monitor, volume| {
log::info!("volume added {}", VolumeExt::name(volume)); log::info!("volume added {}", VolumeExt::name(volume));
event_tx.send(Event::Changed).unwrap(); if let Some(event_tx) = event_tx.upgrade() {
event_tx.send(Event::Changed);
}
}); });
} }
{ {
let event_tx = event_tx.clone(); let event_tx = event_tx.clone();
monitor.connect_volume_removed(move |_monitor, volume| { monitor.connect_volume_removed(move |_monitor, volume| {
log::info!("volume removed {}", VolumeExt::name(volume)); log::info!("volume removed {}", VolumeExt::name(volume));
event_tx.send(Event::Changed).unwrap(); if let Some(event_tx) = event_tx.upgrade() {
event_tx.send(Event::Changed);
}
}); });
} }
@ -420,7 +437,11 @@ impl Gvfs {
items_tx.send(items(&monitor, sizes)).await.unwrap(); items_tx.send(items(&monitor, sizes)).await.unwrap();
} }
Cmd::Rescan => { Cmd::Rescan => {
event_tx.send(Event::Items(items(&monitor, IconSizes::default()))).unwrap(); let Some(event_tx) = event_tx.upgrade() else {
return;
};
event_tx.send(Event::Items(items(&monitor, IconSizes::default())));
} }
Cmd::Mount(mounter_item, complete_tx) => { Cmd::Mount(mounter_item, complete_tx) => {
let MounterItem::Gvfs(ref item) = mounter_item else { let MounterItem::Gvfs(ref item) = mounter_item else {
@ -472,6 +493,9 @@ impl Gvfs {
.ok().map(|info| info.boolean(gio::FILE_ATTRIBUTE_FILESYSTEM_REMOTE)) .ok().map(|info| info.boolean(gio::FILE_ATTRIBUTE_FILESYSTEM_REMOTE))
.unwrap_or(true); .unwrap_or(true);
} }
let Some(event_tx) = event_tx.upgrade() else {
return;
};
event_tx.send(Event::MountResult(updated_item, match res { event_tx.send(Event::MountResult(updated_item, match res {
Ok(()) => { Ok(()) => {
_ = complete_tx.send(Ok(())); _ = complete_tx.send(Ok(()));
@ -483,7 +507,7 @@ impl Gvfs {
Some(gio::IOErrorEnum::FailedHandled) => Ok(false), Some(gio::IOErrorEnum::FailedHandled) => Ok(false),
_ => Err(format!("{err}")) _ => Err(format!("{err}"))
}} }}
})).unwrap(); }));
}, },
); );
break; break;
@ -499,6 +523,9 @@ impl Gvfs {
gio::Cancellable::NONE, gio::Cancellable::NONE,
move |res| { move |res| {
log::info!("network drive {uri}: result {res:?}"); log::info!("network drive {uri}: result {res:?}");
let Some(event_tx) = event_tx.upgrade() else {
return;
};
event_tx.send(Event::NetworkResult(uri, match res { event_tx.send(Event::NetworkResult(uri, match res {
Ok(()) => { Ok(()) => {
_ = result_tx.send(Ok(())); _ = result_tx.send(Ok(()));
@ -509,7 +536,7 @@ impl Gvfs {
Some(gio::IOErrorEnum::FailedHandled) => Ok(false), Some(gio::IOErrorEnum::FailedHandled) => Ok(false),
_ => Err(format!("{err}")) _ => Err(format!("{err}"))
}} }}
})).unwrap(); }));
} }
); );
} }
@ -533,6 +560,9 @@ impl Gvfs {
// FIXME sometimes a uri can be mounted and then not recognized as mounted... // FIXME sometimes a uri can be mounted and then not recognized as mounted...
// seems to be related to uri with a path // seems to be related to uri with a path
items_tx.blocking_send(network_scan(&uri, sizes)).unwrap(); items_tx.blocking_send(network_scan(&uri, sizes)).unwrap();
let Some(event_tx) = event_tx.upgrade() else {
return;
};
event_tx.send(Event::NetworkResult(resolved_uri, match res { event_tx.send(Event::NetworkResult(resolved_uri, match res {
Ok(()) => { Ok(()) => {
Ok(true) Ok(true)
@ -541,7 +571,7 @@ impl Gvfs {
Some(gio::IOErrorEnum::FailedHandled) => Ok(false), Some(gio::IOErrorEnum::FailedHandled) => Ok(false),
_ => Err(format!("{err}")) _ => Err(format!("{err}"))
} }
})).unwrap(); }));
} }
); );
} else { } else {
@ -597,7 +627,7 @@ impl Gvfs {
}); });
Self { Self {
command_tx, command_tx,
event_rx: Arc::new(Mutex::new(event_rx)), event_rx: Arc::new(event_rx),
} }
} }
} }
@ -671,7 +701,7 @@ impl Mounter for Gvfs {
let event_rx = self.event_rx.clone(); let event_rx = self.event_rx.clone();
struct Wrapper { struct Wrapper {
command_tx: mpsc::UnboundedSender<Cmd>, command_tx: mpsc::UnboundedSender<Cmd>,
event_rx: Arc<Mutex<mpsc::UnboundedReceiver<Event>>>, event_rx: Arc<crate::channel::Receiver<Event>>,
} }
impl Hash for Wrapper { impl Hash for Wrapper {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) { fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
@ -695,7 +725,7 @@ impl Mounter for Gvfs {
MounterMessage, MounterMessage,
>| async move { >| async move {
command_tx.send(Cmd::Rescan).unwrap(); command_tx.send(Cmd::Rescan).unwrap();
while let Some(event) = event_rx.lock().await.recv().await { while let Some(event) = event_rx.recv().await {
match event { match event {
Event::Changed => command_tx.send(Cmd::Rescan).unwrap(), Event::Changed => command_tx.send(Cmd::Rescan).unwrap(),
Event::Items(items) => { Event::Items(items) => {

View file

@ -6955,9 +6955,8 @@ impl Tab {
.await .await
.unwrap(); .unwrap();
let output = Arc::new(tokio::sync::Mutex::new(output)); let (watch_tx, mut watch_rx) = tokio::sync::watch::channel(true);
{ {
let output = output.clone();
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
scan_search( scan_search(
&search_location, &search_location,
@ -6985,14 +6984,7 @@ impl Tab {
true true
} else { } else {
// Wake up update method // Wake up update method
futures::executor::block_on(async { watch_tx.send(false).is_ok()
output
.lock()
.await
.send(Message::SearchReady(false))
.await
})
.is_ok()
} }
} }
Err(_) => false, Err(_) => false,
@ -7005,13 +6997,16 @@ impl Tab {
search_location, search_location,
start.elapsed(), start.elapsed(),
); );
}) });
.await }
.unwrap();
while watch_rx.changed().await.is_ok() {
let is_ready = *watch_rx.borrow_and_update();
let _ = output.send(Message::SearchReady(is_ready)).await;
} }
// Send final ready // Send final ready
let _ = output.lock().await.send(Message::SearchReady(true)).await; let _ = output.send(Message::SearchReady(true)).await;
std::future::pending().await std::future::pending().await
}, },