diff --git a/i18n/en/cosmic_files.ftl b/i18n/en/cosmic_files.ftl index a2e590a..7ba34a6 100644 --- a/i18n/en/cosmic_files.ftl +++ b/i18n/en/cosmic_files.ftl @@ -30,6 +30,8 @@ size = Size details = Details dismiss = Dismiss message operations-in-progress = {$count} actions in progress ({$percent}%)... +pause = Pause +resume = Resume # Dialogs diff --git a/src/app.rs b/src/app.rs index 515682a..55be07c 100644 --- a/src/app.rs +++ b/src/app.rs @@ -48,10 +48,7 @@ use std::{ num::NonZeroU16, path::PathBuf, process, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, time::{self, Instant}, }; use tokio::sync::mpsc; @@ -67,7 +64,7 @@ use crate::{ localize::LANGUAGE_SORTER, menu, mime_app, mime_icon, mounter::{MounterAuth, MounterItem, MounterItems, MounterKey, MounterMessage, MOUNTERS}, - operation::{Operation, ReplaceResult}, + operation::{Controller, Operation, ReplaceResult}, spawn_detached::spawn_detached, tab::{self, HeadingOptions, ItemMetadata, Location, Tab, HOVER_DURATION}, }; @@ -314,6 +311,8 @@ pub enum Message { PendingComplete(u64), PendingDismiss, PendingError(u64, String), + PendingPause(u64, bool), + PendingPauseAll(bool), PendingProgress(u64, f32), Preview(Option), RescanTrash, @@ -522,7 +521,7 @@ pub struct App { #[cfg(feature = "notify")] notification_opt: Option>>, pending_operation_id: u64, - pending_operations: BTreeMap)>, + pending_operations: BTreeMap, progress_operations: BTreeSet, complete_operations: BTreeMap, failed_operations: BTreeMap, @@ -705,7 +704,7 @@ impl App { self.progress_operations.insert(id); } self.pending_operations - .insert(id, (operation, 0.0, Arc::new(AtomicBool::new(false)))); + .insert(id, (operation, 0.0, Controller::new())); } fn remove_window(&mut self, id: &window::Id) { @@ -1226,12 +1225,35 @@ impl App { if !self.pending_operations.is_empty() { let mut section = widget::settings::section().title(fl!("pending")); - for (id, (op, progress, _)) in self.pending_operations.iter().rev() { + for (id, (op, progress, controller)) in self.pending_operations.iter().rev() { section = section.add(widget::column::with_children(vec![ widget::row::with_children(vec![ widget::progress_bar(0.0..=100.0, *progress) .height(progress_bar_height) .into(), + if controller.is_paused() { + widget::tooltip( + widget::button::icon(widget::icon::from_name( + "media-playback-start-symbolic", + )) + .on_press(Message::PendingPause(*id, false)) + .padding(8), + widget::text::body(fl!("resume")), + widget::tooltip::Position::Top, + ) + .into() + } else { + widget::tooltip( + widget::button::icon(widget::icon::from_name( + "media-playback-pause-symbolic", + )) + .on_press(Message::PendingPause(*id, true)) + .padding(8), + widget::text::body(fl!("pause")), + widget::tooltip::Position::Top, + ) + .into() + }, widget::tooltip( widget::button::icon(widget::icon::from_name("window-close-symbolic")) .on_press(Message::PendingCancel(*id)) @@ -2344,14 +2366,14 @@ impl Application for App { } } Message::PendingCancel(id) => { - if let Some((_, _, cancelled)) = self.pending_operations.get(&id) { - cancelled.store(true, Ordering::SeqCst); + if let Some((_, _, controller)) = self.pending_operations.get(&id) { + controller.cancel(); self.progress_operations.remove(&id); } } Message::PendingCancelAll => { - for (id, (_, _, cancelled)) in self.pending_operations.iter() { - cancelled.store(true, Ordering::SeqCst); + for (id, (_, _, controller)) in self.pending_operations.iter() { + controller.cancel(); self.progress_operations.remove(&id); } } @@ -2396,17 +2418,43 @@ impl Application for App { self.progress_operations.clear(); } Message::PendingError(id, err) => { - if let Some((op, progress, cancelled)) = self.pending_operations.remove(&id) { + if let Some((op, progress, controller)) = self.pending_operations.remove(&id) { self.failed_operations.insert(id, (op, progress, err)); // Only show dialog if not cancelled - if !cancelled.load(Ordering::SeqCst) { + if !controller.is_cancelled() { self.dialog_pages.push_back(DialogPage::FailedOperation(id)); } self.progress_operations.remove(&id); } + // Close progress notification if all relavent operations are finished + if !self + .pending_operations + .iter() + .any(|(_id, (op, _, _))| op.show_progress_notification()) + { + self.progress_operations.clear(); + } // Manually rescan any trash tabs after any operation is completed return self.rescan_trash(); } + Message::PendingPause(id, pause) => { + if let Some((_, _, controller)) = self.pending_operations.get(&id) { + if pause { + controller.pause(); + } else { + controller.unpause(); + } + } + } + Message::PendingPauseAll(pause) => { + for (id, (_, _, controller)) in self.pending_operations.iter() { + if pause { + controller.pause(); + } else { + controller.unpause(); + } + } + } Message::PendingProgress(id, new_progress) => { if let Some((_, progress, _)) = self.pending_operations.get_mut(&id) { *progress = new_progress; @@ -3819,7 +3867,11 @@ impl Application for App { let mut title = String::new(); let mut total_progress = 0.0; let mut count = 0; - for (_id, (op, progress, _)) in self.pending_operations.iter() { + let mut all_paused = true; + for (_id, (op, progress, controller)) in self.pending_operations.iter() { + if !controller.is_paused() { + all_paused = false; + } if op.show_progress_notification() { if title.is_empty() { title = op.pending_text(*progress as i32); @@ -3851,6 +3903,29 @@ impl Application for App { let container = widget::layer_container(widget::column::with_children(vec![ widget::row::with_children(vec![ progress_bar.into(), + if all_paused { + widget::tooltip( + widget::button::icon(widget::icon::from_name( + "media-playback-start-symbolic", + )) + .on_press(Message::PendingPauseAll(false)) + .padding(8), + widget::text::body(fl!("resume")), + widget::tooltip::Position::Top, + ) + .into() + } else { + widget::tooltip( + widget::button::icon(widget::icon::from_name( + "media-playback-pause-symbolic", + )) + .on_press(Message::PendingPauseAll(true)) + .padding(8), + widget::text::body(fl!("pause")), + widget::tooltip::Position::Top, + ) + .into() + }, widget::tooltip( widget::button::icon(widget::icon::from_name("window-close-symbolic")) .on_press(Message::PendingCancelAll) diff --git a/src/operation/mod.rs b/src/operation/mod.rs index 3fb5cb7..f9c7b75 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -4,12 +4,9 @@ use std::{ fs, io::{self, Read, Write}, path::{Path, PathBuf}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::{Arc, Condvar, Mutex}, }; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, Mutex as TokioMutex}; use walkdir::WalkDir; use self::reader::OpReader; @@ -27,7 +24,7 @@ pub mod reader; pub mod recursive; fn handle_replace( - msg_tx: &Arc>>, + msg_tx: &Arc>>, file_from: PathBuf, file_to: PathBuf, multiple: bool, @@ -81,8 +78,8 @@ fn zip_extract>( archive: &mut zip::ZipArchive, directory: P, id: u64, - msg_tx: Arc>>, - cancelled: Arc, + msg_tx: Arc>>, + controller: Controller, ) -> zip::result::ZipResult<()> { use std::{ffi::OsString, fs}; use zip::result::ZipError; @@ -108,9 +105,9 @@ fn zip_extract>( let mut buffer = vec![0; 4 * 1024 * 1024]; let total_files = archive.len(); for i in 0..total_files { - if cancelled.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled")).into()); - } + controller + .check() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; executor::block_on(async { let total_progress = (i as f32) / total_files as f32; @@ -179,9 +176,9 @@ fn zip_extract>( let mut outfile = fs::File::create(&outpath)?; let mut current = 0; loop { - if cancelled.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled")).into()); - } + controller + .check() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; let count = file.read(&mut buffer)?; if count == 0 { @@ -227,6 +224,72 @@ fn zip_extract>( Ok(()) } +struct ControllerInner { + state: Mutex, + condvar: Condvar, +} + +#[derive(Clone)] +pub struct Controller { + inner: Arc, +} + +impl Controller { + const RUNNING: u32 = 0; + const PAUSED: u32 = 1; + const CANCELLED: u32 = 2; + + pub fn new() -> Self { + Self { + inner: Arc::new(ControllerInner { + state: Mutex::new(Self::RUNNING), + condvar: Condvar::new(), + }), + } + } + + pub fn check(&self) -> Result<(), String> { + let mut state = self.inner.state.lock().unwrap(); + loop { + if *state == Self::CANCELLED { + return Err(fl!("cancelled")); + } else if *state == Self::PAUSED { + state = self.inner.condvar.wait(state).unwrap(); + } else { + return Ok(()); + } + } + } + + pub fn is_cancelled(&self) -> bool { + let state = self.inner.state.lock().unwrap(); + *state == Self::CANCELLED + } + + pub fn cancel(&self) { + let mut state = self.inner.state.lock().unwrap(); + *state = Self::CANCELLED; + self.inner.condvar.notify_all(); + } + + pub fn is_paused(&self) -> bool { + let state = self.inner.state.lock().unwrap(); + *state == Self::PAUSED + } + + pub fn pause(&self) { + let mut state = self.inner.state.lock().unwrap(); + *state = Self::PAUSED; + self.inner.condvar.notify_all(); + } + + pub fn unpause(&self) { + let mut state = self.inner.state.lock().unwrap(); + *state = Self::RUNNING; + self.inner.condvar.notify_all(); + } +} + #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] pub enum ReplaceResult { Replace(bool), @@ -289,8 +352,8 @@ async fn copy_or_move( to: PathBuf, moving: bool, id: u64, - msg_tx: &Arc>>, - cancelled: Arc, + msg_tx: &Arc>>, + controller: Controller, ) -> Result<(), String> { let msg_tx = msg_tx.clone(); tokio::task::spawn_blocking(move || -> Result<(), String> { @@ -321,7 +384,7 @@ async fn copy_or_move( }) .collect(); - let mut context = Context::new(cancelled); + let mut context = Context::new(controller); { let msg_tx = msg_tx.clone(); @@ -605,8 +668,8 @@ impl Operation { pub async fn perform( self, id: u64, - msg_tx: &Arc>>, - cancelled: Arc, + msg_tx: &Arc>>, + controller: Controller, ) -> Result<(), String> { let _ = msg_tx .lock() @@ -650,9 +713,7 @@ impl Operation { let total_paths = paths.len(); for (i, path) in paths.iter().enumerate() { - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; executor::block_on(async { let total_progress = (i as f32) / total_paths as f32; @@ -683,9 +744,7 @@ impl Operation { let total_paths = paths.len(); let mut buffer = vec![0; 4 * 1024 * 1024]; for (i, path) in paths.iter().enumerate() { - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; executor::block_on(async { let total_progress = (i as f32) / total_paths as f32; @@ -719,9 +778,7 @@ impl Operation { .map_err(err_str)?; let mut current = 0; loop { - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; let count = file.read(&mut buffer).map_err(err_str)?; if count == 0 { @@ -763,14 +820,12 @@ impl Operation { .map_err(err_str)?; } Self::Copy { paths, to } => { - copy_or_move(paths, to, false, id, msg_tx, cancelled).await?; + copy_or_move(paths, to, false, id, msg_tx, controller).await?; } Self::Delete { paths } => { let total = paths.len(); for (i, path) in paths.into_iter().enumerate() { - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; let _ = msg_tx .lock() @@ -804,9 +859,7 @@ impl Operation { let items = trash::os_limited::list().map_err(err_str)?; let count = items.len(); for (i, item) in items.into_iter().enumerate() { - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; executor::block_on(async { let total_progress = i as f32 / count as f32; @@ -830,9 +883,7 @@ impl Operation { tokio::task::spawn_blocking(move || -> Result<(), String> { let total_paths = paths.len(); for (i, path) in paths.iter().enumerate() { - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; executor::block_on(async { let total_progress = (i as f32) / total_paths as f32; @@ -856,18 +907,18 @@ impl Operation { } let msg_tx = msg_tx.clone(); - let cancelled = cancelled.clone(); + let controller = controller.clone(); let mime = mime_for_path(&path); match mime.essence_str() { "application/gzip" | "application/x-compressed-tar" => { - OpReader::new(path, id, msg_tx, cancelled) + OpReader::new(path, id, msg_tx, controller) .map(io::BufReader::new) .map(flate2::read::GzDecoder::new) .map(tar::Archive::new) .and_then(|mut archive| archive.unpack(&new_dir)) .map_err(err_str)? } - "application/x-tar" => OpReader::new(path, id, msg_tx, cancelled) + "application/x-tar" => OpReader::new(path, id, msg_tx, controller) .map(io::BufReader::new) .map(tar::Archive::new) .and_then(|mut archive| archive.unpack(&new_dir)) @@ -877,12 +928,12 @@ impl Operation { .map(zip::ZipArchive::new) .map_err(err_str)? .and_then(move |mut archive| { - zip_extract(&mut archive, &new_dir, id, msg_tx, cancelled) + zip_extract(&mut archive, &new_dir, id, msg_tx, controller) }) .map_err(err_str)?, #[cfg(feature = "bzip2")] "application/x-bzip" | "application/x-bzip-compressed-tar" => { - OpReader::new(path, id, msg_tx, cancelled) + OpReader::new(path, id, msg_tx, controller) .map(io::BufReader::new) .map(bzip2::read::BzDecoder::new) .map(tar::Archive::new) @@ -891,7 +942,7 @@ impl Operation { } #[cfg(feature = "liblzma")] "application/x-xz" | "application/x-xz-compressed-tar" => { - OpReader::new(path, id, msg_tx, cancelled) + OpReader::new(path, id, msg_tx, controller) .map(io::BufReader::new) .map(liblzma::read::XzDecoder::new) .map(tar::Archive::new) @@ -910,12 +961,10 @@ impl Operation { .map_err(err_str)?; } Self::Move { paths, to } => { - copy_or_move(paths, to, true, id, msg_tx, cancelled).await?; + copy_or_move(paths, to, true, id, msg_tx, controller).await?; } Self::NewFolder { path } => { - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; tokio::task::spawn_blocking(|| fs::create_dir(path)) .await @@ -923,9 +972,7 @@ impl Operation { .map_err(err_str)?; } Self::NewFile { path } => { - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; tokio::task::spawn_blocking(|| fs::File::create(path)) .await @@ -933,9 +980,7 @@ impl Operation { .map_err(err_str)?; } Self::Rename { from, to } => { - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; tokio::task::spawn_blocking(|| fs::rename(from, to)) .await @@ -951,9 +996,7 @@ impl Operation { Self::Restore { paths } => { let total = paths.len(); for (i, path) in paths.into_iter().enumerate() { - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; let _ = msg_tx .lock() @@ -977,9 +1020,7 @@ impl Operation { { use std::os::unix::fs::PermissionsExt; - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; let mut perms = fs::metadata(&path).map_err(err_str)?.permissions(); let current_mode = perms.mode(); @@ -988,9 +1029,7 @@ impl Operation { fs::set_permissions(&path, perms).map_err(err_str)?; } - if cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + controller.check()?; let mut command = std::process::Command::new(path); spawn_detached(&mut command).map_err(err_str)?; @@ -1052,7 +1091,7 @@ mod tests { paths: paths_clone, to: to_clone, } - .perform(id, &sync::Mutex::new(tx).into()) + .perform(id, &Mutex::new(tx).into()) .await }); diff --git a/src/operation/reader.rs b/src/operation/reader.rs index 3190964..fdb9071 100644 --- a/src/operation/reader.rs +++ b/src/operation/reader.rs @@ -1,15 +1,9 @@ use cosmic::iced::futures::{channel::mpsc::Sender, executor, SinkExt}; -use std::{ - fs, io, - path::Path, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, -}; +use std::{fs, io, path::Path, sync::Arc}; use tokio::sync::Mutex; -use crate::{app::Message, fl}; +use super::Controller; +use crate::app::Message; // Special reader just for operations, handling cancel and progress pub struct OpReader { @@ -18,7 +12,7 @@ pub struct OpReader { current: u64, id: u64, msg_tx: Arc>>, - cancelled: Arc, + controller: Controller, } impl OpReader { @@ -26,7 +20,7 @@ impl OpReader { path: P, id: u64, msg_tx: Arc>>, - cancelled: Arc, + controller: Controller, ) -> io::Result { let file = fs::File::open(&path)?; let metadata = file.metadata()?; @@ -36,16 +30,16 @@ impl OpReader { current: 0, id, msg_tx, - cancelled, + controller, }) } } impl io::Read for OpReader { fn read(&mut self, buf: &mut [u8]) -> io::Result { - if self.cancelled.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled"))); - } + self.controller + .check() + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; let count = self.file.read(buf)?; self.current += count as u64; diff --git a/src/operation/recursive.rs b/src/operation/recursive.rs index 95d25a5..ef1e7bf 100644 --- a/src/operation/recursive.rs +++ b/src/operation/recursive.rs @@ -4,29 +4,24 @@ use std::{ io::{Read, Write}, ops::ControlFlow, path::PathBuf, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, }; use walkdir::WalkDir; -use super::{copy_unique_path, ReplaceResult}; -use crate::fl; +use super::{copy_unique_path, Controller, ReplaceResult}; pub struct Context { buf: Vec, - cancelled: Arc, + controller: Controller, on_progress: Box, on_replace: Box ReplaceResult + 'static>, replace_result_opt: Option, } impl Context { - pub fn new(cancelled: Arc) -> Self { + pub fn new(controller: Controller) -> Self { Self { buf: vec![0; 4 * 1024 * 1024], - cancelled, + controller, on_progress: Box::new(|_op, _progress| {}), on_replace: Box::new(|_op| ReplaceResult::Cancel), replace_result_opt: None, @@ -41,9 +36,7 @@ impl Context { let mut ops = Vec::new(); let mut cleanup_ops = Vec::new(); for (from_parent, to_parent) in from_to_pairs { - if self.cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + self.controller.check()?; if from_parent == to_parent { // Skip matching source and destination @@ -51,9 +44,7 @@ impl Context { } for entry in WalkDir::new(&from_parent).into_iter() { - if self.cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + self.controller.check()?; let entry = entry.map_err(|err| { format!("failed to walk directory {:?}: {}", from_parent, err) @@ -106,9 +97,7 @@ impl Context { let total_ops = ops.len(); for (current_ops, mut op) in ops.into_iter().enumerate() { - if self.cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled")); - } + self.controller.check()?; let progress = Progress { current_ops, @@ -234,9 +223,7 @@ impl Op { .open(&self.to)?; to_file.set_permissions(metadata.permissions())?; loop { - if ctx.cancelled.load(Ordering::SeqCst) { - return Err(fl!("cancelled").into()); - } + ctx.controller.check()?; let count = from_file.read(&mut ctx.buf)?; if count == 0 {