diff --git a/Cargo.lock b/Cargo.lock index 366bb64..c4f9699 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1378,6 +1378,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "funty" version = "2.0.0" @@ -2221,6 +2230,26 @@ dependencies = [ "cfb", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "instant" version = "0.1.13" @@ -2356,6 +2385,26 @@ dependencies = [ "serde_json", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "kuchikiki" version = "0.8.2" @@ -2462,6 +2511,7 @@ dependencies = [ "lru", "memmap2", "mime_guess", + "notify", "parking_lot", "rand 0.8.5", "regex", @@ -2842,6 +2892,18 @@ dependencies = [ "adler2", ] +[[package]] +name = "mio" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +dependencies = [ + "libc", + "log", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys 0.48.0", +] + [[package]] name = "mio" version = "1.0.2" @@ -2939,6 +3001,25 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "6.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" +dependencies = [ + "bitflags 2.6.0", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 0.8.11", + "walkdir", + "windows-sys 0.48.0", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -5204,7 +5285,7 @@ dependencies = [ "backtrace", "bytes", "libc", - "mio", + "mio 1.0.2", "parking_lot", "pin-project-lite", "signal-hook-registry", diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 2c1a6ab..2ebfdaf 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -265,7 +265,7 @@ impl RecursiveRequest { let request_one = |id, addr, depth| { req.request_one(id, addr, depth) .map_err(|e| { - debug!("error: {e:?}"); + debug!("error: {e:#}"); e }) .instrument(error_span!( @@ -341,7 +341,7 @@ impl RecursiveRequest { Ok(n) if n < 8 => REQUERY_INTERVAL / 8 * (n as u32), Ok(_) => REQUERY_INTERVAL, Err(e) => { - error!("error in get_peers_root(): {e:?}"); + error!("error in get_peers_root(): {e:#}"); return Err::<(), anyhow::Error>(e); } }; @@ -359,7 +359,7 @@ impl RecursiveRequest { let (id, addr, depth) = addr.unwrap(); futs.push( this.request_one(id, addr, depth) - .map_err(|e| debug!("error: {e:?}")) + .map_err(|e| debug!("error: {e:#}")) .instrument(error_span!("addr", addr=addr.to_string())) ); } @@ -996,7 +996,7 @@ impl DhtWorker { }, Err(e) => { self.dht.routing_table.write().mark_error(&id); - debug!("error: {e:?}"); + debug!("error: {e:#}"); } } }.instrument(error_span!("ping", addr=addr.to_string()))) @@ -1033,7 +1033,7 @@ impl DhtWorker { ) .unwrap(); if let Err(e) = socket.send_to(&buf, addr).await { - debug!("error sending to {addr}: {e:?}"); + debug!("error sending to {addr}: {e:#}"); if let Some(tid) = our_tid { self.on_send_error(tid, addr, e.into()); } diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index fc2874c..d9870b5 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -36,6 +36,7 @@ storage_examples = [] tracing-subscriber-utils = ["tracing-subscriber"] postgres = ["sqlx"] async-bt = ["async-backtrace"] +watch = ["notify"] [dependencies] sqlx = { version = "0.8.2", features = [ @@ -105,6 +106,7 @@ mime_guess = { version = "2.0.5", default-features = false } tokio-socks = "0.5.2" async-trait = "0.1.81" async-backtrace = { version = "0.2", optional = true } +notify = { version = "6.1.1", optional = true } [build-dependencies] anyhow = "1" diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index f8543c9..455381c 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -67,6 +67,8 @@ pub mod tracing_subscriber_config_utils; mod type_aliases; #[cfg(all(feature = "http-api", feature = "upnp-serve-adapter"))] pub mod upnp_server_adapter; +#[cfg(feature = "watch")] +pub mod watch; pub use api::Api; pub use api_error::ApiError; diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 67b6fcb..3b8eda0 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -147,7 +147,7 @@ async fn _test_e2e_download(drop_checks: &DropChecks) { } Ok(true) } - crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"), + crate::ManagedTorrentState::Error(e) => bail!("error: {e:#}"), _ => bail!("broken state"), }) .context("error checking for torrent liveness")?; diff --git a/crates/librqbit/src/watch.rs b/crates/librqbit/src/watch.rs new file mode 100644 index 0000000..b435928 --- /dev/null +++ b/crates/librqbit/src/watch.rs @@ -0,0 +1,173 @@ +use std::{ + io::Read, + path::{Path, PathBuf}, + sync::{Arc, Weak}, +}; + +use anyhow::{bail, Context}; +use buffers::ByteBuf; +use librqbit_core::torrent_metainfo::torrent_from_bytes; +use notify::Watcher; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tracing::{debug, error, error_span, trace, warn}; + +use crate::{AddTorrent, AddTorrentOptions, AddTorrentResponse, Session}; + +struct ThreadCancelEvent { + mutex: parking_lot::Mutex, + condvar: parking_lot::Condvar, +} + +impl ThreadCancelEvent { + fn new() -> Arc { + Arc::new(Self { + mutex: parking_lot::Mutex::new(false), + condvar: parking_lot::Condvar::new(), + }) + } + + fn cancel(&self) { + let mut g = self.mutex.lock(); + *g = true; + self.condvar.notify_all(); + } + + fn wait_until_cancelled(&self) { + let mut g = self.mutex.lock(); + while !*g { + self.condvar.wait(&mut g); + } + } +} + +async fn watch_adder(session_w: Weak, mut rx: UnboundedReceiver>) { + async fn add_one( + session_w: &Weak, + add_torrent: AddTorrent<'static>, + ) -> anyhow::Result<()> { + let session = match session_w.upgrade() { + Some(s) => s, + None => return Ok(()), + }; + let res = session + .add_torrent( + add_torrent, + Some(AddTorrentOptions { + overwrite: true, + ..Default::default() + }), + ) + .await?; + match res { + AddTorrentResponse::Added(_, _) => {} + AddTorrentResponse::AlreadyManaged(_, _) => { + debug!("already managed"); + } + AddTorrentResponse::ListOnly(..) => bail!("bug: unexpected list only"), + } + Ok(()) + } + + while let Some(add_torrent) = rx.recv().await { + if let Err(e) = add_one(&session_w, add_torrent).await { + warn!("error adding torrent: {e:#}"); + } + } +} + +fn watch_thread( + folder: PathBuf, + tx: UnboundedSender>, + cancel_event: &ThreadCancelEvent, +) -> anyhow::Result<()> { + fn read_and_validate_torrent(path: &Path) -> anyhow::Result> { + let mut buf = Vec::new(); + std::fs::File::open(path) + .context("error opening")? + .read_to_end(&mut buf) + .context("error reading")?; + torrent_from_bytes::(&buf).context("invalid .torrent file")?; + Ok(AddTorrent::from_bytes(buf)) + } + + fn watch_cb( + ev: notify::Result, + tx: &UnboundedSender>, + ) -> anyhow::Result<()> { + trace!(event=?ev, "watch event"); + let ev = ev.context("error event")?; + match ev.kind { + notify::EventKind::Create(_) | notify::EventKind::Modify(_) => {} + other => { + debug!(kind=?other, paths=?ev.paths, "ignoring event"); + return Ok(()); + } + } + for path in ev.paths { + if path.extension().and_then(|e| e.to_str()) != Some("torrent") { + trace!(?path, "ignoring path"); + continue; + } + let add = match read_and_validate_torrent(&path) { + Ok(add) => add, + Err(e) => { + debug!(?path, "error validating torrent: {e:#}"); + continue; + } + }; + + if tx.send(add).is_err() { + return Ok(()); + } + } + Ok(()) + } + + let mut watcher = notify::recommended_watcher(move |ev| { + if let Err(e) = watch_cb(ev, &tx) { + warn!("error processing watch event: {e:#}"); + } + }) + .context("error creating watcher")?; + watcher + .watch(&folder, notify::RecursiveMode::Recursive) + .context("error watching")?; + cancel_event.wait_until_cancelled(); + debug!(?folder, "watcher thread done"); + Ok(()) +} + +impl Session { + pub fn watch_folder(self: &Arc, watch_folder: &Path) { + let session_w = Arc::downgrade(self); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + self.spawn(error_span!("watch_adder", ?watch_folder), async move { + watch_adder(session_w, rx).await; + Ok(()) + }); + + let cancel_event = ThreadCancelEvent::new(); + let cancel_event_2 = cancel_event.clone(); + let cancel_token = self.cancellation_token().clone(); + crate::spawn_utils::spawn( + "watch_cancel", + error_span!("watch_cancel", ?watch_folder), + async move { + cancel_token.cancelled().await; + trace!("canceling watcher"); + cancel_event.cancel(); + Ok(()) + }, + ); + + let watch_folder = PathBuf::from(watch_folder); + let session_span = self.rs(); + std::thread::spawn(move || { + let span = error_span!(parent: session_span, "watcher", folder=?watch_folder); + let _ = span.enter(); + if let Err(e) = watch_thread(watch_folder, tx, &cancel_event_2) { + error!("error in watcher thread: {e:#}"); + } + }); + } +} diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 8865888..1a0c046 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -27,6 +27,7 @@ librqbit = { version = "7.1.0-beta.0", path = "../librqbit", default-features = "http-api", "tracing-subscriber-utils", "upnp-serve-adapter", + "watch", ] } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } console-subscriber = { version = "0.4", optional = true } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 4e37de2..561d52d 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,4 +1,11 @@ -use std::{io, net::SocketAddr, path::PathBuf, sync::Arc, thread, time::Duration}; +use std::{ + io, + net::SocketAddr, + path::{Path, PathBuf}, + sync::Arc, + thread, + time::Duration, +}; use anyhow::{bail, Context}; use clap::{CommandFactory, Parser, ValueEnum}; @@ -236,6 +243,11 @@ struct ServerStartOptions { /// [Experimental] if set, will try to resume quickly after restart and skip checksumming. #[arg(long = "fastresume", env = "RQBIT_FASTRESUME")] fastresume: bool, + + /// The folder to watch for added .torrent files. All files in this folder will be automatically added + /// to the session. + #[arg(long = "watch-folder", env = "RQBIT_WATCH_FOLDER")] + watch_folder: Option, } #[derive(Parser)] @@ -580,7 +592,7 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> }; let api = Api::new( - session, + session.clone(), Some(log_config.rust_log_reload_tx), Some(log_config.line_broadcast), ); @@ -595,6 +607,10 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> let upnp_router = upnp_server.as_mut().and_then(|s| s.take_router().ok()); let http_api_fut = http_api.make_http_api_and_run(tcp_listener, upnp_router); + if let Some(watch_folder) = start_opts.watch_folder.as_ref() { + session.watch_folder(Path::new(watch_folder)); + } + let res = match upnp_server { Some(srv) => { let upnp_fut = srv.run_ssdp_forever(); @@ -609,7 +625,6 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> r = http_api_fut => r, }, }; - res.context("error running server") } },