Implement pause for operations

This commit is contained in:
Jeremy Soller 2024-11-15 09:47:03 -07:00
parent 8068688f48
commit 2109c8c3d6
No known key found for this signature in database
GPG key ID: D02FD439211AF56F
5 changed files with 212 additions and 115 deletions

View file

@ -4,12 +4,9 @@ use std::{
fs,
io::{self, Read, Write},
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
sync::{Arc, Condvar, Mutex},
};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, Mutex as TokioMutex};
use walkdir::WalkDir;
use self::reader::OpReader;
@ -27,7 +24,7 @@ pub mod reader;
pub mod recursive;
fn handle_replace(
msg_tx: &Arc<Mutex<Sender<Message>>>,
msg_tx: &Arc<TokioMutex<Sender<Message>>>,
file_from: PathBuf,
file_to: PathBuf,
multiple: bool,
@ -81,8 +78,8 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
archive: &mut zip::ZipArchive<R>,
directory: P,
id: u64,
msg_tx: Arc<Mutex<Sender<Message>>>,
cancelled: Arc<AtomicBool>,
msg_tx: Arc<TokioMutex<Sender<Message>>>,
controller: Controller,
) -> zip::result::ZipResult<()> {
use std::{ffi::OsString, fs};
use zip::result::ZipError;
@ -108,9 +105,9 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
let mut buffer = vec![0; 4 * 1024 * 1024];
let total_files = archive.len();
for i in 0..total_files {
if cancelled.load(Ordering::SeqCst) {
return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled")).into());
}
controller
.check()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
executor::block_on(async {
let total_progress = (i as f32) / total_files as f32;
@ -179,9 +176,9 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
let mut outfile = fs::File::create(&outpath)?;
let mut current = 0;
loop {
if cancelled.load(Ordering::SeqCst) {
return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled")).into());
}
controller
.check()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
let count = file.read(&mut buffer)?;
if count == 0 {
@ -227,6 +224,72 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
Ok(())
}
struct ControllerInner {
state: Mutex<u32>,
condvar: Condvar,
}
#[derive(Clone)]
pub struct Controller {
inner: Arc<ControllerInner>,
}
impl Controller {
const RUNNING: u32 = 0;
const PAUSED: u32 = 1;
const CANCELLED: u32 = 2;
pub fn new() -> Self {
Self {
inner: Arc::new(ControllerInner {
state: Mutex::new(Self::RUNNING),
condvar: Condvar::new(),
}),
}
}
pub fn check(&self) -> Result<(), String> {
let mut state = self.inner.state.lock().unwrap();
loop {
if *state == Self::CANCELLED {
return Err(fl!("cancelled"));
} else if *state == Self::PAUSED {
state = self.inner.condvar.wait(state).unwrap();
} else {
return Ok(());
}
}
}
pub fn is_cancelled(&self) -> bool {
let state = self.inner.state.lock().unwrap();
*state == Self::CANCELLED
}
pub fn cancel(&self) {
let mut state = self.inner.state.lock().unwrap();
*state = Self::CANCELLED;
self.inner.condvar.notify_all();
}
pub fn is_paused(&self) -> bool {
let state = self.inner.state.lock().unwrap();
*state == Self::PAUSED
}
pub fn pause(&self) {
let mut state = self.inner.state.lock().unwrap();
*state = Self::PAUSED;
self.inner.condvar.notify_all();
}
pub fn unpause(&self) {
let mut state = self.inner.state.lock().unwrap();
*state = Self::RUNNING;
self.inner.condvar.notify_all();
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum ReplaceResult {
Replace(bool),
@ -289,8 +352,8 @@ async fn copy_or_move(
to: PathBuf,
moving: bool,
id: u64,
msg_tx: &Arc<Mutex<Sender<Message>>>,
cancelled: Arc<AtomicBool>,
msg_tx: &Arc<TokioMutex<Sender<Message>>>,
controller: Controller,
) -> Result<(), String> {
let msg_tx = msg_tx.clone();
tokio::task::spawn_blocking(move || -> Result<(), String> {
@ -321,7 +384,7 @@ async fn copy_or_move(
})
.collect();
let mut context = Context::new(cancelled);
let mut context = Context::new(controller);
{
let msg_tx = msg_tx.clone();
@ -605,8 +668,8 @@ impl Operation {
pub async fn perform(
self,
id: u64,
msg_tx: &Arc<Mutex<Sender<Message>>>,
cancelled: Arc<AtomicBool>,
msg_tx: &Arc<TokioMutex<Sender<Message>>>,
controller: Controller,
) -> Result<(), String> {
let _ = msg_tx
.lock()
@ -650,9 +713,7 @@ impl Operation {
let total_paths = paths.len();
for (i, path) in paths.iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
executor::block_on(async {
let total_progress = (i as f32) / total_paths as f32;
@ -683,9 +744,7 @@ impl Operation {
let total_paths = paths.len();
let mut buffer = vec![0; 4 * 1024 * 1024];
for (i, path) in paths.iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
executor::block_on(async {
let total_progress = (i as f32) / total_paths as f32;
@ -719,9 +778,7 @@ impl Operation {
.map_err(err_str)?;
let mut current = 0;
loop {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
let count = file.read(&mut buffer).map_err(err_str)?;
if count == 0 {
@ -763,14 +820,12 @@ impl Operation {
.map_err(err_str)?;
}
Self::Copy { paths, to } => {
copy_or_move(paths, to, false, id, msg_tx, cancelled).await?;
copy_or_move(paths, to, false, id, msg_tx, controller).await?;
}
Self::Delete { paths } => {
let total = paths.len();
for (i, path) in paths.into_iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
let _ = msg_tx
.lock()
@ -804,9 +859,7 @@ impl Operation {
let items = trash::os_limited::list().map_err(err_str)?;
let count = items.len();
for (i, item) in items.into_iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
executor::block_on(async {
let total_progress = i as f32 / count as f32;
@ -830,9 +883,7 @@ impl Operation {
tokio::task::spawn_blocking(move || -> Result<(), String> {
let total_paths = paths.len();
for (i, path) in paths.iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
executor::block_on(async {
let total_progress = (i as f32) / total_paths as f32;
@ -856,18 +907,18 @@ impl Operation {
}
let msg_tx = msg_tx.clone();
let cancelled = cancelled.clone();
let controller = controller.clone();
let mime = mime_for_path(&path);
match mime.essence_str() {
"application/gzip" | "application/x-compressed-tar" => {
OpReader::new(path, id, msg_tx, cancelled)
OpReader::new(path, id, msg_tx, controller)
.map(io::BufReader::new)
.map(flate2::read::GzDecoder::new)
.map(tar::Archive::new)
.and_then(|mut archive| archive.unpack(&new_dir))
.map_err(err_str)?
}
"application/x-tar" => OpReader::new(path, id, msg_tx, cancelled)
"application/x-tar" => OpReader::new(path, id, msg_tx, controller)
.map(io::BufReader::new)
.map(tar::Archive::new)
.and_then(|mut archive| archive.unpack(&new_dir))
@ -877,12 +928,12 @@ impl Operation {
.map(zip::ZipArchive::new)
.map_err(err_str)?
.and_then(move |mut archive| {
zip_extract(&mut archive, &new_dir, id, msg_tx, cancelled)
zip_extract(&mut archive, &new_dir, id, msg_tx, controller)
})
.map_err(err_str)?,
#[cfg(feature = "bzip2")]
"application/x-bzip" | "application/x-bzip-compressed-tar" => {
OpReader::new(path, id, msg_tx, cancelled)
OpReader::new(path, id, msg_tx, controller)
.map(io::BufReader::new)
.map(bzip2::read::BzDecoder::new)
.map(tar::Archive::new)
@ -891,7 +942,7 @@ impl Operation {
}
#[cfg(feature = "liblzma")]
"application/x-xz" | "application/x-xz-compressed-tar" => {
OpReader::new(path, id, msg_tx, cancelled)
OpReader::new(path, id, msg_tx, controller)
.map(io::BufReader::new)
.map(liblzma::read::XzDecoder::new)
.map(tar::Archive::new)
@ -910,12 +961,10 @@ impl Operation {
.map_err(err_str)?;
}
Self::Move { paths, to } => {
copy_or_move(paths, to, true, id, msg_tx, cancelled).await?;
copy_or_move(paths, to, true, id, msg_tx, controller).await?;
}
Self::NewFolder { path } => {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
tokio::task::spawn_blocking(|| fs::create_dir(path))
.await
@ -923,9 +972,7 @@ impl Operation {
.map_err(err_str)?;
}
Self::NewFile { path } => {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
tokio::task::spawn_blocking(|| fs::File::create(path))
.await
@ -933,9 +980,7 @@ impl Operation {
.map_err(err_str)?;
}
Self::Rename { from, to } => {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
tokio::task::spawn_blocking(|| fs::rename(from, to))
.await
@ -951,9 +996,7 @@ impl Operation {
Self::Restore { paths } => {
let total = paths.len();
for (i, path) in paths.into_iter().enumerate() {
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
let _ = msg_tx
.lock()
@ -977,9 +1020,7 @@ impl Operation {
{
use std::os::unix::fs::PermissionsExt;
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
let mut perms = fs::metadata(&path).map_err(err_str)?.permissions();
let current_mode = perms.mode();
@ -988,9 +1029,7 @@ impl Operation {
fs::set_permissions(&path, perms).map_err(err_str)?;
}
if cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
controller.check()?;
let mut command = std::process::Command::new(path);
spawn_detached(&mut command).map_err(err_str)?;
@ -1052,7 +1091,7 @@ mod tests {
paths: paths_clone,
to: to_clone,
}
.perform(id, &sync::Mutex::new(tx).into())
.perform(id, &Mutex::new(tx).into())
.await
});

View file

@ -1,15 +1,9 @@
use cosmic::iced::futures::{channel::mpsc::Sender, executor, SinkExt};
use std::{
fs, io,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use std::{fs, io, path::Path, sync::Arc};
use tokio::sync::Mutex;
use crate::{app::Message, fl};
use super::Controller;
use crate::app::Message;
// Special reader just for operations, handling cancel and progress
pub struct OpReader {
@ -18,7 +12,7 @@ pub struct OpReader {
current: u64,
id: u64,
msg_tx: Arc<Mutex<Sender<Message>>>,
cancelled: Arc<AtomicBool>,
controller: Controller,
}
impl OpReader {
@ -26,7 +20,7 @@ impl OpReader {
path: P,
id: u64,
msg_tx: Arc<Mutex<Sender<Message>>>,
cancelled: Arc<AtomicBool>,
controller: Controller,
) -> io::Result<Self> {
let file = fs::File::open(&path)?;
let metadata = file.metadata()?;
@ -36,16 +30,16 @@ impl OpReader {
current: 0,
id,
msg_tx,
cancelled,
controller,
})
}
}
impl io::Read for OpReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.cancelled.load(Ordering::SeqCst) {
return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled")));
}
self.controller
.check()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
let count = self.file.read(buf)?;
self.current += count as u64;

View file

@ -4,29 +4,24 @@ use std::{
io::{Read, Write},
ops::ControlFlow,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use walkdir::WalkDir;
use super::{copy_unique_path, ReplaceResult};
use crate::fl;
use super::{copy_unique_path, Controller, ReplaceResult};
pub struct Context {
buf: Vec<u8>,
cancelled: Arc<AtomicBool>,
controller: Controller,
on_progress: Box<dyn Fn(&Op, &Progress) + 'static>,
on_replace: Box<dyn Fn(&Op) -> ReplaceResult + 'static>,
replace_result_opt: Option<ReplaceResult>,
}
impl Context {
pub fn new(cancelled: Arc<AtomicBool>) -> Self {
pub fn new(controller: Controller) -> Self {
Self {
buf: vec![0; 4 * 1024 * 1024],
cancelled,
controller,
on_progress: Box::new(|_op, _progress| {}),
on_replace: Box::new(|_op| ReplaceResult::Cancel),
replace_result_opt: None,
@ -41,9 +36,7 @@ impl Context {
let mut ops = Vec::new();
let mut cleanup_ops = Vec::new();
for (from_parent, to_parent) in from_to_pairs {
if self.cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
self.controller.check()?;
if from_parent == to_parent {
// Skip matching source and destination
@ -51,9 +44,7 @@ impl Context {
}
for entry in WalkDir::new(&from_parent).into_iter() {
if self.cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
self.controller.check()?;
let entry = entry.map_err(|err| {
format!("failed to walk directory {:?}: {}", from_parent, err)
@ -106,9 +97,7 @@ impl Context {
let total_ops = ops.len();
for (current_ops, mut op) in ops.into_iter().enumerate() {
if self.cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled"));
}
self.controller.check()?;
let progress = Progress {
current_ops,
@ -234,9 +223,7 @@ impl Op {
.open(&self.to)?;
to_file.set_permissions(metadata.permissions())?;
loop {
if ctx.cancelled.load(Ordering::SeqCst) {
return Err(fl!("cancelled").into());
}
ctx.controller.check()?;
let count = from_file.read(&mut ctx.buf)?;
if count == 0 {