Merge pull request #1742 from pop-os/mtp
Fix unsupported errors when copying large files over MTP
This commit is contained in:
commit
b895b07bb2
12 changed files with 671 additions and 216 deletions
14
Cargo.lock
generated
14
Cargo.lock
generated
|
|
@ -538,6 +538,12 @@ version = "1.1.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
|
||||
|
||||
[[package]]
|
||||
name = "atomic_float"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "628d228f918ac3b82fe590352cc719d30664a0c13ca3a60266fe02c7132d480a"
|
||||
|
||||
[[package]]
|
||||
name = "atomicwrites"
|
||||
version = "0.4.2"
|
||||
|
|
@ -1349,12 +1355,12 @@ name = "cosmic-files"
|
|||
version = "1.0.9"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"atomic_float",
|
||||
"bzip2",
|
||||
"compio",
|
||||
"cosmic-client-toolkit",
|
||||
"cosmic-mime-apps",
|
||||
"dirs 6.0.0",
|
||||
"env_logger",
|
||||
"fastrand",
|
||||
"filetime",
|
||||
"flate2",
|
||||
|
|
@ -1379,6 +1385,7 @@ dependencies = [
|
|||
"notify-debouncer-full",
|
||||
"notify-rust",
|
||||
"num_cpus",
|
||||
"num_enum",
|
||||
"open",
|
||||
"ordermap",
|
||||
"paste",
|
||||
|
|
@ -1394,8 +1401,11 @@ dependencies = [
|
|||
"tar",
|
||||
"tempfile",
|
||||
"test-log",
|
||||
"thiserror 2.0.18",
|
||||
"tikv-jemallocator",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"trash",
|
||||
"url",
|
||||
"uzers",
|
||||
|
|
@ -1963,7 +1973,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "32e90c2accc4b07a8456ea0debdc2e7587bdd890680d71173a15d4ae604f6eef"
|
||||
dependencies = [
|
||||
"log",
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -1975,7 +1984,6 @@ dependencies = [
|
|||
"anstream",
|
||||
"anstyle",
|
||||
"env_filter",
|
||||
"jiff",
|
||||
"log",
|
||||
]
|
||||
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ icu = { version = "2.1.1", features = ["compiled_data"] }
|
|||
cctk = { git = "https://github.com/pop-os/cosmic-protocols", package = "cosmic-client-toolkit", rev = "160b086", optional = true }
|
||||
cosmic-mime-apps = { git = "https://github.com/pop-os/cosmic-mime-apps.git", optional = true }
|
||||
dirs = "6.0.0"
|
||||
env_logger = "0.11"
|
||||
gio = { version = "0.21", optional = true }
|
||||
glib = { version = "0.21", optional = true }
|
||||
glob = "0.3"
|
||||
|
|
@ -62,6 +61,11 @@ png = "0.18"
|
|||
jxl-oxide = { version = "0.12.5", features = ["image"] }
|
||||
num_cpus = "1.17.0"
|
||||
filetime = "0.2"
|
||||
tracing = "0.1.44"
|
||||
tracing-subscriber = { version = "0.3.22", features = ["env-filter"] }
|
||||
thiserror = "2.0.18"
|
||||
atomic_float = "1.1.0"
|
||||
num_enum = "0.7.6"
|
||||
|
||||
# Completion-based IO runtime to enable io_uring / IOCP file IO support.
|
||||
[dependencies.compio]
|
||||
|
|
|
|||
|
|
@ -9,9 +9,25 @@ use cosmic_files::dialog::{
|
|||
Dialog, DialogChoice, DialogChoiceOption, DialogFilter, DialogFilterPattern, DialogKind,
|
||||
DialogMessage, DialogResult, DialogSettings,
|
||||
};
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init();
|
||||
let log_format = tracing_subscriber::fmt::format()
|
||||
.pretty()
|
||||
.without_time()
|
||||
.with_line_number(true)
|
||||
.with_file(true)
|
||||
.with_target(false)
|
||||
.with_thread_names(true);
|
||||
|
||||
let log_layer = tracing_subscriber::fmt::Layer::default()
|
||||
.with_writer(std::io::stderr)
|
||||
.event_format(log_format);
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::EnvFilter::from_env("RUST_LOG"))
|
||||
.with(log_layer)
|
||||
.init();
|
||||
|
||||
let settings = Settings::default();
|
||||
app::run::<App>(settings, ())?;
|
||||
|
|
|
|||
|
|
@ -6967,8 +6967,7 @@ impl Application for App {
|
|||
|_| {
|
||||
stream::channel(
|
||||
1,
|
||||
move |msg_tx: futures::channel::mpsc::Sender<_>| async move {
|
||||
let msg_tx = Arc::new(tokio::sync::Mutex::new(msg_tx));
|
||||
move |mut msg_tx: futures::channel::mpsc::Sender<_>| async move {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
match notify_rust::Notification::new()
|
||||
.summary(&fl!("notification-in-progress"))
|
||||
|
|
@ -6978,8 +6977,6 @@ impl Application for App {
|
|||
Ok(notification) => {
|
||||
let _ = futures::executor::block_on(async {
|
||||
msg_tx
|
||||
.lock()
|
||||
.await
|
||||
.send(Message::Notification(Arc::new(
|
||||
Mutex::new(notification),
|
||||
)))
|
||||
|
|
|
|||
77
src/channel.rs
Normal file
77
src/channel.rs
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
// Copyright 2025 System76 <info@system76.com>
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{
|
||||
Arc, Mutex,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
},
|
||||
};
|
||||
|
||||
/// Create a channel backed by `tokio::sync::Notify` and a sync mutex with a vec deque.
|
||||
pub fn channel<Message>() -> (Sender<Message>, Receiver<Message>) {
|
||||
let channel = Arc::new(Channel {
|
||||
queue: Mutex::new(VecDeque::default()),
|
||||
notify: tokio::sync::Notify::const_new(),
|
||||
closed: AtomicBool::new(false),
|
||||
});
|
||||
|
||||
(Sender(channel.clone()), Receiver(channel))
|
||||
}
|
||||
|
||||
/// A channel backed by `tokio::sync::Notify` and a sync mutex with a vec deque.
|
||||
struct Channel<Message> {
|
||||
pub(self) queue: Mutex<VecDeque<Message>>,
|
||||
/// Set when a new message has been stored.
|
||||
pub(self) notify: tokio::sync::Notify,
|
||||
/// Set when the receiver is dropped.
|
||||
pub(self) closed: AtomicBool,
|
||||
}
|
||||
|
||||
pub struct Sender<Message>(Arc<Channel<Message>>);
|
||||
|
||||
impl<Message> Sender<Message> {
|
||||
pub fn send(&self, message: Message) {
|
||||
self.0.queue.lock().unwrap().push_back(message);
|
||||
self.0.notify.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
impl<Message> Drop for Sender<Message> {
|
||||
fn drop(&mut self) {
|
||||
self.0.closed.store(true, Ordering::SeqCst);
|
||||
self.0.notify.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Receiver<Message>(Arc<Channel<Message>>);
|
||||
|
||||
impl<Message> Receiver<Message> {
|
||||
/// Returns a value until the sender is dropped.
|
||||
pub async fn recv(&self) -> Option<Message> {
|
||||
loop {
|
||||
{
|
||||
let mut queue = self.0.queue.lock().unwrap();
|
||||
if let Some(value) = queue.pop_front() {
|
||||
if queue.capacity() - queue.len() > 32 {
|
||||
let capacity = queue.len().next_power_of_two();
|
||||
queue.shrink_to(capacity);
|
||||
}
|
||||
drop(queue);
|
||||
return Some(value);
|
||||
}
|
||||
}
|
||||
|
||||
if self.0.closed.load(Ordering::SeqCst) {
|
||||
return None;
|
||||
}
|
||||
|
||||
self.0.notify.notified().await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_recv(&self) -> Option<Message> {
|
||||
self.0.queue.lock().unwrap().pop_front()
|
||||
}
|
||||
}
|
||||
35
src/lib.rs
35
src/lib.rs
|
|
@ -3,10 +3,12 @@
|
|||
|
||||
use cosmic::{app::Settings, iced::Limits};
|
||||
use std::{env, fs, path::PathBuf, process};
|
||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||
|
||||
use app::{App, Flags};
|
||||
pub mod app;
|
||||
mod archive;
|
||||
pub mod channel;
|
||||
pub mod clipboard;
|
||||
mod context_action;
|
||||
use config::Config;
|
||||
|
|
@ -73,7 +75,22 @@ pub fn is_wayland() -> bool {
|
|||
/// Runs application in desktop mode
|
||||
#[rustfmt::skip]
|
||||
pub fn desktop() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init();
|
||||
let log_format = tracing_subscriber::fmt::format()
|
||||
.pretty()
|
||||
.without_time()
|
||||
.with_line_number(true)
|
||||
.with_file(true)
|
||||
.with_target(false)
|
||||
.with_thread_names(true);
|
||||
|
||||
let log_layer = tracing_subscriber::fmt::Layer::default()
|
||||
.with_writer(std::io::stderr)
|
||||
.event_format(log_format);
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::EnvFilter::from_env("RUST_LOG"))
|
||||
.with(log_layer)
|
||||
.init();
|
||||
|
||||
localize::localize();
|
||||
|
||||
|
|
@ -108,7 +125,21 @@ pub fn desktop() -> Result<(), Box<dyn std::error::Error>> {
|
|||
/// Runs application with these settings
|
||||
#[rustfmt::skip]
|
||||
pub fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init();
|
||||
let log_format = tracing_subscriber::fmt::format()
|
||||
.pretty()
|
||||
.with_line_number(true)
|
||||
.with_file(true)
|
||||
.with_target(false)
|
||||
.with_thread_names(true);
|
||||
|
||||
let log_layer = tracing_subscriber::fmt::Layer::default()
|
||||
.with_writer(std::io::stderr)
|
||||
.event_format(log_format);
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.with(log_layer)
|
||||
.init();
|
||||
|
||||
localize::localize();
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ use cosmic::{
|
|||
};
|
||||
use gio::{glib, prelude::*};
|
||||
use std::{any::TypeId, cell::Cell, future::pending, hash::Hash, path::PathBuf, sync::Arc};
|
||||
use tokio::sync::{Mutex, mpsc};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use super::{Mounter, MounterAuth, MounterItem, MounterItems, MounterMessage};
|
||||
use crate::{
|
||||
|
|
@ -199,6 +199,7 @@ fn network_scan(uri: &str, sizes: IconSizes) -> Result<Vec<tab::Item>, String> {
|
|||
metadata,
|
||||
hidden,
|
||||
location_opt: Some(location),
|
||||
image_dimensions: None,
|
||||
mime,
|
||||
icon_handle_grid,
|
||||
icon_handle_list,
|
||||
|
|
@ -229,7 +230,10 @@ fn dir_info(uri: &str) -> Result<(String, String, Option<PathBuf>), glib::Error>
|
|||
Ok((resolved_uri, info.display_name().into(), file.path()))
|
||||
}
|
||||
|
||||
fn mount_op(uri: String, event_tx: mpsc::UnboundedSender<Event>) -> gio::MountOperation {
|
||||
fn mount_op(
|
||||
uri: String,
|
||||
event_tx: std::sync::Weak<crate::channel::Sender<Event>>,
|
||||
) -> gio::MountOperation {
|
||||
let mount_op = gio::MountOperation::new();
|
||||
mount_op.connect_ask_password(
|
||||
move |mount_op, message, default_user, default_domain, flags| {
|
||||
|
|
@ -252,9 +256,9 @@ fn mount_op(uri: String, event_tx: mpsc::UnboundedSender<Event>) -> gio::MountOp
|
|||
.then_some(false),
|
||||
};
|
||||
let (auth_tx, mut auth_rx) = mpsc::channel(1);
|
||||
event_tx
|
||||
.send(Event::NetworkAuth(uri.clone(), auth, auth_tx))
|
||||
.unwrap();
|
||||
if let Some(event_tx) = event_tx.upgrade() {
|
||||
event_tx.send(Event::NetworkAuth(uri.clone(), auth, auth_tx));
|
||||
}
|
||||
//TODO: async recv?
|
||||
if let Some(auth) = auth_rx.blocking_recv() {
|
||||
if auth.anonymous_opt == Some(true) {
|
||||
|
|
@ -357,37 +361,45 @@ impl Item {
|
|||
|
||||
pub struct Gvfs {
|
||||
command_tx: mpsc::UnboundedSender<Cmd>,
|
||||
event_rx: Arc<Mutex<mpsc::UnboundedReceiver<Event>>>,
|
||||
event_rx: Arc<crate::channel::Receiver<Event>>,
|
||||
}
|
||||
|
||||
impl Gvfs {
|
||||
pub fn new() -> Self {
|
||||
//TODO: switch to using gvfs-zbus which will better integrate with async rust
|
||||
let (command_tx, mut command_rx) = mpsc::unbounded_channel();
|
||||
let (event_tx, event_rx) = mpsc::unbounded_channel();
|
||||
let (event_tx, event_rx) = crate::channel::channel();
|
||||
let event_tx = Arc::new(event_tx);
|
||||
std::thread::spawn(move || {
|
||||
let main_loop = glib::MainLoop::new(None, false);
|
||||
main_loop.context().spawn_local(async move {
|
||||
let event_tx = Arc::downgrade(&event_tx);
|
||||
let monitor = gio::VolumeMonitor::get();
|
||||
{
|
||||
let event_tx = event_tx.clone();
|
||||
monitor.connect_mount_changed(move |_monitor, mount| {
|
||||
log::info!("mount changed {}", MountExt::name(mount));
|
||||
event_tx.send(Event::Changed).unwrap();
|
||||
if let Some(event_tx) = event_tx.upgrade() {
|
||||
event_tx.send(Event::Changed);
|
||||
}
|
||||
});
|
||||
}
|
||||
{
|
||||
let event_tx = event_tx.clone();
|
||||
monitor.connect_mount_added(move |_monitor, mount| {
|
||||
log::info!("mount added {}", MountExt::name(mount));
|
||||
event_tx.send(Event::Changed).unwrap();
|
||||
if let Some(event_tx) = event_tx.upgrade() {
|
||||
event_tx.send(Event::Changed);
|
||||
}
|
||||
});
|
||||
}
|
||||
{
|
||||
let event_tx = event_tx.clone();
|
||||
monitor.connect_mount_removed(move |_monitor, mount| {
|
||||
log::info!("mount removed {}", MountExt::name(mount));
|
||||
event_tx.send(Event::Changed).unwrap();
|
||||
if let Some(event_tx) = event_tx.upgrade() {
|
||||
event_tx.send(Event::Changed);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -395,21 +407,27 @@ impl Gvfs {
|
|||
let event_tx = event_tx.clone();
|
||||
monitor.connect_volume_changed(move |_monitor, volume| {
|
||||
log::info!("volume changed {}", VolumeExt::name(volume));
|
||||
event_tx.send(Event::Changed).unwrap();
|
||||
if let Some(event_tx) = event_tx.upgrade() {
|
||||
event_tx.send(Event::Changed);
|
||||
}
|
||||
});
|
||||
}
|
||||
{
|
||||
let event_tx = event_tx.clone();
|
||||
monitor.connect_volume_added(move |_monitor, volume| {
|
||||
log::info!("volume added {}", VolumeExt::name(volume));
|
||||
event_tx.send(Event::Changed).unwrap();
|
||||
if let Some(event_tx) = event_tx.upgrade() {
|
||||
event_tx.send(Event::Changed);
|
||||
}
|
||||
});
|
||||
}
|
||||
{
|
||||
let event_tx = event_tx.clone();
|
||||
monitor.connect_volume_removed(move |_monitor, volume| {
|
||||
log::info!("volume removed {}", VolumeExt::name(volume));
|
||||
event_tx.send(Event::Changed).unwrap();
|
||||
if let Some(event_tx) = event_tx.upgrade() {
|
||||
event_tx.send(Event::Changed);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -419,7 +437,11 @@ impl Gvfs {
|
|||
items_tx.send(items(&monitor, sizes)).await.unwrap();
|
||||
}
|
||||
Cmd::Rescan => {
|
||||
event_tx.send(Event::Items(items(&monitor, IconSizes::default()))).unwrap();
|
||||
let Some(event_tx) = event_tx.upgrade() else {
|
||||
return;
|
||||
};
|
||||
|
||||
event_tx.send(Event::Items(items(&monitor, IconSizes::default())));
|
||||
}
|
||||
Cmd::Mount(mounter_item, complete_tx) => {
|
||||
let MounterItem::Gvfs(ref item) = mounter_item else {
|
||||
|
|
@ -471,6 +493,9 @@ impl Gvfs {
|
|||
.ok().map(|info| info.boolean(gio::FILE_ATTRIBUTE_FILESYSTEM_REMOTE))
|
||||
.unwrap_or(true);
|
||||
}
|
||||
let Some(event_tx) = event_tx.upgrade() else {
|
||||
return;
|
||||
};
|
||||
event_tx.send(Event::MountResult(updated_item, match res {
|
||||
Ok(()) => {
|
||||
_ = complete_tx.send(Ok(()));
|
||||
|
|
@ -482,7 +507,7 @@ impl Gvfs {
|
|||
Some(gio::IOErrorEnum::FailedHandled) => Ok(false),
|
||||
_ => Err(format!("{err}"))
|
||||
}}
|
||||
})).unwrap();
|
||||
}));
|
||||
},
|
||||
);
|
||||
break;
|
||||
|
|
@ -498,6 +523,9 @@ impl Gvfs {
|
|||
gio::Cancellable::NONE,
|
||||
move |res| {
|
||||
log::info!("network drive {uri}: result {res:?}");
|
||||
let Some(event_tx) = event_tx.upgrade() else {
|
||||
return;
|
||||
};
|
||||
event_tx.send(Event::NetworkResult(uri, match res {
|
||||
Ok(()) => {
|
||||
_ = result_tx.send(Ok(()));
|
||||
|
|
@ -508,7 +536,7 @@ impl Gvfs {
|
|||
Some(gio::IOErrorEnum::FailedHandled) => Ok(false),
|
||||
_ => Err(format!("{err}"))
|
||||
}}
|
||||
})).unwrap();
|
||||
}));
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
@ -532,6 +560,9 @@ impl Gvfs {
|
|||
// FIXME sometimes a uri can be mounted and then not recognized as mounted...
|
||||
// seems to be related to uri with a path
|
||||
items_tx.blocking_send(network_scan(&uri, sizes)).unwrap();
|
||||
let Some(event_tx) = event_tx.upgrade() else {
|
||||
return;
|
||||
};
|
||||
event_tx.send(Event::NetworkResult(resolved_uri, match res {
|
||||
Ok(()) => {
|
||||
Ok(true)
|
||||
|
|
@ -540,7 +571,7 @@ impl Gvfs {
|
|||
Some(gio::IOErrorEnum::FailedHandled) => Ok(false),
|
||||
_ => Err(format!("{err}"))
|
||||
}
|
||||
})).unwrap();
|
||||
}));
|
||||
}
|
||||
);
|
||||
} else {
|
||||
|
|
@ -596,7 +627,7 @@ impl Gvfs {
|
|||
});
|
||||
Self {
|
||||
command_tx,
|
||||
event_rx: Arc::new(Mutex::new(event_rx)),
|
||||
event_rx: Arc::new(event_rx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -670,7 +701,7 @@ impl Mounter for Gvfs {
|
|||
let event_rx = self.event_rx.clone();
|
||||
struct Wrapper {
|
||||
command_tx: mpsc::UnboundedSender<Cmd>,
|
||||
event_rx: Arc<Mutex<mpsc::UnboundedReceiver<Event>>>,
|
||||
event_rx: Arc<crate::channel::Receiver<Event>>,
|
||||
}
|
||||
impl Hash for Wrapper {
|
||||
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
||||
|
|
@ -694,7 +725,7 @@ impl Mounter for Gvfs {
|
|||
MounterMessage,
|
||||
>| async move {
|
||||
command_tx.send(Cmd::Rescan).unwrap();
|
||||
while let Some(event) = event_rx.lock().await.recv().await {
|
||||
while let Some(event) = event_rx.recv().await {
|
||||
match event {
|
||||
Event::Changed => command_tx.send(Cmd::Rescan).unwrap(),
|
||||
Event::Items(items) => {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,11 @@
|
|||
use std::sync::{Arc, Mutex};
|
||||
use atomic_float::AtomicF32;
|
||||
use num_enum::{IntoPrimitive, TryFromPrimitive};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{self, AtomicU16};
|
||||
use tokio::sync::Notify;
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, IntoPrimitive, TryFromPrimitive)]
|
||||
#[repr(u16)]
|
||||
pub enum ControllerState {
|
||||
Cancelled,
|
||||
Failed,
|
||||
|
|
@ -11,8 +15,8 @@ pub enum ControllerState {
|
|||
|
||||
#[derive(Debug)]
|
||||
struct ControllerInner {
|
||||
state: Mutex<ControllerState>,
|
||||
progress: Mutex<f32>,
|
||||
state: AtomicU16,
|
||||
progress: AtomicF32,
|
||||
notify: Notify,
|
||||
}
|
||||
|
||||
|
|
@ -27,8 +31,8 @@ impl Default for Controller {
|
|||
Self {
|
||||
primary: true,
|
||||
inner: Arc::new(ControllerInner {
|
||||
state: Mutex::new(ControllerState::Running),
|
||||
progress: Mutex::new(0.0),
|
||||
state: AtomicU16::new(ControllerState::Running.into()),
|
||||
progress: AtomicF32::new(0.0),
|
||||
notify: Notify::new(),
|
||||
}),
|
||||
}
|
||||
|
|
@ -50,19 +54,24 @@ impl Controller {
|
|||
}
|
||||
|
||||
pub fn progress(&self) -> f32 {
|
||||
*self.inner.progress.lock().unwrap()
|
||||
self.inner.progress.load(atomic::Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub fn set_progress(&self, progress: f32) {
|
||||
*self.inner.progress.lock().unwrap() = progress;
|
||||
self.inner
|
||||
.progress
|
||||
.swap(progress, atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn state(&self) -> ControllerState {
|
||||
*self.inner.state.lock().unwrap()
|
||||
ControllerState::try_from(self.inner.state.load(atomic::Ordering::Relaxed))
|
||||
.unwrap_or(ControllerState::Failed)
|
||||
}
|
||||
|
||||
pub fn set_state(&self, state: ControllerState) {
|
||||
*self.inner.state.lock().unwrap() = state;
|
||||
self.inner
|
||||
.state
|
||||
.store(state.into(), atomic::Ordering::Relaxed);
|
||||
self.inner.notify.notify_waiters();
|
||||
}
|
||||
|
||||
|
|
@ -86,6 +95,35 @@ impl Controller {
|
|||
self.set_state(ControllerState::Paused);
|
||||
}
|
||||
|
||||
/// Returns when the state is paused.
|
||||
///
|
||||
/// Use this to pause futures.
|
||||
pub async fn until_paused(&self) {
|
||||
loop {
|
||||
if matches!(self.state(), ControllerState::Paused) {
|
||||
return;
|
||||
}
|
||||
|
||||
self.inner.notify.notified().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns when state is neither paused, cancelled, nor failed.
|
||||
///
|
||||
/// Use this to resume futures.
|
||||
pub async fn until_unpaused(&self) {
|
||||
loop {
|
||||
if !matches!(
|
||||
self.state(),
|
||||
ControllerState::Paused | ControllerState::Cancelled | ControllerState::Failed
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
self.inner.notify.notified().await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unpause(&self) {
|
||||
if !self.is_cancelled() | !self.is_failed() {
|
||||
self.set_state(ControllerState::Running);
|
||||
|
|
|
|||
|
|
@ -22,6 +22,9 @@ use zip::AesMode::Aes256;
|
|||
pub use self::controller::{Controller, ControllerState};
|
||||
pub mod controller;
|
||||
|
||||
pub use notifiers::*;
|
||||
mod notifiers;
|
||||
|
||||
pub use self::reader::OpReader;
|
||||
pub mod reader;
|
||||
|
||||
|
|
@ -111,7 +114,7 @@ async fn copy_or_move(
|
|||
);
|
||||
|
||||
// Handle duplicate file names by renaming paths
|
||||
let mut from_to_pairs: Vec<(PathBuf, PathBuf)> = paths
|
||||
let from_to_pairs_iter = paths
|
||||
.into_iter()
|
||||
.zip(std::iter::repeat(to.as_path()))
|
||||
.filter_map(|(from, to)| {
|
||||
|
|
@ -129,36 +132,46 @@ async fn copy_or_move(
|
|||
//TODO: how to handle from missing file name?
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
});
|
||||
|
||||
// Attempt quick and simple renames
|
||||
//TODO: allow rename to be used for directories in recursive context?
|
||||
if matches!(method, Method::Move { .. }) {
|
||||
from_to_pairs.retain(|(from, to)| {
|
||||
//TODO: show replace dialog here?
|
||||
if to.exists() {
|
||||
return true;
|
||||
}
|
||||
|
||||
//TODO: use compio::fs::rename?
|
||||
match fs::rename(from, to) {
|
||||
Ok(()) => {
|
||||
log::info!("renamed {} to {}", from.display(), to.display());
|
||||
false
|
||||
let from_to_pairs: Vec<(PathBuf, PathBuf)> = if matches!(method, Method::Move { .. }) {
|
||||
from_to_pairs_iter
|
||||
.map(|(from, to)| async move {
|
||||
//TODO: show replace dialog here?
|
||||
if to.exists() {
|
||||
return Some((from, to));
|
||||
}
|
||||
Err(err) => {
|
||||
log::info!(
|
||||
"failed to rename {} to {}, fallback to recursive move: {}",
|
||||
from.display(),
|
||||
to.display(),
|
||||
err
|
||||
);
|
||||
true
|
||||
|
||||
match compio::fs::rename(&from, &to).await {
|
||||
Ok(()) => {
|
||||
log::info!("renamed {} to {}", from.display(), to.display());
|
||||
None
|
||||
}
|
||||
Err(err) => {
|
||||
log::info!(
|
||||
"failed to rename {} to {}, fallback to recursive move: {}",
|
||||
from.display(),
|
||||
to.display(),
|
||||
err
|
||||
);
|
||||
Some((from, to))
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
})
|
||||
.collect::<cosmic::iced::futures::stream::FuturesOrdered<_>>()
|
||||
.fold(Vec::new(), |mut pairs, pair| async move {
|
||||
if let Some(pair) = pair {
|
||||
pairs.push(pair);
|
||||
}
|
||||
pairs
|
||||
})
|
||||
.await
|
||||
} else {
|
||||
from_to_pairs_iter.collect()
|
||||
};
|
||||
|
||||
let mut context = Context::new(controller.clone());
|
||||
|
||||
|
|
@ -216,7 +229,7 @@ pub async fn sync_to_disk(
|
|||
}
|
||||
}))
|
||||
.buffer_unordered(32)
|
||||
.collect::<Vec<_>>()
|
||||
.collect::<()>()
|
||||
.await;
|
||||
|
||||
// Sync directories to disk
|
||||
|
|
@ -226,7 +239,7 @@ pub async fn sync_to_disk(
|
|||
}
|
||||
}))
|
||||
.buffer_unordered(16)
|
||||
.collect::<Vec<_>>()
|
||||
.collect::<()>()
|
||||
.await;
|
||||
}
|
||||
|
||||
|
|
|
|||
58
src/operation/notifiers.rs
Normal file
58
src/operation/notifiers.rs
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
// Copyright 2026 System76 <info@system76.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, LazyLock, Mutex};
|
||||
use tokio::sync::Notify;
|
||||
|
||||
/// Monitor files which are being written to.
|
||||
pub struct FileWritingNotifier {
|
||||
data: Vec<PathBuf>,
|
||||
notify: Arc<Notify>,
|
||||
}
|
||||
|
||||
static ACTIVELY_WRITING: LazyLock<Mutex<FileWritingNotifier>> = LazyLock::new(|| {
|
||||
Mutex::new(FileWritingNotifier {
|
||||
data: Vec::new(),
|
||||
notify: Arc::new(Notify::new()),
|
||||
})
|
||||
});
|
||||
|
||||
/// Append path that is being written to.
|
||||
pub fn actively_writing_add(path: PathBuf) {
|
||||
ACTIVELY_WRITING.lock().unwrap().data.push(path);
|
||||
}
|
||||
|
||||
/// Remove path to file that has finished writing and notify waiters.
|
||||
pub fn actively_writing_remove(path: &Path) {
|
||||
let mut guard = ACTIVELY_WRITING.lock().unwrap();
|
||||
guard.data.retain(|p| p != path);
|
||||
guard.notify.notify_waiters();
|
||||
}
|
||||
|
||||
/// Wait until the actively-writing queue is empty or a file has been removed.
|
||||
pub async fn actively_writing_tick() {
|
||||
let notify = (|| {
|
||||
let guard = ACTIVELY_WRITING.lock().unwrap();
|
||||
|
||||
if !guard.data.is_empty() {
|
||||
return Some(guard.notify.clone());
|
||||
}
|
||||
|
||||
None
|
||||
})();
|
||||
|
||||
if let Some(notify) = notify {
|
||||
notify.notified().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a file is being written to. Avoid thumbnail generation until after it is finished.
|
||||
pub fn is_actively_writing_to(path: &Path) -> bool {
|
||||
ACTIVELY_WRITING
|
||||
.lock()
|
||||
.unwrap()
|
||||
.data
|
||||
.iter()
|
||||
.any(|p| p == path)
|
||||
}
|
||||
|
|
@ -1,15 +1,33 @@
|
|||
use compio::BufResult;
|
||||
use compio::buf::{IntoInner, IoBuf};
|
||||
use compio::io::{AsyncReadAt, AsyncWriteAt};
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::time::Instant;
|
||||
use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf, rc::Rc};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::operation::{OperationError, sync_to_disk};
|
||||
// Copyright 2023 System76 <info@system76.com>
|
||||
// SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
use super::{Controller, OperationSelection, ReplaceResult, copy_unique_path};
|
||||
use crate::operation::{OperationError, sync_to_disk};
|
||||
use anyhow::Context as AnyhowContext;
|
||||
use compio::BufResult;
|
||||
use compio::buf::{IntoInner, IoBuf};
|
||||
use compio::driver::{ToSharedFd, op::AsyncifyFd};
|
||||
use compio::io::{AsyncReadAt, AsyncWriteAt};
|
||||
use cosmic::iced::futures;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::time::Instant;
|
||||
use std::{cell::Cell, error::Error, fs, ops::ControlFlow, path::PathBuf};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
#[cfg(feature = "gvfs")]
|
||||
use gio::prelude::FileExtManual;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum GioCopyError {
|
||||
#[error("controller state")]
|
||||
Controller(OperationError),
|
||||
#[cfg(feature = "gvfs")]
|
||||
#[error("gio copy failed")]
|
||||
GLib(#[from] glib::Error),
|
||||
}
|
||||
|
||||
pub enum Method {
|
||||
Copy,
|
||||
|
|
@ -313,136 +331,28 @@ impl Op {
|
|||
})
|
||||
}
|
||||
|
||||
async fn run(
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
mut progress: Progress,
|
||||
) -> Result<bool, Box<dyn Error>> {
|
||||
async fn run(&mut self, ctx: &mut Context, progress: Progress) -> Result<bool, Box<dyn Error>> {
|
||||
if self.skipped.normal.get() || (self.is_cleanup && self.skipped.cleanup.get()) {
|
||||
return Ok(true);
|
||||
}
|
||||
match self.kind {
|
||||
OpKind::Copy => {
|
||||
// Remove `to` if overwriting and it is an existing file
|
||||
if self.to.is_file() {
|
||||
match ctx.replace(self).await? {
|
||||
ControlFlow::Continue(to) => {
|
||||
self.to = to;
|
||||
}
|
||||
ControlFlow::Break(ret) => {
|
||||
return Ok(ret);
|
||||
}
|
||||
}
|
||||
crate::operation::actively_writing_add(self.to.clone());
|
||||
let result = self.copy(ctx, progress).await;
|
||||
|
||||
if result.is_err() {
|
||||
_ = compio::fs::remove_file(&self.to).await;
|
||||
}
|
||||
|
||||
let (from_file, metadata, mut to_file) = cosmic::iced::futures::try_join!(
|
||||
async {
|
||||
compio::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&self.from)
|
||||
.await
|
||||
},
|
||||
compio::fs::metadata(&self.from),
|
||||
// This is atomic and ensures `to` is not created by any other process
|
||||
async {
|
||||
compio::fs::OpenOptions::new()
|
||||
.create_new(true)
|
||||
.write(true)
|
||||
.open(&self.to)
|
||||
.await
|
||||
}
|
||||
)?;
|
||||
|
||||
progress.total_bytes = Some(metadata.len());
|
||||
(ctx.on_progress)(self, &progress);
|
||||
if let Err(err) = to_file.set_permissions(metadata.permissions()).await {
|
||||
// This error is not propagated upwards as some filesystems do not support setting permissions
|
||||
log::warn!(
|
||||
"failed to set permissions for {}: {}",
|
||||
self.to.display(),
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
// Prevent spamming the progress callbacks.
|
||||
let mut last_progress_update = Instant::now();
|
||||
// io_uring/IOCP requires transferring ownership of the buffer to the kernel.
|
||||
let mut buf_in = std::mem::take(&mut ctx.buf);
|
||||
// Track where the current read/write position is at.
|
||||
let mut pos = 0;
|
||||
|
||||
loop {
|
||||
let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await;
|
||||
|
||||
let count = match result {
|
||||
Ok(0) => {
|
||||
ctx.buf = buf_out;
|
||||
break;
|
||||
}
|
||||
Ok(count) => count,
|
||||
Err(why) => {
|
||||
ctx.buf = buf_out;
|
||||
return Err(why.into());
|
||||
}
|
||||
};
|
||||
|
||||
let BufResult(result, buf_out_slice) =
|
||||
to_file.write_at(buf_out.slice(..count), pos).await;
|
||||
let buf_out = buf_out_slice.into_inner();
|
||||
|
||||
if let Err(why) = result {
|
||||
ctx.buf = buf_out;
|
||||
return Err(why.into());
|
||||
}
|
||||
|
||||
progress.current_bytes += count as u64;
|
||||
pos += count as u64;
|
||||
|
||||
// Avoid spamming progress messages too early.
|
||||
let current = Instant::now();
|
||||
if current.duration_since(last_progress_update).as_millis() > 49 {
|
||||
last_progress_update = current;
|
||||
(ctx.on_progress)(self, &progress);
|
||||
|
||||
// Also check if the progress was cancelled.
|
||||
if let Err(state) = ctx.controller.check().await {
|
||||
ctx.buf = buf_out;
|
||||
return Err(OperationError::from_state(state, &ctx.controller).into());
|
||||
}
|
||||
}
|
||||
|
||||
buf_in = buf_out;
|
||||
}
|
||||
|
||||
let mut times = fs::FileTimes::new();
|
||||
{
|
||||
use std::os::unix::prelude::MetadataExt;
|
||||
log::info!("{}", metadata.mtime());
|
||||
}
|
||||
if let Ok(time) = metadata.modified() {
|
||||
times = times.set_modified(time);
|
||||
}
|
||||
if let Ok(time) = metadata.accessed() {
|
||||
times = times.set_accessed(time);
|
||||
}
|
||||
//TODO: upstream set_times implementation to compio?
|
||||
{
|
||||
use compio::driver::{ToSharedFd, op::AsyncifyFd};
|
||||
let op =
|
||||
AsyncifyFd::new(to_file.to_shared_fd(), move |file: &std::fs::File| {
|
||||
BufResult(file.set_times(times).map(|_| 0), ())
|
||||
});
|
||||
match compio::runtime::submit(op).await.0.map(|_| ()) {
|
||||
Ok(()) => {
|
||||
log::info!("set times for {} to {:?}", self.to.display(), times);
|
||||
}
|
||||
Err(err) => {
|
||||
log::warn!("failed to set times for {}: {}", self.to.display(), err);
|
||||
}
|
||||
}
|
||||
}
|
||||
crate::operation::actively_writing_remove(&self.to);
|
||||
return result;
|
||||
}
|
||||
OpKind::Move { cross_device_copy } => {
|
||||
// Do not clean up if cross_device_copy is set
|
||||
if cross_device_copy {
|
||||
self.skipped.cleanup.set(true);
|
||||
}
|
||||
|
||||
// Remove `to` if overwriting and it is an existing file
|
||||
if self.to.is_file() {
|
||||
match ctx.replace(self).await? {
|
||||
|
|
@ -520,4 +430,268 @@ impl Op {
|
|||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn copy(
|
||||
&mut self,
|
||||
ctx: &mut Context,
|
||||
mut progress: Progress,
|
||||
) -> Result<bool, Box<dyn Error>> {
|
||||
// Remove `to` if overwriting and it is an existing file
|
||||
if self.to.is_file() {
|
||||
match ctx.replace(self).await? {
|
||||
ControlFlow::Continue(to) => {
|
||||
self.to = to;
|
||||
}
|
||||
ControlFlow::Break(ret) => {
|
||||
return Ok(ret);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (from_file, metadata, to_file) = cosmic::iced::futures::join!(
|
||||
async {
|
||||
compio::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.open(&self.from)
|
||||
.await
|
||||
.with_context(|| format!("failed to open {} for reading", self.from.display(),))
|
||||
},
|
||||
async { compio::fs::metadata(&self.from).await.ok() },
|
||||
// This is atomic and ensures `to` is not created by any other process
|
||||
async {
|
||||
compio::fs::OpenOptions::new()
|
||||
.create_new(true)
|
||||
.write(true)
|
||||
.open(&self.to)
|
||||
.await
|
||||
.with_context(|| format!("failed to open {} for writing", self.to.display()))
|
||||
}
|
||||
);
|
||||
|
||||
let from_file = from_file?;
|
||||
let mut to_file = to_file?;
|
||||
progress.total_bytes = metadata.as_ref().map(|m| m.len());
|
||||
(ctx.on_progress)(self, &progress);
|
||||
|
||||
if let Some(metadata) = metadata.as_ref() {
|
||||
if let Err(why) = to_file.set_permissions(metadata.permissions()).await {
|
||||
// This error is not propagated upwards as some filesystems do not support setting permissions
|
||||
if !matches!(why.kind(), std::io::ErrorKind::Unsupported) {
|
||||
tracing::warn!(?why, "failed to set permissions for {}", self.to.display(),);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Prevent spamming the progress callbacks.
|
||||
let mut last_progress_update = Instant::now();
|
||||
// io_uring/IOCP requires transferring ownership of the buffer to the kernel.
|
||||
let mut buf_in = std::mem::take(&mut ctx.buf);
|
||||
// Track where the current read/write position is at.
|
||||
let mut pos = 0;
|
||||
|
||||
loop {
|
||||
let BufResult(result, buf_out) = from_file.read_at(buf_in, pos).await;
|
||||
|
||||
let count = match result {
|
||||
Ok(0) => {
|
||||
buf_in = buf_out;
|
||||
break;
|
||||
}
|
||||
Ok(count) => count,
|
||||
Err(why) => {
|
||||
ctx.buf = buf_out;
|
||||
tracing::error!("failed to read: {:?}", why);
|
||||
_ = futures::future::join(from_file.close(), to_file.close()).await;
|
||||
return Err(why).context("failed to read")?;
|
||||
}
|
||||
};
|
||||
|
||||
let BufResult(result, buf_out_slice) =
|
||||
to_file.write_at(buf_out.slice(..count), pos).await;
|
||||
let buf_out = buf_out_slice.into_inner();
|
||||
|
||||
if let Err(why) = result {
|
||||
#[cfg(feature = "gvfs")]
|
||||
if let std::io::ErrorKind::Unsupported = why.kind() {
|
||||
ctx.buf = buf_out;
|
||||
_ = futures::future::join(from_file.close(), to_file.close()).await;
|
||||
return self
|
||||
.gio_file_copy(ctx, progress)
|
||||
.await
|
||||
.map(|_| true)
|
||||
.map_err(Into::into);
|
||||
}
|
||||
|
||||
tracing::error!("failed to write: {:?}", why);
|
||||
ctx.buf = buf_out;
|
||||
_ = futures::future::join(from_file.close(), to_file.close()).await;
|
||||
return Err(why).context("failed to write")?;
|
||||
}
|
||||
|
||||
progress.current_bytes += count as u64;
|
||||
pos += count as u64;
|
||||
|
||||
// Avoid spamming progress messages too early.
|
||||
let current = Instant::now();
|
||||
if current.duration_since(last_progress_update).as_millis() > 49 {
|
||||
last_progress_update = current;
|
||||
(ctx.on_progress)(self, &progress);
|
||||
|
||||
// Also check if the progress was cancelled.
|
||||
if let Err(state) = ctx.controller.check().await {
|
||||
ctx.buf = buf_out;
|
||||
tracing::warn!(
|
||||
"operation to copy from {:?} to {:?} cancelled",
|
||||
self.from,
|
||||
self.to
|
||||
);
|
||||
_ = futures::future::join(from_file.close(), to_file.close()).await;
|
||||
return Err(OperationError::from_state(state, &ctx.controller).into());
|
||||
}
|
||||
}
|
||||
|
||||
buf_in = buf_out;
|
||||
}
|
||||
|
||||
ctx.buf = buf_in;
|
||||
|
||||
if let Some(metadata) = metadata.as_ref() {
|
||||
let mut times = fs::FileTimes::new();
|
||||
if let Ok(time) = metadata.modified() {
|
||||
times = times.set_modified(time);
|
||||
}
|
||||
if let Ok(time) = metadata.accessed() {
|
||||
times = times.set_accessed(time);
|
||||
}
|
||||
//TODO: upstream set_times implementation to compio?
|
||||
let op = AsyncifyFd::new(to_file.to_shared_fd(), move |file: &std::fs::File| {
|
||||
BufResult(file.set_times(times).map(|_| 0), ())
|
||||
});
|
||||
match compio::runtime::submit(op).await.0.map(|_| ()) {
|
||||
Ok(()) => {
|
||||
tracing::info!("set times for {} to {:?}", self.to.display(), times);
|
||||
}
|
||||
Err(why) => {
|
||||
if !matches!(why.kind(), std::io::ErrorKind::Unsupported) {
|
||||
tracing::error!(?why, "failed to set times for {}", self.to.display());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ = to_file.close().await;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Fallback mechanism in the event that unsupported I/O error errors occur.
|
||||
/// Fixes unsupported errors when copying large files over MTP.
|
||||
/// TODO: Find what Gio.File does to work around this.
|
||||
#[cfg(feature = "gvfs")]
|
||||
async fn gio_file_copy(
|
||||
&self,
|
||||
ctx: &mut Context,
|
||||
mut progress: Progress,
|
||||
) -> Result<(), GioCopyError> {
|
||||
_ = compio::fs::remove_file(&self.to).await;
|
||||
|
||||
let from = gio::File::for_path(&self.from);
|
||||
let to = gio::File::for_path(&self.to);
|
||||
let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
let (pause_tx, mut pause_rx) = tokio::sync::watch::channel(false);
|
||||
|
||||
let task = compio::runtime::spawn_blocking(move || {
|
||||
let glib_context = glib::MainContext::new();
|
||||
let glib_loop = glib::MainLoop::new(Some(&glib_context), false);
|
||||
glib_context.with_thread_default(move || {
|
||||
let glib_loop2 = glib_loop.clone();
|
||||
glib::MainContext::ref_thread_default().spawn_local(async move {
|
||||
// Create a future for copying the file with `gio::File`. This also creates a progress stream.
|
||||
let (gio_copy_fut, mut progress_stream) = from.copy_future(
|
||||
&to,
|
||||
gio::FileCopyFlags::OVERWRITE | gio::FileCopyFlags::ALL_METADATA,
|
||||
glib::Priority::LOW,
|
||||
);
|
||||
|
||||
let mut copy_fut = gio_copy_fut
|
||||
.map(|result| result.map_err(GioCopyError::GLib))
|
||||
.fuse();
|
||||
|
||||
let progress_fut = std::pin::pin!(async {
|
||||
while let Some((current_bytes, _)) = progress_stream.next().await {
|
||||
_ = progress_tx.send(current_bytes);
|
||||
}
|
||||
|
||||
drop(progress_tx);
|
||||
futures::future::pending::<()>().await;
|
||||
});
|
||||
|
||||
let mut progress_fut = progress_fut.fuse();
|
||||
let mut pause_rx2 = pause_rx.clone();
|
||||
|
||||
loop {
|
||||
let until_paused = std::pin::pin!(pause_rx.wait_for(|paused| *paused));
|
||||
futures::select! {
|
||||
_ = &mut progress_fut => {},
|
||||
|
||||
result = &mut copy_fut => {
|
||||
_ = tx.send(result.map(|_| ()));
|
||||
glib_loop2.quit();
|
||||
return;
|
||||
}
|
||||
|
||||
_ = until_paused.fuse() => {
|
||||
_ = pause_rx2.wait_for(|paused| !*paused).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
glib_loop.run();
|
||||
})
|
||||
});
|
||||
|
||||
let mut last_progress_update = Instant::now();
|
||||
let mut task = task.fuse();
|
||||
let mut rx = rx.fuse();
|
||||
|
||||
loop {
|
||||
let until_paused = std::pin::pin!(ctx.controller.until_paused());
|
||||
futures::select! {
|
||||
value = progress_rx.recv().fuse() => {
|
||||
if let Some(current_bytes) = value {
|
||||
progress.current_bytes = current_bytes as u64;
|
||||
let current = Instant::now();
|
||||
if current.duration_since(last_progress_update).as_millis() > 49 {
|
||||
last_progress_update = current;
|
||||
(ctx.on_progress)(self, &progress);
|
||||
// Also check if the progress was cancelled.
|
||||
if let Err(state) = ctx.controller.check().await {
|
||||
tracing::warn!(
|
||||
"operation to copy from {:?} to {:?} cancelled",
|
||||
self.from,
|
||||
self.to
|
||||
);
|
||||
return Err::<(), GioCopyError>(GioCopyError::Controller(
|
||||
OperationError::from_state(state, &ctx.controller),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result = rx => return result.unwrap(),
|
||||
|
||||
_ = task => (),
|
||||
|
||||
_ = until_paused.fuse() => {
|
||||
// Pauses an active copy while the controller state is paused.
|
||||
_ = pause_tx.send(true);
|
||||
ctx.controller.until_unpaused().await;
|
||||
_ = pause_tx.send(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
42
src/tab.rs
42
src/tab.rs
|
|
@ -708,6 +708,9 @@ pub fn item_from_gvfs_info(path: PathBuf, file_info: gio::FileInfo, sizes: IconS
|
|||
children_opt,
|
||||
},
|
||||
hidden,
|
||||
image_dimensions: (!remote && mime.type_() == mime::IMAGE)
|
||||
.then(|| image::image_dimensions(&path).ok())
|
||||
.flatten(),
|
||||
location_opt: Some(Location::Path(path)),
|
||||
mime,
|
||||
icon_handle_grid,
|
||||
|
|
@ -843,6 +846,7 @@ pub fn item_from_entry(
|
|||
},
|
||||
hidden,
|
||||
location_opt: Some(Location::Path(path)),
|
||||
image_dimensions: None,
|
||||
mime,
|
||||
icon_handle_grid,
|
||||
icon_handle_list,
|
||||
|
|
@ -896,6 +900,9 @@ pub fn item_from_trash_entry(
|
|||
metadata: ItemMetadata::Trash { metadata, entry },
|
||||
hidden: false,
|
||||
location_opt: None,
|
||||
image_dimensions: (mime.type_() == mime::IMAGE)
|
||||
.then(|| image::image_dimensions(&original_path).ok())
|
||||
.flatten(),
|
||||
mime,
|
||||
icon_handle_grid,
|
||||
icon_handle_list,
|
||||
|
|
@ -1444,6 +1451,7 @@ pub fn scan_desktop(
|
|||
metadata,
|
||||
hidden: false,
|
||||
location_opt: Some(Location::Trash),
|
||||
image_dimensions: None,
|
||||
mime,
|
||||
icon_handle_grid,
|
||||
icon_handle_list,
|
||||
|
|
@ -2319,6 +2327,7 @@ pub struct Item {
|
|||
pub hidden: bool,
|
||||
pub location_opt: Option<Location>,
|
||||
pub mime: Mime,
|
||||
pub image_dimensions: Option<(u32, u32)>,
|
||||
pub icon_handle_grid: widget::icon::Handle,
|
||||
pub icon_handle_list: widget::icon::Handle,
|
||||
pub icon_handle_list_condensed: widget::icon::Handle,
|
||||
|
|
@ -6713,13 +6722,13 @@ impl Tab {
|
|||
|
||||
// Determine effective memory budget based on image size
|
||||
let (effective_max_mb, effective_jobs) = if mime.type_() == mime::IMAGE {
|
||||
match image::image_dimensions(&path) {
|
||||
Ok((width, height)) => {
|
||||
match item.image_dimensions {
|
||||
Some((width, height)) => {
|
||||
let (_use_dedicated, eff_mb, eff_jobs) =
|
||||
should_use_dedicated_worker(width, height, max_mb, max_jobs);
|
||||
(eff_mb, eff_jobs)
|
||||
}
|
||||
Err(_) => (max_mb, max_jobs),
|
||||
None => (max_mb, max_jobs),
|
||||
}
|
||||
} else {
|
||||
(max_mb, max_jobs)
|
||||
|
|
@ -6762,6 +6771,10 @@ impl Tab {
|
|||
stream::channel(
|
||||
1,
|
||||
move |mut output: futures::channel::mpsc::Sender<_>| async move {
|
||||
while crate::operation::is_actively_writing_to(&path) {
|
||||
crate::operation::actively_writing_tick().await;
|
||||
}
|
||||
|
||||
let message = {
|
||||
let path = path.clone();
|
||||
|
||||
|
|
@ -6946,9 +6959,8 @@ impl Tab {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
let output = Arc::new(tokio::sync::Mutex::new(output));
|
||||
let (watch_tx, mut watch_rx) = tokio::sync::watch::channel(true);
|
||||
{
|
||||
let output = output.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
scan_search(
|
||||
&search_location,
|
||||
|
|
@ -6976,14 +6988,7 @@ impl Tab {
|
|||
true
|
||||
} else {
|
||||
// Wake up update method
|
||||
futures::executor::block_on(async {
|
||||
output
|
||||
.lock()
|
||||
.await
|
||||
.send(Message::SearchReady(false))
|
||||
.await
|
||||
})
|
||||
.is_ok()
|
||||
watch_tx.send(false).is_ok()
|
||||
}
|
||||
}
|
||||
Err(_) => false,
|
||||
|
|
@ -6996,13 +7001,16 @@ impl Tab {
|
|||
search_location,
|
||||
start.elapsed(),
|
||||
);
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
while watch_rx.changed().await.is_ok() {
|
||||
let is_ready = *watch_rx.borrow_and_update();
|
||||
let _ = output.send(Message::SearchReady(is_ready)).await;
|
||||
}
|
||||
|
||||
// Send final ready
|
||||
let _ = output.lock().await.send(Message::SearchReady(true)).await;
|
||||
let _ = output.send(Message::SearchReady(true)).await;
|
||||
|
||||
std::future::pending().await
|
||||
},
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue