Watching works fine
This commit is contained in:
parent
a73f921c5f
commit
dedee2ef08
5 changed files with 46 additions and 14 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -2535,6 +2535,7 @@ dependencies = [
|
||||||
"url",
|
"url",
|
||||||
"urlencoding",
|
"urlencoding",
|
||||||
"uuid",
|
"uuid",
|
||||||
|
"walkdir",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
||||||
|
|
@ -107,6 +107,7 @@ tokio-socks = "0.5.2"
|
||||||
async-trait = "0.1.81"
|
async-trait = "0.1.81"
|
||||||
async-backtrace = { version = "0.2", optional = true }
|
async-backtrace = { version = "0.2", optional = true }
|
||||||
notify = { version = "6.1.1", optional = true }
|
notify = { version = "6.1.1", optional = true }
|
||||||
|
walkdir = "2.5.0"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
anyhow = "1"
|
anyhow = "1"
|
||||||
|
|
|
||||||
|
|
@ -1207,6 +1207,10 @@ impl Session {
|
||||||
.context("error starting torrent")?;
|
.context("error starting torrent")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(name) = managed_torrent.shared().info.name.as_ref() {
|
||||||
|
info!(?name, id, "added torrent");
|
||||||
|
}
|
||||||
|
|
||||||
Ok(AddTorrentResponse::Added(id, managed_torrent))
|
Ok(AddTorrentResponse::Added(id, managed_torrent))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1519,7 +1519,7 @@ impl PeerHandler {
|
||||||
match state.file_ops().write_chunk(addr, piece, chunk_info) {
|
match state.file_ops().write_chunk(addr, piece, chunk_info) {
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("FATAL: error writing chunk to disk: {:?}", e);
|
error!("FATAL: error writing chunk to disk: {e:#}");
|
||||||
return state.on_fatal_error(e);
|
return state.on_fatal_error(e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ use librqbit_core::torrent_metainfo::torrent_from_bytes;
|
||||||
use notify::Watcher;
|
use notify::Watcher;
|
||||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||||
use tracing::{debug, error, error_span, trace, warn};
|
use tracing::{debug, error, error_span, trace, warn};
|
||||||
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
use crate::{AddTorrent, AddTorrentOptions, AddTorrentResponse, Session};
|
use crate::{AddTorrent, AddTorrentOptions, AddTorrentResponse, Session};
|
||||||
|
|
||||||
|
|
@ -40,10 +41,14 @@ impl ThreadCancelEvent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn watch_adder(session_w: Weak<Session>, mut rx: UnboundedReceiver<AddTorrent<'static>>) {
|
async fn watch_adder(
|
||||||
|
session_w: Weak<Session>,
|
||||||
|
mut rx: UnboundedReceiver<(AddTorrent<'static>, PathBuf)>,
|
||||||
|
) {
|
||||||
async fn add_one(
|
async fn add_one(
|
||||||
session_w: &Weak<Session>,
|
session_w: &Weak<Session>,
|
||||||
add_torrent: AddTorrent<'static>,
|
add_torrent: AddTorrent<'static>,
|
||||||
|
path: PathBuf,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let session = match session_w.upgrade() {
|
let session = match session_w.upgrade() {
|
||||||
Some(s) => s,
|
Some(s) => s,
|
||||||
|
|
@ -57,19 +62,20 @@ async fn watch_adder(session_w: Weak<Session>, mut rx: UnboundedReceiver<AddTorr
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
.with_context(|| format!("error adding torrent from {path:?}"))?;
|
||||||
match res {
|
match res {
|
||||||
AddTorrentResponse::Added(_, _) => {}
|
AddTorrentResponse::Added(_, _) => {}
|
||||||
AddTorrentResponse::AlreadyManaged(_, _) => {
|
AddTorrentResponse::AlreadyManaged(_, _) => {
|
||||||
debug!("already managed");
|
debug!(?path, "already managed");
|
||||||
}
|
}
|
||||||
AddTorrentResponse::ListOnly(..) => bail!("bug: unexpected list only"),
|
AddTorrentResponse::ListOnly(..) => bail!("bug: unexpected list only"),
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Some(add_torrent) = rx.recv().await {
|
while let Some((add_torrent, path)) = rx.recv().await {
|
||||||
if let Err(e) = add_one(&session_w, add_torrent).await {
|
if let Err(e) = add_one(&session_w, add_torrent, path).await {
|
||||||
warn!("error adding torrent: {e:#}");
|
warn!("error adding torrent: {e:#}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -77,7 +83,7 @@ async fn watch_adder(session_w: Weak<Session>, mut rx: UnboundedReceiver<AddTorr
|
||||||
|
|
||||||
fn watch_thread(
|
fn watch_thread(
|
||||||
folder: PathBuf,
|
folder: PathBuf,
|
||||||
tx: UnboundedSender<AddTorrent<'static>>,
|
tx: UnboundedSender<(AddTorrent<'static>, PathBuf)>,
|
||||||
cancel_event: &ThreadCancelEvent,
|
cancel_event: &ThreadCancelEvent,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
fn read_and_validate_torrent(path: &Path) -> anyhow::Result<AddTorrent<'static>> {
|
fn read_and_validate_torrent(path: &Path) -> anyhow::Result<AddTorrent<'static>> {
|
||||||
|
|
@ -92,7 +98,7 @@ fn watch_thread(
|
||||||
|
|
||||||
fn watch_cb(
|
fn watch_cb(
|
||||||
ev: notify::Result<notify::Event>,
|
ev: notify::Result<notify::Event>,
|
||||||
tx: &UnboundedSender<AddTorrent<'static>>,
|
tx: &UnboundedSender<(AddTorrent<'static>, PathBuf)>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
trace!(event=?ev, "watch event");
|
trace!(event=?ev, "watch event");
|
||||||
let ev = ev.context("error event")?;
|
let ev = ev.context("error event")?;
|
||||||
|
|
@ -111,18 +117,37 @@ fn watch_thread(
|
||||||
let add = match read_and_validate_torrent(&path) {
|
let add = match read_and_validate_torrent(&path) {
|
||||||
Ok(add) => add,
|
Ok(add) => add,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!(?path, "error validating torrent: {e:#}");
|
warn!(?path, "error validating torrent: {e:#}");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if tx.send(add).is_err() {
|
if tx.send((add, path.to_owned())).is_err() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for entry in WalkDir::new(&folder)
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|e| e.ok())
|
||||||
|
.filter(|e| e.file_type().is_file())
|
||||||
|
.filter(|e| e.path().extension().and_then(|e| e.to_str()) == Some("torrent"))
|
||||||
|
{
|
||||||
|
let t = match read_and_validate_torrent(entry.path()) {
|
||||||
|
Ok(t) => t,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(path=?entry.path(), "error validating torrent: {e:#}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if tx.send((t, entry.path().to_owned())).is_err() {
|
||||||
|
debug!(?folder, "watcher thread done");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut watcher = notify::recommended_watcher(move |ev| {
|
let mut watcher = notify::recommended_watcher(move |ev| {
|
||||||
if let Err(e) = watch_cb(ev, &tx) {
|
if let Err(e) = watch_cb(ev, &tx) {
|
||||||
warn!("error processing watch event: {e:#}");
|
warn!("error processing watch event: {e:#}");
|
||||||
|
|
@ -164,10 +189,11 @@ impl Session {
|
||||||
let session_span = self.rs();
|
let session_span = self.rs();
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
let span = error_span!(parent: session_span, "watcher", folder=?watch_folder);
|
let span = error_span!(parent: session_span, "watcher", folder=?watch_folder);
|
||||||
let _ = span.enter();
|
span.in_scope(move || {
|
||||||
if let Err(e) = watch_thread(watch_folder, tx, &cancel_event_2) {
|
if let Err(e) = watch_thread(watch_folder, tx, &cancel_event_2) {
|
||||||
error!("error in watcher thread: {e:#}");
|
error!("error in watcher thread: {e:#}");
|
||||||
}
|
}
|
||||||
|
})
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue