diff --git a/Cargo.lock b/Cargo.lock index dc300fc..723eea5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4436,6 +4436,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "url", ] [[package]] diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index a2a4453..1f8c4ee 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -98,7 +98,9 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ ], optional = true } uuid = { version = "1.2", features = ["v4"] } futures = "0.3" -url = { version = "=2.5.2", default-features = false } # can't upgrade yet until min version is Rust 1.81, see https://github.com/servo/rust-url/issues/992 +url = { version = "=2.5.2", default-features = false, features = [ + "serde", +] } # can't upgrade yet until min version is Rust 1.81, see https://github.com/servo/rust-url/issues/992 hex = "0.4" backoff = "0.4.0" @@ -119,7 +121,7 @@ notify = { version = "7", optional = true } walkdir = "2.5.0" arc-swap = "1.7.1" intervaltree = "0.2.7" -async-compression = {version="0.4.18", features= ["tokio", "gzip"] } +async-compression = { version = "0.4.18", features = ["tokio", "gzip"] } [build-dependencies] anyhow = "1" diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 951e025..e65eded 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -121,6 +121,7 @@ pub struct Session { default_storage_factory: Option, persistence: Option>, disk_write_tx: Option, + trackers: HashSet, // Limits and throttling pub(crate) concurrent_initialize_semaphore: Arc, @@ -422,6 +423,9 @@ pub struct SessionOptions { pub blocklist_url: Option, + // The list of tracker URLs to always use for each torrent. + pub trackers: HashSet, + #[cfg(feature = "disable-upload")] pub disable_upload: bool, } @@ -440,12 +444,12 @@ async fn create_tcp_listener( bail!("no free TCP ports in range {port_range:?}"); } -fn torrent_file_from_info_bytes(info_bytes: &[u8], trackers: &[String]) -> anyhow::Result { +fn torrent_file_from_info_bytes(info_bytes: &[u8], trackers: &[url::Url]) -> anyhow::Result { #[derive(Serialize)] struct Tmp<'a> { announce: &'a str, #[serde(rename = "announce-list")] - announce_list: &'a [&'a [String]], + announce_list: &'a [&'a [url::Url]], info: bencode::raw_value::RawValue<&'a [u8]>, } @@ -469,7 +473,7 @@ pub(crate) struct CheckedIncomingConnection { struct InternalAddResult { info_hash: Id20, metadata: Option, - trackers: Vec, + trackers: Vec, name: Option, } @@ -644,6 +648,7 @@ impl Session { opts.concurrent_init_limit.unwrap_or(3), )), ratelimits: Limits::new(opts.ratelimits), + trackers: opts.trackers, #[cfg(feature = "disable-upload")] _disable_upload: opts.disable_upload, blocklist, @@ -906,7 +911,11 @@ impl Session { InternalAddResult { info_hash, - trackers: magnet.trackers, + trackers: magnet + .trackers + .into_iter() + .filter_map(|t| url::Url::parse(&t).ok()) + .collect(), metadata: None, name: magnet.name, } @@ -952,7 +961,10 @@ impl Session { torrent.torrent_bytes, torrent.info_bytes, )?), - trackers, + trackers: trackers + .iter() + .filter_map(|t| url::Url::parse(t).ok()) + .collect(), name: None, } } @@ -1309,7 +1321,7 @@ impl Session { fn make_peer_rx( self: &Arc, info_hash: Id20, - mut trackers: Vec, + mut trackers: Vec, announce: bool, force_tracker_interval: Option, initial_peers: Vec, @@ -1326,7 +1338,9 @@ impl Session { if is_private && trackers.len() > 1 { warn!("private trackers are not fully implemented, so using only the first tracker"); - trackers.resize_with(1, Default::default); + trackers.truncate(1); + } else { + trackers.extend(self.trackers.iter().cloned()); } let tracker_rx_stats = PeerRxTorrentInfo { @@ -1336,7 +1350,7 @@ impl Session { let tracker_rx = TrackerComms::start( info_hash, self.peer_id, - trackers, + trackers.into_iter().collect(), Box::new(tracker_rx_stats), force_tracker_interval, announce_port, @@ -1393,7 +1407,7 @@ impl Session { self: &Arc, info_hash: Id20, peer_rx: PeerStream, - trackers: &[String], + trackers: &[url::Url], peer_opts: Option, ) -> anyhow::Result { match read_metainfo_from_peer_receiver( @@ -1525,9 +1539,10 @@ mod tests { #[test] fn test_torrent_file_from_info_and_bytes() { - fn get_trackers(info: &TorrentMetaV1) -> Vec { + fn get_trackers(info: &TorrentMetaV1) -> Vec { info.iter_announce() .filter_map(|t| std::str::from_utf8(t.as_ref()).ok().map(|t| t.to_owned())) + .filter_map(|t| t.parse().ok()) .collect_vec() } diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index b70bbb1..d3c20c3 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -114,7 +114,7 @@ impl SessionPersistenceStore for PostgresSessionStorage { .shared() .trackers .iter() - .cloned() + .map(|t| t.to_string()) .collect::>(), ) .bind( diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 90b8f60..5181609 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -188,7 +188,7 @@ pub struct ManagedTorrentShared { pub id: TorrentId, pub info_hash: Id20, pub(crate) spawner: BlockingSpawner, - pub trackers: HashSet, + pub trackers: HashSet, pub peer_id: Id20, pub span: tracing::Span, pub(crate) options: ManagedTorrentOptions, diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index e592b10..b56a351 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -52,6 +52,7 @@ libc = "0.2.158" signal-hook = "0.3.17" tokio-util = "0.7.11" gethostname = "0.5.0" +url = "2" [dev-dependencies] futures = { version = "0.3" } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 23ada49..dde6707 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, io, net::SocketAddr, num::NonZeroU32, @@ -232,6 +233,10 @@ struct Opts { /// Downloads a p2p blocklist from this url and blocks peers from it #[arg(long, env = "RQBIT_BLOCKLIST_URL")] blocklist_url: Option, + + /// The filename with tracker URLs to always use for each torrent. + #[arg(long, env = "RQBIT_TRACKERS_FILENAME")] + trackers_filename: Option, } #[derive(Parser)] @@ -431,6 +436,24 @@ fn main() -> anyhow::Result<()> { } } +async fn parse_trackers_file(filename: &str) -> anyhow::Result> { + let content = tokio::fs::read_to_string(filename) + .await + .with_context(|| format!("error opening {filename}"))?; + let trackers = content + .lines() + .filter_map(|s| { + let s = s.trim(); + if s.is_empty() { + return None; + } + url::Url::parse(s).ok() + }) + .collect::>(); + info!(filename, count = trackers.len(), "parsed trackers"); + Ok(trackers) +} + async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> { let log_config = init_logging(InitLoggingOptions { default_rust_log_value: Some(match opts.log_level.unwrap_or(LogLevel::Info) { @@ -449,6 +472,14 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> Err(e) => warn!("failed increasing open file limit: {:#}", e), }; + let trackers = if let Some(f) = opts.trackers_filename { + parse_trackers_file(&f) + .await + .context("error reading trackers file")? + } else { + Default::default() + }; + let mut sopts = SessionOptions { disable_dht: opts.disable_dht, disable_dht_persistence: opts.disable_dht_persistence, @@ -499,6 +530,7 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> download_bps: opts.ratelimit_download_bps, }, blocklist_url: opts.blocklist_url, + trackers, }; let http_api_basic_auth = if let Ok(up) = std::env::var("RQBIT_HTTP_BASIC_AUTH_USERPASS") { diff --git a/crates/tracker_comms/src/tracker_comms.rs b/crates/tracker_comms/src/tracker_comms.rs index c155b28..0bfc60a 100644 --- a/crates/tracker_comms/src/tracker_comms.rs +++ b/crates/tracker_comms/src/tracker_comms.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; @@ -91,7 +92,7 @@ impl TrackerComms { pub fn start( info_hash: Id20, peer_id: Id20, - trackers: Vec, + trackers: HashSet, stats: Box, force_interval: Option, tcp_listen_port: Option, @@ -99,17 +100,11 @@ impl TrackerComms { ) -> Option> { let trackers = trackers .into_iter() - .filter_map(|t| match Url::parse(&t) { - Ok(parsed) => match parsed.scheme() { - "http" | "https" => Some(SupportedTracker::Http(parsed)), - "udp" => Some(SupportedTracker::Udp(parsed)), - _ => { - debug!("unsuppoted tracker URL: {}", t); - None - } - }, - Err(e) => { - debug!("error parsing tracker URL {}: {}", t, e); + .filter_map(|t| match t.scheme() { + "http" | "https" => Some(SupportedTracker::Http(t)), + "udp" => Some(SupportedTracker::Udp(t)), + _ => { + debug!("unsuppoted tracker URL: {}", t); None } })