diff --git a/Cargo.lock b/Cargo.lock index 3641340..54a30fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "ab_glyph" @@ -152,6 +152,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd" +[[package]] +name = "aligned-array" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c92d086290f52938013f6242ac62bf7d401fab8ad36798a609faa65c3fd2c" +dependencies = [ + "generic-array", +] + [[package]] name = "aligned-vec" version = "0.5.0" @@ -345,7 +354,7 @@ dependencies = [ "enumflags2", "futures-channel", "futures-util", - "rand", + "rand 0.8.5", "serde", "serde_repr", "tokio", @@ -362,7 +371,7 @@ dependencies = [ "enumflags2", "futures-channel", "futures-util", - "rand", + "rand 0.8.5", "serde", "serde_repr", "tokio", @@ -1140,6 +1149,215 @@ dependencies = [ "memchr", ] +[[package]] +name = "compio" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c52183b7eefcaa6441fe810885a34fcdec7378e2883673fc3d74ca6e9ff738b" +dependencies = [ + "compio-buf", + "compio-dispatcher", + "compio-driver", + "compio-fs", + "compio-io", + "compio-log", + "compio-macros", + "compio-net", + "compio-process", + "compio-quic", + "compio-runtime", + "compio-signal", +] + +[[package]] +name = "compio-buf" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d14413106aad7dd931df3c4724110dabd731c81d52ba18edb4f2d57e7beb611b" +dependencies = [ + "arrayvec", + "bytes", + "libc", +] + +[[package]] +name = "compio-dispatcher" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ae8fab55190537c8634232f395302011ce39c18facbd4b85363df41114677ac" +dependencies = [ + "compio-driver", + "compio-runtime", + "flume", + "futures-channel", +] + +[[package]] +name = "compio-driver" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9493b0c88b90d386bb3fd9b5618260d96ba2b09cab5c4a5ba50ec9b77f0711b" +dependencies = [ + "aligned-array", + "cfg-if", + "cfg_aliases 0.2.1", + "compio-buf", + "compio-log", + "crossbeam-channel", + "crossbeam-queue", + "futures-util", + "io-uring", + "libc", + "once_cell", + "paste", + "polling 3.7.4", + "socket2 0.5.9", + "windows-sys 0.52.0", +] + +[[package]] +name = "compio-fs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dee2c5ba7c96f0caf3d62ed745278b26eebd4e9296817c4ef2ad6c359629f8ab" +dependencies = [ + "cfg-if", + "cfg_aliases 0.2.1", + "compio-buf", + "compio-driver", + "compio-io", + "compio-runtime", + "libc", + "os_pipe", + "widestring", + "windows-sys 0.52.0", +] + +[[package]] +name = "compio-io" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c18b1d7d4c058e3e92e9265d59f74981fda2693809b1e45f8ed7717d892c8ac" +dependencies = [ + "compio-buf", + "futures-util", + "paste", +] + +[[package]] +name = "compio-log" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4e560213c1996b618da369b7c9109564b41af9033802ae534465c4ee4e132f" +dependencies = [ + "tracing", +] + +[[package]] +name = "compio-macros" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f05ed201484967dc70de77a8f7a02b29aaa8e6c81cbea2e75492ee0c8d97766b" +dependencies = [ + "proc-macro-crate 3.3.0", + "proc-macro2", + "quote", + "syn 2.0.100", +] + +[[package]] +name = "compio-net" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0882a85c535c7b5d6ea3b9b37cc7421ec3f8ae8b83a09eb53f4295fb87b54995" +dependencies = [ + "cfg-if", + "compio-buf", + "compio-driver", + "compio-io", + "compio-runtime", + "either", + "libc", + "once_cell", + "socket2 0.5.9", + "widestring", + "windows-sys 0.52.0", +] + +[[package]] +name = "compio-process" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dc299e4c0a2cdc4455bb4df86c554845d1abe611a1922e4b12a8af2a0fadc35" +dependencies = [ + "cfg-if", + "compio-buf", + "compio-driver", + "compio-io", + "compio-runtime", + "futures-util", + "windows-sys 0.52.0", +] + +[[package]] +name = "compio-quic" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8853537ade322b0d5dee3dca216465c463f480e530caeabc2b2df15b986068b" +dependencies = [ + "cfg_aliases 0.2.1", + "compio-buf", + "compio-io", + "compio-log", + "compio-net", + "compio-runtime", + "flume", + "futures-util", + "libc", + "quinn-proto", + "rustc-hash 2.1.1", + "rustls", + "thiserror 2.0.12", + "windows-sys 0.52.0", +] + +[[package]] +name = "compio-runtime" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a95ef126945a166879ef37d494015be13a1e4e452419bc4e5c4c5799f441756a" +dependencies = [ + "async-task", + "cfg-if", + "compio-buf", + "compio-driver", + "compio-log", + "crossbeam-queue", + "futures-util", + "libc", + "once_cell", + "scoped-tls", + "slab", + "socket2 0.5.9", + "windows-sys 0.52.0", +] + +[[package]] +name = "compio-signal" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd30ba3a28cd73fa49a6e4f1c31c1ad4742fb33802662aadf9ed188ae8a8f0e4" +dependencies = [ + "compio-buf", + "compio-driver", + "compio-runtime", + "libc", + "once_cell", + "os_pipe", + "slab", + "windows-sys 0.52.0", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1244,6 +1462,7 @@ version = "0.1.0" dependencies = [ "bzip2", "chrono", + "compio", "cosmic-mime-apps", "dirs 6.0.0", "env_logger", @@ -1251,6 +1470,7 @@ dependencies = [ "flate2", "fork", "freedesktop_entry_parser", + "futures", "gio", "glib", "glob", @@ -1448,6 +1668,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2060,6 +2289,18 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2342,8 +2583,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -3365,6 +3608,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "io-uring" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab01638bb6a279897b7691f87f3f3c232451711fd419a69ced980ce61074fa46" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "libc", +] + [[package]] name = "is-docker" version = "0.2.0" @@ -3999,6 +4253,15 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.15", +] + [[package]] name = "ndk" version = "0.9.0" @@ -4547,6 +4810,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "os_pipe" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ffd2b0a5634335b135d5728d84c5e0fd726954b87111f7506a61c502280d982" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "ouroboros" version = "0.18.5" @@ -4710,7 +4983,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -5007,6 +5280,25 @@ dependencies = [ "memchr", ] +[[package]] +name = "quinn-proto" +version = "0.11.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b820744eb4dc9b57a3398183639c511b5a26d2ed702cedd3febaa1393caa22cc" +dependencies = [ + "bytes", + "getrandom 0.3.2", + "rand 0.9.0", + "ring", + "rustc-hash 2.1.1", + "rustls-pki-types", + "slab", + "thiserror 2.0.12", + "tinyvec", + "tracing", + "web-time", +] + [[package]] name = "quote" version = "1.0.40" @@ -5029,8 +5321,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", + "zerocopy 0.8.24", ] [[package]] @@ -5040,7 +5343,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -5052,6 +5365,15 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.2", +] + [[package]] name = "range-alloc" version = "0.1.4" @@ -5090,8 +5412,8 @@ dependencies = [ "once_cell", "paste", "profiling", - "rand", - "rand_chacha", + "rand 0.8.5", + "rand_chacha 0.3.1", "simd_helpers", "system-deps 6.2.2", "thiserror 1.0.69", @@ -5313,6 +5635,20 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.15", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "ron" version = "0.9.0" @@ -5424,6 +5760,39 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "rustls" +version = "0.23.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "822ee9188ac4ec04a2f0531e55d035fb2de73f18b41a63c70c2712503b6fb13c" +dependencies = [ + "once_cell", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pki-types" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +dependencies = [ + "web-time", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.20" @@ -5763,6 +6132,15 @@ dependencies = [ "x11rb", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "spirv" version = "0.3.0+sdk-1.3.268.0" @@ -6478,6 +6856,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7497808a85e03f612f13e9c5061e4c81cdee86e6c00adfa1096690990ccd08e9" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.4" @@ -7525,7 +7909,7 @@ dependencies = [ [[package]] name = "xdg-mime" version = "0.4.0" -source = "git+https://github.com/ellieplayswow/xdg-mime-rs?branch=feature/get-same-as#4f8d07ceedabbe58368a8e7f5547232490860790" +source = "git+https://github.com/ellieplayswow/xdg-mime-rs?branch=feature%2Fget-same-as#4f8d07ceedabbe58368a8e7f5547232490860790" dependencies = [ "dirs-next", "glob", @@ -7645,7 +8029,7 @@ dependencies = [ "nix 0.26.4", "once_cell", "ordered-stream", - "rand", + "rand 0.8.5", "serde", "serde_repr", "sha1", @@ -7684,7 +8068,7 @@ dependencies = [ "hex", "nix 0.29.0", "ordered-stream", - "rand", + "rand 0.8.5", "serde", "serde_repr", "sha1", diff --git a/Cargo.toml b/Cargo.toml index a77b644..668e98a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,17 +4,20 @@ version = "0.1.0" authors = ["Jeremy Soller "] edition = "2021" license = "GPL-3.0-only" -rust-version = "1.71" +rust-version = "1.85.0" [build-dependencies] vergen = { version = "8", features = ["git", "gitcl"] } [dependencies] chrono = { version = "0.4", features = ["unstable-locales"] } +# Completion-based IO runtime to enable io_uring / IOCP file IO support. +compio = { version = "0.14.0", features = ["io", "macros", "runtime"] } cosmic-mime-apps = { git = "https://github.com/pop-os/cosmic-mime-apps.git", optional = true } dirs = "6.0.0" env_logger = "0.11" freedesktop_entry_parser = "1.3" +futures = "0.3.31" gio = { version = "0.20", optional = true } glib = { version = "0.20", optional = true } glob = "0.3" diff --git a/examples/copy.rs b/examples/copy.rs index bc117bc..f0a12be 100644 --- a/examples/copy.rs +++ b/examples/copy.rs @@ -1,36 +1,44 @@ use cosmic_files::operation::{recursive::Context, Controller, ReplaceResult}; use std::{error::Error, io, path::PathBuf}; -fn main() -> Result<(), Box> { +#[compio::main] +async fn main() -> Result<(), Box> { let mut context = Context::new(Controller::default()) .on_progress(|op, progress| { println!("{:?}: {:?}", op.to, progress); }) .on_replace(|op| { - println!("replace {:?}? (y/N)", op.to); - let mut line = String::new(); - match io::stdin().read_line(&mut line) { - Ok(_) => { - if line == "y" { - ReplaceResult::Replace(false) - } else { - ReplaceResult::Skip(false) + Box::pin(async move { + println!("replace {:?}? (y/N)", op.to); + let mut line = String::new(); + match io::stdin().read_line(&mut line) { + Ok(_) => { + if line == "y" { + ReplaceResult::Replace(false) + } else { + ReplaceResult::Skip(false) + } + } + Err(err) => { + eprintln!("failed to read stdin: {}", err); + ReplaceResult::Cancel } } - Err(err) => { - eprintln!("failed to read stdin: {}", err); - ReplaceResult::Cancel - } - } + }) }); - context.recursive_copy_or_move( - vec![(PathBuf::from("test/a"), PathBuf::from("test/b"))], - false, - )?; - context.recursive_copy_or_move( - vec![(PathBuf::from("test/b"), PathBuf::from("test/c"))], - true, - )?; + context + .recursive_copy_or_move( + vec![(PathBuf::from("test/a"), PathBuf::from("test/b"))], + false, + ) + .await?; + context + .recursive_copy_or_move( + vec![(PathBuf::from("test/b"), PathBuf::from("test/c"))], + true, + ) + .await?; + Ok(()) } diff --git a/src/app.rs b/src/app.rs index b96e224..a76ee8f 100644 --- a/src/app.rs +++ b/src/app.rs @@ -51,12 +51,15 @@ use slotmap::Key as SlotMapKey; use std::{ any::TypeId, collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, - env, fmt, fs, io, + env, fmt, fs, + future::Future, + io, num::NonZeroU16, path::{Path, PathBuf}, + pin::Pin, process, sync::{Arc, Mutex}, - time::{self, Instant}, + time::{self, Duration, Instant}, }; use tokio::sync::mpsc; use trash::TrashItem; @@ -539,6 +542,7 @@ pub struct App { config: Config, mode: Mode, app_themes: Vec, + compio_tx: mpsc::Sender + Send>>>, context_page: ContextPage, dialog_pages: VecDeque, dialog_text_input: widget::Id, @@ -837,6 +841,7 @@ impl App { self.margin = overlaps; } + #[must_use] fn open_tab_entity( &mut self, location: Location, @@ -883,14 +888,46 @@ impl App { self.open_tab_entity(location, activate, selection_paths).1 } - fn operation(&mut self, operation: Operation) { + #[must_use] + fn operation(&mut self, operation: Operation) -> Task { let id = self.pending_operation_id; + let controller = Controller::default(); + let compio_tx = self.compio_tx.clone(); + self.pending_operation_id += 1; if operation.show_progress_notification() { self.progress_operations.insert(id); } self.pending_operations - .insert(id, (operation, Controller::default())); + .insert(id, (operation.clone(), controller.clone())); + + // Use a task to send operations to the compio runtime thread. + cosmic::Task::stream(cosmic::iced_futures::stream::channel( + 4, + move |msg_tx| async move { + let (tx, rx) = tokio::sync::oneshot::channel(); + + let msg_tx = Arc::new(tokio::sync::Mutex::new(msg_tx)); + + let msg_tx_clone = msg_tx.clone(); + + _ = compio_tx + .send(Box::pin(async move { + let msg = match operation.perform(&msg_tx_clone, controller).await { + Ok(result_paths) => Message::PendingComplete(id, result_paths), + Err(err) => Message::PendingError(id, err), + }; + + _ = tx.send(msg); + })) + .await; + + if let Ok(msg) = rx.await { + let _ = msg_tx.lock().await.send(msg).await; + } + }, + )) + .map(cosmic::Action::App) } fn remove_window(&mut self, id: &window::Id) { @@ -900,6 +937,7 @@ impl App { } } + #[must_use] fn rescan_operation_selection(&mut self, op_sel: OperationSelection) -> Task { log::info!("rescan_operation_selection {:?}", op_sel); let entity = self.tab_model.active(); @@ -1789,6 +1827,22 @@ impl Application for App { let window_id_opt = core.main_window_id(); + // Create a dedicated thread for the compio runtime to handle operations on. + // Supports io_uring on Linux, IOPC on Windows, and polling everywhere else. + let (compio_tx, mut compio_rx) = mpsc::channel(1); + let tokio_handle = tokio::runtime::Handle::current(); + std::thread::spawn(move || { + let _tokio = tokio_handle.enter(); + compio::runtime::RuntimeBuilder::new() + .build() + .unwrap() + .block_on(async move { + while let Some(task) = compio_rx.recv().await { + _ = compio::runtime::spawn(task).detach(); + } + }) + }); + let mut app = App { core, nav_bar_context_id: segmented_button::Entity::null(), @@ -1798,6 +1852,7 @@ impl Application for App { config: flags.config, mode: flags.mode, app_themes, + compio_tx, context_page: ContextPage::Preview(None, PreviewKind::Selected), dialog_pages: VecDeque::new(), dialog_text_input: widget::Id::unique(), @@ -2209,14 +2264,15 @@ impl Application for App { } } if !trash_items.is_empty() { - self.operation(Operation::DeleteTrash { items: trash_items }); + return self + .operation(Operation::DeleteTrash { items: trash_items }); } } } _ => { let paths = dbg!(self.selected_paths(entity_opt)); if !paths.is_empty() { - self.operation(Operation::Delete { paths }); + return self.operation(Operation::Delete { paths }); } } } @@ -2265,15 +2321,15 @@ impl Application for App { let extension = archive_type.extension(); let name = format!("{}{}", name, extension); let to = to.join(name); - self.operation(Operation::Compress { + return self.operation(Operation::Compress { paths, to, archive_type, password, - }) + }); } DialogPage::EmptyTrash => { - self.operation(Operation::EmptyTrash); + return self.operation(Operation::EmptyTrash); } DialogPage::FailedOperation(id) => { log::warn!("TODO: retry operation {}", id); @@ -2288,7 +2344,7 @@ impl Application for App { }, _ => unreachable!(), }; - self.operation(new_op); + return self.operation(new_op); } DialogPage::MountError { mounter_key, @@ -2326,7 +2382,7 @@ impl Application for App { } DialogPage::NewItem { parent, name, dir } => { let path = parent.join(name); - self.operation(if dir { + return self.operation(if dir { Operation::NewFolder { path } } else { Operation::NewFile { path } @@ -2375,13 +2431,13 @@ impl Application for App { from, parent, name, .. } => { let to = parent.join(name); - self.operation(Operation::Rename { from, to }); + return self.operation(Operation::Rename { from, to }); } DialogPage::Replace { .. } => { log::warn!("replace dialog should be completed with replace result"); } DialogPage::SetExecutableAndLaunch { path } => { - self.operation(Operation::SetExecutableAndLaunch { path }); + return self.operation(Operation::SetExecutableAndLaunch { path }); } DialogPage::FavoritePathError { entity, .. } => { if let Some(FavoriteIndex(favorite_i)) = @@ -2417,7 +2473,7 @@ impl Application for App { .and_then(|first| first.parent()) .map(|parent| parent.to_path_buf()) { - self.operation(Operation::Extract { + return self.operation(Operation::Extract { paths, to: destination, password: None, @@ -2458,7 +2514,8 @@ impl Application for App { } if let Some(archive_paths) = archive_paths { if !selected_paths.is_empty() { - self.operation(Operation::Extract { + self.file_dialog_opt = None; + return self.operation(Operation::Extract { paths: archive_paths, to: selected_paths[0].clone(), password: None, @@ -2912,20 +2969,16 @@ impl Application for App { Message::PasteContents(to, mut contents) => { contents.paths.retain(|p| p != &to); if !contents.paths.is_empty() { - match contents.kind { - ClipboardKind::Copy => { - self.operation(Operation::Copy { - paths: contents.paths, - to, - }); - } - ClipboardKind::Cut => { - self.operation(Operation::Move { - paths: contents.paths, - to, - }); - } - } + return match contents.kind { + ClipboardKind::Copy => self.operation(Operation::Copy { + paths: contents.paths, + to, + }), + ClipboardKind::Cut => self.operation(Operation::Move { + paths: contents.paths, + to, + }), + }; } } Message::PendingCancel(id) => { @@ -3182,7 +3235,7 @@ impl Application for App { } } if !trash_items.is_empty() { - self.operation(Operation::Restore { items: trash_items }); + return self.operation(Operation::Restore { items: trash_items }); } } Message::ScrollTab(scroll_speed) => { @@ -3353,7 +3406,7 @@ impl Application for App { ])); } tab::Command::Delete(paths) => { - self.operation(Operation::Delete { paths }); + commands.push(self.operation(Operation::Delete { paths })) } tab::Command::DropFiles(to, from) => { commands.push(self.update(Message::PasteContents(to, from))); @@ -3497,7 +3550,7 @@ impl Application for App { }); } Message::UndoTrashStart(items) => { - self.operation(Operation::Restore { items }); + return self.operation(Operation::Restore { items }); } Message::WindowClose => { if let Some(window_id) = self.window_id_opt.take() { @@ -3617,8 +3670,7 @@ impl Application for App { }, )), Location::Trash if matches!(action, DndAction::Move) => { - self.operation(Operation::Delete { paths: data.paths }); - Task::none() + self.operation(Operation::Delete { paths: data.paths }) } _ => { log::warn!("Copy to trash is not supported."); @@ -3678,8 +3730,7 @@ impl Application for App { }, )), Location::Trash if matches!(action, DndAction::Move) => { - self.operation(Operation::Delete { paths: data.paths }); - Task::none() + self.operation(Operation::Delete { paths: data.paths }) } _ => { log::warn!("Copy to trash is not supported."); @@ -5250,8 +5301,17 @@ impl Application for App { //TODO: inhibit suspend/shutdown? if self.window_id_opt.is_some() { - // Refresh progress when window is open and operations are in progress - subscriptions.push(window::frames().map(|_| Message::None)); + // Force refresh the UI every 100ms while an operation is active. + if self + .pending_operations + .values() + .any(|(_, controller)| !controller.is_paused()) + { + subscriptions.push( + cosmic::iced::time::every(Duration::from_millis(100)) + .map(|_| Message::None), + ) + } } else { // Handle notification when window is closed and operations are in progress #[cfg(feature = "notify")] @@ -5293,37 +5353,6 @@ impl Application for App { } } - for (id, (pending_operation, controller)) in self.pending_operations.iter() { - //TODO: use recipe? - let id = *id; - let pending_operation = pending_operation.clone(); - let controller = controller.clone(); - subscriptions.push(Subscription::run_with_id( - id, - stream::channel(16, move |msg_tx| async move { - let msg_tx = Arc::new(tokio::sync::Mutex::new(msg_tx)); - match pending_operation.perform(&msg_tx, controller).await { - Ok(result_paths) => { - let _ = msg_tx - .lock() - .await - .send(Message::PendingComplete(id, result_paths)) - .await; - } - Err(err) => { - let _ = msg_tx - .lock() - .await - .send(Message::PendingError(id, err)) - .await; - } - } - - std::future::pending().await - }), - )); - } - let mut selected_preview = None; if self.core.window.show_context { if let ContextPage::Preview(entity_opt, PreviewKind::Selected) = self.context_page { diff --git a/src/operation/controller.rs b/src/operation/controller.rs index 1154b32..e02524e 100644 --- a/src/operation/controller.rs +++ b/src/operation/controller.rs @@ -1,6 +1,7 @@ use crate::fl; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; +use tokio::sync::Notify; #[derive(Clone, Copy, Debug)] pub enum ControllerState { @@ -13,7 +14,7 @@ pub enum ControllerState { struct ControllerInner { state: Mutex, progress: Mutex, - condvar: Condvar, + notify: Notify, } #[derive(Debug)] @@ -29,23 +30,22 @@ impl Default for Controller { inner: Arc::new(ControllerInner { state: Mutex::new(ControllerState::Running), progress: Mutex::new(0.0), - condvar: Condvar::new(), + notify: Notify::new(), }), } } } impl Controller { - pub fn check(&self) -> Result<(), String> { - let mut state = self.inner.state.lock().unwrap(); + pub async fn check(&self) -> Result<(), String> { loop { - match *state { + match self.state() { ControllerState::Cancelled => return Err(fl!("cancelled")), - ControllerState::Paused => { - state = self.inner.condvar.wait(state).unwrap(); - } + ControllerState::Paused => (), ControllerState::Running => return Ok(()), } + + self.inner.notify.notified().await; } } @@ -63,7 +63,7 @@ impl Controller { pub fn set_state(&self, state: ControllerState) { *self.inner.state.lock().unwrap() = state; - self.inner.condvar.notify_all(); + self.inner.notify.notify_waiters(); } pub fn is_cancelled(&self) -> bool { @@ -83,8 +83,9 @@ impl Controller { } pub fn unpause(&self) { - //TODO: ensure this does not override Cancel? - self.set_state(ControllerState::Running); + if !self.is_cancelled() { + self.set_state(ControllerState::Running); + } } } diff --git a/src/operation/mod.rs b/src/operation/mod.rs index cc54544..3941142 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -6,7 +6,7 @@ use crate::{ spawn_detached::spawn_detached, tab, }; -use cosmic::iced::futures::{channel::mpsc::Sender, executor, SinkExt}; +use cosmic::iced::futures::{channel::mpsc::Sender, SinkExt}; use std::collections::VecDeque; use std::fmt::Formatter; use std::{ @@ -30,8 +30,8 @@ pub mod reader; use self::recursive::Context; pub mod recursive; -fn handle_replace( - msg_tx: &Arc>>, +async fn handle_replace( + msg_tx: Arc>>, file_from: PathBuf, file_to: PathBuf, multiple: bool, @@ -52,21 +52,19 @@ fn handle_replace( } }; - executor::block_on(async { - let (tx, mut rx) = mpsc::channel(1); - let _ = msg_tx - .lock() - .await - .send(Message::DialogPush(DialogPage::Replace { - from: item_from, - to: item_to, - multiple, - apply_to_all: false, - tx, - })) - .await; - rx.recv().await.unwrap_or(ReplaceResult::Cancel) - }) + let (tx, mut rx) = mpsc::channel(1); + let _ = msg_tx + .lock() + .await + .send(Message::DialogPush(DialogPage::Replace { + from: item_from, + to: item_to, + multiple, + apply_to_all: false, + tx, + })) + .await; + rx.recv().await.unwrap_or(ReplaceResult::Cancel) } fn get_directory_name(file_name: &str) -> &str { @@ -122,9 +120,12 @@ fn zip_extract>( let mut pending_directory_creates = VecDeque::new(); for i in 0..total_files { - controller - .check() - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + futures::executor::block_on(async { + controller + .check() + .await + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + })?; controller.set_progress((i as f32) / total_files as f32); @@ -210,9 +211,12 @@ fn zip_extract>( let mut outfile = fs::File::create(&outpath)?; let mut current = 0; loop { - controller - .check() - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + futures::executor::block_on(async { + controller + .check() + .await + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + })?; let count = file.read(&mut buffer)?; if count == 0 { @@ -268,7 +272,8 @@ async fn copy_or_move( controller: Controller, ) -> Result { let msg_tx = msg_tx.clone(); - tokio::task::spawn_blocking(move || -> Result { + + compio::runtime::spawn(async move { log::info!( "{} {:?} to {:?}", if moving { "Move" } else { "Copy" }, @@ -319,19 +324,21 @@ async fn copy_or_move( { let msg_tx = msg_tx.clone(); context = context.on_replace(move |op| { - handle_replace(&msg_tx, op.from.clone(), op.to.clone(), true) + let msg_tx = msg_tx.clone(); + Box::pin(handle_replace(msg_tx, op.from.clone(), op.to.clone(), true)) }); } context .recursive_copy_or_move(from_to_pairs, moving) + .await .map_err(OperationError::from_str)?; - Ok(context.op_sel) + Result::::Ok(context.op_sel) }) .await - .map_err(OperationError::from_str)? - //.map_err(OperationError::from_str) + .map_err(wrap_compio_spawn_error)? + .map_err(OperationError::from_str) } fn copy_unique_path(from: &Path, to: &Path) -> PathBuf { @@ -689,14 +696,14 @@ impl Operation { let controller_clone = controller.clone(); //TODO: IF ERROR, RETURN AN Operation THAT CAN UNDO THE CURRENT STATE - let paths = match self { + let paths: Result = match self { Self::Compress { paths, to, archive_type, password, } => { - tokio::task::spawn_blocking( + compio::runtime::spawn_blocking( move || -> Result { let Some(relative_root) = to.parent() else { return Err(OperationError::from_str(format!( @@ -736,7 +743,9 @@ impl Operation { let total_paths = paths.len(); for (i, path) in paths.iter().enumerate() { - controller.check().map_err(OperationError::from_str)?; + futures::executor::block_on(async { + controller.check().await.map_err(OperationError::from_str) + })?; controller.set_progress((i as f32) / total_paths as f32); @@ -762,7 +771,9 @@ impl Operation { let total_paths = paths.len(); let mut buffer = vec![0; 4 * 1024 * 1024]; for (i, path) in paths.iter().enumerate() { - controller.check().map_err(OperationError::from_str)?; + futures::executor::block_on(async { + controller.check().await.map_err(OperationError::from_str) + })?; controller.set_progress((i as f32) / total_paths as f32); @@ -800,9 +811,12 @@ impl Operation { .map_err(OperationError::from_str)?; let mut current = 0; loop { - controller - .check() - .map_err(OperationError::from_str)?; + futures::executor::block_on(async { + controller + .check() + .await + .map_err(OperationError::from_str) + })?; let count = file .read(&mut buffer) @@ -836,20 +850,22 @@ impl Operation { }, ) .await - .map_err(OperationError::from_str)? - //.map_err(|e| e)? + .map_err(wrap_compio_spawn_error)? + .map_err(OperationError::from_str) } Self::Copy { paths, to } => copy_or_move(paths, to, false, msg_tx, controller).await, Self::Delete { paths } => { let total = paths.len(); for (i, path) in paths.into_iter().enumerate() { - controller.check().map_err(OperationError::from_str)?; + futures::executor::block_on(async { + controller.check().await.map_err(OperationError::from_str) + })?; controller.set_progress((i as f32) / (total as f32)); - let _items_opt = tokio::task::spawn_blocking(|| trash::delete(path)) + let _items_opt = compio::runtime::spawn_blocking(|| trash::delete(path)) .await - .map_err(OperationError::from_str)? + .map_err(wrap_compio_spawn_error)? .map_err(OperationError::from_str)?; //TODO: items_opt allows for easy restore } @@ -866,10 +882,12 @@ impl Operation { ) ))] { - tokio::task::spawn_blocking(move || -> Result<(), OperationError> { + compio::runtime::spawn_blocking(move || -> Result<(), OperationError> { let count = items.len(); for (i, item) in items.into_iter().enumerate() { - controller.check().map_err(OperationError::from_str)?; + futures::executor::block_on(async { + controller.check().await.map_err(OperationError::from_str) + })?; controller.set_progress(i as f32 / count as f32); @@ -879,7 +897,8 @@ impl Operation { Ok(()) }) .await - .map_err(OperationError::from_str)??; + .map_err(wrap_compio_spawn_error)? + .map_err(OperationError::from_str)?; } Ok(OperationSelection::default()) } @@ -894,11 +913,13 @@ impl Operation { ) ))] { - tokio::task::spawn_blocking(move || -> Result<(), OperationError> { + compio::runtime::spawn_blocking(move || -> Result<(), OperationError> { let items = trash::os_limited::list().map_err(OperationError::from_str)?; let count = items.len(); for (i, item) in items.into_iter().enumerate() { - controller.check().map_err(OperationError::from_str)?; + futures::executor::block_on(async { + controller.check().await.map_err(OperationError::from_str) + })?; controller.set_progress(i as f32 / count as f32); @@ -908,7 +929,8 @@ impl Operation { Ok(()) }) .await - .map_err(OperationError::from_str)??; + .map_err(wrap_compio_spawn_error)? + .map_err(OperationError::from_str)?; } Ok(OperationSelection::default()) } @@ -916,137 +938,135 @@ impl Operation { paths, to, password, - } => { - tokio::task::spawn_blocking( - move || -> Result { - let total_paths = paths.len(); - let mut op_sel = OperationSelection::default(); - for (i, path) in paths.iter().enumerate() { - controller.check().map_err(OperationError::from_str)?; + } => compio::runtime::spawn_blocking( + move || -> Result { + let total_paths = paths.len(); + let mut op_sel = OperationSelection::default(); + for (i, path) in paths.iter().enumerate() { + futures::executor::block_on(async { + controller.check().await.map_err(OperationError::from_str) + })?; - controller.set_progress((i as f32) / total_paths as f32); + controller.set_progress((i as f32) / total_paths as f32); - if let Some(file_name) = path.file_name().and_then(|f| f.to_str()) { - let dir_name = get_directory_name(file_name); - let mut new_dir = to.join(dir_name); + if let Some(file_name) = path.file_name().and_then(|f| f.to_str()) { + let dir_name = get_directory_name(file_name); + let mut new_dir = to.join(dir_name); - if new_dir.exists() { - if let Some(new_dir_parent) = new_dir.parent() { - new_dir = copy_unique_path(&new_dir, new_dir_parent); - } - } - - op_sel.ignored.push(path.clone()); - op_sel.selected.push(new_dir.clone()); - - let controller = controller.clone(); - let mime = mime_for_path(path); - let password = password.clone(); - match mime.essence_str() { - "application/gzip" | "application/x-compressed-tar" => { - OpReader::new(path, controller) - .map(io::BufReader::new) - .map(flate2::read::GzDecoder::new) - .map(tar::Archive::new) - .and_then(|mut archive| archive.unpack(&new_dir)) - .map_err(OperationError::from_str)? - } - "application/x-tar" => OpReader::new(path, controller) - .map(io::BufReader::new) - .map(tar::Archive::new) - .and_then(|mut archive| archive.unpack(&new_dir)) - .map_err(OperationError::from_str)?, - "application/zip" => fs::File::open(path) - .map(io::BufReader::new) - .map(zip::ZipArchive::new) - .map_err(OperationError::from_str)? - .and_then(move |mut archive| { - zip_extract( - &mut archive, - &new_dir, - controller, - password, - ) - }) - .map_err(|e| match e { - ZipError::UnsupportedArchive( - ZipError::PASSWORD_REQUIRED, - ) - | ZipError::InvalidPassword => OperationError { - kind: OperationErrorType::PasswordRequired, - }, - _ => OperationError::from_str(e), - })?, - #[cfg(feature = "bzip2")] - "application/x-bzip" | "application/x-bzip-compressed-tar" => { - OpReader::new(path, controller) - .map(io::BufReader::new) - .map(bzip2::read::BzDecoder::new) - .map(tar::Archive::new) - .and_then(|mut archive| archive.unpack(&new_dir)) - .map_err(OperationError::from_str)? - } - #[cfg(feature = "xz2")] - "application/x-xz" | "application/x-xz-compressed-tar" => { - OpReader::new(path, controller) - .map(io::BufReader::new) - .map(xz2::read::XzDecoder::new) - .map(tar::Archive::new) - .and_then(|mut archive| archive.unpack(&new_dir)) - .map_err(OperationError::from_str)? - } - _ => Err(OperationError::from_str(format!( - "unsupported mime type {:?}", - mime - )))?, + if new_dir.exists() { + if let Some(new_dir_parent) = new_dir.parent() { + new_dir = copy_unique_path(&new_dir, new_dir_parent); } } - } - Ok(op_sel) - }, - ) - .await - .map_err(OperationError::from_str)? - //.map_err(OperationError::from_str)? - } + op_sel.ignored.push(path.clone()); + op_sel.selected.push(new_dir.clone()); + + let controller = controller.clone(); + let mime = mime_for_path(path); + let password = password.clone(); + match mime.essence_str() { + "application/gzip" | "application/x-compressed-tar" => { + OpReader::new(path, controller) + .map(io::BufReader::new) + .map(flate2::read::GzDecoder::new) + .map(tar::Archive::new) + .and_then(|mut archive| archive.unpack(&new_dir)) + .map_err(OperationError::from_str)? + } + "application/x-tar" => OpReader::new(path, controller) + .map(io::BufReader::new) + .map(tar::Archive::new) + .and_then(|mut archive| archive.unpack(&new_dir)) + .map_err(OperationError::from_str)?, + "application/zip" => fs::File::open(path) + .map(io::BufReader::new) + .map(zip::ZipArchive::new) + .map_err(OperationError::from_str)? + .and_then(move |mut archive| { + zip_extract(&mut archive, &new_dir, controller, password) + }) + .map_err(|e| match e { + ZipError::UnsupportedArchive( + ZipError::PASSWORD_REQUIRED, + ) + | ZipError::InvalidPassword => OperationError { + kind: OperationErrorType::PasswordRequired, + }, + _ => OperationError::from_str(e), + })?, + #[cfg(feature = "bzip2")] + "application/x-bzip" | "application/x-bzip-compressed-tar" => { + OpReader::new(path, controller) + .map(io::BufReader::new) + .map(bzip2::read::BzDecoder::new) + .map(tar::Archive::new) + .and_then(|mut archive| archive.unpack(&new_dir)) + .map_err(OperationError::from_str)? + } + #[cfg(feature = "xz2")] + "application/x-xz" | "application/x-xz-compressed-tar" => { + OpReader::new(path, controller) + .map(io::BufReader::new) + .map(xz2::read::XzDecoder::new) + .map(tar::Archive::new) + .and_then(|mut archive| archive.unpack(&new_dir)) + .map_err(OperationError::from_str)? + } + _ => Err(OperationError::from_str(format!( + "unsupported mime type {:?}", + mime + )))?, + } + } + } + + Ok(op_sel) + }, + ) + .await + .map_err(wrap_compio_spawn_error)? + .map_err(OperationError::from_str), Self::Move { paths, to } => copy_or_move(paths, to, true, msg_tx, controller).await, - Self::NewFolder { path } => tokio::task::spawn_blocking( - move || -> Result { - controller.check().map_err(OperationError::from_str)?; - fs::create_dir(&path).map_err(OperationError::from_str)?; - Ok(OperationSelection { - ignored: Vec::new(), - selected: vec![path], - }) - }, - ) + Self::NewFolder { path } => compio::runtime::spawn(async move { + controller.check().await.map_err(OperationError::from_str)?; + compio::fs::create_dir(&path) + .await + .map_err(OperationError::from_str)?; + Result::<_, OperationError>::Ok(OperationSelection { + ignored: Vec::new(), + selected: vec![path], + }) + }) .await - .map_err(OperationError::from_str)?, - Self::NewFile { path } => tokio::task::spawn_blocking( - move || -> Result { - controller.check().map_err(OperationError::from_str)?; - fs::File::create(&path).map_err(OperationError::from_str)?; - Ok(OperationSelection { - ignored: Vec::new(), - selected: vec![path], - }) - }, - ) + .map_err(wrap_compio_spawn_error)? + .map_err(OperationError::from_str), + Self::NewFile { path } => compio::runtime::spawn(async move { + controller.check().await.map_err(OperationError::from_str)?; + compio::fs::File::create(&path) + .await + .map_err(OperationError::from_str)?; + Result::<_, OperationError>::Ok(OperationSelection { + ignored: Vec::new(), + selected: vec![path], + }) + }) .await - .map_err(OperationError::from_str)?, - Self::Rename { from, to } => tokio::task::spawn_blocking( - move || -> Result { - controller.check().map_err(OperationError::from_str)?; - fs::rename(&from, &to).map_err(OperationError::from_str)?; - Ok(OperationSelection { - ignored: vec![from], - selected: vec![to], - }) - }, - ) + .map_err(wrap_compio_spawn_error)? + .map_err(OperationError::from_str), + Self::Rename { from, to } => compio::runtime::spawn(async move { + controller.check().await.map_err(OperationError::from_str)?; + compio::fs::rename(&from, &to) + .await + .map_err(OperationError::from_str)?; + Result::<_, OperationError>::Ok(OperationSelection { + ignored: vec![from], + selected: vec![to], + }) + }) .await - .map_err(OperationError::from_str)?, + .map_err(wrap_compio_spawn_error)? + .map_err(OperationError::from_str), #[cfg(target_os = "macos")] Self::Restore { .. } => { // TODO: add support for macos @@ -1057,15 +1077,15 @@ impl Operation { let total = items.len(); let mut paths = Vec::with_capacity(total); for (i, item) in items.into_iter().enumerate() { - controller.check().map_err(OperationError::from_str)?; + controller.check().await.map_err(OperationError::from_str)?; controller.set_progress((i as f32) / (total as f32)); paths.push(item.original_path()); - tokio::task::spawn_blocking(|| trash::os_limited::restore_all([item])) + compio::runtime::spawn_blocking(|| trash::os_limited::restore_all([item])) .await - .map_err(OperationError::from_str)? + .map_err(wrap_compio_spawn_error)? .map_err(OperationError::from_str)?; } Ok(OperationSelection { @@ -1074,14 +1094,14 @@ impl Operation { }) } Self::SetExecutableAndLaunch { path } => { - tokio::task::spawn_blocking(move || -> Result<(), OperationError> { + controller.check().await.map_err(OperationError::from_str)?; + + compio::runtime::spawn_blocking(move || -> Result<(), OperationError> { //TODO: what to do on non-Unix systems? #[cfg(unix)] { use std::os::unix::fs::PermissionsExt; - controller.check().map_err(OperationError::from_str)?; - let mut perms = fs::metadata(&path) .map_err(OperationError::from_str)? .permissions(); @@ -1091,16 +1111,14 @@ impl Operation { fs::set_permissions(&path, perms).map_err(OperationError::from_str)?; } - controller.check().map_err(OperationError::from_str)?; - let mut command = std::process::Command::new(path); spawn_detached(&mut command).map_err(OperationError::from_str)?; Ok(()) }) .await - .map_err(OperationError::from_str)? - .map_err(|e| e)?; + .map_err(wrap_compio_spawn_error)? + .map_err(OperationError::from_str)?; Ok(OperationSelection::default()) } }; @@ -1111,6 +1129,16 @@ impl Operation { } } +#[track_caller] +fn wrap_compio_spawn_error(_unwind: Box) -> OperationError { + log::error!( + "compio runtime spawn failed: {}", + std::backtrace::Backtrace::capture() + ); + + OperationError::from_str("compio runtime spawn failed") +} + #[cfg(test)] mod tests { use std::{ @@ -1136,42 +1164,44 @@ mod tests { fl, }; - // Tests hang with lower values - const BUF_SIZE: usize = 8; - /// Simple wrapper around `[Operation::Copy]` pub async fn operation_copy( paths: Vec, to: PathBuf, ) -> Result { let id = fastrand::u64(0..u64::MAX); - let (tx, mut rx) = mpsc::channel(BUF_SIZE); + let (tx, mut rx) = mpsc::channel(1); let paths_clone = paths.clone(); let to_clone = to.clone(); - let handle_copy = tokio::spawn(async move { + + // Wrap this into its own future so that it may be polled concurerntly with the message handler. + let handle_copy = async move { Operation::Copy { paths: paths_clone, to: to_clone, } .perform(&sync::Mutex::new(tx).into(), Controller::default()) .await - }); + }; - while let Some(msg) = rx.next().await { - match msg { - Message::DialogPush(DialogPage::Replace { tx, .. }) => { - debug!("[{id}] Replace request"); - tx.send(ReplaceResult::Cancel).await.expect("Sending a response to a replace request should succeed") + // Concurrently handling messages will prevent the mpsc channel from blocking when full. + let handle_messages = async move { + while let Some(msg) = rx.next().await { + match msg { + Message::DialogPush(DialogPage::Replace { tx, .. }) => { + debug!("[{id}] Replace request"); + tx.send(ReplaceResult::Cancel).await.expect("Sending a response to a replace request should succeed") + } + _ => unreachable!("Only [ `Message::PendingProgress`, `Message::DialogPush(DialogPage::Replace)` ] are sent from operation"), } - _ => unreachable!("Only [ `Message::PendingProgress`, `Message::DialogPush(DialogPage::Replace)` ] are sent from operation"), } - } + }; - handle_copy.await.unwrap() + futures::future::join(handle_messages, handle_copy).await.1 } - #[test(tokio::test)] + #[test(compio::test)] async fn copy_file_to_same_location() -> io::Result<()> { let fs = simple_fs(NUM_FILES, 0, 1, 0, NAME_LEN)?; let path = fs.path(); @@ -1205,7 +1235,7 @@ mod tests { Ok(()) } - #[test(tokio::test)] + #[test(compio::test)] async fn copy_file_with_extension_to_same_loc() -> io::Result<()> { let fs = empty_fs()?; let path = fs.path(); @@ -1225,7 +1255,7 @@ mod tests { Ok(()) } - #[test(tokio::test)] + #[test(compio::test)] async fn copy_dir_to_same_location() -> io::Result<()> { let fs = simple_fs(NUM_FILES, 0, NUM_DIRS, NUM_NESTED, NAME_LEN)?; let path = fs.path(); @@ -1250,7 +1280,7 @@ mod tests { Ok(()) } - #[test(tokio::test)] + #[test(compio::test)] async fn copying_file_multiple_times_to_same_location() -> io::Result<()> { let fs = empty_fs()?; let path = fs.path(); @@ -1275,7 +1305,7 @@ mod tests { Ok(()) } - #[test(tokio::test)] + #[test(compio::test)] async fn copy_to_diff_dir_doesnt_dupe_files() -> io::Result<()> { let fs = simple_fs(NUM_FILES, NUM_HIDDEN, NUM_DIRS, NUM_NESTED, NAME_LEN)?; let path = fs.path(); @@ -1303,9 +1333,11 @@ mod tests { ); operation_copy(vec![first_file.clone()], second_dir.clone()) .await - .expect( - "Copy operation should have been cancelled because we're copying to different directories without replacement", - ); + .expect(concat!( + "Copy operation should have been cancelled ", + "because we're copying to different directories ", + "without replacement" + )); assert!( first_dir.join(base_name).exists(), "First file should still exist" @@ -1318,7 +1350,7 @@ mod tests { Ok(()) } - #[test(tokio::test)] + #[test(compio::test)] async fn copy_file_with_diff_name_to_diff_dir() -> io::Result<()> { let fs = empty_fs()?; let path = fs.path(); diff --git a/src/operation/reader.rs b/src/operation/reader.rs index c059226..a037eba 100644 --- a/src/operation/reader.rs +++ b/src/operation/reader.rs @@ -25,9 +25,12 @@ impl OpReader { impl io::Read for OpReader { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.controller - .check() - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + futures::executor::block_on(async { + self.controller + .check() + .await + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + })?; let count = self.file.read(buf)?; self.current += count as u64; diff --git a/src/operation/recursive.rs b/src/operation/recursive.rs index a8d3c61..eadde52 100644 --- a/src/operation/recursive.rs +++ b/src/operation/recursive.rs @@ -1,12 +1,9 @@ -use std::{ - cell::Cell, - error::Error, - fs, - io::{Read, Write}, - ops::ControlFlow, - path::PathBuf, - rc::Rc, -}; +use compio::io::{AsyncReadAt, AsyncWriteAt}; +use compio::BufResult; +use std::future::Future; +use std::pin::Pin; +use std::time::Instant; +use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf, rc::Rc}; use walkdir::WalkDir; use super::{copy_unique_path, Controller, OperationSelection, ReplaceResult}; @@ -15,7 +12,7 @@ pub struct Context { buf: Vec, controller: Controller, on_progress: Box, - on_replace: Box, + on_replace: Pin>, pub(crate) op_sel: OperationSelection, replace_result_opt: Option, } @@ -23,22 +20,29 @@ pub struct Context { pub trait OnProgress: Fn(&Op, &Progress) + 'static {} impl OnProgress for F where F: Fn(&Op, &Progress) + 'static {} -pub trait OnReplace: Fn(&Op) -> ReplaceResult + 'static {} -impl OnReplace for F where F: Fn(&Op) -> ReplaceResult + 'static {} +pub trait OnReplace: + for<'a> Fn(&'a Op) -> Pin + 'a>> + 'static +{ +} +impl OnReplace for F where + F: for<'a> Fn(&'a Op) -> Pin + 'a>> + 'static +{ +} impl Context { pub fn new(controller: Controller) -> Self { Self { - buf: vec![0; 4 * 1024 * 1024], + // 128K is the optimal upper size of a buffer. + buf: vec![0u8; 128 * 1024], controller, on_progress: Box::new(|_op, _progress| {}), - on_replace: Box::new(|_op| ReplaceResult::Cancel), + on_replace: Box::pin(|_op| Box::pin(async { ReplaceResult::Cancel })), op_sel: OperationSelection::default(), replace_result_opt: None, } } - pub fn recursive_copy_or_move( + pub async fn recursive_copy_or_move( &mut self, from_to_pairs: Vec<(PathBuf, PathBuf)>, moving: bool, @@ -46,7 +50,7 @@ impl Context { let mut ops = Vec::new(); let mut cleanup_ops = Vec::new(); for (from_parent, to_parent) in from_to_pairs { - self.controller.check()?; + self.controller.check().await?; if from_parent == to_parent { // Skip matching source and destination @@ -54,7 +58,7 @@ impl Context { } for entry in WalkDir::new(&from_parent).into_iter() { - self.controller.check()?; + self.controller.check().await?; let entry = entry.map_err(|err| { format!("failed to walk directory {:?}: {}", from_parent, err) @@ -114,7 +118,7 @@ impl Context { let total_ops = ops.len(); for (current_ops, mut op) in ops.into_iter().enumerate() { - self.controller.check()?; + self.controller.check().await?; let progress = Progress { current_ops, @@ -123,7 +127,7 @@ impl Context { total_bytes: None, }; (self.on_progress)(&op, &progress); - if op.run(self, progress).map_err(|err| { + if op.run(self, progress).await.map_err(|err| { format!( "failed to {:?} {:?} to {:?}: {}", op.kind, op.from, op.to, err @@ -148,21 +152,23 @@ impl Context { self } - pub fn on_replace(mut self, f: F) -> Self { - self.on_replace = Box::new(f); + pub fn on_replace(mut self, f: impl OnReplace + 'static) -> Self { + self.on_replace = Box::pin(f); self } - fn replace(&mut self, op: &Op) -> Result, Box> { - let replace_result = self - .replace_result_opt - .unwrap_or_else(|| (self.on_replace)(op)); + async fn replace(&mut self, op: &Op) -> Result, Box> { + let replace_result = match self.replace_result_opt { + Some(result) => result, + None => (self.on_replace)(op).await, + }; + match replace_result { ReplaceResult::Replace(apply_to_all) => { if apply_to_all { self.replace_result_opt = Some(replace_result); } - fs::remove_file(&op.to)?; + compio::fs::remove_file(&op.to).await?; Ok(ControlFlow::Continue(op.to.clone())) } ReplaceResult::KeepBoth => match op.to.parent() { @@ -223,17 +229,19 @@ impl Op { }) } - fn run(&mut self, ctx: &mut Context, mut progress: Progress) -> Result> { + async fn run( + &mut self, + ctx: &mut Context, + mut progress: Progress, + ) -> Result> { if self.skipped.get() { return Ok(true); } match self.kind { OpKind::Copy => { - let mut from_file = fs::OpenOptions::new().read(true).open(&self.from)?; - let metadata = from_file.metadata()?; // Remove `to` if overwriting and it is an existing file if self.to.is_file() { - match ctx.replace(self)? { + match ctx.replace(self).await? { ControlFlow::Continue(to) => { self.to = to; } @@ -242,31 +250,83 @@ impl Op { } } } + + let (from_file, metadata, mut to_file) = futures::try_join!( + async { + compio::fs::OpenOptions::new() + .read(true) + .open(&self.from) + .await + }, + compio::fs::metadata(&self.from), + // This is atomic and ensures `to` is not created by any other process + async { + compio::fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&self.to) + .await + } + )?; + progress.total_bytes = Some(metadata.len()); (ctx.on_progress)(self, &progress); - // This is atomic and ensures `to` is not created by any other process - let mut to_file = fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&self.to)?; - to_file.set_permissions(metadata.permissions())?; - loop { - ctx.controller.check()?; + to_file.set_permissions(metadata.permissions()).await?; - let count = from_file.read(&mut ctx.buf)?; - if count == 0 { - break; + // Prevent spamming the progress callbacks. + let mut last_progress_update = Instant::now(); + // io_uring/IOCP requires transferring ownership of the buffer to the kernel. + let mut buf_in = std::mem::take(&mut ctx.buf); + // Track where the current read/write position is at. + let mut pos = 0; + + loop { + let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await; + + let count = match result { + Ok(0) => { + ctx.buf = buf_out; + break; + } + Ok(count) => count, + Err(why) => { + ctx.buf = buf_out; + return Err(why.into()); + } + }; + + let BufResult(result, buf_out) = to_file.write_at(buf_out, pos).await; + + if let Err(why) = result { + ctx.buf = buf_out; + return Err(why.into()); } - to_file.write_all(&ctx.buf[..count])?; + progress.current_bytes += count as u64; - (ctx.on_progress)(self, &progress); + pos += count as u64; + + // Avoid spamming progress messages too early. + let current = Instant::now(); + if current.duration_since(last_progress_update).as_millis() > 49 { + last_progress_update = current; + (ctx.on_progress)(self, &progress); + + // Also check if the progress was cancelled. + if let Err(why) = ctx.controller.check().await { + ctx.buf = buf_out; + return Err(why.into()); + } + } + + buf_in = buf_out; } - to_file.sync_all()?; + + to_file.sync_all().await?; } OpKind::Move => { // Remove `to` if overwriting and it is an existing file if self.to.is_file() { - match ctx.replace(self)? { + match ctx.replace(self).await? { ControlFlow::Continue(to) => { self.to = to; } @@ -276,11 +336,16 @@ impl Op { } } // This is atomic and ensures `to` is not created by any other process - match fs::hard_link(&self.from, &self.to) { + match compio::fs::hard_link(&self.from, &self.to).await { Ok(()) => {} Err(err) => { - //TODO: what is the error code on Windows? - if err.raw_os_error() == Some(libc::EXDEV) { + // https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_NOT_SAME_DEVICE.html + #[cfg(windows)] + const EXDEV: i32 = 17; + #[cfg(unix)] + const EXDEV: i32 = libc::EXDEV as _; + + if err.raw_os_error() == Some(EXDEV) { // Try standard copy if hard link fails with cross device error let mut copy_op = Op { kind: OpKind::Copy, @@ -288,7 +353,7 @@ impl Op { to: self.to.clone(), skipped: self.skipped.clone(), }; - return copy_op.run(ctx, progress); + return Box::pin(copy_op.run(ctx, progress)).await; } else { return Err(err.into()); } @@ -296,18 +361,18 @@ impl Op { } } OpKind::Mkdir => { - fs::create_dir_all(&self.to)?; + compio::fs::create_dir_all(&self.to).await?; } OpKind::Remove => { - fs::remove_file(&self.from)?; + compio::fs::remove_file(&self.from).await?; } OpKind::Rmdir => { - fs::remove_dir(&self.from)?; + compio::fs::remove_dir(&self.from).await?; } OpKind::Symlink { ref target } => { // Remove `to` if overwriting and it is an existing file if self.to.is_file() { - match ctx.replace(self)? { + match ctx.replace(self).await? { ControlFlow::Continue(to) => { self.to = to; } diff --git a/src/tab.rs b/src/tab.rs index da0bacc..68a0e3a 100644 --- a/src/tab.rs +++ b/src/tab.rs @@ -1829,10 +1829,11 @@ pub struct Tab { scroll_bounds_opt: Option, } -fn calculate_dir_size(path: &Path, controller: Controller) -> Result { +async fn calculate_dir_size(path: &Path, controller: Controller) -> Result { let mut total = 0; for entry_res in WalkDir::new(path) { - controller.check()?; + controller.check().await?; + //TODO: report more errors? if let Ok(entry) = entry_res { if let Ok(metadata) = entry.metadata() { @@ -1841,6 +1842,9 @@ fn calculate_dir_size(path: &Path, controller: Controller) -> Result { - log::debug!( - "calculated directory size of {:?} in {:?}", - path, - start.elapsed() - ); - Message::DirectorySize( - path.clone(), - DirSize::Directory(size), - ) - } - Err(err) => { - log::warn!( + let start = Instant::now(); + match calculate_dir_size(&path, controller).await { + Ok(size) => { + log::debug!( + "calculated directory size of {:?} in {:?}", + path, + start.elapsed() + ); + Message::DirectorySize( + path.clone(), + DirSize::Directory(size), + ) + } + Err(err) => { + log::warn!( "failed to calculate directory size of {:?}: {}", path, err ); - Message::DirectorySize( - path.clone(), - DirSize::Error(err), - ) - } + Message::DirectorySize( + path.clone(), + DirSize::Error(err), + ) } - }) - .await - .unwrap() + } }; match output.send(message).await {