perf(copy): async batch file flushes
Instead of calling `sync_all()` on every file individually during the Copy operation, the flushing is now batched and done at the end. Flushing now also happens for Move.
This commit is contained in:
parent
34a33df5fc
commit
1963e58560
1 changed files with 32 additions and 3 deletions
|
|
@ -1,6 +1,7 @@
|
||||||
use compio::BufResult;
|
use compio::BufResult;
|
||||||
use compio::buf::{IntoInner, IoBuf};
|
use compio::buf::{IntoInner, IoBuf};
|
||||||
use compio::io::{AsyncReadAt, AsyncWriteAt};
|
use compio::io::{AsyncReadAt, AsyncWriteAt};
|
||||||
|
use futures::{StreamExt, stream};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
@ -57,6 +58,8 @@ impl Context {
|
||||||
) -> Result<bool, OperationError> {
|
) -> Result<bool, OperationError> {
|
||||||
let mut ops = Vec::new();
|
let mut ops = Vec::new();
|
||||||
let mut cleanup_ops = Vec::new();
|
let mut cleanup_ops = Vec::new();
|
||||||
|
let mut written_files = Vec::new();
|
||||||
|
let mut target_dirs = std::collections::HashSet::new();
|
||||||
for (from_parent, to_parent) in from_to_pairs {
|
for (from_parent, to_parent) in from_to_pairs {
|
||||||
self.controller
|
self.controller
|
||||||
.check()
|
.check()
|
||||||
|
|
@ -141,6 +144,9 @@ impl Context {
|
||||||
cleanup_ops.push(cleanup_op);
|
cleanup_ops.push(cleanup_op);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if let Some(parent) = op.to.parent() {
|
||||||
|
target_dirs.insert(parent.to_path_buf());
|
||||||
|
}
|
||||||
ops.push(op);
|
ops.push(op);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -177,10 +183,19 @@ impl Context {
|
||||||
&self.controller,
|
&self.controller,
|
||||||
)
|
)
|
||||||
})? {
|
})? {
|
||||||
|
if matches!(
|
||||||
|
op.kind,
|
||||||
|
OpKind::Copy
|
||||||
|
| OpKind::Move {
|
||||||
|
cross_device_copy: true
|
||||||
|
}
|
||||||
|
) {
|
||||||
|
written_files.push(op.to.clone());
|
||||||
|
}
|
||||||
// The from path is ignored in the operation selection if it is a top level item
|
// The from path is ignored in the operation selection if it is a top level item
|
||||||
if self.op_sel.ignored.contains(&op.from) {
|
if self.op_sel.ignored.contains(&op.from) {
|
||||||
// So add the to path to the selection
|
// So add the to path to the selection
|
||||||
self.op_sel.selected.push(op.to.clone());
|
self.op_sel.selected.push(op.to);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Cancelled
|
// Cancelled
|
||||||
|
|
@ -188,6 +203,22 @@ impl Context {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync files to disk
|
||||||
|
let file_stream = stream::iter(written_files.into_iter().map(|path| async move {
|
||||||
|
if let Ok(file) = compio::fs::OpenOptions::new().write(true).open(&path).await {
|
||||||
|
let _ = file.sync_all().await;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
file_stream.buffer_unordered(32).collect::<Vec<_>>().await;
|
||||||
|
|
||||||
|
// Sync directories to disk
|
||||||
|
let dir_stream = stream::iter(target_dirs.into_iter().map(|path| async move {
|
||||||
|
if let Ok(dir) = compio::fs::OpenOptions::new().read(true).open(&path).await {
|
||||||
|
let _ = dir.sync_all().await;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
dir_stream.buffer_unordered(16).collect::<Vec<_>>().await;
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -411,8 +442,6 @@ impl Op {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
to_file.sync_all().await?;
|
|
||||||
}
|
}
|
||||||
OpKind::Move { cross_device_copy } => {
|
OpKind::Move { cross_device_copy } => {
|
||||||
// Remove `to` if overwriting and it is an existing file
|
// Remove `to` if overwriting and it is an existing file
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue