Move progress into controller to make it not block

This commit is contained in:
Jeremy Soller 2024-11-20 08:15:31 -07:00
parent c4a08ead84
commit b4b5e78abe
No known key found for this signature in database
GPG key ID: D02FD439211AF56F
4 changed files with 176 additions and 272 deletions

View file

@ -313,7 +313,6 @@ pub enum Message {
PendingError(u64, String), PendingError(u64, String),
PendingPause(u64, bool), PendingPause(u64, bool),
PendingPauseAll(bool), PendingPauseAll(bool),
PendingProgress(u64, f32),
Preview(Option<Entity>), Preview(Option<Entity>),
RescanTrash, RescanTrash,
Rename(Option<Entity>), Rename(Option<Entity>),
@ -521,10 +520,10 @@ pub struct App {
#[cfg(feature = "notify")] #[cfg(feature = "notify")]
notification_opt: Option<Arc<Mutex<notify_rust::NotificationHandle>>>, notification_opt: Option<Arc<Mutex<notify_rust::NotificationHandle>>>,
pending_operation_id: u64, pending_operation_id: u64,
pending_operations: BTreeMap<u64, (Operation, f32, Controller)>, pending_operations: BTreeMap<u64, (Operation, Controller)>,
progress_operations: BTreeSet<u64>, progress_operations: BTreeSet<u64>,
complete_operations: BTreeMap<u64, Operation>, complete_operations: BTreeMap<u64, Operation>,
failed_operations: BTreeMap<u64, (Operation, f32, Controller, String)>, failed_operations: BTreeMap<u64, (Operation, Controller, String)>,
search_id: widget::Id, search_id: widget::Id,
#[cfg(feature = "wayland")] #[cfg(feature = "wayland")]
surface_ids: HashMap<WlOutput, WindowId>, surface_ids: HashMap<WlOutput, WindowId>,
@ -704,7 +703,7 @@ impl App {
self.progress_operations.insert(id); self.progress_operations.insert(id);
} }
self.pending_operations self.pending_operations
.insert(id, (operation, 0.0, Controller::new())); .insert(id, (operation, Controller::new()));
} }
fn remove_window(&mut self, id: &window::Id) { fn remove_window(&mut self, id: &window::Id) {
@ -1252,10 +1251,11 @@ impl App {
if !self.pending_operations.is_empty() { if !self.pending_operations.is_empty() {
let mut section = widget::settings::section().title(fl!("pending")); let mut section = widget::settings::section().title(fl!("pending"));
for (id, (op, progress, controller)) in self.pending_operations.iter().rev() { for (id, (op, controller)) in self.pending_operations.iter().rev() {
let progress = controller.progress();
section = section.add(widget::column::with_children(vec![ section = section.add(widget::column::with_children(vec![
widget::row::with_children(vec![ widget::row::with_children(vec![
widget::progress_bar(0.0..=100.0, *progress) widget::progress_bar(0.0..=1.0, progress)
.height(progress_bar_height) .height(progress_bar_height)
.into(), .into(),
if controller.is_paused() { if controller.is_paused() {
@ -1292,7 +1292,7 @@ impl App {
]) ])
.align_y(Alignment::Center) .align_y(Alignment::Center)
.into(), .into(),
widget::text(op.pending_text(*progress as i32, controller.state())).into(), widget::text(op.pending_text(progress, controller.state())).into(),
])); ]));
} }
children.push(section.into()); children.push(section.into());
@ -1300,9 +1300,10 @@ impl App {
if !self.failed_operations.is_empty() { if !self.failed_operations.is_empty() {
let mut section = widget::settings::section().title(fl!("failed")); let mut section = widget::settings::section().title(fl!("failed"));
for (_id, (op, progress, controller, error)) in self.failed_operations.iter().rev() { for (_id, (op, controller, error)) in self.failed_operations.iter().rev() {
let progress = controller.progress();
section = section.add(widget::column::with_children(vec![ section = section.add(widget::column::with_children(vec![
widget::text(op.pending_text(*progress as i32, controller.state())).into(), widget::text(op.pending_text(progress, controller.state())).into(),
widget::text(error).into(), widget::text(error).into(),
])); ]));
} }
@ -2395,13 +2396,13 @@ impl Application for App {
} }
} }
Message::PendingCancel(id) => { Message::PendingCancel(id) => {
if let Some((_, _, controller)) = self.pending_operations.get(&id) { if let Some((_, controller)) = self.pending_operations.get(&id) {
controller.cancel(); controller.cancel();
self.progress_operations.remove(&id); self.progress_operations.remove(&id);
} }
} }
Message::PendingCancelAll => { Message::PendingCancelAll => {
for (id, (_, _, controller)) in self.pending_operations.iter() { for (id, (_, controller)) in self.pending_operations.iter() {
controller.cancel(); controller.cancel();
self.progress_operations.remove(&id); self.progress_operations.remove(&id);
} }
@ -2409,7 +2410,7 @@ impl Application for App {
Message::PendingComplete(id, op_sel) => { Message::PendingComplete(id, op_sel) => {
let mut commands = Vec::with_capacity(4); let mut commands = Vec::with_capacity(4);
// Show toast for some operations // Show toast for some operations
if let Some((op, _, _)) = self.pending_operations.remove(&id) { if let Some((op, _)) = self.pending_operations.remove(&id) {
if let Some(description) = op.toast() { if let Some(description) = op.toast() {
if let Operation::Delete { ref paths } = op { if let Operation::Delete { ref paths } = op {
let paths: Arc<[PathBuf]> = Arc::from(paths.as_slice()); let paths: Arc<[PathBuf]> = Arc::from(paths.as_slice());
@ -2431,7 +2432,7 @@ impl Application for App {
if !self if !self
.pending_operations .pending_operations
.iter() .iter()
.any(|(_id, (op, _, _))| op.show_progress_notification()) .any(|(_id, (op, _))| op.show_progress_notification())
{ {
self.progress_operations.clear(); self.progress_operations.clear();
} }
@ -2449,21 +2450,20 @@ impl Application for App {
self.progress_operations.clear(); self.progress_operations.clear();
} }
Message::PendingError(id, err) => { Message::PendingError(id, err) => {
if let Some((op, progress, controller)) = self.pending_operations.remove(&id) { if let Some((op, controller)) = self.pending_operations.remove(&id) {
// Only show dialog if not cancelled // Only show dialog if not cancelled
if !controller.is_cancelled() { if !controller.is_cancelled() {
self.dialog_pages.push_back(DialogPage::FailedOperation(id)); self.dialog_pages.push_back(DialogPage::FailedOperation(id));
} }
// Remove from progress // Remove from progress
self.progress_operations.remove(&id); self.progress_operations.remove(&id);
self.failed_operations self.failed_operations.insert(id, (op, controller, err));
.insert(id, (op, progress, controller, err));
} }
// Close progress notification if all relavent operations are finished // Close progress notification if all relavent operations are finished
if !self if !self
.pending_operations .pending_operations
.iter() .iter()
.any(|(_id, (op, _, _))| op.show_progress_notification()) .any(|(_id, (op, _))| op.show_progress_notification())
{ {
self.progress_operations.clear(); self.progress_operations.clear();
} }
@ -2471,7 +2471,7 @@ impl Application for App {
return self.rescan_trash(); return self.rescan_trash();
} }
Message::PendingPause(id, pause) => { Message::PendingPause(id, pause) => {
if let Some((_, _, controller)) = self.pending_operations.get(&id) { if let Some((_, controller)) = self.pending_operations.get(&id) {
if pause { if pause {
controller.pause(); controller.pause();
} else { } else {
@ -2480,7 +2480,7 @@ impl Application for App {
} }
} }
Message::PendingPauseAll(pause) => { Message::PendingPauseAll(pause) => {
for (_id, (_, _, controller)) in self.pending_operations.iter() { for (_id, (_, controller)) in self.pending_operations.iter() {
if pause { if pause {
controller.pause(); controller.pause();
} else { } else {
@ -2488,12 +2488,6 @@ impl Application for App {
} }
} }
} }
Message::PendingProgress(id, new_progress) => {
if let Some((_, progress, _)) = self.pending_operations.get_mut(&id) {
*progress = new_progress;
}
return self.update_notification();
}
Message::Preview(entity_opt) => { Message::Preview(entity_opt) => {
match self.mode { match self.mode {
Mode::App => { Mode::App => {
@ -3454,7 +3448,7 @@ impl Application for App {
), ),
DialogPage::FailedOperation(id) => { DialogPage::FailedOperation(id) => {
//TODO: try next dialog page (making sure index is used by Dialog messages)? //TODO: try next dialog page (making sure index is used by Dialog messages)?
let (operation, _, _, err) = self.failed_operations.get(id)?; let (operation, _, err) = self.failed_operations.get(id)?;
//TODO: nice description of error //TODO: nice description of error
widget::dialog() widget::dialog()
@ -3901,13 +3895,14 @@ impl Application for App {
let mut total_progress = 0.0; let mut total_progress = 0.0;
let mut count = 0; let mut count = 0;
let mut all_paused = true; let mut all_paused = true;
for (_id, (op, progress, controller)) in self.pending_operations.iter() { for (_id, (op, controller)) in self.pending_operations.iter() {
if !controller.is_paused() { if !controller.is_paused() {
all_paused = false; all_paused = false;
} }
if op.show_progress_notification() { if op.show_progress_notification() {
let progress = controller.progress();
if title.is_empty() { if title.is_empty() {
title = op.pending_text(*progress as i32, controller.state()); title = op.pending_text(progress, controller.state());
} }
total_progress += progress; total_progress += progress;
count += 1; count += 1;
@ -3917,7 +3912,7 @@ impl Application for App {
// Adjust the progress bar so it does not jump around when operations finish // Adjust the progress bar so it does not jump around when operations finish
for id in self.progress_operations.iter() { for id in self.progress_operations.iter() {
if self.complete_operations.contains_key(&id) { if self.complete_operations.contains_key(&id) {
total_progress += 100.0; total_progress += 1.0;
count += 1; count += 1;
} }
} }
@ -3943,7 +3938,7 @@ impl Application for App {
//TODO: get height from theme? //TODO: get height from theme?
let progress_bar_height = Length::Fixed(4.0); let progress_bar_height = Length::Fixed(4.0);
let progress_bar = let progress_bar =
widget::progress_bar(0.0..=100.0, total_progress).height(progress_bar_height); widget::progress_bar(0.0..=1.0, total_progress).height(progress_bar_height);
let container = widget::layer_container(widget::column::with_children(vec![ let container = widget::layer_container(widget::column::with_children(vec![
widget::row::with_children(vec![ widget::row::with_children(vec![
@ -4397,7 +4392,11 @@ impl Application for App {
if !self.pending_operations.is_empty() { if !self.pending_operations.is_empty() {
//TODO: inhibit suspend/shutdown? //TODO: inhibit suspend/shutdown?
if self.window_id_opt.is_none() { if self.window_id_opt.is_some() {
// Refresh progress when window is open and operations are in progress
subscriptions.push(window::frames().map(|_| Message::None));
} else {
// Handle notification when window is closed and operations are in progress
#[cfg(feature = "notify")] #[cfg(feature = "notify")]
{ {
struct NotificationSubscription; struct NotificationSubscription;
@ -4437,16 +4436,16 @@ impl Application for App {
} }
} }
for (id, (pending_operation, _, cancelled)) in self.pending_operations.iter() { for (id, (pending_operation, controller)) in self.pending_operations.iter() {
//TODO: use recipe? //TODO: use recipe?
let id = *id; let id = *id;
let pending_operation = pending_operation.clone(); let pending_operation = pending_operation.clone();
let cancelled = cancelled.clone(); let controller = controller.clone();
subscriptions.push(Subscription::run_with_id( subscriptions.push(Subscription::run_with_id(
id, id,
stream::channel(16, move |msg_tx| async move { stream::channel(16, move |msg_tx| async move {
let msg_tx = Arc::new(tokio::sync::Mutex::new(msg_tx)); let msg_tx = Arc::new(tokio::sync::Mutex::new(msg_tx));
match pending_operation.perform(id, &msg_tx, cancelled).await { match pending_operation.perform(&msg_tx, controller).await {
Ok(result_paths) => { Ok(result_paths) => {
let _ = msg_tx let _ = msg_tx
.lock() .lock()

105
src/operation/controller.rs Normal file
View file

@ -0,0 +1,105 @@
use crate::fl;
use std::sync::{Arc, Condvar, Mutex};
#[derive(Clone, Copy, Debug)]
pub enum ControllerState {
Cancelled,
Paused,
Running,
}
#[derive(Debug)]
struct ControllerInner {
state: Mutex<ControllerState>,
progress: Mutex<f32>,
condvar: Condvar,
}
#[derive(Debug)]
pub struct Controller {
primary: bool,
inner: Arc<ControllerInner>,
}
impl Controller {
pub fn new() -> Self {
Self {
primary: true,
inner: Arc::new(ControllerInner {
state: Mutex::new(ControllerState::Running),
progress: Mutex::new(0.0),
condvar: Condvar::new(),
}),
}
}
pub fn check(&self) -> Result<(), String> {
let mut state = self.inner.state.lock().unwrap();
loop {
match *state {
ControllerState::Cancelled => return Err(fl!("cancelled")),
ControllerState::Paused => {
state = self.inner.condvar.wait(state).unwrap();
}
ControllerState::Running => return Ok(()),
}
}
}
pub fn progress(&self) -> f32 {
*self.inner.progress.lock().unwrap()
}
pub fn set_progress(&self, progress: f32) {
*self.inner.progress.lock().unwrap() = progress;
}
pub fn state(&self) -> ControllerState {
*self.inner.state.lock().unwrap()
}
pub fn set_state(&self, state: ControllerState) {
*self.inner.state.lock().unwrap() = state;
self.inner.condvar.notify_all();
}
pub fn is_cancelled(&self) -> bool {
matches!(self.state(), ControllerState::Cancelled)
}
pub fn cancel(&self) {
self.set_state(ControllerState::Cancelled);
}
pub fn is_paused(&self) -> bool {
matches!(self.state(), ControllerState::Paused)
}
pub fn pause(&self) {
self.set_state(ControllerState::Paused);
}
pub fn unpause(&self) {
//TODO: ensure this does not override Cancel?
self.set_state(ControllerState::Running);
}
}
impl Clone for Controller {
fn clone(&self) -> Self {
Self {
primary: false,
inner: self.inner.clone(),
}
}
}
impl Drop for Controller {
fn drop(&mut self) {
// Cancel operations if primary controller is dropped
if self.primary {
self.cancel();
}
}
}

View file

@ -4,13 +4,11 @@ use std::{
fs, fs,
io::{self, Read, Write}, io::{self, Read, Write},
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{Arc, Condvar, Mutex}, sync::Arc,
}; };
use tokio::sync::{mpsc, Mutex as TokioMutex}; use tokio::sync::{mpsc, Mutex as TokioMutex};
use walkdir::WalkDir; use walkdir::WalkDir;
use self::reader::OpReader;
use self::recursive::Context;
use crate::{ use crate::{
app::{ArchiveType, DialogPage, Message}, app::{ArchiveType, DialogPage, Message},
config::IconSizes, config::IconSizes,
@ -20,7 +18,13 @@ use crate::{
tab, tab,
}; };
pub use self::controller::{Controller, ControllerState};
pub mod controller;
use self::reader::OpReader;
pub mod reader; pub mod reader;
use self::recursive::Context;
pub mod recursive; pub mod recursive;
fn handle_replace( fn handle_replace(
@ -77,8 +81,6 @@ fn get_directory_name(file_name: &str) -> &str {
fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>( fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
archive: &mut zip::ZipArchive<R>, archive: &mut zip::ZipArchive<R>,
directory: P, directory: P,
id: u64,
msg_tx: Arc<TokioMutex<Sender<Message>>>,
controller: Controller, controller: Controller,
) -> zip::result::ZipResult<()> { ) -> zip::result::ZipResult<()> {
use std::{ffi::OsString, fs}; use std::{ffi::OsString, fs};
@ -109,14 +111,7 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
.check() .check()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
executor::block_on(async { controller.set_progress((i as f32) / total_files as f32);
let total_progress = (i as f32) / total_files as f32;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0 * total_progress))
.await;
});
let mut file = archive.by_index(i)?; let mut file = archive.by_index(i)?;
let filepath = file let filepath = file
@ -188,15 +183,9 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
current += count as u64; current += count as u64;
if current < total { if current < total {
executor::block_on(async { let file_progress = current as f32 / total as f32;
let file_progress = current as f32 / total as f32; let total_progress = (i as f32 + file_progress) / total_files as f32;
let total_progress = (i as f32 + file_progress) / total_files as f32; controller.set_progress(total_progress);
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0 * total_progress))
.await;
});
} }
} }
outfile.sync_all()?; outfile.sync_all()?;
@ -224,98 +213,6 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
Ok(()) Ok(())
} }
#[derive(Clone, Copy, Debug)]
pub enum ControllerState {
Cancelled,
Paused,
Running,
}
#[derive(Debug)]
struct ControllerInner {
state: Mutex<ControllerState>,
condvar: Condvar,
}
#[derive(Debug)]
pub struct Controller {
primary: bool,
inner: Arc<ControllerInner>,
}
impl Controller {
pub fn new() -> Self {
Self {
primary: true,
inner: Arc::new(ControllerInner {
state: Mutex::new(ControllerState::Running),
condvar: Condvar::new(),
}),
}
}
pub fn check(&self) -> Result<(), String> {
let mut state = self.inner.state.lock().unwrap();
loop {
match *state {
ControllerState::Cancelled => return Err(fl!("cancelled")),
ControllerState::Paused => {
state = self.inner.condvar.wait(state).unwrap();
}
ControllerState::Running => return Ok(()),
}
}
}
pub fn state(&self) -> ControllerState {
*self.inner.state.lock().unwrap()
}
pub fn set_state(&self, state: ControllerState) {
*self.inner.state.lock().unwrap() = state;
self.inner.condvar.notify_all();
}
pub fn is_cancelled(&self) -> bool {
matches!(self.state(), ControllerState::Cancelled)
}
pub fn cancel(&self) {
self.set_state(ControllerState::Cancelled);
}
pub fn is_paused(&self) -> bool {
matches!(self.state(), ControllerState::Paused)
}
pub fn pause(&self) {
self.set_state(ControllerState::Paused);
}
pub fn unpause(&self) {
//TODO: ensure this does not override Cancel?
self.set_state(ControllerState::Running);
}
}
impl Clone for Controller {
fn clone(&self) -> Self {
Self {
primary: false,
inner: self.inner.clone(),
}
}
}
impl Drop for Controller {
fn drop(&mut self) {
// Cancel operations if primary controller is dropped
if self.primary {
self.cancel();
}
}
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum ReplaceResult { pub enum ReplaceResult {
Replace(bool), Replace(bool),
@ -328,7 +225,6 @@ async fn copy_or_move(
paths: Vec<PathBuf>, paths: Vec<PathBuf>,
to: PathBuf, to: PathBuf,
moving: bool, moving: bool,
id: u64,
msg_tx: &Arc<TokioMutex<Sender<Message>>>, msg_tx: &Arc<TokioMutex<Sender<Message>>>,
controller: Controller, controller: Controller,
) -> Result<OperationSelection, String> { ) -> Result<OperationSelection, String> {
@ -361,10 +257,9 @@ async fn copy_or_move(
}) })
.collect(); .collect();
let mut context = Context::new(controller); let mut context = Context::new(controller.clone());
{ {
let msg_tx = msg_tx.clone();
context = context.on_progress(move |_op, progress| { context = context.on_progress(move |_op, progress| {
let item_progress = match progress.total_bytes { let item_progress = match progress.total_bytes {
Some(total_bytes) => { Some(total_bytes) => {
@ -378,13 +273,7 @@ async fn copy_or_move(
}; };
let total_progress = let total_progress =
(item_progress + progress.current_ops as f32) / progress.total_ops as f32; (item_progress + progress.current_ops as f32) / progress.total_ops as f32;
executor::block_on(async { controller.set_progress(total_progress);
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0 * total_progress))
.await;
})
}); });
} }
@ -562,7 +451,8 @@ pub enum Operation {
} }
impl Operation { impl Operation {
pub fn pending_text(&self, percent: i32, state: ControllerState) -> String { pub fn pending_text(&self, ratio: f32, state: ControllerState) -> String {
let percent = (ratio * 100.0) as i32;
let progress = || match state { let progress = || match state {
ControllerState::Running => fl!("progress", percent = percent), ControllerState::Running => fl!("progress", percent = percent),
ControllerState::Paused => fl!("progress-paused", percent = percent), ControllerState::Paused => fl!("progress-paused", percent = percent),
@ -706,15 +596,10 @@ impl Operation {
/// Perform the operation /// Perform the operation
pub async fn perform( pub async fn perform(
self, self,
id: u64,
msg_tx: &Arc<TokioMutex<Sender<Message>>>, msg_tx: &Arc<TokioMutex<Sender<Message>>>,
controller: Controller, controller: Controller,
) -> Result<OperationSelection, String> { ) -> Result<OperationSelection, String> {
let _ = msg_tx let controller_clone = controller.clone();
.lock()
.await
.send(Message::PendingProgress(id, 0.0))
.await;
//TODO: IF ERROR, RETURN AN Operation THAT CAN UNDO THE CURRENT STATE //TODO: IF ERROR, RETURN AN Operation THAT CAN UNDO THE CURRENT STATE
let paths = match self { let paths = match self {
@ -723,7 +608,6 @@ impl Operation {
to, to,
archive_type, archive_type,
} => { } => {
let msg_tx = msg_tx.clone();
tokio::task::spawn_blocking(move || -> Result<OperationSelection, String> { tokio::task::spawn_blocking(move || -> Result<OperationSelection, String> {
let Some(relative_root) = to.parent() else { let Some(relative_root) = to.parent() else {
return Err(format!("path {:?} has no parent directory", to)); return Err(format!("path {:?} has no parent directory", to));
@ -759,14 +643,7 @@ impl Operation {
for (i, path) in paths.iter().enumerate() { for (i, path) in paths.iter().enumerate() {
controller.check()?; controller.check()?;
executor::block_on(async { controller.set_progress((i as f32) / total_paths as f32);
let total_progress = (i as f32) / total_paths as f32;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0 * total_progress))
.await;
});
if let Some(relative_path) = if let Some(relative_path) =
path.strip_prefix(relative_root).map_err(err_str)?.to_str() path.strip_prefix(relative_root).map_err(err_str)?.to_str()
@ -790,14 +667,7 @@ impl Operation {
for (i, path) in paths.iter().enumerate() { for (i, path) in paths.iter().enumerate() {
controller.check()?; controller.check()?;
executor::block_on(async { controller.set_progress((i as f32) / total_paths as f32);
let total_progress = (i as f32) / total_paths as f32;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0 * total_progress))
.await;
});
let mut zip_options = zip::write::SimpleFileOptions::default(); let mut zip_options = zip::write::SimpleFileOptions::default();
if let Some(relative_path) = if let Some(relative_path) =
@ -831,19 +701,10 @@ impl Operation {
archive.write_all(&buffer[..count]).map_err(err_str)?; archive.write_all(&buffer[..count]).map_err(err_str)?;
current += count; current += count;
executor::block_on(async { let file_progress = current as f32 / total as f32;
let file_progress = current as f32 / total as f32; let total_progress =
let total_progress = (i as f32 + file_progress) / total_paths as f32;
(i as f32 + file_progress) / total_paths as f32; controller.set_progress(total_progress);
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(
id,
100.0 * total_progress,
))
.await;
});
} }
} else { } else {
archive archive
@ -863,22 +724,13 @@ impl Operation {
.map_err(err_str)? .map_err(err_str)?
.map_err(err_str)? .map_err(err_str)?
} }
Self::Copy { paths, to } => { Self::Copy { paths, to } => copy_or_move(paths, to, false, msg_tx, controller).await?,
copy_or_move(paths, to, false, id, msg_tx, controller).await?
}
Self::Delete { paths } => { Self::Delete { paths } => {
let total = paths.len(); let total = paths.len();
for (i, path) in paths.into_iter().enumerate() { for (i, path) in paths.into_iter().enumerate() {
controller.check()?; controller.check()?;
let _ = msg_tx controller.set_progress((i as f32) / (total as f32));
.lock()
.await
.send(Message::PendingProgress(
id,
100.0 * (i as f32) / (total as f32),
))
.await;
let _items_opt = tokio::task::spawn_blocking(|| trash::delete(path)) let _items_opt = tokio::task::spawn_blocking(|| trash::delete(path))
.await .await
@ -899,21 +751,13 @@ impl Operation {
) )
))] ))]
{ {
let msg_tx = msg_tx.clone();
tokio::task::spawn_blocking(move || -> Result<(), String> { tokio::task::spawn_blocking(move || -> Result<(), String> {
let items = trash::os_limited::list().map_err(err_str)?; let items = trash::os_limited::list().map_err(err_str)?;
let count = items.len(); let count = items.len();
for (i, item) in items.into_iter().enumerate() { for (i, item) in items.into_iter().enumerate() {
controller.check()?; controller.check()?;
executor::block_on(async { controller.set_progress(i as f32 / count as f32);
let total_progress = i as f32 / count as f32;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0 * total_progress))
.await;
});
trash::os_limited::purge_all([item]).map_err(err_str)?; trash::os_limited::purge_all([item]).map_err(err_str)?;
} }
@ -925,21 +769,13 @@ impl Operation {
OperationSelection::default() OperationSelection::default()
} }
Self::Extract { paths, to } => { Self::Extract { paths, to } => {
let msg_tx = msg_tx.clone();
tokio::task::spawn_blocking(move || -> Result<OperationSelection, String> { tokio::task::spawn_blocking(move || -> Result<OperationSelection, String> {
let total_paths = paths.len(); let total_paths = paths.len();
let mut op_sel = OperationSelection::default(); let mut op_sel = OperationSelection::default();
for (i, path) in paths.iter().enumerate() { for (i, path) in paths.iter().enumerate() {
controller.check()?; controller.check()?;
executor::block_on(async { controller.set_progress((i as f32) / total_paths as f32);
let total_progress = (i as f32) / total_paths as f32;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0 * total_progress))
.await;
});
let to = to.to_owned(); let to = to.to_owned();
@ -956,19 +792,18 @@ impl Operation {
op_sel.ignored.push(path.clone()); op_sel.ignored.push(path.clone());
op_sel.selected.push(new_dir.clone()); op_sel.selected.push(new_dir.clone());
let msg_tx = msg_tx.clone();
let controller = controller.clone(); let controller = controller.clone();
let mime = mime_for_path(&path); let mime = mime_for_path(&path);
match mime.essence_str() { match mime.essence_str() {
"application/gzip" | "application/x-compressed-tar" => { "application/gzip" | "application/x-compressed-tar" => {
OpReader::new(path, id, msg_tx, controller) OpReader::new(path, controller)
.map(io::BufReader::new) .map(io::BufReader::new)
.map(flate2::read::GzDecoder::new) .map(flate2::read::GzDecoder::new)
.map(tar::Archive::new) .map(tar::Archive::new)
.and_then(|mut archive| archive.unpack(&new_dir)) .and_then(|mut archive| archive.unpack(&new_dir))
.map_err(err_str)? .map_err(err_str)?
} }
"application/x-tar" => OpReader::new(path, id, msg_tx, controller) "application/x-tar" => OpReader::new(path, controller)
.map(io::BufReader::new) .map(io::BufReader::new)
.map(tar::Archive::new) .map(tar::Archive::new)
.and_then(|mut archive| archive.unpack(&new_dir)) .and_then(|mut archive| archive.unpack(&new_dir))
@ -978,12 +813,12 @@ impl Operation {
.map(zip::ZipArchive::new) .map(zip::ZipArchive::new)
.map_err(err_str)? .map_err(err_str)?
.and_then(move |mut archive| { .and_then(move |mut archive| {
zip_extract(&mut archive, &new_dir, id, msg_tx, controller) zip_extract(&mut archive, &new_dir, controller)
}) })
.map_err(err_str)?, .map_err(err_str)?,
#[cfg(feature = "bzip2")] #[cfg(feature = "bzip2")]
"application/x-bzip" | "application/x-bzip-compressed-tar" => { "application/x-bzip" | "application/x-bzip-compressed-tar" => {
OpReader::new(path, id, msg_tx, controller) OpReader::new(path, controller)
.map(io::BufReader::new) .map(io::BufReader::new)
.map(bzip2::read::BzDecoder::new) .map(bzip2::read::BzDecoder::new)
.map(tar::Archive::new) .map(tar::Archive::new)
@ -992,7 +827,7 @@ impl Operation {
} }
#[cfg(feature = "liblzma")] #[cfg(feature = "liblzma")]
"application/x-xz" | "application/x-xz-compressed-tar" => { "application/x-xz" | "application/x-xz-compressed-tar" => {
OpReader::new(path, id, msg_tx, controller) OpReader::new(path, controller)
.map(io::BufReader::new) .map(io::BufReader::new)
.map(liblzma::read::XzDecoder::new) .map(liblzma::read::XzDecoder::new)
.map(tar::Archive::new) .map(tar::Archive::new)
@ -1010,9 +845,7 @@ impl Operation {
.map_err(err_str)? .map_err(err_str)?
.map_err(err_str)? .map_err(err_str)?
} }
Self::Move { paths, to } => { Self::Move { paths, to } => copy_or_move(paths, to, true, msg_tx, controller).await?,
copy_or_move(paths, to, true, id, msg_tx, controller).await?
}
Self::NewFolder { path } => { Self::NewFolder { path } => {
tokio::task::spawn_blocking(move || -> Result<OperationSelection, String> { tokio::task::spawn_blocking(move || -> Result<OperationSelection, String> {
controller.check()?; controller.check()?;
@ -1061,14 +894,7 @@ impl Operation {
for (i, item) in items.into_iter().enumerate() { for (i, item) in items.into_iter().enumerate() {
controller.check()?; controller.check()?;
let _ = msg_tx controller.set_progress((i as f32) / (total as f32));
.lock()
.await
.send(Message::PendingProgress(
id,
100.0 * (i as f32) / (total as f32),
))
.await;
paths.push(item.original_path()); paths.push(item.original_path());
@ -1112,11 +938,7 @@ impl Operation {
} }
}; };
let _ = msg_tx controller_clone.set_progress(100.0);
.lock()
.await
.send(Message::PendingProgress(id, 100.0))
.await;
Ok(paths) Ok(paths)
} }
@ -1164,15 +986,12 @@ mod tests {
paths: paths_clone, paths: paths_clone,
to: to_clone, to: to_clone,
} }
.perform(id, &sync::Mutex::new(tx).into(), Controller::new()) .perform(&sync::Mutex::new(tx).into(), Controller::new())
.await .await
}); });
while let Some(msg) = rx.next().await { while let Some(msg) = rx.next().await {
match msg { match msg {
Message::PendingProgress(id, progress) => {
trace!("({id}) [ {paths:?} => {to:?} ] {progress}% complete)")
}
Message::DialogPush(DialogPage::Replace { tx, .. }) => { Message::DialogPush(DialogPage::Replace { tx, .. }) => {
debug!("[{id}] Replace request"); debug!("[{id}] Replace request");
tx.send(ReplaceResult::Cancel).await.expect("Sending a response to a replace request should succeed") tx.send(ReplaceResult::Cancel).await.expect("Sending a response to a replace request should succeed")

View file

@ -1,35 +1,23 @@
use cosmic::iced::futures::{channel::mpsc::Sender, executor, SinkExt}; use std::{fs, io, path::Path};
use std::{fs, io, path::Path, sync::Arc};
use tokio::sync::Mutex;
use super::Controller; use super::Controller;
use crate::app::Message;
// Special reader just for operations, handling cancel and progress // Special reader just for operations, handling cancel and progress
pub struct OpReader { pub struct OpReader {
file: fs::File, file: fs::File,
metadata: fs::Metadata, metadata: fs::Metadata,
current: u64, current: u64,
id: u64,
msg_tx: Arc<Mutex<Sender<Message>>>,
controller: Controller, controller: Controller,
} }
impl OpReader { impl OpReader {
pub fn new<P: AsRef<Path>>( pub fn new<P: AsRef<Path>>(path: P, controller: Controller) -> io::Result<Self> {
path: P,
id: u64,
msg_tx: Arc<Mutex<Sender<Message>>>,
controller: Controller,
) -> io::Result<Self> {
let file = fs::File::open(&path)?; let file = fs::File::open(&path)?;
let metadata = file.metadata()?; let metadata = file.metadata()?;
Ok(Self { Ok(Self {
file, file,
metadata, metadata,
current: 0, current: 0,
id,
msg_tx,
controller, controller,
}) })
} }
@ -45,14 +33,7 @@ impl io::Read for OpReader {
self.current += count as u64; self.current += count as u64;
let progress = self.current as f32 / self.metadata.len() as f32; let progress = self.current as f32 / self.metadata.len() as f32;
executor::block_on(async { self.controller.set_progress(progress);
let _ = self
.msg_tx
.lock()
.await
.send(Message::PendingProgress(self.id, 100.0 * progress))
.await;
});
Ok(count) Ok(count)
} }