From c114759c9ed4164939914c6eae61b4567377a73c Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Fri, 10 Apr 2026 05:48:58 +0200 Subject: [PATCH 01/12] chore: use tracing for structured logs --- Cargo.toml | 3 ++- src/lib.rs | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8be8b9a..1b90a4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ icu = { version = "2.1.1", features = ["compiled_data"] } cctk = { git = "https://github.com/pop-os/cosmic-protocols", package = "cosmic-client-toolkit", rev = "160b086", optional = true } cosmic-mime-apps = { git = "https://github.com/pop-os/cosmic-mime-apps.git", optional = true } dirs = "6.0.0" -env_logger = "0.11" gio = { version = "0.21", optional = true } glib = { version = "0.21", optional = true } glob = "0.3" @@ -62,6 +61,8 @@ png = "0.18" jxl-oxide = { version = "0.12.5", features = ["image"] } num_cpus = "1.17.0" filetime = "0.2" +tracing = "0.1.44" +tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } # Completion-based IO runtime to enable io_uring / IOCP file IO support. [dependencies.compio] diff --git a/src/lib.rs b/src/lib.rs index cc0bcc1..b4c85c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ use cosmic::{app::Settings, iced::Limits}; use std::{env, fs, path::PathBuf, process}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use app::{App, Flags}; pub mod app; @@ -73,7 +74,22 @@ pub fn is_wayland() -> bool { /// Runs application in desktop mode #[rustfmt::skip] pub fn desktop() -> Result<(), Box> { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init(); + let log_format = tracing_subscriber::fmt::format() + .pretty() + .without_time() + .with_line_number(true) + .with_file(true) + .with_target(false) + .with_thread_names(true); + + let log_layer = tracing_subscriber::fmt::Layer::default() + .with_writer(std::io::stderr) + .event_format(log_format); + + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::from_env("RUST_LOG")) + .with(log_layer) + .init(); localize::localize(); @@ -108,7 +124,21 @@ pub fn desktop() -> Result<(), Box> { /// Runs application with these settings #[rustfmt::skip] pub fn main() -> Result<(), Box> { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init(); + let log_format = tracing_subscriber::fmt::format() + .pretty() + .with_line_number(true) + .with_file(true) + .with_target(false) + .with_thread_names(true); + + let log_layer = tracing_subscriber::fmt::Layer::default() + .with_writer(std::io::stderr) + .event_format(log_format); + + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::from_env("RUST_LOG")) + .with(log_layer) + .init(); localize::localize(); From b299f1a172388e73f0c3405f82b08ca4d3cc8884 Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Fri, 10 Apr 2026 05:57:34 +0200 Subject: [PATCH 02/12] chore: add methods to `Controller` to pause and unpause futures - Use `Controller::until_paused` as a signal in a select to pause futures - Use `Controller::until_unpaused` to block futures in a select loop --- src/operation/controller.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/operation/controller.rs b/src/operation/controller.rs index f4b26fe..960a28d 100644 --- a/src/operation/controller.rs +++ b/src/operation/controller.rs @@ -86,6 +86,35 @@ impl Controller { self.set_state(ControllerState::Paused); } + /// Returns when the state is paused. + /// + /// Use this to pause futures. + pub async fn until_paused(&self) { + loop { + if matches!(self.state(), ControllerState::Paused) { + return; + } + + self.inner.notify.notified().await; + } + } + + /// Returns when state is neither paused, cancelled, nor failed. + /// + /// Use this to resume futures. + pub async fn until_unpaused(&self) { + loop { + if !matches!( + self.state(), + ControllerState::Paused | ControllerState::Cancelled | ControllerState::Failed + ) { + return; + } + + self.inner.notify.notified().await; + } + } + pub fn unpause(&self) { if !self.is_cancelled() | !self.is_failed() { self.set_state(ControllerState::Running); From e2bdcf8da4df1cdb2a4d707ed52a14b04eb4c0ff Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Fri, 10 Apr 2026 06:03:16 +0200 Subject: [PATCH 03/12] fix: use gio file copy fallback to fix MTP unsupported I/O errors --- Cargo.toml | 1 + src/operation/recursive.rs | 441 ++++++++++++++++++++++++++----------- 2 files changed, 311 insertions(+), 131 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1b90a4a..9447640 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ num_cpus = "1.17.0" filetime = "0.2" tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } +thiserror = "2.0.18" # Completion-based IO runtime to enable io_uring / IOCP file IO support. [dependencies.compio] diff --git a/src/operation/recursive.rs b/src/operation/recursive.rs index 61c4841..befe69c 100644 --- a/src/operation/recursive.rs +++ b/src/operation/recursive.rs @@ -1,15 +1,31 @@ -use compio::BufResult; -use compio::buf::{IntoInner, IoBuf}; -use compio::io::{AsyncReadAt, AsyncWriteAt}; -use std::future::Future; -use std::pin::Pin; -use std::time::Instant; -use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf, rc::Rc}; -use walkdir::WalkDir; - -use crate::operation::{OperationError, sync_to_disk}; +// Copyright 2023 System76 +// SPDX-License-Identifier: GPL-3.0-only use super::{Controller, OperationSelection, ReplaceResult, copy_unique_path}; +use crate::operation::{OperationError, sync_to_disk}; +use anyhow::Context as AnyhowContext; +use compio::BufResult; +use compio::buf::{IntoInner, IoBuf}; +use compio::driver::{ToSharedFd, op::AsyncifyFd}; +use compio::io::{AsyncReadAt, AsyncWriteAt}; +use cosmic::iced::futures; +use futures::future::Either; +use futures::{FutureExt, StreamExt}; +use gio::prelude::FileExtManual; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::time::Instant; +use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf}; +use walkdir::WalkDir; + +#[derive(thiserror::Error, Debug)] +pub enum GioCopyError { + #[error("controller state")] + Controller(OperationError), + #[error("gio copy failed")] + GLib(#[from] glib::Error), +} pub enum Method { Copy, @@ -313,136 +329,26 @@ impl Op { }) } - async fn run( - &mut self, - ctx: &mut Context, - mut progress: Progress, - ) -> Result> { + async fn run(&mut self, ctx: &mut Context, progress: Progress) -> Result> { if self.skipped.normal.get() || (self.is_cleanup && self.skipped.cleanup.get()) { return Ok(true); } match self.kind { OpKind::Copy => { - // Remove `to` if overwriting and it is an existing file - if self.to.is_file() { - match ctx.replace(self).await? { - ControlFlow::Continue(to) => { - self.to = to; - } - ControlFlow::Break(ret) => { - return Ok(ret); - } - } + let result = self.copy(ctx, progress).await; + + if result.is_err() { + _ = compio::fs::remove_file(&self.to).await; } - let (from_file, metadata, mut to_file) = cosmic::iced::futures::try_join!( - async { - compio::fs::OpenOptions::new() - .read(true) - .open(&self.from) - .await - }, - compio::fs::metadata(&self.from), - // This is atomic and ensures `to` is not created by any other process - async { - compio::fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&self.to) - .await - } - )?; - - progress.total_bytes = Some(metadata.len()); - (ctx.on_progress)(self, &progress); - if let Err(err) = to_file.set_permissions(metadata.permissions()).await { - // This error is not propagated upwards as some filesystems do not support setting permissions - log::warn!( - "failed to set permissions for {}: {}", - self.to.display(), - err - ); - } - - // Prevent spamming the progress callbacks. - let mut last_progress_update = Instant::now(); - // io_uring/IOCP requires transferring ownership of the buffer to the kernel. - let mut buf_in = std::mem::take(&mut ctx.buf); - // Track where the current read/write position is at. - let mut pos = 0; - - loop { - let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await; - - let count = match result { - Ok(0) => { - ctx.buf = buf_out; - break; - } - Ok(count) => count, - Err(why) => { - ctx.buf = buf_out; - return Err(why.into()); - } - }; - - let BufResult(result, buf_out_slice) = - to_file.write_at(buf_out.slice(..count), pos).await; - let buf_out = buf_out_slice.into_inner(); - - if let Err(why) = result { - ctx.buf = buf_out; - return Err(why.into()); - } - - progress.current_bytes += count as u64; - pos += count as u64; - - // Avoid spamming progress messages too early. - let current = Instant::now(); - if current.duration_since(last_progress_update).as_millis() > 49 { - last_progress_update = current; - (ctx.on_progress)(self, &progress); - - // Also check if the progress was cancelled. - if let Err(state) = ctx.controller.check().await { - ctx.buf = buf_out; - return Err(OperationError::from_state(state, &ctx.controller).into()); - } - } - - buf_in = buf_out; - } - - let mut times = fs::FileTimes::new(); - { - use std::os::unix::prelude::MetadataExt; - log::info!("{}", metadata.mtime()); - } - if let Ok(time) = metadata.modified() { - times = times.set_modified(time); - } - if let Ok(time) = metadata.accessed() { - times = times.set_accessed(time); - } - //TODO: upstream set_times implementation to compio? - { - use compio::driver::{ToSharedFd, op::AsyncifyFd}; - let op = - AsyncifyFd::new(to_file.to_shared_fd(), move |file: &std::fs::File| { - BufResult(file.set_times(times).map(|_| 0), ()) - }); - match compio::runtime::submit(op).await.0.map(|_| ()) { - Ok(()) => { - log::info!("set times for {} to {:?}", self.to.display(), times); - } - Err(err) => { - log::warn!("failed to set times for {}: {}", self.to.display(), err); - } - } - } + return result; } OpKind::Move { cross_device_copy } => { + // Do not clean up if cross_device_copy is set + if cross_device_copy { + self.skipped.cleanup.set(true); + } + // Remove `to` if overwriting and it is an existing file if self.to.is_file() { match ctx.replace(self).await? { @@ -520,4 +426,277 @@ impl Op { } Ok(true) } + + async fn copy( + &mut self, + ctx: &mut Context, + mut progress: Progress, + ) -> Result> { + // Remove `to` if overwriting and it is an existing file + if self.to.is_file() { + match ctx.replace(self).await? { + ControlFlow::Continue(to) => { + self.to = to; + } + ControlFlow::Break(ret) => { + return Ok(ret); + } + } + } + + let (from_file, metadata, to_file) = cosmic::iced::futures::join!( + async { + compio::fs::OpenOptions::new() + .read(true) + .open(&self.from) + .await + .with_context(|| format!("failed to open {} for reading", self.from.display(),)) + }, + async { compio::fs::metadata(&self.from).await.ok() }, + // This is atomic and ensures `to` is not created by any other process + async { + compio::fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&self.to) + .await + .with_context(|| format!("failed to open {} for writing", self.to.display())) + } + ); + + let from_file = from_file?; + let mut to_file = to_file?; + progress.total_bytes = metadata.as_ref().map(|m| m.len()); + (ctx.on_progress)(self, &progress); + + if let Some(metadata) = metadata.as_ref() { + if let Err(why) = to_file.set_permissions(metadata.permissions()).await { + // This error is not propagated upwards as some filesystems do not support setting permissions + if !matches!(why.kind(), std::io::ErrorKind::Unsupported) { + tracing::warn!(?why, "failed to set permissions for {}", self.to.display(),); + } + } + } + + // Prevent spamming the progress callbacks. + let mut last_progress_update = Instant::now(); + // io_uring/IOCP requires transferring ownership of the buffer to the kernel. + let mut buf_in = std::mem::take(&mut ctx.buf); + // Track where the current read/write position is at. + let mut pos = 0; + + loop { + let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await; + + let count = match result { + Ok(0) => { + buf_in = buf_out; + break; + } + Ok(count) => count, + Err(why) => { + ctx.buf = buf_out; + tracing::error!("failed to read: {:?}", why); + _ = futures::future::join(from_file.close(), to_file.close()).await; + return Err(why).context("failed to read")?; + } + }; + + let BufResult(result, buf_out_slice) = + to_file.write_at(buf_out.slice(..count), pos).await; + let buf_out = buf_out_slice.into_inner(); + + if let Err(why) = result { + if let std::io::ErrorKind::Unsupported = why.kind() { + ctx.buf = buf_out; + _ = futures::future::join(from_file.close(), to_file.close()).await; + return self + .gio_file_copy(ctx, progress) + .await + .map(|_| true) + .map_err(Into::into); + } + + tracing::error!("failed to write: {:?}", why); + ctx.buf = buf_out; + _ = futures::future::join(from_file.close(), to_file.close()).await; + return Err(why).context("failed to write")?; + } + + progress.current_bytes += count as u64; + pos += count as u64; + + // Avoid spamming progress messages too early. + let current = Instant::now(); + if current.duration_since(last_progress_update).as_millis() > 49 { + last_progress_update = current; + (ctx.on_progress)(self, &progress); + + // Also check if the progress was cancelled. + if let Err(state) = ctx.controller.check().await { + ctx.buf = buf_out; + tracing::warn!( + "operation to copy from {:?} to {:?} cancelled", + self.from, + self.to + ); + _ = futures::future::join(from_file.close(), to_file.close()).await; + return Err(OperationError::from_state(state, &ctx.controller).into()); + } + } + + buf_in = buf_out; + } + + ctx.buf = buf_in; + + if let Some(metadata) = metadata.as_ref() { + let mut times = fs::FileTimes::new(); + if let Ok(time) = metadata.modified() { + times = times.set_modified(time); + } + if let Ok(time) = metadata.accessed() { + times = times.set_accessed(time); + } + //TODO: upstream set_times implementation to compio? + let op = AsyncifyFd::new(to_file.to_shared_fd(), move |file: &std::fs::File| { + BufResult(file.set_times(times).map(|_| 0), ()) + }); + match compio::runtime::submit(op).await.0.map(|_| ()) { + Ok(()) => { + tracing::info!("set times for {} to {:?}", self.to.display(), times); + } + Err(why) => { + if !matches!(why.kind(), std::io::ErrorKind::Unsupported) { + tracing::error!(?why, "failed to set times for {}", self.to.display()); + } + } + } + } + + _ = to_file.close().await; + + Ok(true) + } + + /// Fallback mechanism in the event that unsupported I/O error errors occur. + /// Fixes unsupported errors when copying large files over MTP. + /// TODO: Find what Gio.File does to work around this. + async fn gio_file_copy( + &self, + ctx: &mut Context, + mut progress: Progress, + ) -> Result<(), GioCopyError> { + _ = compio::fs::remove_file(&self.to).await; + + let from = gio::File::for_path(&self.from); + let to = gio::File::for_path(&self.to); + let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = tokio::sync::oneshot::channel(); + let (pause_tx, mut pause_rx) = tokio::sync::watch::channel(false); + + let task = compio::runtime::spawn_blocking(move || { + let glib_context = glib::MainContext::new(); + let glib_loop = glib::MainLoop::new(Some(&glib_context), false); + glib_context.with_thread_default(move || { + let glib_loop2 = glib_loop.clone(); + glib::MainContext::ref_thread_default().spawn_local(async move { + // Create a future for copying the file with `gio::File`. This also creates a progress stream. + let (gio_copy_fut, mut progress_stream) = from.copy_future( + &to, + gio::FileCopyFlags::OVERWRITE | gio::FileCopyFlags::ALL_METADATA, + glib::Priority::LOW, + ); + + let mut copy_fut = + gio_copy_fut.map(|result| result.map_err(GioCopyError::GLib)); + + let mut progress_fut = std::pin::pin!(async { + while let Some((current_bytes, _)) = progress_stream.next().await { + _ = progress_tx.send(current_bytes); + } + + drop(progress_tx); + futures::future::pending::<()>().await; + }); + + // Poll the progress future while waiting for the copy future to finish. + let mut main_fut = std::pin::pin!(async { + loop { + if let Either::Right((result, _)) = + futures::future::select(&mut progress_fut, &mut copy_fut).await + { + return result; + } + } + }); + + let mut pause_rx2 = pause_rx.clone(); + + loop { + let until_paused = std::pin::pin!(pause_rx.wait_for(|paused| *paused)); + match futures::future::select(until_paused, &mut main_fut).await { + Either::Left(_) => { + _ = pause_rx2.wait_for(|paused| !*paused).await; + } + + Either::Right((result, _)) => { + _ = tx.send(result.map(|_| ())); + glib_loop2.quit(); + return; + } + } + } + }); + + glib_loop.run(); + }) + }); + + let mut last_progress_update = Instant::now(); + let mut task = task.fuse(); + let mut rx = rx.fuse(); + + loop { + let until_paused = std::pin::pin!(ctx.controller.until_paused()); + + futures::select! { + value = progress_rx.recv().fuse() => { + if let Some(current_bytes) = value { + progress.current_bytes = current_bytes as u64; + let current = Instant::now(); + if current.duration_since(last_progress_update).as_millis() > 49 { + last_progress_update = current; + (ctx.on_progress)(self, &progress); + + tracing::info!("checking controller"); + + // Also check if the progress was cancelled. + if let Err(state) = ctx.controller.check().await { + tracing::warn!( + "operation to copy from {:?} to {:?} cancelled", + self.from, + self.to + ); + return Err::<(), GioCopyError>(GioCopyError::Controller( + OperationError::from_state(state, &ctx.controller), + )); + } + } + } + } + + result = rx => return result.unwrap(), + + _ = task => (), + + _ = until_paused.fuse() => { + // Pauses an active copy while the controller state is paused. + _ = pause_tx.send(true); + ctx.controller.until_unpaused().await; + _ = pause_tx.send(false); + } + } + } + } } From bb15f30fe5adcea4df8418adf867ba8bea876ae2 Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Fri, 10 Apr 2026 15:49:04 +0200 Subject: [PATCH 04/12] chore: update Cargo.lock --- Cargo.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f89c7f4..55dead6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1354,7 +1354,6 @@ dependencies = [ "cosmic-client-toolkit", "cosmic-mime-apps", "dirs 6.0.0", - "env_logger", "fastrand", "filetime", "flate2", @@ -1394,8 +1393,11 @@ dependencies = [ "tar", "tempfile", "test-log", + "thiserror 2.0.18", "tikv-jemallocator", "tokio", + "tracing", + "tracing-subscriber", "trash", "url", "uzers", @@ -1963,7 +1965,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32e90c2accc4b07a8456ea0debdc2e7587bdd890680d71173a15d4ae604f6eef" dependencies = [ "log", - "regex", ] [[package]] @@ -1975,7 +1976,6 @@ dependencies = [ "anstream", "anstyle", "env_filter", - "jiff", "log", ] From 93dd775f3c6452ec8b82998d002b7351085c70a5 Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Tue, 14 Apr 2026 16:23:10 +0200 Subject: [PATCH 05/12] perf: get image dimensions from background thread This caused the tab subscription to block the tokio executor. Instead store the image dimensions in the `Item`, which is created on a background thread. --- src/mounter/gvfs.rs | 1 + src/tab.rs | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/mounter/gvfs.rs b/src/mounter/gvfs.rs index cd4ea68..8ac42dc 100644 --- a/src/mounter/gvfs.rs +++ b/src/mounter/gvfs.rs @@ -199,6 +199,7 @@ fn network_scan(uri: &str, sizes: IconSizes) -> Result, String> { metadata, hidden, location_opt: Some(location), + image_dimensions: None, mime, icon_handle_grid, icon_handle_list, diff --git a/src/tab.rs b/src/tab.rs index b0b3a34..1e63588 100644 --- a/src/tab.rs +++ b/src/tab.rs @@ -708,6 +708,9 @@ pub fn item_from_gvfs_info(path: PathBuf, file_info: gio::FileInfo, sizes: IconS children_opt, }, hidden, + image_dimensions: (!remote && mime.type_() == mime::IMAGE) + .then(|| image::image_dimensions(&path).ok()) + .flatten(), location_opt: Some(Location::Path(path)), mime, icon_handle_grid, @@ -843,6 +846,7 @@ pub fn item_from_entry( }, hidden, location_opt: Some(Location::Path(path)), + image_dimensions: None, mime, icon_handle_grid, icon_handle_list, @@ -896,6 +900,9 @@ pub fn item_from_trash_entry( metadata: ItemMetadata::Trash { metadata, entry }, hidden: false, location_opt: None, + image_dimensions: (mime.type_() == mime::IMAGE) + .then(|| image::image_dimensions(&original_path).ok()) + .flatten(), mime, icon_handle_grid, icon_handle_list, @@ -1444,6 +1451,7 @@ pub fn scan_desktop( metadata, hidden: false, location_opt: Some(Location::Trash), + image_dimensions: None, mime, icon_handle_grid, icon_handle_list, @@ -2319,6 +2327,7 @@ pub struct Item { pub hidden: bool, pub location_opt: Option, pub mime: Mime, + pub image_dimensions: Option<(u32, u32)>, pub icon_handle_grid: widget::icon::Handle, pub icon_handle_list: widget::icon::Handle, pub icon_handle_list_condensed: widget::icon::Handle, @@ -6713,13 +6722,13 @@ impl Tab { // Determine effective memory budget based on image size let (effective_max_mb, effective_jobs) = if mime.type_() == mime::IMAGE { - match image::image_dimensions(&path) { - Ok((width, height)) => { + match item.image_dimensions { + Some((width, height)) => { let (_use_dedicated, eff_mb, eff_jobs) = should_use_dedicated_worker(width, height, max_mb, max_jobs); (eff_mb, eff_jobs) } - Err(_) => (max_mb, max_jobs), + None => (max_mb, max_jobs), } } else { (max_mb, max_jobs) From 971374f60ba701cb2a8728985c9d2742c590b036 Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Tue, 14 Apr 2026 16:27:53 +0200 Subject: [PATCH 06/12] perf: use atomics for controller progress and state --- Cargo.lock | 8 ++++++++ Cargo.toml | 2 ++ src/operation/controller.rs | 29 +++++++++++++++++++---------- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 55dead6..9a0d3f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -538,6 +538,12 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "atomic_float" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "628d228f918ac3b82fe590352cc719d30664a0c13ca3a60266fe02c7132d480a" + [[package]] name = "atomicwrites" version = "0.4.2" @@ -1349,6 +1355,7 @@ name = "cosmic-files" version = "1.0.9" dependencies = [ "anyhow", + "atomic_float", "bzip2", "compio", "cosmic-client-toolkit", @@ -1378,6 +1385,7 @@ dependencies = [ "notify-debouncer-full", "notify-rust", "num_cpus", + "num_enum", "open", "ordermap", "paste", diff --git a/Cargo.toml b/Cargo.toml index 9447640..74e554b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,8 @@ filetime = "0.2" tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } thiserror = "2.0.18" +atomic_float = "1.1.0" +num_enum = "0.7.6" # Completion-based IO runtime to enable io_uring / IOCP file IO support. [dependencies.compio] diff --git a/src/operation/controller.rs b/src/operation/controller.rs index 960a28d..691316c 100644 --- a/src/operation/controller.rs +++ b/src/operation/controller.rs @@ -1,7 +1,11 @@ -use std::sync::{Arc, Mutex}; +use atomic_float::AtomicF32; +use num_enum::{IntoPrimitive, TryFromPrimitive}; +use std::sync::Arc; +use std::sync::atomic::{self, AtomicU16}; use tokio::sync::Notify; -#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)] +#[repr(u16)] pub enum ControllerState { Cancelled, Failed, @@ -11,8 +15,8 @@ pub enum ControllerState { #[derive(Debug)] struct ControllerInner { - state: Mutex, - progress: Mutex, + state: AtomicU16, + progress: AtomicF32, notify: Notify, } @@ -27,8 +31,8 @@ impl Default for Controller { Self { primary: true, inner: Arc::new(ControllerInner { - state: Mutex::new(ControllerState::Running), - progress: Mutex::new(0.0), + state: AtomicU16::new(ControllerState::Running.into()), + progress: AtomicF32::new(0.0), notify: Notify::new(), }), } @@ -50,19 +54,24 @@ impl Controller { } pub fn progress(&self) -> f32 { - *self.inner.progress.lock().unwrap() + self.inner.progress.load(atomic::Ordering::Relaxed) } pub fn set_progress(&self, progress: f32) { - *self.inner.progress.lock().unwrap() = progress; + self.inner + .progress + .swap(progress, atomic::Ordering::Relaxed); } pub fn state(&self) -> ControllerState { - *self.inner.state.lock().unwrap() + ControllerState::try_from(self.inner.state.load(atomic::Ordering::Relaxed)) + .unwrap_or(ControllerState::Failed) } pub fn set_state(&self, state: ControllerState) { - *self.inner.state.lock().unwrap() = state; + self.inner + .state + .store(state.into(), atomic::Ordering::Relaxed); self.inner.notify.notify_waiters(); } From e35d5123f05bdd0ca2c06d0a3a60a04e0cb324ee Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Tue, 14 Apr 2026 16:38:56 +0200 Subject: [PATCH 07/12] perf: avoid holding async mutex guards across await points tokio recommends using a sync mutex with a notifier instead of the async mutex where possible. Rust forbids holding a sync mutex guard across await points so we can prevent a potential deadlock this way. This adds a custom channel based on the tokio mpmc example for handling gvfs events from callbacks to avoid the async mutex requirement. Messages are held in a `VecDeque` behind a sync mutex and the receiver will get notified via the notifier when a message is added to the queue. Weak references used in gio callbacks in case the sender is dropped by the application. --- src/app.rs | 5 +-- src/channel.rs | 77 +++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 +- src/mounter/gvfs.rs | 70 +++++++++++++++++++++++++++++------------ src/tab.rs | 23 ++++++-------- 5 files changed, 139 insertions(+), 39 deletions(-) create mode 100644 src/channel.rs diff --git a/src/app.rs b/src/app.rs index 19dc54d..a427065 100644 --- a/src/app.rs +++ b/src/app.rs @@ -6958,8 +6958,7 @@ impl Application for App { |_| { stream::channel( 1, - move |msg_tx: futures::channel::mpsc::Sender<_>| async move { - let msg_tx = Arc::new(tokio::sync::Mutex::new(msg_tx)); + move |mut msg_tx: futures::channel::mpsc::Sender<_>| async move { tokio::task::spawn_blocking(move || { match notify_rust::Notification::new() .summary(&fl!("notification-in-progress")) @@ -6969,8 +6968,6 @@ impl Application for App { Ok(notification) => { let _ = futures::executor::block_on(async { msg_tx - .lock() - .await .send(Message::Notification(Arc::new( Mutex::new(notification), ))) diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..e91bc35 --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,77 @@ +// Copyright 2025 System76 +// SPDX-License-Identifier: MPL-2.0 + +use std::{ + collections::VecDeque, + sync::{ + Arc, Mutex, + atomic::{AtomicBool, Ordering}, + }, +}; + +/// Create a channel backed by `tokio::sync::Notify` and a sync mutex with a vec deque. +pub fn channel() -> (Sender, Receiver) { + let channel = Arc::new(Channel { + queue: Mutex::new(VecDeque::default()), + notify: tokio::sync::Notify::const_new(), + closed: AtomicBool::new(false), + }); + + (Sender(channel.clone()), Receiver(channel)) +} + +/// A channel backed by `tokio::sync::Notify` and a sync mutex with a vec deque. +struct Channel { + pub(self) queue: Mutex>, + /// Set when a new message has been stored. + pub(self) notify: tokio::sync::Notify, + /// Set when the receiver is dropped. + pub(self) closed: AtomicBool, +} + +pub struct Sender(Arc>); + +impl Sender { + pub fn send(&self, message: Message) { + self.0.queue.lock().unwrap().push_back(message); + self.0.notify.notify_one(); + } +} + +impl Drop for Sender { + fn drop(&mut self) { + self.0.closed.store(true, Ordering::SeqCst); + self.0.notify.notify_one(); + } +} + +pub struct Receiver(Arc>); + +impl Receiver { + /// Returns a value until the sender is dropped. + pub async fn recv(&self) -> Option { + loop { + { + let mut queue = self.0.queue.lock().unwrap(); + if let Some(value) = queue.pop_front() { + if queue.capacity() - queue.len() > 32 { + let capacity = queue.len().next_power_of_two(); + queue.shrink_to(capacity); + } + drop(queue); + return Some(value); + } + } + + if self.0.closed.load(Ordering::SeqCst) { + return None; + } + + self.0.notify.notified().await; + } + } + + pub fn try_recv(&self) -> Option { + self.0.queue.lock().unwrap().pop_front() + } +} diff --git a/src/lib.rs b/src/lib.rs index b4c85c7..9214a7f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use app::{App, Flags}; pub mod app; mod archive; +pub mod channel; pub mod clipboard; mod context_action; use config::Config; @@ -136,7 +137,7 @@ pub fn main() -> Result<(), Box> { .event_format(log_format); tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::from_env("RUST_LOG")) + .with(tracing_subscriber::EnvFilter::from_default_env()) .with(log_layer) .init(); diff --git a/src/mounter/gvfs.rs b/src/mounter/gvfs.rs index 8ac42dc..d6294b6 100644 --- a/src/mounter/gvfs.rs +++ b/src/mounter/gvfs.rs @@ -5,7 +5,7 @@ use cosmic::{ }; use gio::{glib, prelude::*}; use std::{any::TypeId, cell::Cell, future::pending, hash::Hash, path::PathBuf, sync::Arc}; -use tokio::sync::{Mutex, mpsc}; +use tokio::sync::mpsc; use super::{Mounter, MounterAuth, MounterItem, MounterItems, MounterMessage}; use crate::{ @@ -230,7 +230,10 @@ fn dir_info(uri: &str) -> Result<(String, String, Option), glib::Error> Ok((resolved_uri, info.display_name().into(), file.path())) } -fn mount_op(uri: String, event_tx: mpsc::UnboundedSender) -> gio::MountOperation { +fn mount_op( + uri: String, + event_tx: std::sync::Weak>, +) -> gio::MountOperation { let mount_op = gio::MountOperation::new(); mount_op.connect_ask_password( move |mount_op, message, default_user, default_domain, flags| { @@ -253,9 +256,9 @@ fn mount_op(uri: String, event_tx: mpsc::UnboundedSender) -> gio::MountOp .then_some(false), }; let (auth_tx, mut auth_rx) = mpsc::channel(1); - event_tx - .send(Event::NetworkAuth(uri.clone(), auth, auth_tx)) - .unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::NetworkAuth(uri.clone(), auth, auth_tx)); + } //TODO: async recv? if let Some(auth) = auth_rx.blocking_recv() { if auth.anonymous_opt == Some(true) { @@ -358,37 +361,45 @@ impl Item { pub struct Gvfs { command_tx: mpsc::UnboundedSender, - event_rx: Arc>>, + event_rx: Arc>, } impl Gvfs { pub fn new() -> Self { //TODO: switch to using gvfs-zbus which will better integrate with async rust let (command_tx, mut command_rx) = mpsc::unbounded_channel(); - let (event_tx, event_rx) = mpsc::unbounded_channel(); + let (event_tx, event_rx) = crate::channel::channel(); + let event_tx = Arc::new(event_tx); std::thread::spawn(move || { let main_loop = glib::MainLoop::new(None, false); main_loop.context().spawn_local(async move { + let event_tx = Arc::downgrade(&event_tx); let monitor = gio::VolumeMonitor::get(); { let event_tx = event_tx.clone(); monitor.connect_mount_changed(move |_monitor, mount| { log::info!("mount changed {}", MountExt::name(mount)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } { let event_tx = event_tx.clone(); monitor.connect_mount_added(move |_monitor, mount| { log::info!("mount added {}", MountExt::name(mount)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } { let event_tx = event_tx.clone(); monitor.connect_mount_removed(move |_monitor, mount| { log::info!("mount removed {}", MountExt::name(mount)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } @@ -396,21 +407,27 @@ impl Gvfs { let event_tx = event_tx.clone(); monitor.connect_volume_changed(move |_monitor, volume| { log::info!("volume changed {}", VolumeExt::name(volume)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } { let event_tx = event_tx.clone(); monitor.connect_volume_added(move |_monitor, volume| { log::info!("volume added {}", VolumeExt::name(volume)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } { let event_tx = event_tx.clone(); monitor.connect_volume_removed(move |_monitor, volume| { log::info!("volume removed {}", VolumeExt::name(volume)); - event_tx.send(Event::Changed).unwrap(); + if let Some(event_tx) = event_tx.upgrade() { + event_tx.send(Event::Changed); + } }); } @@ -420,7 +437,11 @@ impl Gvfs { items_tx.send(items(&monitor, sizes)).await.unwrap(); } Cmd::Rescan => { - event_tx.send(Event::Items(items(&monitor, IconSizes::default()))).unwrap(); + let Some(event_tx) = event_tx.upgrade() else { + return; + }; + + event_tx.send(Event::Items(items(&monitor, IconSizes::default()))); } Cmd::Mount(mounter_item, complete_tx) => { let MounterItem::Gvfs(ref item) = mounter_item else { @@ -472,6 +493,9 @@ impl Gvfs { .ok().map(|info| info.boolean(gio::FILE_ATTRIBUTE_FILESYSTEM_REMOTE)) .unwrap_or(true); } + let Some(event_tx) = event_tx.upgrade() else { + return; + }; event_tx.send(Event::MountResult(updated_item, match res { Ok(()) => { _ = complete_tx.send(Ok(())); @@ -483,7 +507,7 @@ impl Gvfs { Some(gio::IOErrorEnum::FailedHandled) => Ok(false), _ => Err(format!("{err}")) }} - })).unwrap(); + })); }, ); break; @@ -499,6 +523,9 @@ impl Gvfs { gio::Cancellable::NONE, move |res| { log::info!("network drive {uri}: result {res:?}"); + let Some(event_tx) = event_tx.upgrade() else { + return; + }; event_tx.send(Event::NetworkResult(uri, match res { Ok(()) => { _ = result_tx.send(Ok(())); @@ -509,7 +536,7 @@ impl Gvfs { Some(gio::IOErrorEnum::FailedHandled) => Ok(false), _ => Err(format!("{err}")) }} - })).unwrap(); + })); } ); } @@ -533,6 +560,9 @@ impl Gvfs { // FIXME sometimes a uri can be mounted and then not recognized as mounted... // seems to be related to uri with a path items_tx.blocking_send(network_scan(&uri, sizes)).unwrap(); + let Some(event_tx) = event_tx.upgrade() else { + return; + }; event_tx.send(Event::NetworkResult(resolved_uri, match res { Ok(()) => { Ok(true) @@ -541,7 +571,7 @@ impl Gvfs { Some(gio::IOErrorEnum::FailedHandled) => Ok(false), _ => Err(format!("{err}")) } - })).unwrap(); + })); } ); } else { @@ -597,7 +627,7 @@ impl Gvfs { }); Self { command_tx, - event_rx: Arc::new(Mutex::new(event_rx)), + event_rx: Arc::new(event_rx), } } } @@ -671,7 +701,7 @@ impl Mounter for Gvfs { let event_rx = self.event_rx.clone(); struct Wrapper { command_tx: mpsc::UnboundedSender, - event_rx: Arc>>, + event_rx: Arc>, } impl Hash for Wrapper { fn hash(&self, state: &mut H) { @@ -695,7 +725,7 @@ impl Mounter for Gvfs { MounterMessage, >| async move { command_tx.send(Cmd::Rescan).unwrap(); - while let Some(event) = event_rx.lock().await.recv().await { + while let Some(event) = event_rx.recv().await { match event { Event::Changed => command_tx.send(Cmd::Rescan).unwrap(), Event::Items(items) => { diff --git a/src/tab.rs b/src/tab.rs index 1e63588..a0f95a0 100644 --- a/src/tab.rs +++ b/src/tab.rs @@ -6955,9 +6955,8 @@ impl Tab { .await .unwrap(); - let output = Arc::new(tokio::sync::Mutex::new(output)); + let (watch_tx, mut watch_rx) = tokio::sync::watch::channel(true); { - let output = output.clone(); tokio::task::spawn_blocking(move || { scan_search( &search_location, @@ -6985,14 +6984,7 @@ impl Tab { true } else { // Wake up update method - futures::executor::block_on(async { - output - .lock() - .await - .send(Message::SearchReady(false)) - .await - }) - .is_ok() + watch_tx.send(false).is_ok() } } Err(_) => false, @@ -7005,13 +6997,16 @@ impl Tab { search_location, start.elapsed(), ); - }) - .await - .unwrap(); + }); + } + + while watch_rx.changed().await.is_ok() { + let is_ready = *watch_rx.borrow_and_update(); + let _ = output.send(Message::SearchReady(is_ready)).await; } // Send final ready - let _ = output.lock().await.send(Message::SearchReady(true)).await; + let _ = output.send(Message::SearchReady(true)).await; std::future::pending().await }, From 0bd20e57e7d9023f4372f5fb65ac77710b2fdae3 Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Tue, 14 Apr 2026 16:51:21 +0200 Subject: [PATCH 08/12] refactor: use select macro for gio copy futures --- src/operation/recursive.rs | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/src/operation/recursive.rs b/src/operation/recursive.rs index befe69c..bbe89d1 100644 --- a/src/operation/recursive.rs +++ b/src/operation/recursive.rs @@ -9,7 +9,6 @@ use compio::buf::{IntoInner, IoBuf}; use compio::driver::{ToSharedFd, op::AsyncifyFd}; use compio::io::{AsyncReadAt, AsyncWriteAt}; use cosmic::iced::futures; -use futures::future::Either; use futures::{FutureExt, StreamExt}; use gio::prelude::FileExtManual; use std::future::Future; @@ -608,10 +607,11 @@ impl Op { glib::Priority::LOW, ); - let mut copy_fut = - gio_copy_fut.map(|result| result.map_err(GioCopyError::GLib)); + let mut copy_fut = gio_copy_fut + .map(|result| result.map_err(GioCopyError::GLib)) + .fuse(); - let mut progress_fut = std::pin::pin!(async { + let progress_fut = std::pin::pin!(async { while let Some((current_bytes, _)) = progress_stream.next().await { _ = progress_tx.send(current_bytes); } @@ -620,31 +620,23 @@ impl Op { futures::future::pending::<()>().await; }); - // Poll the progress future while waiting for the copy future to finish. - let mut main_fut = std::pin::pin!(async { - loop { - if let Either::Right((result, _)) = - futures::future::select(&mut progress_fut, &mut copy_fut).await - { - return result; - } - } - }); - + let mut progress_fut = progress_fut.fuse(); let mut pause_rx2 = pause_rx.clone(); loop { let until_paused = std::pin::pin!(pause_rx.wait_for(|paused| *paused)); - match futures::future::select(until_paused, &mut main_fut).await { - Either::Left(_) => { - _ = pause_rx2.wait_for(|paused| !*paused).await; - } + futures::select! { + _ = &mut progress_fut => {}, - Either::Right((result, _)) => { + result = &mut copy_fut => { _ = tx.send(result.map(|_| ())); glib_loop2.quit(); return; } + + _ = until_paused.fuse() => { + _ = pause_rx2.wait_for(|paused| !*paused).await; + } } } }); @@ -659,7 +651,6 @@ impl Op { loop { let until_paused = std::pin::pin!(ctx.controller.until_paused()); - futures::select! { value = progress_rx.recv().fuse() => { if let Some(current_bytes) = value { @@ -668,9 +659,6 @@ impl Op { if current.duration_since(last_progress_update).as_millis() > 49 { last_progress_update = current; (ctx.on_progress)(self, &progress); - - tracing::info!("checking controller"); - // Also check if the progress was cancelled. if let Err(state) = ctx.controller.check().await { tracing::warn!( From 15e40461e545a70871e21bcd1e698748abd523ed Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Fri, 10 Apr 2026 06:08:21 +0200 Subject: [PATCH 09/12] fix: do not generate thumbnail if file is being written --- src/operation/mod.rs | 3 ++ src/operation/notifiers.rs | 58 ++++++++++++++++++++++++++++++++++++++ src/operation/recursive.rs | 2 ++ src/tab.rs | 4 +++ 4 files changed, 67 insertions(+) create mode 100644 src/operation/notifiers.rs diff --git a/src/operation/mod.rs b/src/operation/mod.rs index da4a83b..1daed41 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -22,6 +22,9 @@ use zip::AesMode::Aes256; pub use self::controller::{Controller, ControllerState}; pub mod controller; +pub use notifiers::*; +mod notifiers; + pub use self::reader::OpReader; pub mod reader; diff --git a/src/operation/notifiers.rs b/src/operation/notifiers.rs new file mode 100644 index 0000000..0e27ce9 --- /dev/null +++ b/src/operation/notifiers.rs @@ -0,0 +1,58 @@ +// Copyright 2026 System76 +// SPDX-License-Identifier: GPL-3.0-only + +use std::path::{Path, PathBuf}; +use std::sync::{Arc, LazyLock, Mutex}; +use tokio::sync::Notify; + +/// Monitor files which are being written to. +pub struct FileWritingNotifier { + data: Vec, + notify: Arc, +} + +static ACTIVELY_WRITING: LazyLock> = LazyLock::new(|| { + Mutex::new(FileWritingNotifier { + data: Vec::new(), + notify: Arc::new(Notify::new()), + }) +}); + +/// Append path that is being written to. +pub fn actively_writing_add(path: PathBuf) { + ACTIVELY_WRITING.lock().unwrap().data.push(path); +} + +/// Remove path to file that has finished writing and notify waiters. +pub fn actively_writing_remove(path: &Path) { + let mut guard = ACTIVELY_WRITING.lock().unwrap(); + guard.data.retain(|p| p != path); + guard.notify.notify_waiters(); +} + +/// Wait until the actively-writing queue is empty or a file has been removed. +pub async fn actively_writing_tick() { + let notify = (|| { + let guard = ACTIVELY_WRITING.lock().unwrap(); + + if !guard.data.is_empty() { + return Some(guard.notify.clone()); + } + + None + })(); + + if let Some(notify) = notify { + notify.notified().await + } +} + +/// Check if a file is being written to. Avoid thumbnail generation until after it is finished. +pub fn is_actively_writing_to(path: &Path) -> bool { + ACTIVELY_WRITING + .lock() + .unwrap() + .data + .iter() + .any(|p| p == path) +} diff --git a/src/operation/recursive.rs b/src/operation/recursive.rs index bbe89d1..a38ea74 100644 --- a/src/operation/recursive.rs +++ b/src/operation/recursive.rs @@ -334,12 +334,14 @@ impl Op { } match self.kind { OpKind::Copy => { + crate::operation::actively_writing_add(self.to.clone()); let result = self.copy(ctx, progress).await; if result.is_err() { _ = compio::fs::remove_file(&self.to).await; } + crate::operation::actively_writing_remove(&self.to); return result; } OpKind::Move { cross_device_copy } => { diff --git a/src/tab.rs b/src/tab.rs index a0f95a0..9d9d2a2 100644 --- a/src/tab.rs +++ b/src/tab.rs @@ -6771,6 +6771,10 @@ impl Tab { stream::channel( 1, move |mut output: futures::channel::mpsc::Sender<_>| async move { + while crate::operation::is_actively_writing_to(&path) { + crate::operation::actively_writing_tick().await; + } + let message = { let path = path.clone(); From e21989aaa31a830005bb912e275b30815d5639ac Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Fri, 10 Apr 2026 06:09:51 +0200 Subject: [PATCH 10/12] chore: use compio::fs::rename for quick renames --- src/operation/mod.rs | 64 +++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/src/operation/mod.rs b/src/operation/mod.rs index 1daed41..f9fae0d 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -114,7 +114,7 @@ async fn copy_or_move( ); // Handle duplicate file names by renaming paths - let mut from_to_pairs: Vec<(PathBuf, PathBuf)> = paths + let from_to_pairs_iter = paths .into_iter() .zip(std::iter::repeat(to.as_path())) .filter_map(|(from, to)| { @@ -132,36 +132,46 @@ async fn copy_or_move( //TODO: how to handle from missing file name? None } - }) - .collect(); + }); // Attempt quick and simple renames //TODO: allow rename to be used for directories in recursive context? - if matches!(method, Method::Move { .. }) { - from_to_pairs.retain(|(from, to)| { - //TODO: show replace dialog here? - if to.exists() { - return true; - } - //TODO: use compio::fs::rename? - match fs::rename(from, to) { - Ok(()) => { - log::info!("renamed {} to {}", from.display(), to.display()); - false + let from_to_pairs: Vec<(PathBuf, PathBuf)> = if matches!(method, Method::Move { .. }) { + from_to_pairs_iter + .map(|(from, to)| async move { + //TODO: show replace dialog here? + if to.exists() { + return Some((from, to)); } - Err(err) => { - log::info!( - "failed to rename {} to {}, fallback to recursive move: {}", - from.display(), - to.display(), - err - ); - true + + match compio::fs::rename(&from, &to).await { + Ok(()) => { + log::info!("renamed {} to {}", from.display(), to.display()); + None + } + Err(err) => { + log::info!( + "failed to rename {} to {}, fallback to recursive move: {}", + from.display(), + to.display(), + err + ); + Some((from, to)) + } } - } - }); - } + }) + .collect::>() + .fold(Vec::new(), |mut pairs, pair| async move { + if let Some(pair) = pair { + pairs.push(pair); + } + pairs + }) + .await + } else { + from_to_pairs_iter.collect() + }; let mut context = Context::new(controller.clone()); @@ -219,7 +229,7 @@ pub async fn sync_to_disk( } })) .buffer_unordered(32) - .collect::>() + .collect::<()>() .await; // Sync directories to disk @@ -229,7 +239,7 @@ pub async fn sync_to_disk( } })) .buffer_unordered(16) - .collect::>() + .collect::<()>() .await; } From 1c4ab758143244c4b7b0c612831fb5346b03d291 Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Tue, 14 Apr 2026 17:15:19 +0200 Subject: [PATCH 11/12] fix: gate gio file copy fallback with gvfs feature --- src/operation/recursive.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/operation/recursive.rs b/src/operation/recursive.rs index a38ea74..9a33ebd 100644 --- a/src/operation/recursive.rs +++ b/src/operation/recursive.rs @@ -10,7 +10,6 @@ use compio::driver::{ToSharedFd, op::AsyncifyFd}; use compio::io::{AsyncReadAt, AsyncWriteAt}; use cosmic::iced::futures; use futures::{FutureExt, StreamExt}; -use gio::prelude::FileExtManual; use std::future::Future; use std::pin::Pin; use std::rc::Rc; @@ -18,10 +17,14 @@ use std::time::Instant; use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf}; use walkdir::WalkDir; +#[cfg(feature = "gvfs")] +use gio::prelude::FileExtManual; + #[derive(thiserror::Error, Debug)] pub enum GioCopyError { #[error("controller state")] Controller(OperationError), + #[cfg(feature = "gvfs")] #[error("gio copy failed")] GLib(#[from] glib::Error), } @@ -508,6 +511,7 @@ impl Op { let buf_out = buf_out_slice.into_inner(); if let Err(why) = result { + #[cfg(feature = "gvfs")] if let std::io::ErrorKind::Unsupported = why.kind() { ctx.buf = buf_out; _ = futures::future::join(from_file.close(), to_file.close()).await; @@ -583,6 +587,7 @@ impl Op { /// Fallback mechanism in the event that unsupported I/O error errors occur. /// Fixes unsupported errors when copying large files over MTP. /// TODO: Find what Gio.File does to work around this. + #[cfg(feature = "gvfs")] async fn gio_file_copy( &self, ctx: &mut Context, From 908f30a571b496a3658ad60a1c7f6a10e92fde50 Mon Sep 17 00:00:00 2001 From: Michael Aaron Murphy Date: Tue, 14 Apr 2026 17:47:50 +0200 Subject: [PATCH 12/12] example(dialog): switch logger to fix build --- examples/dialog.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/examples/dialog.rs b/examples/dialog.rs index d1d5515..be93576 100644 --- a/examples/dialog.rs +++ b/examples/dialog.rs @@ -9,9 +9,25 @@ use cosmic_files::dialog::{ Dialog, DialogChoice, DialogChoiceOption, DialogFilter, DialogFilterPattern, DialogKind, DialogMessage, DialogResult, DialogSettings, }; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; fn main() -> Result<(), Box> { - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init(); + let log_format = tracing_subscriber::fmt::format() + .pretty() + .without_time() + .with_line_number(true) + .with_file(true) + .with_target(false) + .with_thread_names(true); + + let log_layer = tracing_subscriber::fmt::Layer::default() + .with_writer(std::io::stderr) + .event_format(log_format); + + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::from_env("RUST_LOG")) + .with(log_layer) + .init(); let settings = Settings::default(); app::run::(settings, ())?;