diff --git a/Cargo.toml b/Cargo.toml index 1b90a4a..9447640 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ num_cpus = "1.17.0" filetime = "0.2" tracing = "0.1.44" tracing-subscriber = { version = "0.3.22", features = ["env-filter"] } +thiserror = "2.0.18" # Completion-based IO runtime to enable io_uring / IOCP file IO support. [dependencies.compio] diff --git a/src/operation/recursive.rs b/src/operation/recursive.rs index 61c4841..befe69c 100644 --- a/src/operation/recursive.rs +++ b/src/operation/recursive.rs @@ -1,15 +1,31 @@ -use compio::BufResult; -use compio::buf::{IntoInner, IoBuf}; -use compio::io::{AsyncReadAt, AsyncWriteAt}; -use std::future::Future; -use std::pin::Pin; -use std::time::Instant; -use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf, rc::Rc}; -use walkdir::WalkDir; - -use crate::operation::{OperationError, sync_to_disk}; +// Copyright 2023 System76 +// SPDX-License-Identifier: GPL-3.0-only use super::{Controller, OperationSelection, ReplaceResult, copy_unique_path}; +use crate::operation::{OperationError, sync_to_disk}; +use anyhow::Context as AnyhowContext; +use compio::BufResult; +use compio::buf::{IntoInner, IoBuf}; +use compio::driver::{ToSharedFd, op::AsyncifyFd}; +use compio::io::{AsyncReadAt, AsyncWriteAt}; +use cosmic::iced::futures; +use futures::future::Either; +use futures::{FutureExt, StreamExt}; +use gio::prelude::FileExtManual; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::time::Instant; +use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf}; +use walkdir::WalkDir; + +#[derive(thiserror::Error, Debug)] +pub enum GioCopyError { + #[error("controller state")] + Controller(OperationError), + #[error("gio copy failed")] + GLib(#[from] glib::Error), +} pub enum Method { Copy, @@ -313,136 +329,26 @@ impl Op { }) } - async fn run( - &mut self, - ctx: &mut Context, - mut progress: Progress, - ) -> Result> { + async fn run(&mut self, ctx: &mut Context, progress: Progress) -> Result> { if self.skipped.normal.get() || (self.is_cleanup && self.skipped.cleanup.get()) { return Ok(true); } match self.kind { OpKind::Copy => { - // Remove `to` if overwriting and it is an existing file - if self.to.is_file() { - match ctx.replace(self).await? { - ControlFlow::Continue(to) => { - self.to = to; - } - ControlFlow::Break(ret) => { - return Ok(ret); - } - } + let result = self.copy(ctx, progress).await; + + if result.is_err() { + _ = compio::fs::remove_file(&self.to).await; } - let (from_file, metadata, mut to_file) = cosmic::iced::futures::try_join!( - async { - compio::fs::OpenOptions::new() - .read(true) - .open(&self.from) - .await - }, - compio::fs::metadata(&self.from), - // This is atomic and ensures `to` is not created by any other process - async { - compio::fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&self.to) - .await - } - )?; - - progress.total_bytes = Some(metadata.len()); - (ctx.on_progress)(self, &progress); - if let Err(err) = to_file.set_permissions(metadata.permissions()).await { - // This error is not propagated upwards as some filesystems do not support setting permissions - log::warn!( - "failed to set permissions for {}: {}", - self.to.display(), - err - ); - } - - // Prevent spamming the progress callbacks. - let mut last_progress_update = Instant::now(); - // io_uring/IOCP requires transferring ownership of the buffer to the kernel. - let mut buf_in = std::mem::take(&mut ctx.buf); - // Track where the current read/write position is at. - let mut pos = 0; - - loop { - let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await; - - let count = match result { - Ok(0) => { - ctx.buf = buf_out; - break; - } - Ok(count) => count, - Err(why) => { - ctx.buf = buf_out; - return Err(why.into()); - } - }; - - let BufResult(result, buf_out_slice) = - to_file.write_at(buf_out.slice(..count), pos).await; - let buf_out = buf_out_slice.into_inner(); - - if let Err(why) = result { - ctx.buf = buf_out; - return Err(why.into()); - } - - progress.current_bytes += count as u64; - pos += count as u64; - - // Avoid spamming progress messages too early. - let current = Instant::now(); - if current.duration_since(last_progress_update).as_millis() > 49 { - last_progress_update = current; - (ctx.on_progress)(self, &progress); - - // Also check if the progress was cancelled. - if let Err(state) = ctx.controller.check().await { - ctx.buf = buf_out; - return Err(OperationError::from_state(state, &ctx.controller).into()); - } - } - - buf_in = buf_out; - } - - let mut times = fs::FileTimes::new(); - { - use std::os::unix::prelude::MetadataExt; - log::info!("{}", metadata.mtime()); - } - if let Ok(time) = metadata.modified() { - times = times.set_modified(time); - } - if let Ok(time) = metadata.accessed() { - times = times.set_accessed(time); - } - //TODO: upstream set_times implementation to compio? - { - use compio::driver::{ToSharedFd, op::AsyncifyFd}; - let op = - AsyncifyFd::new(to_file.to_shared_fd(), move |file: &std::fs::File| { - BufResult(file.set_times(times).map(|_| 0), ()) - }); - match compio::runtime::submit(op).await.0.map(|_| ()) { - Ok(()) => { - log::info!("set times for {} to {:?}", self.to.display(), times); - } - Err(err) => { - log::warn!("failed to set times for {}: {}", self.to.display(), err); - } - } - } + return result; } OpKind::Move { cross_device_copy } => { + // Do not clean up if cross_device_copy is set + if cross_device_copy { + self.skipped.cleanup.set(true); + } + // Remove `to` if overwriting and it is an existing file if self.to.is_file() { match ctx.replace(self).await? { @@ -520,4 +426,277 @@ impl Op { } Ok(true) } + + async fn copy( + &mut self, + ctx: &mut Context, + mut progress: Progress, + ) -> Result> { + // Remove `to` if overwriting and it is an existing file + if self.to.is_file() { + match ctx.replace(self).await? { + ControlFlow::Continue(to) => { + self.to = to; + } + ControlFlow::Break(ret) => { + return Ok(ret); + } + } + } + + let (from_file, metadata, to_file) = cosmic::iced::futures::join!( + async { + compio::fs::OpenOptions::new() + .read(true) + .open(&self.from) + .await + .with_context(|| format!("failed to open {} for reading", self.from.display(),)) + }, + async { compio::fs::metadata(&self.from).await.ok() }, + // This is atomic and ensures `to` is not created by any other process + async { + compio::fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&self.to) + .await + .with_context(|| format!("failed to open {} for writing", self.to.display())) + } + ); + + let from_file = from_file?; + let mut to_file = to_file?; + progress.total_bytes = metadata.as_ref().map(|m| m.len()); + (ctx.on_progress)(self, &progress); + + if let Some(metadata) = metadata.as_ref() { + if let Err(why) = to_file.set_permissions(metadata.permissions()).await { + // This error is not propagated upwards as some filesystems do not support setting permissions + if !matches!(why.kind(), std::io::ErrorKind::Unsupported) { + tracing::warn!(?why, "failed to set permissions for {}", self.to.display(),); + } + } + } + + // Prevent spamming the progress callbacks. + let mut last_progress_update = Instant::now(); + // io_uring/IOCP requires transferring ownership of the buffer to the kernel. + let mut buf_in = std::mem::take(&mut ctx.buf); + // Track where the current read/write position is at. + let mut pos = 0; + + loop { + let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await; + + let count = match result { + Ok(0) => { + buf_in = buf_out; + break; + } + Ok(count) => count, + Err(why) => { + ctx.buf = buf_out; + tracing::error!("failed to read: {:?}", why); + _ = futures::future::join(from_file.close(), to_file.close()).await; + return Err(why).context("failed to read")?; + } + }; + + let BufResult(result, buf_out_slice) = + to_file.write_at(buf_out.slice(..count), pos).await; + let buf_out = buf_out_slice.into_inner(); + + if let Err(why) = result { + if let std::io::ErrorKind::Unsupported = why.kind() { + ctx.buf = buf_out; + _ = futures::future::join(from_file.close(), to_file.close()).await; + return self + .gio_file_copy(ctx, progress) + .await + .map(|_| true) + .map_err(Into::into); + } + + tracing::error!("failed to write: {:?}", why); + ctx.buf = buf_out; + _ = futures::future::join(from_file.close(), to_file.close()).await; + return Err(why).context("failed to write")?; + } + + progress.current_bytes += count as u64; + pos += count as u64; + + // Avoid spamming progress messages too early. + let current = Instant::now(); + if current.duration_since(last_progress_update).as_millis() > 49 { + last_progress_update = current; + (ctx.on_progress)(self, &progress); + + // Also check if the progress was cancelled. + if let Err(state) = ctx.controller.check().await { + ctx.buf = buf_out; + tracing::warn!( + "operation to copy from {:?} to {:?} cancelled", + self.from, + self.to + ); + _ = futures::future::join(from_file.close(), to_file.close()).await; + return Err(OperationError::from_state(state, &ctx.controller).into()); + } + } + + buf_in = buf_out; + } + + ctx.buf = buf_in; + + if let Some(metadata) = metadata.as_ref() { + let mut times = fs::FileTimes::new(); + if let Ok(time) = metadata.modified() { + times = times.set_modified(time); + } + if let Ok(time) = metadata.accessed() { + times = times.set_accessed(time); + } + //TODO: upstream set_times implementation to compio? + let op = AsyncifyFd::new(to_file.to_shared_fd(), move |file: &std::fs::File| { + BufResult(file.set_times(times).map(|_| 0), ()) + }); + match compio::runtime::submit(op).await.0.map(|_| ()) { + Ok(()) => { + tracing::info!("set times for {} to {:?}", self.to.display(), times); + } + Err(why) => { + if !matches!(why.kind(), std::io::ErrorKind::Unsupported) { + tracing::error!(?why, "failed to set times for {}", self.to.display()); + } + } + } + } + + _ = to_file.close().await; + + Ok(true) + } + + /// Fallback mechanism in the event that unsupported I/O error errors occur. + /// Fixes unsupported errors when copying large files over MTP. + /// TODO: Find what Gio.File does to work around this. + async fn gio_file_copy( + &self, + ctx: &mut Context, + mut progress: Progress, + ) -> Result<(), GioCopyError> { + _ = compio::fs::remove_file(&self.to).await; + + let from = gio::File::for_path(&self.from); + let to = gio::File::for_path(&self.to); + let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = tokio::sync::oneshot::channel(); + let (pause_tx, mut pause_rx) = tokio::sync::watch::channel(false); + + let task = compio::runtime::spawn_blocking(move || { + let glib_context = glib::MainContext::new(); + let glib_loop = glib::MainLoop::new(Some(&glib_context), false); + glib_context.with_thread_default(move || { + let glib_loop2 = glib_loop.clone(); + glib::MainContext::ref_thread_default().spawn_local(async move { + // Create a future for copying the file with `gio::File`. This also creates a progress stream. + let (gio_copy_fut, mut progress_stream) = from.copy_future( + &to, + gio::FileCopyFlags::OVERWRITE | gio::FileCopyFlags::ALL_METADATA, + glib::Priority::LOW, + ); + + let mut copy_fut = + gio_copy_fut.map(|result| result.map_err(GioCopyError::GLib)); + + let mut progress_fut = std::pin::pin!(async { + while let Some((current_bytes, _)) = progress_stream.next().await { + _ = progress_tx.send(current_bytes); + } + + drop(progress_tx); + futures::future::pending::<()>().await; + }); + + // Poll the progress future while waiting for the copy future to finish. + let mut main_fut = std::pin::pin!(async { + loop { + if let Either::Right((result, _)) = + futures::future::select(&mut progress_fut, &mut copy_fut).await + { + return result; + } + } + }); + + let mut pause_rx2 = pause_rx.clone(); + + loop { + let until_paused = std::pin::pin!(pause_rx.wait_for(|paused| *paused)); + match futures::future::select(until_paused, &mut main_fut).await { + Either::Left(_) => { + _ = pause_rx2.wait_for(|paused| !*paused).await; + } + + Either::Right((result, _)) => { + _ = tx.send(result.map(|_| ())); + glib_loop2.quit(); + return; + } + } + } + }); + + glib_loop.run(); + }) + }); + + let mut last_progress_update = Instant::now(); + let mut task = task.fuse(); + let mut rx = rx.fuse(); + + loop { + let until_paused = std::pin::pin!(ctx.controller.until_paused()); + + futures::select! { + value = progress_rx.recv().fuse() => { + if let Some(current_bytes) = value { + progress.current_bytes = current_bytes as u64; + let current = Instant::now(); + if current.duration_since(last_progress_update).as_millis() > 49 { + last_progress_update = current; + (ctx.on_progress)(self, &progress); + + tracing::info!("checking controller"); + + // Also check if the progress was cancelled. + if let Err(state) = ctx.controller.check().await { + tracing::warn!( + "operation to copy from {:?} to {:?} cancelled", + self.from, + self.to + ); + return Err::<(), GioCopyError>(GioCopyError::Controller( + OperationError::from_state(state, &ctx.controller), + )); + } + } + } + } + + result = rx => return result.unwrap(), + + _ = task => (), + + _ = until_paused.fuse() => { + // Pauses an active copy while the controller state is paused. + _ = pause_tx.send(true); + ctx.controller.until_unpaused().await; + _ = pause_tx.send(false); + } + } + } + } }