From b4b5e78abe82a6be41fab320469a371719a71d53 Mon Sep 17 00:00:00 2001 From: Jeremy Soller Date: Wed, 20 Nov 2024 08:15:31 -0700 Subject: [PATCH] Move progress into controller to make it not block --- src/app.rs | 67 +++++----- src/operation/controller.rs | 105 +++++++++++++++ src/operation/mod.rs | 251 +++++------------------------------- src/operation/reader.rs | 25 +--- 4 files changed, 176 insertions(+), 272 deletions(-) create mode 100644 src/operation/controller.rs diff --git a/src/app.rs b/src/app.rs index 3d44d78..572b6f6 100644 --- a/src/app.rs +++ b/src/app.rs @@ -313,7 +313,6 @@ pub enum Message { PendingError(u64, String), PendingPause(u64, bool), PendingPauseAll(bool), - PendingProgress(u64, f32), Preview(Option), RescanTrash, Rename(Option), @@ -521,10 +520,10 @@ pub struct App { #[cfg(feature = "notify")] notification_opt: Option>>, pending_operation_id: u64, - pending_operations: BTreeMap, + pending_operations: BTreeMap, progress_operations: BTreeSet, complete_operations: BTreeMap, - failed_operations: BTreeMap, + failed_operations: BTreeMap, search_id: widget::Id, #[cfg(feature = "wayland")] surface_ids: HashMap, @@ -704,7 +703,7 @@ impl App { self.progress_operations.insert(id); } self.pending_operations - .insert(id, (operation, 0.0, Controller::new())); + .insert(id, (operation, Controller::new())); } fn remove_window(&mut self, id: &window::Id) { @@ -1252,10 +1251,11 @@ impl App { if !self.pending_operations.is_empty() { let mut section = widget::settings::section().title(fl!("pending")); - for (id, (op, progress, controller)) in self.pending_operations.iter().rev() { + for (id, (op, controller)) in self.pending_operations.iter().rev() { + let progress = controller.progress(); section = section.add(widget::column::with_children(vec![ widget::row::with_children(vec![ - widget::progress_bar(0.0..=100.0, *progress) + widget::progress_bar(0.0..=1.0, progress) .height(progress_bar_height) .into(), if controller.is_paused() { @@ -1292,7 +1292,7 @@ impl App { ]) .align_y(Alignment::Center) .into(), - widget::text(op.pending_text(*progress as i32, controller.state())).into(), + widget::text(op.pending_text(progress, controller.state())).into(), ])); } children.push(section.into()); @@ -1300,9 +1300,10 @@ impl App { if !self.failed_operations.is_empty() { let mut section = widget::settings::section().title(fl!("failed")); - for (_id, (op, progress, controller, error)) in self.failed_operations.iter().rev() { + for (_id, (op, controller, error)) in self.failed_operations.iter().rev() { + let progress = controller.progress(); section = section.add(widget::column::with_children(vec![ - widget::text(op.pending_text(*progress as i32, controller.state())).into(), + widget::text(op.pending_text(progress, controller.state())).into(), widget::text(error).into(), ])); } @@ -2395,13 +2396,13 @@ impl Application for App { } } Message::PendingCancel(id) => { - if let Some((_, _, controller)) = self.pending_operations.get(&id) { + if let Some((_, controller)) = self.pending_operations.get(&id) { controller.cancel(); self.progress_operations.remove(&id); } } Message::PendingCancelAll => { - for (id, (_, _, controller)) in self.pending_operations.iter() { + for (id, (_, controller)) in self.pending_operations.iter() { controller.cancel(); self.progress_operations.remove(&id); } @@ -2409,7 +2410,7 @@ impl Application for App { Message::PendingComplete(id, op_sel) => { let mut commands = Vec::with_capacity(4); // Show toast for some operations - if let Some((op, _, _)) = self.pending_operations.remove(&id) { + if let Some((op, _)) = self.pending_operations.remove(&id) { if let Some(description) = op.toast() { if let Operation::Delete { ref paths } = op { let paths: Arc<[PathBuf]> = Arc::from(paths.as_slice()); @@ -2431,7 +2432,7 @@ impl Application for App { if !self .pending_operations .iter() - .any(|(_id, (op, _, _))| op.show_progress_notification()) + .any(|(_id, (op, _))| op.show_progress_notification()) { self.progress_operations.clear(); } @@ -2449,21 +2450,20 @@ impl Application for App { self.progress_operations.clear(); } Message::PendingError(id, err) => { - if let Some((op, progress, controller)) = self.pending_operations.remove(&id) { + if let Some((op, controller)) = self.pending_operations.remove(&id) { // Only show dialog if not cancelled if !controller.is_cancelled() { self.dialog_pages.push_back(DialogPage::FailedOperation(id)); } // Remove from progress self.progress_operations.remove(&id); - self.failed_operations - .insert(id, (op, progress, controller, err)); + self.failed_operations.insert(id, (op, controller, err)); } // Close progress notification if all relavent operations are finished if !self .pending_operations .iter() - .any(|(_id, (op, _, _))| op.show_progress_notification()) + .any(|(_id, (op, _))| op.show_progress_notification()) { self.progress_operations.clear(); } @@ -2471,7 +2471,7 @@ impl Application for App { return self.rescan_trash(); } Message::PendingPause(id, pause) => { - if let Some((_, _, controller)) = self.pending_operations.get(&id) { + if let Some((_, controller)) = self.pending_operations.get(&id) { if pause { controller.pause(); } else { @@ -2480,7 +2480,7 @@ impl Application for App { } } Message::PendingPauseAll(pause) => { - for (_id, (_, _, controller)) in self.pending_operations.iter() { + for (_id, (_, controller)) in self.pending_operations.iter() { if pause { controller.pause(); } else { @@ -2488,12 +2488,6 @@ impl Application for App { } } } - Message::PendingProgress(id, new_progress) => { - if let Some((_, progress, _)) = self.pending_operations.get_mut(&id) { - *progress = new_progress; - } - return self.update_notification(); - } Message::Preview(entity_opt) => { match self.mode { Mode::App => { @@ -3454,7 +3448,7 @@ impl Application for App { ), DialogPage::FailedOperation(id) => { //TODO: try next dialog page (making sure index is used by Dialog messages)? - let (operation, _, _, err) = self.failed_operations.get(id)?; + let (operation, _, err) = self.failed_operations.get(id)?; //TODO: nice description of error widget::dialog() @@ -3901,13 +3895,14 @@ impl Application for App { let mut total_progress = 0.0; let mut count = 0; let mut all_paused = true; - for (_id, (op, progress, controller)) in self.pending_operations.iter() { + for (_id, (op, controller)) in self.pending_operations.iter() { if !controller.is_paused() { all_paused = false; } if op.show_progress_notification() { + let progress = controller.progress(); if title.is_empty() { - title = op.pending_text(*progress as i32, controller.state()); + title = op.pending_text(progress, controller.state()); } total_progress += progress; count += 1; @@ -3917,7 +3912,7 @@ impl Application for App { // Adjust the progress bar so it does not jump around when operations finish for id in self.progress_operations.iter() { if self.complete_operations.contains_key(&id) { - total_progress += 100.0; + total_progress += 1.0; count += 1; } } @@ -3943,7 +3938,7 @@ impl Application for App { //TODO: get height from theme? let progress_bar_height = Length::Fixed(4.0); let progress_bar = - widget::progress_bar(0.0..=100.0, total_progress).height(progress_bar_height); + widget::progress_bar(0.0..=1.0, total_progress).height(progress_bar_height); let container = widget::layer_container(widget::column::with_children(vec![ widget::row::with_children(vec![ @@ -4397,7 +4392,11 @@ impl Application for App { if !self.pending_operations.is_empty() { //TODO: inhibit suspend/shutdown? - if self.window_id_opt.is_none() { + if self.window_id_opt.is_some() { + // Refresh progress when window is open and operations are in progress + subscriptions.push(window::frames().map(|_| Message::None)); + } else { + // Handle notification when window is closed and operations are in progress #[cfg(feature = "notify")] { struct NotificationSubscription; @@ -4437,16 +4436,16 @@ impl Application for App { } } - for (id, (pending_operation, _, cancelled)) in self.pending_operations.iter() { + for (id, (pending_operation, controller)) in self.pending_operations.iter() { //TODO: use recipe? let id = *id; let pending_operation = pending_operation.clone(); - let cancelled = cancelled.clone(); + let controller = controller.clone(); subscriptions.push(Subscription::run_with_id( id, stream::channel(16, move |msg_tx| async move { let msg_tx = Arc::new(tokio::sync::Mutex::new(msg_tx)); - match pending_operation.perform(id, &msg_tx, cancelled).await { + match pending_operation.perform(&msg_tx, controller).await { Ok(result_paths) => { let _ = msg_tx .lock() diff --git a/src/operation/controller.rs b/src/operation/controller.rs new file mode 100644 index 0000000..4b33b42 --- /dev/null +++ b/src/operation/controller.rs @@ -0,0 +1,105 @@ +use crate::fl; + +use std::sync::{Arc, Condvar, Mutex}; + +#[derive(Clone, Copy, Debug)] +pub enum ControllerState { + Cancelled, + Paused, + Running, +} + +#[derive(Debug)] +struct ControllerInner { + state: Mutex, + progress: Mutex, + condvar: Condvar, +} + +#[derive(Debug)] +pub struct Controller { + primary: bool, + inner: Arc, +} + +impl Controller { + pub fn new() -> Self { + Self { + primary: true, + inner: Arc::new(ControllerInner { + state: Mutex::new(ControllerState::Running), + progress: Mutex::new(0.0), + condvar: Condvar::new(), + }), + } + } + + pub fn check(&self) -> Result<(), String> { + let mut state = self.inner.state.lock().unwrap(); + loop { + match *state { + ControllerState::Cancelled => return Err(fl!("cancelled")), + ControllerState::Paused => { + state = self.inner.condvar.wait(state).unwrap(); + } + ControllerState::Running => return Ok(()), + } + } + } + + pub fn progress(&self) -> f32 { + *self.inner.progress.lock().unwrap() + } + + pub fn set_progress(&self, progress: f32) { + *self.inner.progress.lock().unwrap() = progress; + } + + pub fn state(&self) -> ControllerState { + *self.inner.state.lock().unwrap() + } + + pub fn set_state(&self, state: ControllerState) { + *self.inner.state.lock().unwrap() = state; + self.inner.condvar.notify_all(); + } + + pub fn is_cancelled(&self) -> bool { + matches!(self.state(), ControllerState::Cancelled) + } + + pub fn cancel(&self) { + self.set_state(ControllerState::Cancelled); + } + + pub fn is_paused(&self) -> bool { + matches!(self.state(), ControllerState::Paused) + } + + pub fn pause(&self) { + self.set_state(ControllerState::Paused); + } + + pub fn unpause(&self) { + //TODO: ensure this does not override Cancel? + self.set_state(ControllerState::Running); + } +} + +impl Clone for Controller { + fn clone(&self) -> Self { + Self { + primary: false, + inner: self.inner.clone(), + } + } +} + +impl Drop for Controller { + fn drop(&mut self) { + // Cancel operations if primary controller is dropped + if self.primary { + self.cancel(); + } + } +} diff --git a/src/operation/mod.rs b/src/operation/mod.rs index 37f7788..dceefb2 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -4,13 +4,11 @@ use std::{ fs, io::{self, Read, Write}, path::{Path, PathBuf}, - sync::{Arc, Condvar, Mutex}, + sync::Arc, }; use tokio::sync::{mpsc, Mutex as TokioMutex}; use walkdir::WalkDir; -use self::reader::OpReader; -use self::recursive::Context; use crate::{ app::{ArchiveType, DialogPage, Message}, config::IconSizes, @@ -20,7 +18,13 @@ use crate::{ tab, }; +pub use self::controller::{Controller, ControllerState}; +pub mod controller; + +use self::reader::OpReader; pub mod reader; + +use self::recursive::Context; pub mod recursive; fn handle_replace( @@ -77,8 +81,6 @@ fn get_directory_name(file_name: &str) -> &str { fn zip_extract>( archive: &mut zip::ZipArchive, directory: P, - id: u64, - msg_tx: Arc>>, controller: Controller, ) -> zip::result::ZipResult<()> { use std::{ffi::OsString, fs}; @@ -109,14 +111,7 @@ fn zip_extract>( .check() .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - executor::block_on(async { - let total_progress = (i as f32) / total_files as f32; - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress(id, 100.0 * total_progress)) - .await; - }); + controller.set_progress((i as f32) / total_files as f32); let mut file = archive.by_index(i)?; let filepath = file @@ -188,15 +183,9 @@ fn zip_extract>( current += count as u64; if current < total { - executor::block_on(async { - let file_progress = current as f32 / total as f32; - let total_progress = (i as f32 + file_progress) / total_files as f32; - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress(id, 100.0 * total_progress)) - .await; - }); + let file_progress = current as f32 / total as f32; + let total_progress = (i as f32 + file_progress) / total_files as f32; + controller.set_progress(total_progress); } } outfile.sync_all()?; @@ -224,98 +213,6 @@ fn zip_extract>( Ok(()) } -#[derive(Clone, Copy, Debug)] -pub enum ControllerState { - Cancelled, - Paused, - Running, -} - -#[derive(Debug)] -struct ControllerInner { - state: Mutex, - condvar: Condvar, -} - -#[derive(Debug)] -pub struct Controller { - primary: bool, - inner: Arc, -} - -impl Controller { - pub fn new() -> Self { - Self { - primary: true, - inner: Arc::new(ControllerInner { - state: Mutex::new(ControllerState::Running), - condvar: Condvar::new(), - }), - } - } - - pub fn check(&self) -> Result<(), String> { - let mut state = self.inner.state.lock().unwrap(); - loop { - match *state { - ControllerState::Cancelled => return Err(fl!("cancelled")), - ControllerState::Paused => { - state = self.inner.condvar.wait(state).unwrap(); - } - ControllerState::Running => return Ok(()), - } - } - } - - pub fn state(&self) -> ControllerState { - *self.inner.state.lock().unwrap() - } - - pub fn set_state(&self, state: ControllerState) { - *self.inner.state.lock().unwrap() = state; - self.inner.condvar.notify_all(); - } - - pub fn is_cancelled(&self) -> bool { - matches!(self.state(), ControllerState::Cancelled) - } - - pub fn cancel(&self) { - self.set_state(ControllerState::Cancelled); - } - - pub fn is_paused(&self) -> bool { - matches!(self.state(), ControllerState::Paused) - } - - pub fn pause(&self) { - self.set_state(ControllerState::Paused); - } - - pub fn unpause(&self) { - //TODO: ensure this does not override Cancel? - self.set_state(ControllerState::Running); - } -} - -impl Clone for Controller { - fn clone(&self) -> Self { - Self { - primary: false, - inner: self.inner.clone(), - } - } -} - -impl Drop for Controller { - fn drop(&mut self) { - // Cancel operations if primary controller is dropped - if self.primary { - self.cancel(); - } - } -} - #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] pub enum ReplaceResult { Replace(bool), @@ -328,7 +225,6 @@ async fn copy_or_move( paths: Vec, to: PathBuf, moving: bool, - id: u64, msg_tx: &Arc>>, controller: Controller, ) -> Result { @@ -361,10 +257,9 @@ async fn copy_or_move( }) .collect(); - let mut context = Context::new(controller); + let mut context = Context::new(controller.clone()); { - let msg_tx = msg_tx.clone(); context = context.on_progress(move |_op, progress| { let item_progress = match progress.total_bytes { Some(total_bytes) => { @@ -378,13 +273,7 @@ async fn copy_or_move( }; let total_progress = (item_progress + progress.current_ops as f32) / progress.total_ops as f32; - executor::block_on(async { - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress(id, 100.0 * total_progress)) - .await; - }) + controller.set_progress(total_progress); }); } @@ -562,7 +451,8 @@ pub enum Operation { } impl Operation { - pub fn pending_text(&self, percent: i32, state: ControllerState) -> String { + pub fn pending_text(&self, ratio: f32, state: ControllerState) -> String { + let percent = (ratio * 100.0) as i32; let progress = || match state { ControllerState::Running => fl!("progress", percent = percent), ControllerState::Paused => fl!("progress-paused", percent = percent), @@ -706,15 +596,10 @@ impl Operation { /// Perform the operation pub async fn perform( self, - id: u64, msg_tx: &Arc>>, controller: Controller, ) -> Result { - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress(id, 0.0)) - .await; + let controller_clone = controller.clone(); //TODO: IF ERROR, RETURN AN Operation THAT CAN UNDO THE CURRENT STATE let paths = match self { @@ -723,7 +608,6 @@ impl Operation { to, archive_type, } => { - let msg_tx = msg_tx.clone(); tokio::task::spawn_blocking(move || -> Result { let Some(relative_root) = to.parent() else { return Err(format!("path {:?} has no parent directory", to)); @@ -759,14 +643,7 @@ impl Operation { for (i, path) in paths.iter().enumerate() { controller.check()?; - executor::block_on(async { - let total_progress = (i as f32) / total_paths as f32; - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress(id, 100.0 * total_progress)) - .await; - }); + controller.set_progress((i as f32) / total_paths as f32); if let Some(relative_path) = path.strip_prefix(relative_root).map_err(err_str)?.to_str() @@ -790,14 +667,7 @@ impl Operation { for (i, path) in paths.iter().enumerate() { controller.check()?; - executor::block_on(async { - let total_progress = (i as f32) / total_paths as f32; - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress(id, 100.0 * total_progress)) - .await; - }); + controller.set_progress((i as f32) / total_paths as f32); let mut zip_options = zip::write::SimpleFileOptions::default(); if let Some(relative_path) = @@ -831,19 +701,10 @@ impl Operation { archive.write_all(&buffer[..count]).map_err(err_str)?; current += count; - executor::block_on(async { - let file_progress = current as f32 / total as f32; - let total_progress = - (i as f32 + file_progress) / total_paths as f32; - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress( - id, - 100.0 * total_progress, - )) - .await; - }); + let file_progress = current as f32 / total as f32; + let total_progress = + (i as f32 + file_progress) / total_paths as f32; + controller.set_progress(total_progress); } } else { archive @@ -863,22 +724,13 @@ impl Operation { .map_err(err_str)? .map_err(err_str)? } - Self::Copy { paths, to } => { - copy_or_move(paths, to, false, id, msg_tx, controller).await? - } + Self::Copy { paths, to } => copy_or_move(paths, to, false, msg_tx, controller).await?, Self::Delete { paths } => { let total = paths.len(); for (i, path) in paths.into_iter().enumerate() { controller.check()?; - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress( - id, - 100.0 * (i as f32) / (total as f32), - )) - .await; + controller.set_progress((i as f32) / (total as f32)); let _items_opt = tokio::task::spawn_blocking(|| trash::delete(path)) .await @@ -899,21 +751,13 @@ impl Operation { ) ))] { - let msg_tx = msg_tx.clone(); tokio::task::spawn_blocking(move || -> Result<(), String> { let items = trash::os_limited::list().map_err(err_str)?; let count = items.len(); for (i, item) in items.into_iter().enumerate() { controller.check()?; - executor::block_on(async { - let total_progress = i as f32 / count as f32; - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress(id, 100.0 * total_progress)) - .await; - }); + controller.set_progress(i as f32 / count as f32); trash::os_limited::purge_all([item]).map_err(err_str)?; } @@ -925,21 +769,13 @@ impl Operation { OperationSelection::default() } Self::Extract { paths, to } => { - let msg_tx = msg_tx.clone(); tokio::task::spawn_blocking(move || -> Result { let total_paths = paths.len(); let mut op_sel = OperationSelection::default(); for (i, path) in paths.iter().enumerate() { controller.check()?; - executor::block_on(async { - let total_progress = (i as f32) / total_paths as f32; - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress(id, 100.0 * total_progress)) - .await; - }); + controller.set_progress((i as f32) / total_paths as f32); let to = to.to_owned(); @@ -956,19 +792,18 @@ impl Operation { op_sel.ignored.push(path.clone()); op_sel.selected.push(new_dir.clone()); - let msg_tx = msg_tx.clone(); let controller = controller.clone(); let mime = mime_for_path(&path); match mime.essence_str() { "application/gzip" | "application/x-compressed-tar" => { - OpReader::new(path, id, msg_tx, controller) + OpReader::new(path, controller) .map(io::BufReader::new) .map(flate2::read::GzDecoder::new) .map(tar::Archive::new) .and_then(|mut archive| archive.unpack(&new_dir)) .map_err(err_str)? } - "application/x-tar" => OpReader::new(path, id, msg_tx, controller) + "application/x-tar" => OpReader::new(path, controller) .map(io::BufReader::new) .map(tar::Archive::new) .and_then(|mut archive| archive.unpack(&new_dir)) @@ -978,12 +813,12 @@ impl Operation { .map(zip::ZipArchive::new) .map_err(err_str)? .and_then(move |mut archive| { - zip_extract(&mut archive, &new_dir, id, msg_tx, controller) + zip_extract(&mut archive, &new_dir, controller) }) .map_err(err_str)?, #[cfg(feature = "bzip2")] "application/x-bzip" | "application/x-bzip-compressed-tar" => { - OpReader::new(path, id, msg_tx, controller) + OpReader::new(path, controller) .map(io::BufReader::new) .map(bzip2::read::BzDecoder::new) .map(tar::Archive::new) @@ -992,7 +827,7 @@ impl Operation { } #[cfg(feature = "liblzma")] "application/x-xz" | "application/x-xz-compressed-tar" => { - OpReader::new(path, id, msg_tx, controller) + OpReader::new(path, controller) .map(io::BufReader::new) .map(liblzma::read::XzDecoder::new) .map(tar::Archive::new) @@ -1010,9 +845,7 @@ impl Operation { .map_err(err_str)? .map_err(err_str)? } - Self::Move { paths, to } => { - copy_or_move(paths, to, true, id, msg_tx, controller).await? - } + Self::Move { paths, to } => copy_or_move(paths, to, true, msg_tx, controller).await?, Self::NewFolder { path } => { tokio::task::spawn_blocking(move || -> Result { controller.check()?; @@ -1061,14 +894,7 @@ impl Operation { for (i, item) in items.into_iter().enumerate() { controller.check()?; - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress( - id, - 100.0 * (i as f32) / (total as f32), - )) - .await; + controller.set_progress((i as f32) / (total as f32)); paths.push(item.original_path()); @@ -1112,11 +938,7 @@ impl Operation { } }; - let _ = msg_tx - .lock() - .await - .send(Message::PendingProgress(id, 100.0)) - .await; + controller_clone.set_progress(100.0); Ok(paths) } @@ -1164,15 +986,12 @@ mod tests { paths: paths_clone, to: to_clone, } - .perform(id, &sync::Mutex::new(tx).into(), Controller::new()) + .perform(&sync::Mutex::new(tx).into(), Controller::new()) .await }); while let Some(msg) = rx.next().await { match msg { - Message::PendingProgress(id, progress) => { - trace!("({id}) [ {paths:?} => {to:?} ] {progress}% complete)") - } Message::DialogPush(DialogPage::Replace { tx, .. }) => { debug!("[{id}] Replace request"); tx.send(ReplaceResult::Cancel).await.expect("Sending a response to a replace request should succeed") diff --git a/src/operation/reader.rs b/src/operation/reader.rs index fdb9071..c059226 100644 --- a/src/operation/reader.rs +++ b/src/operation/reader.rs @@ -1,35 +1,23 @@ -use cosmic::iced::futures::{channel::mpsc::Sender, executor, SinkExt}; -use std::{fs, io, path::Path, sync::Arc}; -use tokio::sync::Mutex; +use std::{fs, io, path::Path}; use super::Controller; -use crate::app::Message; // Special reader just for operations, handling cancel and progress pub struct OpReader { file: fs::File, metadata: fs::Metadata, current: u64, - id: u64, - msg_tx: Arc>>, controller: Controller, } impl OpReader { - pub fn new>( - path: P, - id: u64, - msg_tx: Arc>>, - controller: Controller, - ) -> io::Result { + pub fn new>(path: P, controller: Controller) -> io::Result { let file = fs::File::open(&path)?; let metadata = file.metadata()?; Ok(Self { file, metadata, current: 0, - id, - msg_tx, controller, }) } @@ -45,14 +33,7 @@ impl io::Read for OpReader { self.current += count as u64; let progress = self.current as f32 / self.metadata.len() as f32; - executor::block_on(async { - let _ = self - .msg_tx - .lock() - .await - .send(Message::PendingProgress(self.id, 100.0 * progress)) - .await; - }); + self.controller.set_progress(progress); Ok(count) }