Merge pull request #237 from ikatson/watch

[Feature] watching a directory for .torrent files and adding them automatically
This commit is contained in:
Igor Katson 2024-09-13 13:24:02 +01:00 committed by GitHub
commit 932131b18d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 333 additions and 42 deletions

84
Cargo.lock generated
View file

@ -1378,6 +1378,15 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "funty"
version = "2.0.0"
@ -2221,6 +2230,26 @@ dependencies = [
"cfb",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags 1.3.2",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.13"
@ -2356,6 +2385,26 @@ dependencies = [
"serde_json",
]
[[package]]
name = "kqueue"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "kuchikiki"
version = "0.8.2"
@ -2462,6 +2511,7 @@ dependencies = [
"lru",
"memmap2",
"mime_guess",
"notify",
"parking_lot",
"rand 0.8.5",
"regex",
@ -2485,6 +2535,7 @@ dependencies = [
"url",
"urlencoding",
"uuid",
"walkdir",
]
[[package]]
@ -2842,6 +2893,18 @@ dependencies = [
"adler2",
]
[[package]]
name = "mio"
version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c"
dependencies = [
"libc",
"log",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.48.0",
]
[[package]]
name = "mio"
version = "1.0.2"
@ -2939,6 +3002,25 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "notify"
version = "6.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d"
dependencies = [
"bitflags 2.6.0",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"log",
"mio 0.8.11",
"walkdir",
"windows-sys 0.48.0",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@ -5204,7 +5286,7 @@ dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"mio 1.0.2",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",

View file

@ -265,7 +265,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksFindNodes> {
let request_one = |id, addr, depth| {
req.request_one(id, addr, depth)
.map_err(|e| {
debug!("error: {e:?}");
debug!("error: {e:#}");
e
})
.instrument(error_span!(
@ -341,7 +341,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> {
Ok(n) if n < 8 => REQUERY_INTERVAL / 8 * (n as u32),
Ok(_) => REQUERY_INTERVAL,
Err(e) => {
error!("error in get_peers_root(): {e:?}");
error!("error in get_peers_root(): {e:#}");
return Err::<(), anyhow::Error>(e);
}
};
@ -359,7 +359,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> {
let (id, addr, depth) = addr.unwrap();
futs.push(
this.request_one(id, addr, depth)
.map_err(|e| debug!("error: {e:?}"))
.map_err(|e| debug!("error: {e:#}"))
.instrument(error_span!("addr", addr=addr.to_string()))
);
}
@ -996,7 +996,7 @@ impl DhtWorker {
},
Err(e) => {
self.dht.routing_table.write().mark_error(&id);
debug!("error: {e:?}");
debug!("error: {e:#}");
}
}
}.instrument(error_span!("ping", addr=addr.to_string())))
@ -1033,7 +1033,7 @@ impl DhtWorker {
)
.unwrap();
if let Err(e) = socket.send_to(&buf, addr).await {
debug!("error sending to {addr}: {e:?}");
debug!("error sending to {addr}: {e:#}");
if let Some(tid) = our_tid {
self.on_send_error(tid, addr, e.into());
}

View file

@ -36,6 +36,7 @@ storage_examples = []
tracing-subscriber-utils = ["tracing-subscriber"]
postgres = ["sqlx"]
async-bt = ["async-backtrace"]
watch = ["notify"]
[dependencies]
sqlx = { version = "0.8.2", features = [
@ -105,6 +106,8 @@ mime_guess = { version = "2.0.5", default-features = false }
tokio-socks = "0.5.2"
async-trait = "0.1.81"
async-backtrace = { version = "0.2", optional = true }
notify = { version = "6.1.1", optional = true }
walkdir = "2.5.0"
[build-dependencies]
anyhow = "1"

View file

@ -72,7 +72,7 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash {
macro_rules! visit_int {
($v:expr) => {{
let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:?}")))?;
let tid: TorrentId = $v.try_into().map_err(|e| E::custom(format!("{e:#}")))?;
Ok(TorrentIdOrHash::from(tid))
}};
}
@ -118,7 +118,7 @@ impl<'de> Deserialize<'de> for TorrentIdOrHash {
{
TorrentIdOrHash::parse(v).map_err(|e| {
E::custom(format!(
"expected integer or 40 byte info hash, couldn't parse string: {e:?}"
"expected integer or 40 byte info hash, couldn't parse string: {e:#}"
))
})
}

View file

@ -20,28 +20,13 @@ pub struct CreateTorrentOptions<'a> {
}
fn walk_dir_find_paths(dir: &Path, out: &mut Vec<Cow<'_, Path>>) -> anyhow::Result<()> {
let mut stack = vec![Cow::Borrowed(dir)];
while let Some(dir) = stack.pop() {
let rd = std::fs::read_dir(&dir).with_context(|| format!("error reading {:?}", dir))?;
for element in rd {
let element =
element.with_context(|| format!("error reading DirEntry from {:?}", dir))?;
let ft = element.file_type().with_context(|| {
format!(
"error determining filetype of DirEntry {:?} while reading {:?}",
element.file_name(),
dir
)
})?;
let full_path = Cow::Owned(dir.join(element.file_name()));
if ft.is_dir() {
stack.push(full_path);
} else {
out.push(full_path);
}
}
}
out.extend(
walkdir::WalkDir::new(dir)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.map(|e| e.path().to_owned().into()),
);
Ok(())
}

View file

@ -67,6 +67,8 @@ pub mod tracing_subscriber_config_utils;
mod type_aliases;
#[cfg(all(feature = "http-api", feature = "upnp-serve-adapter"))]
pub mod upnp_server_adapter;
#[cfg(feature = "watch")]
pub mod watch;
pub use api::Api;
pub use api_error::ApiError;

View file

@ -673,7 +673,7 @@ impl Session {
tokio::select! {
Some(res) = futs.next(), if !futs.is_empty() => {
if let Err(e) = res {
error!("error adding torrent to session: {e:?}");
error!("error adding torrent to session: {e:#}");
}
}
st = ps.next(), if !added_all => {
@ -1207,6 +1207,10 @@ impl Session {
.context("error starting torrent")?;
}
if let Some(name) = managed_torrent.shared().info.name.as_ref() {
info!(?name, id, "added torrent");
}
Ok(AddTorrentResponse::Added(id, managed_torrent))
}
@ -1248,7 +1252,7 @@ impl Session {
.with_context(|| format!("torrent with id {} did not exist", id))?;
if let Err(e) = removed.pause() {
debug!("error pausing torrent before deletion: {e:?}")
debug!("error pausing torrent before deletion: {e:#}")
}
let storage = removed
@ -1259,7 +1263,7 @@ impl Session {
.pause()
// inspect_err not available in 1.75
.map_err(|e| {
warn!("error pausing torrent: {e:?}");
warn!("error pausing torrent: {e:#}");
e
})
.ok()
@ -1285,7 +1289,7 @@ impl Session {
if removed.shared().options.output_folder != self.output_folder {
if let Err(e) = storage.remove_directory_if_empty(Path::new("")) {
warn!(
"error removing {:?}: {e:?}",
"error removing {:?}: {e:#}",
removed.shared().options.output_folder
)
}
@ -1398,7 +1402,7 @@ fn remove_files_and_dirs(info: &ManagedTorrentShared, files: &dyn TorrentStorage
};
for dir in all_dirs {
if let Err(e) = files.remove_directory_if_empty(dir) {
warn!("error removing {dir:?}: {e:?}");
warn!("error removing {dir:?}: {e:#}");
} else {
debug!("removed {dir:?}")
}

View file

@ -147,7 +147,7 @@ async fn _test_e2e_download(drop_checks: &DropChecks) {
}
Ok(true)
}
crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"),
crate::ManagedTorrentState::Error(e) => bail!("error: {e:#}"),
_ => bail!("broken state"),
})
.context("error checking for torrent liveness")?;

View file

@ -1519,7 +1519,7 @@ impl PeerHandler {
match state.file_ops().write_chunk(addr, piece, chunk_info) {
Ok(()) => {}
Err(e) => {
error!("FATAL: error writing chunk to disk: {:?}", e);
error!("FATAL: error writing chunk to disk: {e:#}");
return state.on_fatal_error(e);
}
};

View file

@ -154,7 +154,7 @@ macro_rules! poll_try_io {
match e {
Ok(r) => r,
Err(e) => {
debug!("stream error {e:?}");
debug!("stream error {e:#}");
return Poll::Ready(Err(e));
}
}

View file

@ -0,0 +1,199 @@
use std::{
io::Read,
path::{Path, PathBuf},
sync::{Arc, Weak},
};
use anyhow::{bail, Context};
use buffers::ByteBuf;
use librqbit_core::torrent_metainfo::torrent_from_bytes;
use notify::Watcher;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{debug, error, error_span, trace, warn};
use walkdir::WalkDir;
use crate::{AddTorrent, AddTorrentOptions, AddTorrentResponse, Session};
struct ThreadCancelEvent {
mutex: parking_lot::Mutex<bool>,
condvar: parking_lot::Condvar,
}
impl ThreadCancelEvent {
fn new() -> Arc<Self> {
Arc::new(Self {
mutex: parking_lot::Mutex::new(false),
condvar: parking_lot::Condvar::new(),
})
}
fn cancel(&self) {
let mut g = self.mutex.lock();
*g = true;
self.condvar.notify_all();
}
fn wait_until_cancelled(&self) {
let mut g = self.mutex.lock();
while !*g {
self.condvar.wait(&mut g);
}
}
}
async fn watch_adder(
session_w: Weak<Session>,
mut rx: UnboundedReceiver<(AddTorrent<'static>, PathBuf)>,
) {
async fn add_one(
session_w: &Weak<Session>,
add_torrent: AddTorrent<'static>,
path: PathBuf,
) -> anyhow::Result<()> {
let session = match session_w.upgrade() {
Some(s) => s,
None => return Ok(()),
};
let res = session
.add_torrent(
add_torrent,
Some(AddTorrentOptions {
overwrite: true,
..Default::default()
}),
)
.await
.with_context(|| format!("error adding torrent from {path:?}"))?;
match res {
AddTorrentResponse::Added(_, _) => {}
AddTorrentResponse::AlreadyManaged(_, _) => {
debug!(?path, "already managed");
}
AddTorrentResponse::ListOnly(..) => bail!("bug: unexpected list only"),
}
Ok(())
}
while let Some((add_torrent, path)) = rx.recv().await {
if let Err(e) = add_one(&session_w, add_torrent, path).await {
warn!("error adding torrent: {e:#}");
}
}
}
fn watch_thread(
folder: PathBuf,
tx: UnboundedSender<(AddTorrent<'static>, PathBuf)>,
cancel_event: &ThreadCancelEvent,
) -> anyhow::Result<()> {
fn read_and_validate_torrent(path: &Path) -> anyhow::Result<AddTorrent<'static>> {
let mut buf = Vec::new();
std::fs::File::open(path)
.context("error opening")?
.read_to_end(&mut buf)
.context("error reading")?;
torrent_from_bytes::<ByteBuf>(&buf).context("invalid .torrent file")?;
Ok(AddTorrent::from_bytes(buf))
}
fn watch_cb(
ev: notify::Result<notify::Event>,
tx: &UnboundedSender<(AddTorrent<'static>, PathBuf)>,
) -> anyhow::Result<()> {
trace!(event=?ev, "watch event");
let ev = ev.context("error event")?;
match ev.kind {
notify::EventKind::Create(_) | notify::EventKind::Modify(_) => {}
other => {
debug!(kind=?other, paths=?ev.paths, "ignoring event");
return Ok(());
}
}
for path in ev.paths {
if path.extension().and_then(|e| e.to_str()) != Some("torrent") {
trace!(?path, "ignoring path");
continue;
}
let add = match read_and_validate_torrent(&path) {
Ok(add) => add,
Err(e) => {
warn!(?path, "error validating torrent: {e:#}");
continue;
}
};
if tx.send((add, path.to_owned())).is_err() {
return Ok(());
}
}
Ok(())
}
for entry in WalkDir::new(&folder)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.file_type().is_file())
.filter(|e| e.path().extension().and_then(|e| e.to_str()) == Some("torrent"))
{
let t = match read_and_validate_torrent(entry.path()) {
Ok(t) => t,
Err(e) => {
warn!(path=?entry.path(), "error validating torrent: {e:#}");
continue;
}
};
if tx.send((t, entry.path().to_owned())).is_err() {
debug!(?folder, "watcher thread done");
return Ok(());
}
}
let mut watcher = notify::recommended_watcher(move |ev| {
if let Err(e) = watch_cb(ev, &tx) {
warn!("error processing watch event: {e:#}");
}
})
.context("error creating watcher")?;
watcher
.watch(&folder, notify::RecursiveMode::Recursive)
.context("error watching")?;
cancel_event.wait_until_cancelled();
debug!(?folder, "watcher thread done");
Ok(())
}
impl Session {
pub fn watch_folder(self: &Arc<Self>, watch_folder: &Path) {
let session_w = Arc::downgrade(self);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self.spawn(error_span!("watch_adder", ?watch_folder), async move {
watch_adder(session_w, rx).await;
Ok(())
});
let cancel_event = ThreadCancelEvent::new();
let cancel_event_2 = cancel_event.clone();
let cancel_token = self.cancellation_token().clone();
crate::spawn_utils::spawn(
"watch_cancel",
error_span!("watch_cancel", ?watch_folder),
async move {
cancel_token.cancelled().await;
trace!("canceling watcher");
cancel_event.cancel();
Ok(())
},
);
let watch_folder = PathBuf::from(watch_folder);
let session_span = self.rs();
std::thread::spawn(move || {
let span = error_span!(parent: session_span, "watcher", folder=?watch_folder);
span.in_scope(move || {
if let Err(e) = watch_thread(watch_folder, tx, &cancel_event_2) {
error!("error in watcher thread: {e:#}");
}
})
});
}
}

View file

@ -163,7 +163,7 @@ impl std::fmt::Display for MessageDeserializeError {
len_prefix,
} => write!(
f,
"error deserializing {name} (msg_id={msg_id}, len_prefix={len_prefix}): {error:?}"
"error deserializing {name} (msg_id={msg_id}, len_prefix={len_prefix}): {error:#}"
),
MessageDeserializeError::Other(e) => write!(f, "{e}"),
}

View file

@ -27,6 +27,7 @@ librqbit = { version = "7.1.0-beta.0", path = "../librqbit", default-features =
"http-api",
"tracing-subscriber-utils",
"upnp-serve-adapter",
"watch",
] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
console-subscriber = { version = "0.4", optional = true }

View file

@ -1,4 +1,11 @@
use std::{io, net::SocketAddr, path::PathBuf, sync::Arc, thread, time::Duration};
use std::{
io,
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
thread,
time::Duration,
};
use anyhow::{bail, Context};
use clap::{CommandFactory, Parser, ValueEnum};
@ -236,6 +243,11 @@ struct ServerStartOptions {
/// [Experimental] if set, will try to resume quickly after restart and skip checksumming.
#[arg(long = "fastresume", env = "RQBIT_FASTRESUME")]
fastresume: bool,
/// The folder to watch for added .torrent files. All files in this folder will be automatically added
/// to the session.
#[arg(long = "watch-folder", env = "RQBIT_WATCH_FOLDER")]
watch_folder: Option<String>,
}
#[derive(Parser)]
@ -580,7 +592,7 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()>
};
let api = Api::new(
session,
session.clone(),
Some(log_config.rust_log_reload_tx),
Some(log_config.line_broadcast),
);
@ -595,6 +607,10 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()>
let upnp_router = upnp_server.as_mut().and_then(|s| s.take_router().ok());
let http_api_fut = http_api.make_http_api_and_run(tcp_listener, upnp_router);
if let Some(watch_folder) = start_opts.watch_folder.as_ref() {
session.watch_folder(Path::new(watch_folder));
}
let res = match upnp_server {
Some(srv) => {
let upnp_fut = srv.run_ssdp_forever();
@ -609,7 +625,6 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()>
r = http_api_fut => r,
},
};
res.context("error running server")
}
},