From dedee2ef08998ec3376084ee657d961bd749dd82 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 13 Sep 2024 12:58:09 +0100 Subject: [PATCH] Watching works fine --- Cargo.lock | 1 + crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/session.rs | 4 ++ crates/librqbit/src/torrent_state/live/mod.rs | 2 +- crates/librqbit/src/watch.rs | 52 ++++++++++++++----- 5 files changed, 46 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4f9699..00f23c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2535,6 +2535,7 @@ dependencies = [ "url", "urlencoding", "uuid", + "walkdir", ] [[package]] diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index d9870b5..68eb2a5 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -107,6 +107,7 @@ tokio-socks = "0.5.2" async-trait = "0.1.81" async-backtrace = { version = "0.2", optional = true } notify = { version = "6.1.1", optional = true } +walkdir = "2.5.0" [build-dependencies] anyhow = "1" diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 3b8c59a..b9e2d0c 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1207,6 +1207,10 @@ impl Session { .context("error starting torrent")?; } + if let Some(name) = managed_torrent.shared().info.name.as_ref() { + info!(?name, id, "added torrent"); + } + Ok(AddTorrentResponse::Added(id, managed_torrent)) } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 8ed82c6..2d8d819 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1519,7 +1519,7 @@ impl PeerHandler { match state.file_ops().write_chunk(addr, piece, chunk_info) { Ok(()) => {} Err(e) => { - error!("FATAL: error writing chunk to disk: {:?}", e); + error!("FATAL: error writing chunk to disk: {e:#}"); return state.on_fatal_error(e); } }; diff --git a/crates/librqbit/src/watch.rs b/crates/librqbit/src/watch.rs index b435928..41f1fa6 100644 --- a/crates/librqbit/src/watch.rs +++ b/crates/librqbit/src/watch.rs @@ -10,6 +10,7 @@ use librqbit_core::torrent_metainfo::torrent_from_bytes; use notify::Watcher; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tracing::{debug, error, error_span, trace, warn}; +use walkdir::WalkDir; use crate::{AddTorrent, AddTorrentOptions, AddTorrentResponse, Session}; @@ -40,10 +41,14 @@ impl ThreadCancelEvent { } } -async fn watch_adder(session_w: Weak, mut rx: UnboundedReceiver>) { +async fn watch_adder( + session_w: Weak, + mut rx: UnboundedReceiver<(AddTorrent<'static>, PathBuf)>, +) { async fn add_one( session_w: &Weak, add_torrent: AddTorrent<'static>, + path: PathBuf, ) -> anyhow::Result<()> { let session = match session_w.upgrade() { Some(s) => s, @@ -57,19 +62,20 @@ async fn watch_adder(session_w: Weak, mut rx: UnboundedReceiver {} AddTorrentResponse::AlreadyManaged(_, _) => { - debug!("already managed"); + debug!(?path, "already managed"); } AddTorrentResponse::ListOnly(..) => bail!("bug: unexpected list only"), } Ok(()) } - while let Some(add_torrent) = rx.recv().await { - if let Err(e) = add_one(&session_w, add_torrent).await { + while let Some((add_torrent, path)) = rx.recv().await { + if let Err(e) = add_one(&session_w, add_torrent, path).await { warn!("error adding torrent: {e:#}"); } } @@ -77,7 +83,7 @@ async fn watch_adder(session_w: Weak, mut rx: UnboundedReceiver>, + tx: UnboundedSender<(AddTorrent<'static>, PathBuf)>, cancel_event: &ThreadCancelEvent, ) -> anyhow::Result<()> { fn read_and_validate_torrent(path: &Path) -> anyhow::Result> { @@ -92,7 +98,7 @@ fn watch_thread( fn watch_cb( ev: notify::Result, - tx: &UnboundedSender>, + tx: &UnboundedSender<(AddTorrent<'static>, PathBuf)>, ) -> anyhow::Result<()> { trace!(event=?ev, "watch event"); let ev = ev.context("error event")?; @@ -111,18 +117,37 @@ fn watch_thread( let add = match read_and_validate_torrent(&path) { Ok(add) => add, Err(e) => { - debug!(?path, "error validating torrent: {e:#}"); + warn!(?path, "error validating torrent: {e:#}"); continue; } }; - if tx.send(add).is_err() { + if tx.send((add, path.to_owned())).is_err() { return Ok(()); } } Ok(()) } + for entry in WalkDir::new(&folder) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + .filter(|e| e.path().extension().and_then(|e| e.to_str()) == Some("torrent")) + { + let t = match read_and_validate_torrent(entry.path()) { + Ok(t) => t, + Err(e) => { + warn!(path=?entry.path(), "error validating torrent: {e:#}"); + continue; + } + }; + if tx.send((t, entry.path().to_owned())).is_err() { + debug!(?folder, "watcher thread done"); + return Ok(()); + } + } + let mut watcher = notify::recommended_watcher(move |ev| { if let Err(e) = watch_cb(ev, &tx) { warn!("error processing watch event: {e:#}"); @@ -164,10 +189,11 @@ impl Session { let session_span = self.rs(); std::thread::spawn(move || { let span = error_span!(parent: session_span, "watcher", folder=?watch_folder); - let _ = span.enter(); - if let Err(e) = watch_thread(watch_folder, tx, &cancel_event_2) { - error!("error in watcher thread: {e:#}"); - } + span.in_scope(move || { + if let Err(e) = watch_thread(watch_folder, tx, &cancel_event_2) { + error!("error in watcher thread: {e:#}"); + } + }) }); } }