diff --git a/src/mouse_area.rs b/src/mouse_area.rs index 6e3c6b7..33291eb 100644 --- a/src/mouse_area.rs +++ b/src/mouse_area.rs @@ -500,7 +500,7 @@ fn update( shell.publish(message()) } } - (Some(new), None) => { + (Some(_), None) => { if let Some(message) = widget.on_enter.as_ref() { shell.publish(message()) } diff --git a/src/operation/mod.rs b/src/operation/mod.rs index c73fb0c..3fb5cb7 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -12,6 +12,7 @@ use std::{ use tokio::sync::{mpsc, Mutex}; use walkdir::WalkDir; +use self::reader::OpReader; use self::recursive::Context; use crate::{ app::{ArchiveType, DialogPage, Message}, @@ -22,6 +23,7 @@ use crate::{ tab, }; +pub mod reader; pub mod recursive; fn handle_replace( @@ -74,6 +76,157 @@ fn get_directory_name(file_name: &str) -> &str { file_name } +// From https://docs.rs/zip/latest/zip/read/struct.ZipArchive.html#method.extract, with cancellation and progress added +fn zip_extract>( + archive: &mut zip::ZipArchive, + directory: P, + id: u64, + msg_tx: Arc>>, + cancelled: Arc, +) -> zip::result::ZipResult<()> { + use std::{ffi::OsString, fs}; + use zip::result::ZipError; + + fn make_writable_dir_all>(outpath: T) -> Result<(), ZipError> { + fs::create_dir_all(outpath.as_ref())?; + #[cfg(unix)] + { + // Dirs must be writable until all normal files are extracted + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions( + outpath.as_ref(), + std::fs::Permissions::from_mode( + 0o700 | std::fs::metadata(outpath.as_ref())?.permissions().mode(), + ), + )?; + } + Ok(()) + } + + #[cfg(unix)] + let mut files_by_unix_mode = Vec::new(); + let mut buffer = vec![0; 4 * 1024 * 1024]; + let total_files = archive.len(); + for i in 0..total_files { + if cancelled.load(Ordering::SeqCst) { + return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled")).into()); + } + + executor::block_on(async { + let total_progress = (i as f32) / total_files as f32; + let _ = msg_tx + .lock() + .await + .send(Message::PendingProgress(id, 100.0 * total_progress)) + .await; + }); + + let mut file = archive.by_index(i)?; + let filepath = file + .enclosed_name() + .ok_or(ZipError::InvalidArchive("Invalid file path"))?; + + let outpath = directory.as_ref().join(filepath); + + if file.is_dir() { + make_writable_dir_all(&outpath)?; + continue; + } + let symlink_target = if file.is_symlink() && (cfg!(unix) || cfg!(windows)) { + let mut target = Vec::with_capacity(file.size() as usize); + file.read_to_end(&mut target)?; + Some(target) + } else { + None + }; + drop(file); + if let Some(p) = outpath.parent() { + make_writable_dir_all(p)?; + } + if let Some(target) = symlink_target { + #[cfg(unix)] + { + use std::os::unix::ffi::OsStringExt; + let target = OsString::from_vec(target); + std::os::unix::fs::symlink(&target, outpath.as_path())?; + } + #[cfg(windows)] + { + let Ok(target) = String::from_utf8(target) else { + return Err(ZipError::InvalidArchive("Invalid UTF-8 as symlink target")); + }; + let target = target.into_boxed_str(); + let target_is_dir_from_archive = + archive.shared.files.contains_key(&target) && is_dir(&target); + let target_path = directory.as_ref().join(OsString::from(target.to_string())); + let target_is_dir = if target_is_dir_from_archive { + true + } else if let Ok(meta) = std::fs::metadata(&target_path) { + meta.is_dir() + } else { + false + }; + if target_is_dir { + std::os::windows::fs::symlink_dir(target_path, outpath.as_path())?; + } else { + std::os::windows::fs::symlink_file(target_path, outpath.as_path())?; + } + } + continue; + } + let mut file = archive.by_index(i)?; + let total = file.size(); + let mut outfile = fs::File::create(&outpath)?; + let mut current = 0; + loop { + if cancelled.load(Ordering::SeqCst) { + return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled")).into()); + } + + let count = file.read(&mut buffer)?; + if count == 0 { + break; + } + outfile.write_all(&buffer[..count])?; + current += count as u64; + + if current < total { + executor::block_on(async { + let file_progress = current as f32 / total as f32; + let total_progress = (i as f32 + file_progress) / total_files as f32; + let _ = msg_tx + .lock() + .await + .send(Message::PendingProgress(id, 100.0 * total_progress)) + .await; + }); + } + } + outfile.sync_all()?; + #[cfg(unix)] + { + // Check for real permissions, which we'll set in a second pass + if let Some(mode) = file.unix_mode() { + files_by_unix_mode.push((outpath.clone(), mode)); + } + } + } + #[cfg(unix)] + { + use std::cmp::Reverse; + use std::os::unix::fs::PermissionsExt; + + if files_by_unix_mode.len() > 1 { + // Ensure we update children's permissions before making a parent unwritable + files_by_unix_mode.sort_by_key(|(path, _)| Reverse(path.clone())); + } + for (path, mode) in files_by_unix_mode.into_iter() { + fs::set_permissions(&path, fs::Permissions::from_mode(mode))?; + } + } + Ok(()) +} + #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] pub enum ReplaceResult { Replace(bool), @@ -172,7 +325,7 @@ async fn copy_or_move( { let msg_tx = msg_tx.clone(); - context = context.on_progress(move |op, progress| { + context = context.on_progress(move |_op, progress| { let item_progress = match progress.total_bytes { Some(total_bytes) => { if total_bytes == 0 { @@ -523,6 +676,7 @@ impl Operation { } ArchiveType::Zip => { let mut archive = fs::File::create(&to) + .map(io::BufWriter::new) .map(zip::ZipWriter::new) .map_err(err_str)?; @@ -701,18 +855,19 @@ impl Operation { } } - //TODO: support cancellation while extracting! + let msg_tx = msg_tx.clone(); + let cancelled = cancelled.clone(); let mime = mime_for_path(&path); match mime.essence_str() { "application/gzip" | "application/x-compressed-tar" => { - fs::File::open(path) + OpReader::new(path, id, msg_tx, cancelled) .map(io::BufReader::new) .map(flate2::read::GzDecoder::new) .map(tar::Archive::new) .and_then(|mut archive| archive.unpack(&new_dir)) .map_err(err_str)? } - "application/x-tar" => fs::File::open(path) + "application/x-tar" => OpReader::new(path, id, msg_tx, cancelled) .map(io::BufReader::new) .map(tar::Archive::new) .and_then(|mut archive| archive.unpack(&new_dir)) @@ -721,11 +876,13 @@ impl Operation { .map(io::BufReader::new) .map(zip::ZipArchive::new) .map_err(err_str)? - .and_then(|mut archive| archive.extract(&new_dir)) + .and_then(move |mut archive| { + zip_extract(&mut archive, &new_dir, id, msg_tx, cancelled) + }) .map_err(err_str)?, #[cfg(feature = "bzip2")] "application/x-bzip" | "application/x-bzip-compressed-tar" => { - fs::File::open(path) + OpReader::new(path, id, msg_tx, cancelled) .map(io::BufReader::new) .map(bzip2::read::BzDecoder::new) .map(tar::Archive::new) @@ -734,7 +891,7 @@ impl Operation { } #[cfg(feature = "liblzma")] "application/x-xz" | "application/x-xz-compressed-tar" => { - fs::File::open(path) + OpReader::new(path, id, msg_tx, cancelled) .map(io::BufReader::new) .map(liblzma::read::XzDecoder::new) .map(tar::Archive::new) diff --git a/src/operation/reader.rs b/src/operation/reader.rs new file mode 100644 index 0000000..3190964 --- /dev/null +++ b/src/operation/reader.rs @@ -0,0 +1,65 @@ +use cosmic::iced::futures::{channel::mpsc::Sender, executor, SinkExt}; +use std::{ + fs, io, + path::Path, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; +use tokio::sync::Mutex; + +use crate::{app::Message, fl}; + +// Special reader just for operations, handling cancel and progress +pub struct OpReader { + file: fs::File, + metadata: fs::Metadata, + current: u64, + id: u64, + msg_tx: Arc>>, + cancelled: Arc, +} + +impl OpReader { + pub fn new>( + path: P, + id: u64, + msg_tx: Arc>>, + cancelled: Arc, + ) -> io::Result { + let file = fs::File::open(&path)?; + let metadata = file.metadata()?; + Ok(Self { + file, + metadata, + current: 0, + id, + msg_tx, + cancelled, + }) + } +} + +impl io::Read for OpReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if self.cancelled.load(Ordering::SeqCst) { + return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled"))); + } + + let count = self.file.read(buf)?; + self.current += count as u64; + + let progress = self.current as f32 / self.metadata.len() as f32; + executor::block_on(async { + let _ = self + .msg_tx + .lock() + .await + .send(Message::PendingProgress(self.id, 100.0 * progress)) + .await; + }); + + Ok(count) + } +}