fix: use gio file copy fallback to fix MTP unsupported I/O errors
This commit is contained in:
parent
b299f1a172
commit
e2bdcf8da4
2 changed files with 311 additions and 131 deletions
|
|
@ -63,6 +63,7 @@ num_cpus = "1.17.0"
|
||||||
filetime = "0.2"
|
filetime = "0.2"
|
||||||
tracing = "0.1.44"
|
tracing = "0.1.44"
|
||||||
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
|
||||||
|
thiserror = "2.0.18"
|
||||||
|
|
||||||
# Completion-based IO runtime to enable io_uring / IOCP file IO support.
|
# Completion-based IO runtime to enable io_uring / IOCP file IO support.
|
||||||
[dependencies.compio]
|
[dependencies.compio]
|
||||||
|
|
|
||||||
|
|
@ -1,15 +1,31 @@
|
||||||
use compio::BufResult;
|
// Copyright 2023 System76 <info@system76.com>
|
||||||
use compio::buf::{IntoInner, IoBuf};
|
// SPDX-License-Identifier: GPL-3.0-only
|
||||||
use compio::io::{AsyncReadAt, AsyncWriteAt};
|
|
||||||
use std::future::Future;
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::time::Instant;
|
|
||||||
use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf, rc::Rc};
|
|
||||||
use walkdir::WalkDir;
|
|
||||||
|
|
||||||
use crate::operation::{OperationError, sync_to_disk};
|
|
||||||
|
|
||||||
use super::{Controller, OperationSelection, ReplaceResult, copy_unique_path};
|
use super::{Controller, OperationSelection, ReplaceResult, copy_unique_path};
|
||||||
|
use crate::operation::{OperationError, sync_to_disk};
|
||||||
|
use anyhow::Context as AnyhowContext;
|
||||||
|
use compio::BufResult;
|
||||||
|
use compio::buf::{IntoInner, IoBuf};
|
||||||
|
use compio::driver::{ToSharedFd, op::AsyncifyFd};
|
||||||
|
use compio::io::{AsyncReadAt, AsyncWriteAt};
|
||||||
|
use cosmic::iced::futures;
|
||||||
|
use futures::future::Either;
|
||||||
|
use futures::{FutureExt, StreamExt};
|
||||||
|
use gio::prelude::FileExtManual;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::rc::Rc;
|
||||||
|
use std::time::Instant;
|
||||||
|
use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf};
|
||||||
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
|
#[derive(thiserror::Error, Debug)]
|
||||||
|
pub enum GioCopyError {
|
||||||
|
#[error("controller state")]
|
||||||
|
Controller(OperationError),
|
||||||
|
#[error("gio copy failed")]
|
||||||
|
GLib(#[from] glib::Error),
|
||||||
|
}
|
||||||
|
|
||||||
pub enum Method {
|
pub enum Method {
|
||||||
Copy,
|
Copy,
|
||||||
|
|
@ -313,136 +329,26 @@ impl Op {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(
|
async fn run(&mut self, ctx: &mut Context, progress: Progress) -> Result<bool, Box<dyn Error>> {
|
||||||
&mut self,
|
|
||||||
ctx: &mut Context,
|
|
||||||
mut progress: Progress,
|
|
||||||
) -> Result<bool, Box<dyn Error>> {
|
|
||||||
if self.skipped.normal.get() || (self.is_cleanup && self.skipped.cleanup.get()) {
|
if self.skipped.normal.get() || (self.is_cleanup && self.skipped.cleanup.get()) {
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
match self.kind {
|
match self.kind {
|
||||||
OpKind::Copy => {
|
OpKind::Copy => {
|
||||||
// Remove `to` if overwriting and it is an existing file
|
let result = self.copy(ctx, progress).await;
|
||||||
if self.to.is_file() {
|
|
||||||
match ctx.replace(self).await? {
|
if result.is_err() {
|
||||||
ControlFlow::Continue(to) => {
|
_ = compio::fs::remove_file(&self.to).await;
|
||||||
self.to = to;
|
|
||||||
}
|
|
||||||
ControlFlow::Break(ret) => {
|
|
||||||
return Ok(ret);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let (from_file, metadata, mut to_file) = cosmic::iced::futures::try_join!(
|
return result;
|
||||||
async {
|
|
||||||
compio::fs::OpenOptions::new()
|
|
||||||
.read(true)
|
|
||||||
.open(&self.from)
|
|
||||||
.await
|
|
||||||
},
|
|
||||||
compio::fs::metadata(&self.from),
|
|
||||||
// This is atomic and ensures `to` is not created by any other process
|
|
||||||
async {
|
|
||||||
compio::fs::OpenOptions::new()
|
|
||||||
.create_new(true)
|
|
||||||
.write(true)
|
|
||||||
.open(&self.to)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
)?;
|
|
||||||
|
|
||||||
progress.total_bytes = Some(metadata.len());
|
|
||||||
(ctx.on_progress)(self, &progress);
|
|
||||||
if let Err(err) = to_file.set_permissions(metadata.permissions()).await {
|
|
||||||
// This error is not propagated upwards as some filesystems do not support setting permissions
|
|
||||||
log::warn!(
|
|
||||||
"failed to set permissions for {}: {}",
|
|
||||||
self.to.display(),
|
|
||||||
err
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prevent spamming the progress callbacks.
|
|
||||||
let mut last_progress_update = Instant::now();
|
|
||||||
// io_uring/IOCP requires transferring ownership of the buffer to the kernel.
|
|
||||||
let mut buf_in = std::mem::take(&mut ctx.buf);
|
|
||||||
// Track where the current read/write position is at.
|
|
||||||
let mut pos = 0;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await;
|
|
||||||
|
|
||||||
let count = match result {
|
|
||||||
Ok(0) => {
|
|
||||||
ctx.buf = buf_out;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Ok(count) => count,
|
|
||||||
Err(why) => {
|
|
||||||
ctx.buf = buf_out;
|
|
||||||
return Err(why.into());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let BufResult(result, buf_out_slice) =
|
|
||||||
to_file.write_at(buf_out.slice(..count), pos).await;
|
|
||||||
let buf_out = buf_out_slice.into_inner();
|
|
||||||
|
|
||||||
if let Err(why) = result {
|
|
||||||
ctx.buf = buf_out;
|
|
||||||
return Err(why.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
progress.current_bytes += count as u64;
|
|
||||||
pos += count as u64;
|
|
||||||
|
|
||||||
// Avoid spamming progress messages too early.
|
|
||||||
let current = Instant::now();
|
|
||||||
if current.duration_since(last_progress_update).as_millis() > 49 {
|
|
||||||
last_progress_update = current;
|
|
||||||
(ctx.on_progress)(self, &progress);
|
|
||||||
|
|
||||||
// Also check if the progress was cancelled.
|
|
||||||
if let Err(state) = ctx.controller.check().await {
|
|
||||||
ctx.buf = buf_out;
|
|
||||||
return Err(OperationError::from_state(state, &ctx.controller).into());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
buf_in = buf_out;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut times = fs::FileTimes::new();
|
|
||||||
{
|
|
||||||
use std::os::unix::prelude::MetadataExt;
|
|
||||||
log::info!("{}", metadata.mtime());
|
|
||||||
}
|
|
||||||
if let Ok(time) = metadata.modified() {
|
|
||||||
times = times.set_modified(time);
|
|
||||||
}
|
|
||||||
if let Ok(time) = metadata.accessed() {
|
|
||||||
times = times.set_accessed(time);
|
|
||||||
}
|
|
||||||
//TODO: upstream set_times implementation to compio?
|
|
||||||
{
|
|
||||||
use compio::driver::{ToSharedFd, op::AsyncifyFd};
|
|
||||||
let op =
|
|
||||||
AsyncifyFd::new(to_file.to_shared_fd(), move |file: &std::fs::File| {
|
|
||||||
BufResult(file.set_times(times).map(|_| 0), ())
|
|
||||||
});
|
|
||||||
match compio::runtime::submit(op).await.0.map(|_| ()) {
|
|
||||||
Ok(()) => {
|
|
||||||
log::info!("set times for {} to {:?}", self.to.display(), times);
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("failed to set times for {}: {}", self.to.display(), err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
OpKind::Move { cross_device_copy } => {
|
OpKind::Move { cross_device_copy } => {
|
||||||
|
// Do not clean up if cross_device_copy is set
|
||||||
|
if cross_device_copy {
|
||||||
|
self.skipped.cleanup.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
// Remove `to` if overwriting and it is an existing file
|
// Remove `to` if overwriting and it is an existing file
|
||||||
if self.to.is_file() {
|
if self.to.is_file() {
|
||||||
match ctx.replace(self).await? {
|
match ctx.replace(self).await? {
|
||||||
|
|
@ -520,4 +426,277 @@ impl Op {
|
||||||
}
|
}
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn copy(
|
||||||
|
&mut self,
|
||||||
|
ctx: &mut Context,
|
||||||
|
mut progress: Progress,
|
||||||
|
) -> Result<bool, Box<dyn Error>> {
|
||||||
|
// Remove `to` if overwriting and it is an existing file
|
||||||
|
if self.to.is_file() {
|
||||||
|
match ctx.replace(self).await? {
|
||||||
|
ControlFlow::Continue(to) => {
|
||||||
|
self.to = to;
|
||||||
|
}
|
||||||
|
ControlFlow::Break(ret) => {
|
||||||
|
return Ok(ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (from_file, metadata, to_file) = cosmic::iced::futures::join!(
|
||||||
|
async {
|
||||||
|
compio::fs::OpenOptions::new()
|
||||||
|
.read(true)
|
||||||
|
.open(&self.from)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("failed to open {} for reading", self.from.display(),))
|
||||||
|
},
|
||||||
|
async { compio::fs::metadata(&self.from).await.ok() },
|
||||||
|
// This is atomic and ensures `to` is not created by any other process
|
||||||
|
async {
|
||||||
|
compio::fs::OpenOptions::new()
|
||||||
|
.create_new(true)
|
||||||
|
.write(true)
|
||||||
|
.open(&self.to)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("failed to open {} for writing", self.to.display()))
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
let from_file = from_file?;
|
||||||
|
let mut to_file = to_file?;
|
||||||
|
progress.total_bytes = metadata.as_ref().map(|m| m.len());
|
||||||
|
(ctx.on_progress)(self, &progress);
|
||||||
|
|
||||||
|
if let Some(metadata) = metadata.as_ref() {
|
||||||
|
if let Err(why) = to_file.set_permissions(metadata.permissions()).await {
|
||||||
|
// This error is not propagated upwards as some filesystems do not support setting permissions
|
||||||
|
if !matches!(why.kind(), std::io::ErrorKind::Unsupported) {
|
||||||
|
tracing::warn!(?why, "failed to set permissions for {}", self.to.display(),);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prevent spamming the progress callbacks.
|
||||||
|
let mut last_progress_update = Instant::now();
|
||||||
|
// io_uring/IOCP requires transferring ownership of the buffer to the kernel.
|
||||||
|
let mut buf_in = std::mem::take(&mut ctx.buf);
|
||||||
|
// Track where the current read/write position is at.
|
||||||
|
let mut pos = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await;
|
||||||
|
|
||||||
|
let count = match result {
|
||||||
|
Ok(0) => {
|
||||||
|
buf_in = buf_out;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(count) => count,
|
||||||
|
Err(why) => {
|
||||||
|
ctx.buf = buf_out;
|
||||||
|
tracing::error!("failed to read: {:?}", why);
|
||||||
|
_ = futures::future::join(from_file.close(), to_file.close()).await;
|
||||||
|
return Err(why).context("failed to read")?;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let BufResult(result, buf_out_slice) =
|
||||||
|
to_file.write_at(buf_out.slice(..count), pos).await;
|
||||||
|
let buf_out = buf_out_slice.into_inner();
|
||||||
|
|
||||||
|
if let Err(why) = result {
|
||||||
|
if let std::io::ErrorKind::Unsupported = why.kind() {
|
||||||
|
ctx.buf = buf_out;
|
||||||
|
_ = futures::future::join(from_file.close(), to_file.close()).await;
|
||||||
|
return self
|
||||||
|
.gio_file_copy(ctx, progress)
|
||||||
|
.await
|
||||||
|
.map(|_| true)
|
||||||
|
.map_err(Into::into);
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::error!("failed to write: {:?}", why);
|
||||||
|
ctx.buf = buf_out;
|
||||||
|
_ = futures::future::join(from_file.close(), to_file.close()).await;
|
||||||
|
return Err(why).context("failed to write")?;
|
||||||
|
}
|
||||||
|
|
||||||
|
progress.current_bytes += count as u64;
|
||||||
|
pos += count as u64;
|
||||||
|
|
||||||
|
// Avoid spamming progress messages too early.
|
||||||
|
let current = Instant::now();
|
||||||
|
if current.duration_since(last_progress_update).as_millis() > 49 {
|
||||||
|
last_progress_update = current;
|
||||||
|
(ctx.on_progress)(self, &progress);
|
||||||
|
|
||||||
|
// Also check if the progress was cancelled.
|
||||||
|
if let Err(state) = ctx.controller.check().await {
|
||||||
|
ctx.buf = buf_out;
|
||||||
|
tracing::warn!(
|
||||||
|
"operation to copy from {:?} to {:?} cancelled",
|
||||||
|
self.from,
|
||||||
|
self.to
|
||||||
|
);
|
||||||
|
_ = futures::future::join(from_file.close(), to_file.close()).await;
|
||||||
|
return Err(OperationError::from_state(state, &ctx.controller).into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buf_in = buf_out;
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.buf = buf_in;
|
||||||
|
|
||||||
|
if let Some(metadata) = metadata.as_ref() {
|
||||||
|
let mut times = fs::FileTimes::new();
|
||||||
|
if let Ok(time) = metadata.modified() {
|
||||||
|
times = times.set_modified(time);
|
||||||
|
}
|
||||||
|
if let Ok(time) = metadata.accessed() {
|
||||||
|
times = times.set_accessed(time);
|
||||||
|
}
|
||||||
|
//TODO: upstream set_times implementation to compio?
|
||||||
|
let op = AsyncifyFd::new(to_file.to_shared_fd(), move |file: &std::fs::File| {
|
||||||
|
BufResult(file.set_times(times).map(|_| 0), ())
|
||||||
|
});
|
||||||
|
match compio::runtime::submit(op).await.0.map(|_| ()) {
|
||||||
|
Ok(()) => {
|
||||||
|
tracing::info!("set times for {} to {:?}", self.to.display(), times);
|
||||||
|
}
|
||||||
|
Err(why) => {
|
||||||
|
if !matches!(why.kind(), std::io::ErrorKind::Unsupported) {
|
||||||
|
tracing::error!(?why, "failed to set times for {}", self.to.display());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = to_file.close().await;
|
||||||
|
|
||||||
|
Ok(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fallback mechanism in the event that unsupported I/O error errors occur.
|
||||||
|
/// Fixes unsupported errors when copying large files over MTP.
|
||||||
|
/// TODO: Find what Gio.File does to work around this.
|
||||||
|
async fn gio_file_copy(
|
||||||
|
&self,
|
||||||
|
ctx: &mut Context,
|
||||||
|
mut progress: Progress,
|
||||||
|
) -> Result<(), GioCopyError> {
|
||||||
|
_ = compio::fs::remove_file(&self.to).await;
|
||||||
|
|
||||||
|
let from = gio::File::for_path(&self.from);
|
||||||
|
let to = gio::File::for_path(&self.to);
|
||||||
|
let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
let (pause_tx, mut pause_rx) = tokio::sync::watch::channel(false);
|
||||||
|
|
||||||
|
let task = compio::runtime::spawn_blocking(move || {
|
||||||
|
let glib_context = glib::MainContext::new();
|
||||||
|
let glib_loop = glib::MainLoop::new(Some(&glib_context), false);
|
||||||
|
glib_context.with_thread_default(move || {
|
||||||
|
let glib_loop2 = glib_loop.clone();
|
||||||
|
glib::MainContext::ref_thread_default().spawn_local(async move {
|
||||||
|
// Create a future for copying the file with `gio::File`. This also creates a progress stream.
|
||||||
|
let (gio_copy_fut, mut progress_stream) = from.copy_future(
|
||||||
|
&to,
|
||||||
|
gio::FileCopyFlags::OVERWRITE | gio::FileCopyFlags::ALL_METADATA,
|
||||||
|
glib::Priority::LOW,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut copy_fut =
|
||||||
|
gio_copy_fut.map(|result| result.map_err(GioCopyError::GLib));
|
||||||
|
|
||||||
|
let mut progress_fut = std::pin::pin!(async {
|
||||||
|
while let Some((current_bytes, _)) = progress_stream.next().await {
|
||||||
|
_ = progress_tx.send(current_bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(progress_tx);
|
||||||
|
futures::future::pending::<()>().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Poll the progress future while waiting for the copy future to finish.
|
||||||
|
let mut main_fut = std::pin::pin!(async {
|
||||||
|
loop {
|
||||||
|
if let Either::Right((result, _)) =
|
||||||
|
futures::future::select(&mut progress_fut, &mut copy_fut).await
|
||||||
|
{
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut pause_rx2 = pause_rx.clone();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let until_paused = std::pin::pin!(pause_rx.wait_for(|paused| *paused));
|
||||||
|
match futures::future::select(until_paused, &mut main_fut).await {
|
||||||
|
Either::Left(_) => {
|
||||||
|
_ = pause_rx2.wait_for(|paused| !*paused).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Either::Right((result, _)) => {
|
||||||
|
_ = tx.send(result.map(|_| ()));
|
||||||
|
glib_loop2.quit();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
glib_loop.run();
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut last_progress_update = Instant::now();
|
||||||
|
let mut task = task.fuse();
|
||||||
|
let mut rx = rx.fuse();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let until_paused = std::pin::pin!(ctx.controller.until_paused());
|
||||||
|
|
||||||
|
futures::select! {
|
||||||
|
value = progress_rx.recv().fuse() => {
|
||||||
|
if let Some(current_bytes) = value {
|
||||||
|
progress.current_bytes = current_bytes as u64;
|
||||||
|
let current = Instant::now();
|
||||||
|
if current.duration_since(last_progress_update).as_millis() > 49 {
|
||||||
|
last_progress_update = current;
|
||||||
|
(ctx.on_progress)(self, &progress);
|
||||||
|
|
||||||
|
tracing::info!("checking controller");
|
||||||
|
|
||||||
|
// Also check if the progress was cancelled.
|
||||||
|
if let Err(state) = ctx.controller.check().await {
|
||||||
|
tracing::warn!(
|
||||||
|
"operation to copy from {:?} to {:?} cancelled",
|
||||||
|
self.from,
|
||||||
|
self.to
|
||||||
|
);
|
||||||
|
return Err::<(), GioCopyError>(GioCopyError::Controller(
|
||||||
|
OperationError::from_state(state, &ctx.controller),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = rx => return result.unwrap(),
|
||||||
|
|
||||||
|
_ = task => (),
|
||||||
|
|
||||||
|
_ = until_paused.fuse() => {
|
||||||
|
// Pauses an active copy while the controller state is paused.
|
||||||
|
_ = pause_tx.send(true);
|
||||||
|
ctx.controller.until_unpaused().await;
|
||||||
|
_ = pause_tx.send(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue