Implement progress notification and cancellation

This commit is contained in:
Jeremy Soller 2024-11-14 14:43:45 -07:00
parent 5c7cf52b25
commit 00ed3115cc
No known key found for this signature in database
GPG key ID: D02FD439211AF56F
4 changed files with 336 additions and 108 deletions

View file

@ -4,7 +4,10 @@ use std::{
fs,
io::{self, Read, Write},
path::{Path, PathBuf},
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::{mpsc, Mutex};
use walkdir::WalkDir;
@ -134,6 +137,7 @@ async fn copy_or_move(
moving: bool,
id: u64,
msg_tx: &Arc<Mutex<Sender<Message>>>,
cancelled: Arc<AtomicBool>,
) -> Result<(), String> {
let msg_tx = msg_tx.clone();
tokio::task::spawn_blocking(move || -> Result<(), String> {
@ -164,7 +168,7 @@ async fn copy_or_move(
})
.collect();
let mut context = Context::new();
let mut context = Context::new(cancelled);
{
let msg_tx = msg_tx.clone();
@ -308,38 +312,43 @@ fn paths_parent_name<'a>(paths: &'a Vec<PathBuf>) -> Cow<'a, str> {
}
impl Operation {
pub fn pending_text(&self) -> String {
pub fn pending_text(&self, percent: i32) -> String {
match self {
Self::Compress { paths, to, .. } => fl!(
"compressing",
items = paths.len(),
from = paths_parent_name(paths),
to = file_name(to)
to = file_name(to),
percent = percent
),
Self::Copy { paths, to } => fl!(
"copying",
items = paths.len(),
from = paths_parent_name(paths),
to = file_name(to)
to = file_name(to),
percent = percent
),
Self::Delete { paths } => fl!(
"moving",
items = paths.len(),
from = paths_parent_name(paths),
to = fl!("trash")
to = fl!("trash"),
percent = percent
),
Self::EmptyTrash => fl!("emptying-trash"),
Self::EmptyTrash => fl!("emptying-trash", percent = percent),
Self::Extract { paths, to } => fl!(
"extracting",
items = paths.len(),
from = paths_parent_name(paths),
to = file_name(to)
to = file_name(to),
percent = percent
),
Self::Move { paths, to } => fl!(
"moving",
items = paths.len(),
from = paths_parent_name(paths),
to = file_name(to)
to = file_name(to),
percent = percent
),
Self::NewFile { path } => fl!(
"creating",
@ -354,7 +363,7 @@ impl Operation {
Self::Rename { from, to } => {
fl!("renaming", from = file_name(from), to = file_name(to))
}
Self::Restore { paths } => fl!("restoring", items = paths.len()),
Self::Restore { paths } => fl!("restoring", items = paths.len(), percent = percent),
Self::SetExecutableAndLaunch { path } => {
fl!("setting-executable-and-launching", name = file_name(path))
}
@ -412,6 +421,23 @@ impl Operation {
}
}
pub fn show_progress_notification(&self) -> bool {
// Long running operations show a progress notification
match self {
Self::Compress { .. }
| Self::Copy { .. }
| Self::Delete { .. }
| Self::EmptyTrash
| Self::Extract { .. }
| Self::Move { .. }
| Self::Restore { .. } => true,
Self::NewFile { .. }
| Self::NewFolder { .. }
| Self::Rename { .. }
| Self::SetExecutableAndLaunch { .. } => false,
}
}
pub fn toast(&self) -> Option<String> {
match self {
Self::Compress { .. } => Some(self.completed_text()),
@ -427,6 +453,7 @@ impl Operation {
self,
id: u64,
msg_tx: &Arc<Mutex<Sender<Message>>>,
cancelled: Arc<AtomicBool>,
) -> Result<(), String> {
let _ = msg_tx
.lock()
@ -435,7 +462,6 @@ impl Operation {
.await;
//TODO: IF ERROR, RETURN AN Operation THAT CAN UNDO THE CURRENT STATE
//TODO: SAFELY HANDLE CANCEL
match self {
Self::Compress {
paths,
@ -471,6 +497,10 @@ impl Operation {
let total_paths = paths.len();
for (i, path) in paths.iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
executor::block_on(async {
let total_progress = (i as f32) / total_paths as f32;
let _ = msg_tx
@ -493,15 +523,16 @@ impl Operation {
}
ArchiveType::Zip => {
let mut archive = fs::File::create(&to)
.map(io::BufWriter::new)
.map(zip::ZipWriter::new)
.map_err(err_str)?;
//TODO: set unix_permissions per file?
let zip_options = zip::write::SimpleFileOptions::default();
let total_paths = paths.len();
let mut buffer = vec![0; 4 * 1024 * 1024];
for (i, path) in paths.iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
executor::block_on(async {
let total_progress = (i as f32) / total_paths as f32;
let _ = msg_tx
@ -511,21 +542,54 @@ impl Operation {
.await;
});
let mut zip_options = zip::write::SimpleFileOptions::default();
if let Some(relative_path) =
path.strip_prefix(relative_root).map_err(err_str)?.to_str()
{
if path.is_file() {
let mut file = fs::File::open(&path).map_err(err_str)?;
let metadata = file.metadata().map_err(err_str)?;
let total = metadata.len();
if total >= 4 * 1024 * 1024 * 1024 {
// The large file option must be enabled for files above 4 GiB
zip_options = zip_options.large_file(true);
}
#[cfg(unix)]
{
use std::os::unix::fs::MetadataExt;
let mode = metadata.mode();
zip_options = zip_options.unix_permissions(mode);
}
archive
.start_file(relative_path, zip_options)
.map_err(err_str)?;
let mut current = 0;
loop {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
let mut buffer = Vec::new();
let mut file = fs::File::open(&path)
.map(io::BufReader::new)
.map_err(err_str)?;
let count = file.read(&mut buffer).map_err(err_str)?;
if count == 0 {
break;
}
archive.write_all(&buffer[..count]).map_err(err_str)?;
current += count;
file.read_to_end(&mut buffer).map_err(err_str)?;
archive.write_all(&buffer).map_err(err_str)?;
executor::block_on(async {
let file_progress = current as f32 / total as f32;
let total_progress =
(i as f32 + file_progress) / total_paths as f32;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(
id,
100.0 * total_progress,
))
.await;
});
}
} else {
archive
.add_directory(relative_path, zip_options)
@ -545,26 +609,29 @@ impl Operation {
.map_err(err_str)?;
}
Self::Copy { paths, to } => {
copy_or_move(paths, to, false, id, msg_tx).await?;
copy_or_move(paths, to, false, id, msg_tx, cancelled).await?;
}
Self::Delete { paths } => {
let total = paths.len();
let mut count = 0;
for path in paths {
let items_opt = tokio::task::spawn_blocking(|| trash::delete(path))
.await
.map_err(err_str)?
.map_err(err_str)?;
//TODO: items_opt allows for easy restore
count += 1;
for (i, path) in paths.into_iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(
id,
100.0 * (count as f32) / (total as f32),
100.0 * (i as f32) / (total as f32),
))
.await;
let _items_opt = tokio::task::spawn_blocking(|| trash::delete(path))
.await
.map_err(err_str)?
.map_err(err_str)?;
//TODO: items_opt allows for easy restore
}
}
Self::EmptyTrash => {
@ -578,25 +645,41 @@ impl Operation {
)
))]
{
tokio::task::spawn_blocking(|| {
let items = trash::os_limited::list()?;
trash::os_limited::purge_all(items)
let msg_tx = msg_tx.clone();
tokio::task::spawn_blocking(move || -> Result<(), String> {
let items = trash::os_limited::list().map_err(err_str)?;
let count = items.len();
for (i, item) in items.into_iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
executor::block_on(async {
let total_progress = i as f32 / count as f32;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0 * total_progress))
.await;
});
trash::os_limited::purge_all([item]).map_err(err_str)?;
}
Ok(())
})
.await
.map_err(err_str)?
.map_err(err_str)?;
.map_err(err_str)??;
}
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0))
.await;
}
Self::Extract { paths, to } => {
let msg_tx = msg_tx.clone();
tokio::task::spawn_blocking(move || -> Result<(), String> {
let total_paths = paths.len();
for (i, path) in paths.iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
executor::block_on(async {
let total_progress = (i as f32) / total_paths as f32;
let _ = msg_tx
@ -618,6 +701,7 @@ impl Operation {
}
}
//TODO: support cancellation while extracting!
let mime = mime_for_path(&path);
match mime.essence_str() {
"application/gzip" | "application/x-compressed-tar" => {
@ -669,40 +753,37 @@ impl Operation {
.map_err(err_str)?;
}
Self::Move { paths, to } => {
copy_or_move(paths, to, true, id, msg_tx).await?;
copy_or_move(paths, to, true, id, msg_tx, cancelled).await?;
}
Self::NewFolder { path } => {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
tokio::task::spawn_blocking(|| fs::create_dir(path))
.await
.map_err(err_str)?
.map_err(err_str)?;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0))
.await;
}
Self::NewFile { path } => {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
tokio::task::spawn_blocking(|| fs::File::create(path))
.await
.map_err(err_str)?
.map_err(err_str)?;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0))
.await;
}
Self::Rename { from, to } => {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
tokio::task::spawn_blocking(|| fs::rename(from, to))
.await
.map_err(err_str)?
.map_err(err_str)?;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0))
.await;
}
#[cfg(target_os = "macos")]
Self::Restore { .. } => {
@ -712,49 +793,56 @@ impl Operation {
#[cfg(not(target_os = "macos"))]
Self::Restore { paths } => {
let total = paths.len();
let mut count = 0;
for path in paths {
tokio::task::spawn_blocking(|| trash::os_limited::restore_all([path]))
.await
.map_err(err_str)?
.map_err(err_str)?;
count += 1;
for (i, path) in paths.into_iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(
id,
100.0 * (count as f32) / (total as f32),
100.0 * (i as f32) / (total as f32),
))
.await;
tokio::task::spawn_blocking(|| trash::os_limited::restore_all([path]))
.await
.map_err(err_str)?
.map_err(err_str)?;
}
}
Self::SetExecutableAndLaunch { path } => {
tokio::task::spawn_blocking(move || -> io::Result<()> {
tokio::task::spawn_blocking(move || -> Result<(), String> {
//TODO: what to do on non-Unix systems?
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = fs::metadata(&path)?.permissions();
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
let mut perms = fs::metadata(&path).map_err(err_str)?.permissions();
let current_mode = perms.mode();
let new_mode = current_mode | 0o111;
perms.set_mode(new_mode);
fs::set_permissions(&path, perms)?;
fs::set_permissions(&path, perms).map_err(err_str)?;
}
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
let mut command = std::process::Command::new(path);
spawn_detached(&mut command)?;
spawn_detached(&mut command).map_err(err_str)?;
Ok(())
})
.await
.map_err(err_str)?
.map_err(err_str)?;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0))
.await;
}
}

View file

@ -4,22 +4,29 @@ use std::{
io::{Read, Write},
ops::ControlFlow,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use walkdir::WalkDir;
use super::{copy_unique_path, ReplaceResult};
use crate::fl;
pub struct Context {
buf: Vec<u8>,
cancelled: Arc<AtomicBool>,
on_progress: Box<dyn Fn(&Op, &Progress) + 'static>,
on_replace: Box<dyn Fn(&Op) -> ReplaceResult + 'static>,
replace_result_opt: Option<ReplaceResult>,
}
impl Context {
pub fn new() -> Self {
pub fn new(cancelled: Arc<AtomicBool>) -> Self {
Self {
buf: vec![0; 4 * 1024 * 1024],
cancelled,
on_progress: Box::new(|_op, _progress| {}),
on_replace: Box::new(|_op| ReplaceResult::Cancel),
replace_result_opt: None,
@ -34,12 +41,20 @@ impl Context {
let mut ops = Vec::new();
let mut cleanup_ops = Vec::new();
for (from_parent, to_parent) in from_to_pairs {
if self.cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
if from_parent == to_parent {
// Skip matching source and destination
continue;
}
for entry in WalkDir::new(&from_parent).into_iter() {
if self.cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
let entry = entry.map_err(|err| {
format!("failed to walk directory {:?}: {}", from_parent, err)
})?;
@ -91,6 +106,10 @@ impl Context {
let total_ops = ops.len();
for (current_ops, mut op) in ops.into_iter().enumerate() {
if self.cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
let progress = Progress {
current_ops,
total_ops,
@ -215,6 +234,10 @@ impl Op {
.open(&self.to)?;
to_file.set_permissions(metadata.permissions())?;
loop {
if ctx.cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled").into());
}
let count = from_file.read(&mut ctx.buf)?;
if count == 0 {
break;