Implement progress and cancellation for extraction

This commit is contained in:
Jeremy Soller 2024-11-14 16:00:35 -07:00
parent 00ed3115cc
commit 6a79e8178f
No known key found for this signature in database
GPG key ID: D02FD439211AF56F
3 changed files with 230 additions and 8 deletions

View file

@ -500,7 +500,7 @@ fn update<Message: Clone>(
shell.publish(message())
}
}
(Some(new), None) => {
(Some(_), None) => {
if let Some(message) = widget.on_enter.as_ref() {
shell.publish(message())
}

View file

@ -12,6 +12,7 @@ use std::{
use tokio::sync::{mpsc, Mutex};
use walkdir::WalkDir;
use self::reader::OpReader;
use self::recursive::Context;
use crate::{
app::{ArchiveType, DialogPage, Message},
@ -22,6 +23,7 @@ use crate::{
tab,
};
pub mod reader;
pub mod recursive;
fn handle_replace(
@ -74,6 +76,157 @@ fn get_directory_name(file_name: &str) -> &str {
file_name
}
// From https://docs.rs/zip/latest/zip/read/struct.ZipArchive.html#method.extract, with cancellation and progress added
fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
archive: &mut zip::ZipArchive<R>,
directory: P,
id: u64,
msg_tx: Arc<Mutex<Sender<Message>>>,
cancelled: Arc<AtomicBool>,
) -> zip::result::ZipResult<()> {
use std::{ffi::OsString, fs};
use zip::result::ZipError;
fn make_writable_dir_all<T: AsRef<Path>>(outpath: T) -> Result<(), ZipError> {
fs::create_dir_all(outpath.as_ref())?;
#[cfg(unix)]
{
// Dirs must be writable until all normal files are extracted
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(
outpath.as_ref(),
std::fs::Permissions::from_mode(
0o700 | std::fs::metadata(outpath.as_ref())?.permissions().mode(),
),
)?;
}
Ok(())
}
#[cfg(unix)]
let mut files_by_unix_mode = Vec::new();
let mut buffer = vec![0; 4 * 1024 * 1024];
let total_files = archive.len();
for i in 0..total_files {
if cancelled.load(Ordering::SeqCst) {
return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled")).into());
}
executor::block_on(async {
let total_progress = (i as f32) / total_files as f32;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0 * total_progress))
.await;
});
let mut file = archive.by_index(i)?;
let filepath = file
.enclosed_name()
.ok_or(ZipError::InvalidArchive("Invalid file path"))?;
let outpath = directory.as_ref().join(filepath);
if file.is_dir() {
make_writable_dir_all(&outpath)?;
continue;
}
let symlink_target = if file.is_symlink() && (cfg!(unix) || cfg!(windows)) {
let mut target = Vec::with_capacity(file.size() as usize);
file.read_to_end(&mut target)?;
Some(target)
} else {
None
};
drop(file);
if let Some(p) = outpath.parent() {
make_writable_dir_all(p)?;
}
if let Some(target) = symlink_target {
#[cfg(unix)]
{
use std::os::unix::ffi::OsStringExt;
let target = OsString::from_vec(target);
std::os::unix::fs::symlink(&target, outpath.as_path())?;
}
#[cfg(windows)]
{
let Ok(target) = String::from_utf8(target) else {
return Err(ZipError::InvalidArchive("Invalid UTF-8 as symlink target"));
};
let target = target.into_boxed_str();
let target_is_dir_from_archive =
archive.shared.files.contains_key(&target) && is_dir(&target);
let target_path = directory.as_ref().join(OsString::from(target.to_string()));
let target_is_dir = if target_is_dir_from_archive {
true
} else if let Ok(meta) = std::fs::metadata(&target_path) {
meta.is_dir()
} else {
false
};
if target_is_dir {
std::os::windows::fs::symlink_dir(target_path, outpath.as_path())?;
} else {
std::os::windows::fs::symlink_file(target_path, outpath.as_path())?;
}
}
continue;
}
let mut file = archive.by_index(i)?;
let total = file.size();
let mut outfile = fs::File::create(&outpath)?;
let mut current = 0;
loop {
if cancelled.load(Ordering::SeqCst) {
return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled")).into());
}
let count = file.read(&mut buffer)?;
if count == 0 {
break;
}
outfile.write_all(&buffer[..count])?;
current += count as u64;
if current < total {
executor::block_on(async {
let file_progress = current as f32 / total as f32;
let total_progress = (i as f32 + file_progress) / total_files as f32;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0 * total_progress))
.await;
});
}
}
outfile.sync_all()?;
#[cfg(unix)]
{
// Check for real permissions, which we'll set in a second pass
if let Some(mode) = file.unix_mode() {
files_by_unix_mode.push((outpath.clone(), mode));
}
}
}
#[cfg(unix)]
{
use std::cmp::Reverse;
use std::os::unix::fs::PermissionsExt;
if files_by_unix_mode.len() > 1 {
// Ensure we update children's permissions before making a parent unwritable
files_by_unix_mode.sort_by_key(|(path, _)| Reverse(path.clone()));
}
for (path, mode) in files_by_unix_mode.into_iter() {
fs::set_permissions(&path, fs::Permissions::from_mode(mode))?;
}
}
Ok(())
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
pub enum ReplaceResult {
Replace(bool),
@ -172,7 +325,7 @@ async fn copy_or_move(
{
let msg_tx = msg_tx.clone();
context = context.on_progress(move |op, progress| {
context = context.on_progress(move |_op, progress| {
let item_progress = match progress.total_bytes {
Some(total_bytes) => {
if total_bytes == 0 {
@ -523,6 +676,7 @@ impl Operation {
}
ArchiveType::Zip => {
let mut archive = fs::File::create(&to)
.map(io::BufWriter::new)
.map(zip::ZipWriter::new)
.map_err(err_str)?;
@ -701,18 +855,19 @@ impl Operation {
}
}
//TODO: support cancellation while extracting!
let msg_tx = msg_tx.clone();
let cancelled = cancelled.clone();
let mime = mime_for_path(&path);
match mime.essence_str() {
"application/gzip" | "application/x-compressed-tar" => {
fs::File::open(path)
OpReader::new(path, id, msg_tx, cancelled)
.map(io::BufReader::new)
.map(flate2::read::GzDecoder::new)
.map(tar::Archive::new)
.and_then(|mut archive| archive.unpack(&new_dir))
.map_err(err_str)?
}
"application/x-tar" => fs::File::open(path)
"application/x-tar" => OpReader::new(path, id, msg_tx, cancelled)
.map(io::BufReader::new)
.map(tar::Archive::new)
.and_then(|mut archive| archive.unpack(&new_dir))
@ -721,11 +876,13 @@ impl Operation {
.map(io::BufReader::new)
.map(zip::ZipArchive::new)
.map_err(err_str)?
.and_then(|mut archive| archive.extract(&new_dir))
.and_then(move |mut archive| {
zip_extract(&mut archive, &new_dir, id, msg_tx, cancelled)
})
.map_err(err_str)?,
#[cfg(feature = "bzip2")]
"application/x-bzip" | "application/x-bzip-compressed-tar" => {
fs::File::open(path)
OpReader::new(path, id, msg_tx, cancelled)
.map(io::BufReader::new)
.map(bzip2::read::BzDecoder::new)
.map(tar::Archive::new)
@ -734,7 +891,7 @@ impl Operation {
}
#[cfg(feature = "liblzma")]
"application/x-xz" | "application/x-xz-compressed-tar" => {
fs::File::open(path)
OpReader::new(path, id, msg_tx, cancelled)
.map(io::BufReader::new)
.map(liblzma::read::XzDecoder::new)
.map(tar::Archive::new)

65
src/operation/reader.rs Normal file
View file

@ -0,0 +1,65 @@
use cosmic::iced::futures::{channel::mpsc::Sender, executor, SinkExt};
use std::{
fs, io,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::sync::Mutex;
use crate::{app::Message, fl};
// Special reader just for operations, handling cancel and progress
pub struct OpReader {
file: fs::File,
metadata: fs::Metadata,
current: u64,
id: u64,
msg_tx: Arc<Mutex<Sender<Message>>>,
cancelled: Arc<AtomicBool>,
}
impl OpReader {
pub fn new<P: AsRef<Path>>(
path: P,
id: u64,
msg_tx: Arc<Mutex<Sender<Message>>>,
cancelled: Arc<AtomicBool>,
) -> io::Result<Self> {
let file = fs::File::open(&path)?;
let metadata = file.metadata()?;
Ok(Self {
file,
metadata,
current: 0,
id,
msg_tx,
cancelled,
})
}
}
impl io::Read for OpReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.cancelled.load(Ordering::SeqCst) {
return Err(io::Error::new(io::ErrorKind::Other, fl!("cancelled")));
}
let count = self.file.read(buf)?;
self.current += count as u64;
let progress = self.current as f32 / self.metadata.len() as f32;
executor::block_on(async {
let _ = self
.msg_tx
.lock()
.await
.send(Message::PendingProgress(self.id, 100.0 * progress))
.await;
});
Ok(count)
}
}