Partially implement copy and move, debounce events

This commit is contained in:
Jeremy Soller 2024-03-20 11:54:37 -06:00
parent a2560db6ba
commit 244291be79
No known key found for this signature in database
GPG key ID: D02FD439211AF56F
7 changed files with 311 additions and 131 deletions

32
Cargo.lock generated
View file

@ -1071,6 +1071,7 @@ dependencies = [
"fastrand 2.0.1",
"fork",
"freedesktop_entry_parser",
"fs_extra",
"i18n-embed",
"i18n-embed-fl",
"image",
@ -1079,7 +1080,7 @@ dependencies = [
"libcosmic",
"log",
"mime_guess",
"notify",
"notify-debouncer-full",
"once_cell",
"open",
"paste",
@ -1679,6 +1680,15 @@ dependencies = [
"simd-adler32",
]
[[package]]
name = "file-id"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6584280525fb2059cba3db2c04abf947a1a29a45ddae89f3870f8281704fafc9"
dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "filetime"
version = "0.2.23"
@ -1901,6 +1911,12 @@ dependencies = [
"thiserror",
]
[[package]]
name = "fs_extra"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
@ -3381,6 +3397,20 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "notify-debouncer-full"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f5dab59c348b9b50cf7f261960a20e389feb2713636399cd9082cd4b536154"
dependencies = [
"crossbeam-channel",
"file-id",
"log",
"notify",
"parking_lot 0.12.1",
"walkdir",
]
[[package]]
name = "num"
version = "0.4.1"

View file

@ -14,6 +14,7 @@ chrono = { version = "0.4", features = ["unstable-locales"] }
dirs = "5.0.1"
env_logger = "0.11"
freedesktop_entry_parser = { version = "1.3", optional = true }
fs_extra = "1.3"
image = "0.24"
once_cell = "1.19"
open = "5.0.2"
@ -21,11 +22,11 @@ lexical-sort = "0.3.1"
libc = "0.2"
log = "0.4"
mime_guess = "2"
notify = "6"
notify-debouncer-full = "0.3"
paste = "1.0"
serde = { version = "1", features = ["serde_derive"] }
shlex = { version = "1.3" }
tokio = { version = "1" }
tokio = { version = "1", features = ["sync"] }
trash = "3.2.0"
xdg = { version = "2.5.2", optional = true }
xdg-mime = "0.3"

View file

@ -61,7 +61,7 @@ dev *args:
# Run with debug logs
run *args:
cargo build --release
env RUST_LOG=cosmic_files=debug RUST_BACKTRACE=full target/release/cosmic-files {{args}}
env RUST_LOG=cosmic_files=info RUST_BACKTRACE=full target/release/cosmic-files {{args}}
# Run tests
test *args:

View file

@ -22,14 +22,20 @@ use cosmic::{
},
Application, ApplicationExt, Element,
};
use notify::Watcher;
use notify_debouncer_full::{
new_debouncer,
notify::{self, RecommendedWatcher, Watcher},
DebouncedEvent, Debouncer, FileIdMap,
};
use std::{
any::TypeId,
collections::{BTreeMap, HashMap, HashSet, VecDeque},
env, fs,
env, fmt, fs,
num::NonZeroU16,
path::PathBuf,
process, time,
process,
sync::Arc,
time,
};
use crate::{
@ -150,12 +156,12 @@ pub enum Message {
Modifiers(Modifiers),
MoveToTrash(Option<Entity>),
NewItem(Option<Entity>, bool),
NotifyEvent(notify::Event),
NotifyEvents(Vec<DebouncedEvent>),
NotifyWatcher(WatcherWrapper),
OpenTerminal(Option<Entity>),
OpenWith(PathBuf, mime_app::MimeApp),
Paste(Option<Entity>),
PasteContents(Option<Entity>, ClipboardPaste),
PasteContents(PathBuf, ClipboardPaste),
PendingComplete(u64),
PendingError(u64, String),
PendingProgress(u64, f32),
@ -212,9 +218,8 @@ pub enum DialogPage {
},
}
#[derive(Debug)]
pub struct WatcherWrapper {
watcher_opt: Option<notify::RecommendedWatcher>,
watcher_opt: Option<Debouncer<RecommendedWatcher, FileIdMap>>,
}
impl Clone for WatcherWrapper {
@ -223,6 +228,12 @@ impl Clone for WatcherWrapper {
}
}
impl fmt::Debug for WatcherWrapper {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WatcherWrapper").finish()
}
}
impl PartialEq for WatcherWrapper {
fn eq(&self, _other: &Self) -> bool {
false
@ -248,7 +259,7 @@ pub struct App {
pending_operations: BTreeMap<u64, (Operation, f32)>,
complete_operations: BTreeMap<u64, Operation>,
failed_operations: BTreeMap<u64, (Operation, String)>,
watcher_opt: Option<(notify::RecommendedWatcher, HashSet<PathBuf>)>,
watcher_opt: Option<(Debouncer<RecommendedWatcher, FileIdMap>, HashSet<PathBuf>)>,
}
impl App {
@ -351,7 +362,7 @@ impl App {
// Unwatch paths no longer used
for path in old_paths.iter() {
if !new_paths.contains(path) {
match watcher.unwatch(path) {
match watcher.watcher().unwatch(path) {
Ok(()) => {
log::debug!("unwatching {:?}", path);
}
@ -366,7 +377,10 @@ impl App {
for path in new_paths.iter() {
if !old_paths.contains(path) {
//TODO: should this be recursive?
match watcher.watch(path, notify::RecursiveMode::NonRecursive) {
match watcher
.watcher()
.watch(path, notify::RecursiveMode::NonRecursive)
{
Ok(()) => {
log::debug!("watching {:?}", path);
}
@ -898,8 +912,8 @@ impl Application for App {
}
}
}
Message::NotifyEvent(event) => {
log::debug!("{:?}", event);
Message::NotifyEvents(events) => {
log::debug!("{:?}", events);
let mut needs_reload = Vec::new();
for entity in self.tab_model.iter() {
@ -907,10 +921,12 @@ impl Application for App {
//TODO: support reloading trash, somehow
if let Location::Path(path) = &tab.location {
let mut contains_change = false;
for event_path in event.paths.iter() {
if event_path.starts_with(&path) {
contains_change = true;
break;
for event in events.iter() {
for event_path in event.paths.iter() {
if event_path.starts_with(&path) {
contains_change = true;
break;
}
}
}
if contains_change {
@ -992,17 +1008,38 @@ impl Application for App {
self.core.window.show_context = false;
}
Message::Paste(entity_opt) => {
return clipboard::read_data::<ClipboardPaste, _>(move |contents_opt| {
match contents_opt {
Some(contents) => {
message::app(Message::PasteContents(entity_opt, contents))
}
None => message::none(),
let entity = entity_opt.unwrap_or_else(|| self.tab_model.active());
if let Some(tab) = self.tab_model.data_mut::<Tab>(entity) {
if let Location::Path(path) = &tab.location {
let to = path.clone();
return clipboard::read_data::<ClipboardPaste, _>(move |contents_opt| {
match contents_opt {
Some(contents) => {
message::app(Message::PasteContents(to.clone(), contents))
}
None => message::none(),
}
});
}
});
}
}
Message::PasteContents(entity_opt, contents) => {
println!("{:?}", contents);
Message::PasteContents(to, contents) => {
if !contents.paths.is_empty() {
match contents.kind {
ClipboardKind::Copy => {
self.operation(Operation::Copy {
paths: contents.paths,
to,
});
}
ClipboardKind::Cut => {
self.operation(Operation::Move {
paths: contents.paths,
to,
});
}
}
}
}
Message::PendingComplete(id) => {
if let Some((op, _)) = self.pending_operations.remove(&id) {
@ -1539,37 +1576,48 @@ impl Application for App {
|mut output| async move {
let watcher_res = {
let mut output = output.clone();
//TODO: debounce
notify::recommended_watcher(
move |event_res: Result<notify::Event, notify::Error>| match event_res {
Ok(event) => {
match &event.kind {
notify::EventKind::Access(_) => {
// Data not mutated
return;
}
notify::EventKind::Modify(
notify::event::ModifyKind::Metadata(e),
) if (*e != notify::event::MetadataKind::Any
&& *e != notify::event::MetadataKind::WriteTime) =>
{
// Data not mutated nor modify time changed
return;
}
_ => {}
}
new_debouncer(
time::Duration::from_secs(1),
None,
move |events_res: notify_debouncer_full::DebounceEventResult| {
match events_res {
Ok(mut events) => {
events.retain(|event| {
match &event.kind {
notify::EventKind::Access(_) => {
// Data not mutated
false
}
notify::EventKind::Modify(
notify::event::ModifyKind::Metadata(e),
) if (*e != notify::event::MetadataKind::Any
&& *e
!= notify::event::MetadataKind::WriteTime) =>
{
// Data not mutated nor modify time changed
false
}
_ => true
}
});
match futures::executor::block_on(async {
output.send(Message::NotifyEvent(event)).await
}) {
Ok(()) => {}
Err(err) => {
log::warn!("failed to send notify event: {:?}", err);
if !events.is_empty() {
match futures::executor::block_on(async {
output.send(Message::NotifyEvents(events)).await
}) {
Ok(()) => {}
Err(err) => {
log::warn!(
"failed to send notify events: {:?}",
err
);
}
}
}
}
}
Err(err) => {
log::warn!("failed to watch files: {:?}", err);
Err(err) => {
log::warn!("failed to watch files: {:?}", err);
}
}
},
)
@ -1606,26 +1654,25 @@ impl Application for App {
//TODO: use recipe?
let id = *id;
let pending_operation = pending_operation.clone();
subscriptions.push(subscription::channel(
id,
16,
move |mut msg_tx| async move {
match pending_operation.perform(id, &mut msg_tx).await {
Ok(()) => {
let _ = msg_tx.send(Message::PendingComplete(id)).await;
}
Err(err) => {
let _ = msg_tx
.send(Message::PendingError(id, err.to_string()))
.await;
}
subscriptions.push(subscription::channel(id, 16, move |msg_tx| async move {
let msg_tx = Arc::new(tokio::sync::Mutex::new(msg_tx));
match pending_operation.perform(id, &msg_tx).await {
Ok(()) => {
let _ = msg_tx.lock().await.send(Message::PendingComplete(id)).await;
}
Err(err) => {
let _ = msg_tx
.lock()
.await
.send(Message::PendingError(id, err.to_string()))
.await;
}
}
loop {
tokio::time::sleep(time::Duration::new(1, 0)).await;
}
},
));
loop {
tokio::time::sleep(time::Duration::new(1, 0)).await;
}
}));
}
for entity in self.tab_model.iter() {

View file

@ -61,13 +61,13 @@ impl ClipboardCopy {
match Url::from_file_path(path) {
Ok(url) => {
let url_str = url.to_string();
let url_str = url.as_ref();
text_uri_list.push_str(&url_str);
text_uri_list.push_str(url_str);
text_uri_list.push_str(cr_nl);
x_special_gnome_copied_files.push('\n');
x_special_gnome_copied_files.push_str(&url_str);
x_special_gnome_copied_files.push_str(url_str);
}
Err(()) => {
log::warn!(

View file

@ -17,11 +17,15 @@ use cosmic::{
widget::{self, segmented_button},
Application, ApplicationExt, Element,
};
use notify::Watcher;
use notify_debouncer_full::{
new_debouncer,
notify::{self, RecommendedWatcher, Watcher},
DebouncedEvent, Debouncer, FileIdMap,
};
use std::{
any::TypeId,
collections::{HashMap, HashSet},
env, fs,
env, fmt, fs,
path::PathBuf,
time,
};
@ -180,7 +184,7 @@ enum Message {
Cancel,
Filename(String),
Modifiers(Modifiers),
NotifyEvent(notify::Event),
NotifyEvents(Vec<DebouncedEvent>),
NotifyWatcher(WatcherWrapper),
Open,
Save(bool),
@ -188,9 +192,8 @@ enum Message {
TabRescan(Vec<tab::Item>),
}
#[derive(Debug)]
struct WatcherWrapper {
watcher_opt: Option<notify::RecommendedWatcher>,
watcher_opt: Option<Debouncer<RecommendedWatcher, FileIdMap>>,
}
impl Clone for WatcherWrapper {
@ -199,6 +202,12 @@ impl Clone for WatcherWrapper {
}
}
impl fmt::Debug for WatcherWrapper {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WatcherWrapper").finish()
}
}
impl PartialEq for WatcherWrapper {
fn eq(&self, _other: &Self) -> bool {
false
@ -215,7 +224,7 @@ struct App {
result_opt: Option<DialogResult>,
replace_dialog: bool,
tab: Tab,
watcher_opt: Option<(notify::RecommendedWatcher, HashSet<PathBuf>)>,
watcher_opt: Option<(Debouncer<RecommendedWatcher, FileIdMap>, HashSet<PathBuf>)>,
}
impl App {
@ -252,7 +261,7 @@ impl App {
// Unwatch paths no longer used
for path in old_paths.iter() {
if !new_paths.contains(path) {
match watcher.unwatch(path) {
match watcher.watcher().unwatch(path) {
Ok(()) => {
log::debug!("unwatching {:?}", path);
}
@ -267,7 +276,10 @@ impl App {
for path in new_paths.iter() {
if !old_paths.contains(path) {
//TODO: should this be recursive?
match watcher.watch(path, notify::RecursiveMode::NonRecursive) {
match watcher
.watcher()
.watch(path, notify::RecursiveMode::NonRecursive)
{
Ok(()) => {
log::debug!("watching {:?}", path);
}
@ -440,15 +452,17 @@ impl Application for App {
Message::Modifiers(modifiers) => {
self.modifiers = modifiers;
}
Message::NotifyEvent(event) => {
log::debug!("{:?}", event);
Message::NotifyEvents(events) => {
log::debug!("{:?}", events);
if let Location::Path(path) = &self.tab.location {
let mut contains_change = false;
for event_path in event.paths.iter() {
if event_path.starts_with(&path) {
contains_change = true;
break;
for event in events.iter() {
for event_path in event.paths.iter() {
if event_path.starts_with(&path) {
contains_change = true;
break;
}
}
}
if contains_change {
@ -663,37 +677,48 @@ impl Application for App {
|mut output| async move {
let watcher_res = {
let mut output = output.clone();
//TODO: debounce
notify::recommended_watcher(
move |event_res: Result<notify::Event, notify::Error>| match event_res {
Ok(event) => {
match &event.kind {
notify::EventKind::Access(_) => {
// Data not mutated
return;
}
notify::EventKind::Modify(
notify::event::ModifyKind::Metadata(e),
) if (*e != notify::event::MetadataKind::Any
&& *e != notify::event::MetadataKind::WriteTime) =>
{
// Data not mutated nor modify time changed
return;
}
_ => {}
}
new_debouncer(
time::Duration::from_secs(1),
None,
move |events_res: notify_debouncer_full::DebounceEventResult| {
match events_res {
Ok(mut events) => {
events.retain(|event| {
match &event.kind {
notify::EventKind::Access(_) => {
// Data not mutated
false
}
notify::EventKind::Modify(
notify::event::ModifyKind::Metadata(e),
) if (*e != notify::event::MetadataKind::Any
&& *e
!= notify::event::MetadataKind::WriteTime) =>
{
// Data not mutated nor modify time changed
false
}
_ => true
}
});
match futures::executor::block_on(async {
output.send(Message::NotifyEvent(event)).await
}) {
Ok(()) => {}
Err(err) => {
log::warn!("failed to send notify event: {:?}", err);
if !events.is_empty() {
match futures::executor::block_on(async {
output.send(Message::NotifyEvents(events)).await
}) {
Ok(()) => {}
Err(err) => {
log::warn!(
"failed to send notify events: {:?}",
err
);
}
}
}
}
}
Err(err) => {
log::warn!("failed to watch files: {:?}", err);
Err(err) => {
log::warn!("failed to watch files: {:?}", err);
}
}
},
)

View file

@ -1,5 +1,5 @@
use cosmic::iced::futures::{channel::mpsc, SinkExt};
use std::{fs, path::PathBuf};
use cosmic::iced::futures::{channel::mpsc, executor, SinkExt};
use std::{fs, path::PathBuf, sync::Arc};
use crate::app::Message;
@ -41,12 +41,46 @@ pub enum Operation {
impl Operation {
/// Perform the operation
pub async fn perform(self, id: u64, msg_tx: &mut mpsc::Sender<Message>) -> Result<(), String> {
let _ = msg_tx.send(Message::PendingProgress(id, 0.0)).await;
pub async fn perform(
self,
id: u64,
msg_tx: &Arc<tokio::sync::Mutex<mpsc::Sender<Message>>>,
) -> Result<(), String> {
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 0.0))
.await;
//TODO: IF ERROR, RETURN AN Operation THAT CAN UNDO THE CURRENT STATE
//TODO: SAFELY HANDLE CANCEL
match self {
Self::Copy { paths, to } => {
let msg_tx = msg_tx.clone();
tokio::task::spawn_blocking(move || {
log::info!("Copy {:?} to {:?}", paths, to);
let options = fs_extra::dir::CopyOptions::default();
//TODO: set options as desired
fs_extra::copy_items_with_progress(&paths, &to, &options, |progress| {
executor::block_on(async {
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(
id,
100.0 * (progress.copied_bytes as f32)
/ (progress.total_bytes as f32),
))
.await;
});
//TODO: handle exceptions
fs_extra::dir::TransitProcessResult::ContinueOrAbort
})
})
.await
.map_err(err_str)?
.map_err(err_str)?;
}
Self::Delete { paths } => {
let total = paths.len();
let mut count = 0;
@ -57,6 +91,8 @@ impl Operation {
.map_err(err_str)?;
count += 1;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(
id,
100.0 * (count as f32) / (total as f32),
@ -64,26 +100,64 @@ impl Operation {
.await;
}
}
Self::Move { paths, to } => {
let msg_tx = msg_tx.clone();
tokio::task::spawn_blocking(move || {
log::info!("Move {:?} to {:?}", paths, to);
let options = fs_extra::dir::CopyOptions::default();
//TODO: set options as desired
fs_extra::move_items_with_progress(&paths, &to, &options, |progress| {
executor::block_on(async {
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(
id,
100.0 * (progress.copied_bytes as f32)
/ (progress.total_bytes as f32),
))
.await;
});
//TODO: handle exceptions
fs_extra::dir::TransitProcessResult::ContinueOrAbort
})
})
.await
.map_err(err_str)?
.map_err(err_str)?;
}
Self::NewFolder { path } => {
tokio::task::spawn_blocking(|| fs::create_dir(path))
.await
.map_err(err_str)?
.map_err(err_str)?;
let _ = msg_tx.send(Message::PendingProgress(id, 100.0)).await;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0))
.await;
}
Self::NewFile { path } => {
tokio::task::spawn_blocking(|| fs::File::create(path))
.await
.map_err(err_str)?
.map_err(err_str)?;
let _ = msg_tx.send(Message::PendingProgress(id, 100.0)).await;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0))
.await;
}
Self::Rename { from, to } => {
tokio::task::spawn_blocking(|| fs::rename(from, to))
.await
.map_err(err_str)?
.map_err(err_str)?;
let _ = msg_tx.send(Message::PendingProgress(id, 100.0)).await;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0))
.await;
}
Self::Restore { paths } => {
let total = paths.len();
@ -95,6 +169,8 @@ impl Operation {
.map_err(err_str)?;
count += 1;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(
id,
100.0 * (count as f32) / (total as f32),
@ -102,12 +178,13 @@ impl Operation {
.await;
}
}
_ => {
return Err("not implemented".to_string());
}
}
let _ = msg_tx.send(Message::PendingProgress(id, 100.0)).await;
let _ = msg_tx
.lock()
.await
.send(Message::PendingProgress(id, 100.0))
.await;
Ok(())
}