improv(zip_extract): flush files to disk
This ensures the files are fully on the disk when the operation is done.
This commit is contained in:
parent
1963e58560
commit
d9f654ffe3
3 changed files with 79 additions and 72 deletions
108
src/archive.rs
108
src/archive.rs
|
|
@ -1,15 +1,14 @@
|
||||||
use std::{
|
|
||||||
collections::VecDeque,
|
|
||||||
fs,
|
|
||||||
io::{self, Read, Write},
|
|
||||||
path::Path,
|
|
||||||
};
|
|
||||||
use zip::result::ZipError;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
mime_icon::mime_for_path,
|
mime_icon::mime_for_path,
|
||||||
operation::{Controller, OpReader, OperationError, OperationErrorType},
|
operation::{Controller, OpReader, OperationError, OperationErrorType, sync_to_disk},
|
||||||
};
|
};
|
||||||
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
|
fs,
|
||||||
|
io::{self, Read, Write},
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
};
|
||||||
|
use zip::result::ZipError;
|
||||||
|
|
||||||
pub const SUPPORTED_ARCHIVE_TYPES: &[&str] = &[
|
pub const SUPPORTED_ARCHIVE_TYPES: &[&str] = &[
|
||||||
"application/gzip",
|
"application/gzip",
|
||||||
|
|
@ -113,27 +112,36 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
|
||||||
use std::{ffi::OsString, fs};
|
use std::{ffi::OsString, fs};
|
||||||
use zip::result::ZipError;
|
use zip::result::ZipError;
|
||||||
|
|
||||||
fn make_writable_dir_all<T: AsRef<Path>>(outpath: T) -> Result<(), ZipError> {
|
fn make_writable_dir_all<T: AsRef<Path>>(
|
||||||
fs::create_dir_all(outpath.as_ref())?;
|
outpath: T,
|
||||||
|
target_dirs: &mut HashSet<PathBuf>,
|
||||||
|
) -> Result<(), ZipError> {
|
||||||
|
let path = outpath.as_ref();
|
||||||
|
if !path.exists() {
|
||||||
|
fs::create_dir_all(path)?;
|
||||||
|
}
|
||||||
|
if !target_dirs.contains(path) {
|
||||||
|
target_dirs.insert(path.to_path_buf());
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
{
|
{
|
||||||
// Dirs must be writable until all normal files are extracted
|
// Dirs must be writable until all normal files are extracted
|
||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
std::fs::set_permissions(
|
fs::set_permissions(
|
||||||
outpath.as_ref(),
|
path,
|
||||||
std::fs::Permissions::from_mode(
|
fs::Permissions::from_mode(0o700 | fs::metadata(path)?.permissions().mode()),
|
||||||
0o700 | std::fs::metadata(outpath.as_ref())?.permissions().mode(),
|
|
||||||
),
|
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
|
||||||
let mut files_by_unix_mode = Vec::new();
|
|
||||||
let mut buffer = vec![0; 4 * 1024 * 1024];
|
let mut buffer = vec![0; 4 * 1024 * 1024];
|
||||||
let total_files = archive.len();
|
let total_files = archive.len();
|
||||||
let mut pending_directory_creates = VecDeque::new();
|
let mut written_files = Vec::with_capacity(total_files);
|
||||||
|
let mut target_dirs = HashSet::new();
|
||||||
|
#[cfg(unix)]
|
||||||
|
let mut files_by_unix_mode = Vec::with_capacity(total_files);
|
||||||
|
|
||||||
for i in 0..total_files {
|
for i in 0..total_files {
|
||||||
futures::executor::block_on(async {
|
futures::executor::block_on(async {
|
||||||
|
|
@ -143,7 +151,7 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
|
||||||
.map_err(|s| io::Error::other(OperationError::from_state(s, &controller)))
|
.map_err(|s| io::Error::other(OperationError::from_state(s, &controller)))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
controller.set_progress((i as f32) / total_files as f32);
|
controller.set_progress(i as f32 / total_files as f32);
|
||||||
|
|
||||||
let mut file = match password {
|
let mut file = match password {
|
||||||
None => archive.by_index(i),
|
None => archive.by_index(i),
|
||||||
|
|
@ -156,26 +164,22 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
|
||||||
let outpath = directory.as_ref().join(filepath);
|
let outpath = directory.as_ref().join(filepath);
|
||||||
|
|
||||||
if file.is_dir() {
|
if file.is_dir() {
|
||||||
pending_directory_creates.push_back(outpath.clone());
|
make_writable_dir_all(&outpath, &mut target_dirs)?;
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
if let Some(mode) = file.unix_mode() {
|
||||||
|
files_by_unix_mode.push((outpath, mode));
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let symlink_target = if file.is_symlink() && (cfg!(unix) || cfg!(windows)) {
|
|
||||||
|
if let Some(parent) = outpath.parent() {
|
||||||
|
make_writable_dir_all(parent, &mut target_dirs)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if file.is_symlink() && (cfg!(unix) || cfg!(windows)) {
|
||||||
let mut target = Vec::with_capacity(file.size() as usize);
|
let mut target = Vec::with_capacity(file.size() as usize);
|
||||||
file.read_to_end(&mut target)?;
|
file.read_to_end(&mut target)?;
|
||||||
Some(target)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
drop(file);
|
|
||||||
if let Some(target) = symlink_target {
|
|
||||||
// create all pending dirs
|
|
||||||
while let Some(pending_dir) = pending_directory_creates.pop_front() {
|
|
||||||
make_writable_dir_all(pending_dir)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(p) = outpath.parent() {
|
|
||||||
make_writable_dir_all(p)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
{
|
{
|
||||||
|
|
@ -205,21 +209,10 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
|
||||||
std::os::windows::fs::symlink_file(target_path, outpath.as_path())?;
|
std::os::windows::fs::symlink_file(target_path, outpath.as_path())?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
written_files.push(outpath);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let mut file = match password {
|
|
||||||
None => archive.by_index(i),
|
|
||||||
Some(pwd) => archive.by_index_decrypt(i, pwd.as_bytes()),
|
|
||||||
}?;
|
|
||||||
|
|
||||||
// create all pending dirs
|
|
||||||
while let Some(pending_dir) = pending_directory_creates.pop_front() {
|
|
||||||
make_writable_dir_all(pending_dir)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(p) = outpath.parent() {
|
|
||||||
make_writable_dir_all(p)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let total = file.size();
|
let total = file.size();
|
||||||
let mut outfile = fs::File::create(&outpath)?;
|
let mut outfile = fs::File::create(&outpath)?;
|
||||||
|
|
@ -245,13 +238,14 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
|
||||||
controller.set_progress(total_progress);
|
controller.set_progress(total_progress);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check for real permissions, which we'll set in a second pass
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
{
|
if let Some(mode) = file.unix_mode() {
|
||||||
// Check for real permissions, which we'll set in a second pass
|
files_by_unix_mode.push((outpath.clone(), mode));
|
||||||
if let Some(mode) = file.unix_mode() {
|
|
||||||
files_by_unix_mode.push((outpath.clone(), mode));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
written_files.push(outpath);
|
||||||
}
|
}
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
{
|
{
|
||||||
|
|
@ -260,11 +254,15 @@ fn zip_extract<R: io::Read + io::Seek, P: AsRef<Path>>(
|
||||||
|
|
||||||
if files_by_unix_mode.len() > 1 {
|
if files_by_unix_mode.len() > 1 {
|
||||||
// Ensure we update children's permissions before making a parent unwritable
|
// Ensure we update children's permissions before making a parent unwritable
|
||||||
files_by_unix_mode.sort_by_key(|(path, _)| Reverse(path.clone()));
|
files_by_unix_mode.sort_by_key(|(path, _)| Reverse(path.components().count()));
|
||||||
}
|
}
|
||||||
for (path, mode) in files_by_unix_mode {
|
for (path, mode) in files_by_unix_mode {
|
||||||
fs::set_permissions(&path, fs::Permissions::from_mode(mode))?;
|
fs::set_permissions(&path, fs::Permissions::from_mode(mode))?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Flush files to disk
|
||||||
|
futures::executor::block_on(async { sync_to_disk(written_files, target_dirs).await });
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -196,6 +196,29 @@ async fn copy_or_move(
|
||||||
.map_err(wrap_compio_spawn_error)?
|
.map_err(wrap_compio_spawn_error)?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn sync_to_disk(
|
||||||
|
written_files: Vec<PathBuf>,
|
||||||
|
target_dirs: std::collections::HashSet<PathBuf>,
|
||||||
|
) {
|
||||||
|
use futures::{StreamExt, stream};
|
||||||
|
|
||||||
|
// Sync files to disk
|
||||||
|
let file_stream = stream::iter(written_files.into_iter().map(|path| async move {
|
||||||
|
if let Ok(file) = compio::fs::OpenOptions::new().write(true).open(&path).await {
|
||||||
|
let _ = file.sync_all().await;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
file_stream.buffer_unordered(32).collect::<Vec<_>>().await;
|
||||||
|
|
||||||
|
// Sync directories to disk
|
||||||
|
let dir_stream = stream::iter(target_dirs.into_iter().map(|path| async move {
|
||||||
|
if let Ok(dir) = compio::fs::OpenOptions::new().read(true).open(&path).await {
|
||||||
|
let _ = dir.sync_all().await;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
dir_stream.buffer_unordered(16).collect::<Vec<_>>().await;
|
||||||
|
}
|
||||||
|
|
||||||
fn copy_unique_path(from: &Path, to: &Path) -> PathBuf {
|
fn copy_unique_path(from: &Path, to: &Path) -> PathBuf {
|
||||||
// List of compound extensions to check
|
// List of compound extensions to check
|
||||||
const COMPOUND_EXTENSIONS: &[&str] = &[
|
const COMPOUND_EXTENSIONS: &[&str] = &[
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,13 @@
|
||||||
use compio::BufResult;
|
use compio::BufResult;
|
||||||
use compio::buf::{IntoInner, IoBuf};
|
use compio::buf::{IntoInner, IoBuf};
|
||||||
use compio::io::{AsyncReadAt, AsyncWriteAt};
|
use compio::io::{AsyncReadAt, AsyncWriteAt};
|
||||||
use futures::{StreamExt, stream};
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf, rc::Rc};
|
use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf, rc::Rc};
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
use crate::operation::OperationError;
|
use crate::operation::{OperationError, sync_to_disk};
|
||||||
|
|
||||||
use super::{Controller, OperationSelection, ReplaceResult, copy_unique_path};
|
use super::{Controller, OperationSelection, ReplaceResult, copy_unique_path};
|
||||||
|
|
||||||
|
|
@ -203,21 +202,8 @@ impl Context {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync files to disk
|
// Flush files to disk
|
||||||
let file_stream = stream::iter(written_files.into_iter().map(|path| async move {
|
sync_to_disk(written_files, target_dirs).await;
|
||||||
if let Ok(file) = compio::fs::OpenOptions::new().write(true).open(&path).await {
|
|
||||||
let _ = file.sync_all().await;
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
file_stream.buffer_unordered(32).collect::<Vec<_>>().await;
|
|
||||||
|
|
||||||
// Sync directories to disk
|
|
||||||
let dir_stream = stream::iter(target_dirs.into_iter().map(|path| async move {
|
|
||||||
if let Ok(dir) = compio::fs::OpenOptions::new().read(true).open(&path).await {
|
|
||||||
let _ = dir.sync_all().await;
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
dir_stream.buffer_unordered(16).collect::<Vec<_>>().await;
|
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue