feat: use io_uring / IOCP when available for async file IO (#911)

Spawns a single thread for handling async file IO on the [compio runtime](https://github.com/compio-rs/compio).
It is a completion-based IO runtime that can dynamically select a polling mechanism at runtime. It defaults to
io_uring on Linux, IOCP on Windows, and the polling crate everywhere else. On Linux systems where io_uring is
unavailable or disabled, it will fall back to the polling crate.

This eliminates most of the threads that were needed previously. It significantly reduced the amount of memory
needed in the recursive Context to get a good transfer rate for each copy operation—from a 4 MB buffer to 128 KB.
Copies on a nvme drive are somewhat faster with the async IO changes, and use less CPU than before.

Although it uses a single thread for non-blocking tasks, it still manages to 100% max out my nvme drive's
activity for the whole duration of multiple long transfers. But it would be possible to enable compio's
dispatcher to spread operations across worker threads if necessary.

All but the extract and compress operations were updated to be async. I had to switch the `CondVar` in the
`Controller` to a `tokio::sync::Notify` to prevent the IO thread from being put to sleep when an operation is paused.
Fixed a deadlock in the `operation_copy` test function that was performing an operation without concurrently pulling
from the channel in the operation. Reduced the rate that `Message::None` is sent from a subscription to trigger a UI
redraw, and fixed it to not run when operations are paused.
This commit is contained in:
Michael Murphy 2025-04-09 23:15:07 +02:00 committed by GitHub
parent 7a657c646b
commit 79aa8f887a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 924 additions and 400 deletions

View file

@ -1,12 +1,9 @@
use std::{
cell::Cell,
error::Error,
fs,
io::{Read, Write},
ops::ControlFlow,
path::PathBuf,
rc::Rc,
};
use compio::io::{AsyncReadAt, AsyncWriteAt};
use compio::BufResult;
use std::future::Future;
use std::pin::Pin;
use std::time::Instant;
use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf, rc::Rc};
use walkdir::WalkDir;
use super::{copy_unique_path, Controller, OperationSelection, ReplaceResult};
@ -15,7 +12,7 @@ pub struct Context {
buf: Vec<u8>,
controller: Controller,
on_progress: Box<dyn OnProgress>,
on_replace: Box<dyn OnReplace>,
on_replace: Pin<Box<dyn OnReplace>>,
pub(crate) op_sel: OperationSelection,
replace_result_opt: Option<ReplaceResult>,
}
@ -23,22 +20,29 @@ pub struct Context {
pub trait OnProgress: Fn(&Op, &Progress) + 'static {}
impl<F> OnProgress for F where F: Fn(&Op, &Progress) + 'static {}
pub trait OnReplace: Fn(&Op) -> ReplaceResult + 'static {}
impl<F> OnReplace for F where F: Fn(&Op) -> ReplaceResult + 'static {}
pub trait OnReplace:
for<'a> Fn(&'a Op) -> Pin<Box<dyn Future<Output = ReplaceResult> + 'a>> + 'static
{
}
impl<F> OnReplace for F where
F: for<'a> Fn(&'a Op) -> Pin<Box<dyn Future<Output = ReplaceResult> + 'a>> + 'static
{
}
impl Context {
pub fn new(controller: Controller) -> Self {
Self {
buf: vec![0; 4 * 1024 * 1024],
// 128K is the optimal upper size of a buffer.
buf: vec![0u8; 128 * 1024],
controller,
on_progress: Box::new(|_op, _progress| {}),
on_replace: Box::new(|_op| ReplaceResult::Cancel),
on_replace: Box::pin(|_op| Box::pin(async { ReplaceResult::Cancel })),
op_sel: OperationSelection::default(),
replace_result_opt: None,
}
}
pub fn recursive_copy_or_move(
pub async fn recursive_copy_or_move(
&mut self,
from_to_pairs: Vec<(PathBuf, PathBuf)>,
moving: bool,
@ -46,7 +50,7 @@ impl Context {
let mut ops = Vec::new();
let mut cleanup_ops = Vec::new();
for (from_parent, to_parent) in from_to_pairs {
self.controller.check()?;
self.controller.check().await?;
if from_parent == to_parent {
// Skip matching source and destination
@ -54,7 +58,7 @@ impl Context {
}
for entry in WalkDir::new(&from_parent).into_iter() {
self.controller.check()?;
self.controller.check().await?;
let entry = entry.map_err(|err| {
format!("failed to walk directory {:?}: {}", from_parent, err)
@ -114,7 +118,7 @@ impl Context {
let total_ops = ops.len();
for (current_ops, mut op) in ops.into_iter().enumerate() {
self.controller.check()?;
self.controller.check().await?;
let progress = Progress {
current_ops,
@ -123,7 +127,7 @@ impl Context {
total_bytes: None,
};
(self.on_progress)(&op, &progress);
if op.run(self, progress).map_err(|err| {
if op.run(self, progress).await.map_err(|err| {
format!(
"failed to {:?} {:?} to {:?}: {}",
op.kind, op.from, op.to, err
@ -148,21 +152,23 @@ impl Context {
self
}
pub fn on_replace<F: OnReplace>(mut self, f: F) -> Self {
self.on_replace = Box::new(f);
pub fn on_replace(mut self, f: impl OnReplace + 'static) -> Self {
self.on_replace = Box::pin(f);
self
}
fn replace(&mut self, op: &Op) -> Result<ControlFlow<bool, PathBuf>, Box<dyn Error>> {
let replace_result = self
.replace_result_opt
.unwrap_or_else(|| (self.on_replace)(op));
async fn replace(&mut self, op: &Op) -> Result<ControlFlow<bool, PathBuf>, Box<dyn Error>> {
let replace_result = match self.replace_result_opt {
Some(result) => result,
None => (self.on_replace)(op).await,
};
match replace_result {
ReplaceResult::Replace(apply_to_all) => {
if apply_to_all {
self.replace_result_opt = Some(replace_result);
}
fs::remove_file(&op.to)?;
compio::fs::remove_file(&op.to).await?;
Ok(ControlFlow::Continue(op.to.clone()))
}
ReplaceResult::KeepBoth => match op.to.parent() {
@ -223,17 +229,19 @@ impl Op {
})
}
fn run(&mut self, ctx: &mut Context, mut progress: Progress) -> Result<bool, Box<dyn Error>> {
async fn run(
&mut self,
ctx: &mut Context,
mut progress: Progress,
) -> Result<bool, Box<dyn Error>> {
if self.skipped.get() {
return Ok(true);
}
match self.kind {
OpKind::Copy => {
let mut from_file = fs::OpenOptions::new().read(true).open(&self.from)?;
let metadata = from_file.metadata()?;
// Remove `to` if overwriting and it is an existing file
if self.to.is_file() {
match ctx.replace(self)? {
match ctx.replace(self).await? {
ControlFlow::Continue(to) => {
self.to = to;
}
@ -242,31 +250,83 @@ impl Op {
}
}
}
let (from_file, metadata, mut to_file) = futures::try_join!(
async {
compio::fs::OpenOptions::new()
.read(true)
.open(&self.from)
.await
},
compio::fs::metadata(&self.from),
// This is atomic and ensures `to` is not created by any other process
async {
compio::fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&self.to)
.await
}
)?;
progress.total_bytes = Some(metadata.len());
(ctx.on_progress)(self, &progress);
// This is atomic and ensures `to` is not created by any other process
let mut to_file = fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&self.to)?;
to_file.set_permissions(metadata.permissions())?;
loop {
ctx.controller.check()?;
to_file.set_permissions(metadata.permissions()).await?;
let count = from_file.read(&mut ctx.buf)?;
if count == 0 {
break;
// Prevent spamming the progress callbacks.
let mut last_progress_update = Instant::now();
// io_uring/IOCP requires transferring ownership of the buffer to the kernel.
let mut buf_in = std::mem::take(&mut ctx.buf);
// Track where the current read/write position is at.
let mut pos = 0;
loop {
let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await;
let count = match result {
Ok(0) => {
ctx.buf = buf_out;
break;
}
Ok(count) => count,
Err(why) => {
ctx.buf = buf_out;
return Err(why.into());
}
};
let BufResult(result, buf_out) = to_file.write_at(buf_out, pos).await;
if let Err(why) = result {
ctx.buf = buf_out;
return Err(why.into());
}
to_file.write_all(&ctx.buf[..count])?;
progress.current_bytes += count as u64;
(ctx.on_progress)(self, &progress);
pos += count as u64;
// Avoid spamming progress messages too early.
let current = Instant::now();
if current.duration_since(last_progress_update).as_millis() > 49 {
last_progress_update = current;
(ctx.on_progress)(self, &progress);
// Also check if the progress was cancelled.
if let Err(why) = ctx.controller.check().await {
ctx.buf = buf_out;
return Err(why.into());
}
}
buf_in = buf_out;
}
to_file.sync_all()?;
to_file.sync_all().await?;
}
OpKind::Move => {
// Remove `to` if overwriting and it is an existing file
if self.to.is_file() {
match ctx.replace(self)? {
match ctx.replace(self).await? {
ControlFlow::Continue(to) => {
self.to = to;
}
@ -276,11 +336,16 @@ impl Op {
}
}
// This is atomic and ensures `to` is not created by any other process
match fs::hard_link(&self.from, &self.to) {
match compio::fs::hard_link(&self.from, &self.to).await {
Ok(()) => {}
Err(err) => {
//TODO: what is the error code on Windows?
if err.raw_os_error() == Some(libc::EXDEV) {
// https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_NOT_SAME_DEVICE.html
#[cfg(windows)]
const EXDEV: i32 = 17;
#[cfg(unix)]
const EXDEV: i32 = libc::EXDEV as _;
if err.raw_os_error() == Some(EXDEV) {
// Try standard copy if hard link fails with cross device error
let mut copy_op = Op {
kind: OpKind::Copy,
@ -288,7 +353,7 @@ impl Op {
to: self.to.clone(),
skipped: self.skipped.clone(),
};
return copy_op.run(ctx, progress);
return Box::pin(copy_op.run(ctx, progress)).await;
} else {
return Err(err.into());
}
@ -296,18 +361,18 @@ impl Op {
}
}
OpKind::Mkdir => {
fs::create_dir_all(&self.to)?;
compio::fs::create_dir_all(&self.to).await?;
}
OpKind::Remove => {
fs::remove_file(&self.from)?;
compio::fs::remove_file(&self.from).await?;
}
OpKind::Rmdir => {
fs::remove_dir(&self.from)?;
compio::fs::remove_dir(&self.from).await?;
}
OpKind::Symlink { ref target } => {
// Remove `to` if overwriting and it is an existing file
if self.to.is_file() {
match ctx.replace(self)? {
match ctx.replace(self).await? {
ControlFlow::Continue(to) => {
self.to = to;
}