Merge pull request #337 from ikatson/tracker-list

[feature] List of trackers from file
This commit is contained in:
Igor Katson 2025-02-27 13:04:31 +00:00 committed by GitHub
commit 94877aec6f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 72 additions and 26 deletions

1
Cargo.lock generated
View file

@ -4436,6 +4436,7 @@ dependencies = [
"tokio-util", "tokio-util",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"url",
] ]
[[package]] [[package]]

View file

@ -98,7 +98,9 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
], optional = true } ], optional = true }
uuid = { version = "1.2", features = ["v4"] } uuid = { version = "1.2", features = ["v4"] }
futures = "0.3" futures = "0.3"
url = { version = "=2.5.2", default-features = false } # can't upgrade yet until min version is Rust 1.81, see https://github.com/servo/rust-url/issues/992 url = { version = "=2.5.2", default-features = false, features = [
"serde",
] } # can't upgrade yet until min version is Rust 1.81, see https://github.com/servo/rust-url/issues/992
hex = "0.4" hex = "0.4"
backoff = "0.4.0" backoff = "0.4.0"
@ -119,7 +121,7 @@ notify = { version = "7", optional = true }
walkdir = "2.5.0" walkdir = "2.5.0"
arc-swap = "1.7.1" arc-swap = "1.7.1"
intervaltree = "0.2.7" intervaltree = "0.2.7"
async-compression = {version="0.4.18", features= ["tokio", "gzip"] } async-compression = { version = "0.4.18", features = ["tokio", "gzip"] }
[build-dependencies] [build-dependencies]
anyhow = "1" anyhow = "1"

View file

@ -121,6 +121,7 @@ pub struct Session {
default_storage_factory: Option<BoxStorageFactory>, default_storage_factory: Option<BoxStorageFactory>,
persistence: Option<Arc<dyn SessionPersistenceStore>>, persistence: Option<Arc<dyn SessionPersistenceStore>>,
disk_write_tx: Option<DiskWorkQueueSender>, disk_write_tx: Option<DiskWorkQueueSender>,
trackers: HashSet<url::Url>,
// Limits and throttling // Limits and throttling
pub(crate) concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>, pub(crate) concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>,
@ -422,6 +423,9 @@ pub struct SessionOptions {
pub blocklist_url: Option<String>, pub blocklist_url: Option<String>,
// The list of tracker URLs to always use for each torrent.
pub trackers: HashSet<url::Url>,
#[cfg(feature = "disable-upload")] #[cfg(feature = "disable-upload")]
pub disable_upload: bool, pub disable_upload: bool,
} }
@ -440,12 +444,12 @@ async fn create_tcp_listener(
bail!("no free TCP ports in range {port_range:?}"); bail!("no free TCP ports in range {port_range:?}");
} }
fn torrent_file_from_info_bytes(info_bytes: &[u8], trackers: &[String]) -> anyhow::Result<Bytes> { fn torrent_file_from_info_bytes(info_bytes: &[u8], trackers: &[url::Url]) -> anyhow::Result<Bytes> {
#[derive(Serialize)] #[derive(Serialize)]
struct Tmp<'a> { struct Tmp<'a> {
announce: &'a str, announce: &'a str,
#[serde(rename = "announce-list")] #[serde(rename = "announce-list")]
announce_list: &'a [&'a [String]], announce_list: &'a [&'a [url::Url]],
info: bencode::raw_value::RawValue<&'a [u8]>, info: bencode::raw_value::RawValue<&'a [u8]>,
} }
@ -469,7 +473,7 @@ pub(crate) struct CheckedIncomingConnection {
struct InternalAddResult { struct InternalAddResult {
info_hash: Id20, info_hash: Id20,
metadata: Option<TorrentMetadata>, metadata: Option<TorrentMetadata>,
trackers: Vec<String>, trackers: Vec<url::Url>,
name: Option<String>, name: Option<String>,
} }
@ -644,6 +648,7 @@ impl Session {
opts.concurrent_init_limit.unwrap_or(3), opts.concurrent_init_limit.unwrap_or(3),
)), )),
ratelimits: Limits::new(opts.ratelimits), ratelimits: Limits::new(opts.ratelimits),
trackers: opts.trackers,
#[cfg(feature = "disable-upload")] #[cfg(feature = "disable-upload")]
_disable_upload: opts.disable_upload, _disable_upload: opts.disable_upload,
blocklist, blocklist,
@ -906,7 +911,11 @@ impl Session {
InternalAddResult { InternalAddResult {
info_hash, info_hash,
trackers: magnet.trackers, trackers: magnet
.trackers
.into_iter()
.filter_map(|t| url::Url::parse(&t).ok())
.collect(),
metadata: None, metadata: None,
name: magnet.name, name: magnet.name,
} }
@ -952,7 +961,10 @@ impl Session {
torrent.torrent_bytes, torrent.torrent_bytes,
torrent.info_bytes, torrent.info_bytes,
)?), )?),
trackers, trackers: trackers
.iter()
.filter_map(|t| url::Url::parse(t).ok())
.collect(),
name: None, name: None,
} }
} }
@ -1309,7 +1321,7 @@ impl Session {
fn make_peer_rx( fn make_peer_rx(
self: &Arc<Self>, self: &Arc<Self>,
info_hash: Id20, info_hash: Id20,
mut trackers: Vec<String>, mut trackers: Vec<url::Url>,
announce: bool, announce: bool,
force_tracker_interval: Option<Duration>, force_tracker_interval: Option<Duration>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
@ -1326,7 +1338,9 @@ impl Session {
if is_private && trackers.len() > 1 { if is_private && trackers.len() > 1 {
warn!("private trackers are not fully implemented, so using only the first tracker"); warn!("private trackers are not fully implemented, so using only the first tracker");
trackers.resize_with(1, Default::default); trackers.truncate(1);
} else {
trackers.extend(self.trackers.iter().cloned());
} }
let tracker_rx_stats = PeerRxTorrentInfo { let tracker_rx_stats = PeerRxTorrentInfo {
@ -1336,7 +1350,7 @@ impl Session {
let tracker_rx = TrackerComms::start( let tracker_rx = TrackerComms::start(
info_hash, info_hash,
self.peer_id, self.peer_id,
trackers, trackers.into_iter().collect(),
Box::new(tracker_rx_stats), Box::new(tracker_rx_stats),
force_tracker_interval, force_tracker_interval,
announce_port, announce_port,
@ -1393,7 +1407,7 @@ impl Session {
self: &Arc<Self>, self: &Arc<Self>,
info_hash: Id20, info_hash: Id20,
peer_rx: PeerStream, peer_rx: PeerStream,
trackers: &[String], trackers: &[url::Url],
peer_opts: Option<PeerConnectionOptions>, peer_opts: Option<PeerConnectionOptions>,
) -> anyhow::Result<ResolveMagnetResult> { ) -> anyhow::Result<ResolveMagnetResult> {
match read_metainfo_from_peer_receiver( match read_metainfo_from_peer_receiver(
@ -1525,9 +1539,10 @@ mod tests {
#[test] #[test]
fn test_torrent_file_from_info_and_bytes() { fn test_torrent_file_from_info_and_bytes() {
fn get_trackers(info: &TorrentMetaV1<ByteBuf>) -> Vec<String> { fn get_trackers(info: &TorrentMetaV1<ByteBuf>) -> Vec<url::Url> {
info.iter_announce() info.iter_announce()
.filter_map(|t| std::str::from_utf8(t.as_ref()).ok().map(|t| t.to_owned())) .filter_map(|t| std::str::from_utf8(t.as_ref()).ok().map(|t| t.to_owned()))
.filter_map(|t| t.parse().ok())
.collect_vec() .collect_vec()
} }

View file

@ -114,7 +114,7 @@ impl SessionPersistenceStore for PostgresSessionStorage {
.shared() .shared()
.trackers .trackers
.iter() .iter()
.cloned() .map(|t| t.to_string())
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
) )
.bind( .bind(

View file

@ -188,7 +188,7 @@ pub struct ManagedTorrentShared {
pub id: TorrentId, pub id: TorrentId,
pub info_hash: Id20, pub info_hash: Id20,
pub(crate) spawner: BlockingSpawner, pub(crate) spawner: BlockingSpawner,
pub trackers: HashSet<String>, pub trackers: HashSet<url::Url>,
pub peer_id: Id20, pub peer_id: Id20,
pub span: tracing::Span, pub span: tracing::Span,
pub(crate) options: ManagedTorrentOptions, pub(crate) options: ManagedTorrentOptions,

View file

@ -52,6 +52,7 @@ libc = "0.2.158"
signal-hook = "0.3.17" signal-hook = "0.3.17"
tokio-util = "0.7.11" tokio-util = "0.7.11"
gethostname = "0.5.0" gethostname = "0.5.0"
url = "2"
[dev-dependencies] [dev-dependencies]
futures = { version = "0.3" } futures = { version = "0.3" }

View file

@ -1,4 +1,5 @@
use std::{ use std::{
collections::HashSet,
io, io,
net::SocketAddr, net::SocketAddr,
num::NonZeroU32, num::NonZeroU32,
@ -232,6 +233,10 @@ struct Opts {
/// Downloads a p2p blocklist from this url and blocks peers from it /// Downloads a p2p blocklist from this url and blocks peers from it
#[arg(long, env = "RQBIT_BLOCKLIST_URL")] #[arg(long, env = "RQBIT_BLOCKLIST_URL")]
blocklist_url: Option<String>, blocklist_url: Option<String>,
/// The filename with tracker URLs to always use for each torrent.
#[arg(long, env = "RQBIT_TRACKERS_FILENAME")]
trackers_filename: Option<String>,
} }
#[derive(Parser)] #[derive(Parser)]
@ -431,6 +436,24 @@ fn main() -> anyhow::Result<()> {
} }
} }
async fn parse_trackers_file(filename: &str) -> anyhow::Result<HashSet<url::Url>> {
let content = tokio::fs::read_to_string(filename)
.await
.with_context(|| format!("error opening {filename}"))?;
let trackers = content
.lines()
.filter_map(|s| {
let s = s.trim();
if s.is_empty() {
return None;
}
url::Url::parse(s).ok()
})
.collect::<HashSet<url::Url>>();
info!(filename, count = trackers.len(), "parsed trackers");
Ok(trackers)
}
async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> { async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()> {
let log_config = init_logging(InitLoggingOptions { let log_config = init_logging(InitLoggingOptions {
default_rust_log_value: Some(match opts.log_level.unwrap_or(LogLevel::Info) { default_rust_log_value: Some(match opts.log_level.unwrap_or(LogLevel::Info) {
@ -449,6 +472,14 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()>
Err(e) => warn!("failed increasing open file limit: {:#}", e), Err(e) => warn!("failed increasing open file limit: {:#}", e),
}; };
let trackers = if let Some(f) = opts.trackers_filename {
parse_trackers_file(&f)
.await
.context("error reading trackers file")?
} else {
Default::default()
};
let mut sopts = SessionOptions { let mut sopts = SessionOptions {
disable_dht: opts.disable_dht, disable_dht: opts.disable_dht,
disable_dht_persistence: opts.disable_dht_persistence, disable_dht_persistence: opts.disable_dht_persistence,
@ -499,6 +530,7 @@ async fn async_main(opts: Opts, cancel: CancellationToken) -> anyhow::Result<()>
download_bps: opts.ratelimit_download_bps, download_bps: opts.ratelimit_download_bps,
}, },
blocklist_url: opts.blocklist_url, blocklist_url: opts.blocklist_url,
trackers,
}; };
let http_api_basic_auth = if let Ok(up) = std::env::var("RQBIT_HTTP_BASIC_AUTH_USERPASS") { let http_api_basic_auth = if let Ok(up) = std::env::var("RQBIT_HTTP_BASIC_AUTH_USERPASS") {

View file

@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -91,7 +92,7 @@ impl TrackerComms {
pub fn start( pub fn start(
info_hash: Id20, info_hash: Id20,
peer_id: Id20, peer_id: Id20,
trackers: Vec<String>, trackers: HashSet<Url>,
stats: Box<dyn TorrentStatsProvider>, stats: Box<dyn TorrentStatsProvider>,
force_interval: Option<Duration>, force_interval: Option<Duration>,
tcp_listen_port: Option<u16>, tcp_listen_port: Option<u16>,
@ -99,17 +100,11 @@ impl TrackerComms {
) -> Option<BoxStream<'static, SocketAddr>> { ) -> Option<BoxStream<'static, SocketAddr>> {
let trackers = trackers let trackers = trackers
.into_iter() .into_iter()
.filter_map(|t| match Url::parse(&t) { .filter_map(|t| match t.scheme() {
Ok(parsed) => match parsed.scheme() { "http" | "https" => Some(SupportedTracker::Http(t)),
"http" | "https" => Some(SupportedTracker::Http(parsed)), "udp" => Some(SupportedTracker::Udp(t)),
"udp" => Some(SupportedTracker::Udp(parsed)), _ => {
_ => { debug!("unsuppoted tracker URL: {}", t);
debug!("unsuppoted tracker URL: {}", t);
None
}
},
Err(e) => {
debug!("error parsing tracker URL {}: {}", t, e);
None None
} }
}) })