diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 30bafa4..ce9daea 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -1,6 +1,7 @@ use std::{ collections::HashSet, fs::{File, OpenOptions}, + ops::Deref, path::{Path, PathBuf}, sync::{ atomic::{AtomicU64, Ordering}, @@ -17,6 +18,7 @@ use reqwest::Url; use size_format::SizeFormatterBinary as SF; use crate::{ + buffers::{ByteBuf, ByteString}, chunk_tracker::ChunkTracker, file_ops::FileOps, http_api::make_and_run_http_api, @@ -24,13 +26,14 @@ use crate::{ peer_id::generate_peer_id, spawn_utils::{spawn, BlockingSpawner}, speed_estimator::SpeedEstimator, - torrent_metainfo::TorrentMetaV1Owned, + torrent_metainfo::{TorrentMetaV1Info, TorrentMetaV1Owned}, torrent_state::{AtomicStats, TorrentState, TorrentStateLocked}, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, type_aliases::Sha1, }; pub struct TorrentManagerBuilder { - torrent: TorrentMetaV1Owned, + info: TorrentMetaV1Info, + info_hash: [u8; 20], overwrite: bool, output_folder: PathBuf, only_files: Option>, @@ -39,9 +42,14 @@ pub struct TorrentManagerBuilder { } impl TorrentManagerBuilder { - pub fn new>(torrent: TorrentMetaV1Owned, output_folder: P) -> Self { + pub fn new>( + info: TorrentMetaV1Info, + info_hash: [u8; 20], + output_folder: P, + ) -> Self { Self { - torrent, + info, + info_hash, overwrite: false, output_folder: output_folder.as_ref().into(), only_files: None, @@ -70,9 +78,10 @@ impl TorrentManagerBuilder { self } - pub async fn start_manager(self) -> anyhow::Result { + pub fn start_manager(self) -> anyhow::Result { TorrentManager::start( - self.torrent, + self.info, + self.info_hash, self.output_folder, self.overwrite, self.only_files, @@ -84,10 +93,21 @@ impl TorrentManagerBuilder { #[derive(Clone)] pub struct TorrentManagerHandle { - manager: TorrentManager, + manager: Arc, } impl TorrentManagerHandle { + pub fn add_tracker(&self, url: Url) -> bool { + let mgr = self.manager.clone(); + if mgr.trackers.lock().insert(url.clone()) { + spawn(format!("tracker monitor {}", url), async move { + mgr.single_tracker_monitor(url).await + }); + true + } else { + false + } + } pub async fn cancel(&self) -> anyhow::Result<()> { todo!() } @@ -98,21 +118,24 @@ impl TorrentManagerHandle { } } -#[derive(Clone)] struct TorrentManager { state: Arc, speed_estimator: Arc, + trackers: Mutex>, force_tracker_interval: Option, } -fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result { - let total_length = torrent.info.iter_file_lengths().sum(); - Lengths::new(total_length, torrent.info.piece_length, None) +fn make_lengths>( + torrent: &TorrentMetaV1Info, +) -> anyhow::Result { + let total_length = torrent.iter_file_lengths().sum(); + Lengths::new(total_length, torrent.piece_length, None) } impl TorrentManager { pub fn start>( - torrent: TorrentMetaV1Owned, + info: TorrentMetaV1Info, + info_hash: [u8; 20], out: P, overwrite: bool, only_files: Option>, @@ -121,9 +144,9 @@ impl TorrentManager { ) -> anyhow::Result { let files = { let mut files = - Vec::>>::with_capacity(torrent.info.iter_file_lengths().count()); + Vec::>>::with_capacity(info.iter_file_lengths().count()); - for (path_bits, _) in torrent.info.iter_filenames_and_lengths() { + for (path_bits, _) in info.iter_filenames_and_lengths() { let mut full_path = out.as_ref().to_owned(); for bit in path_bits.iter_components() { full_path.push( @@ -155,12 +178,12 @@ impl TorrentManager { }; let peer_id = generate_peer_id(); - let lengths = make_lengths(&torrent).context("unable to compute Lengths from torrent")?; + let lengths = make_lengths(&info).context("unable to compute Lengths from torrent")?; debug!("computed lengths: {:?}", &lengths); info!("Doing initial checksum validation, this might take a while..."); - let initial_check_results = FileOps::::new(&torrent.info, &files, &lengths) - .initial_check(only_files.as_deref())?; + let initial_check_results = + FileOps::::new(&info, &files, &lengths).initial_check(only_files.as_deref())?; info!( "Initial check results: have {}, needed {}", @@ -175,8 +198,8 @@ impl TorrentManager { ); let state = Arc::new(TorrentState { - info_hash: torrent.info_hash, - torrent: torrent.info, + info_hash, + torrent: info, peer_id, locked: Arc::new(RwLock::new(TorrentStateLocked { peers: Default::default(), @@ -193,14 +216,17 @@ impl TorrentManager { }); let estimator = Arc::new(SpeedEstimator::new(5)); - let mgr = Self { + let mgr = Arc::new(Self { state, speed_estimator: estimator.clone(), + trackers: Mutex::new(HashSet::new()), force_tracker_interval, - }; + }); - spawn("tracker monitor", mgr.clone().task_tracker_monitor()); - spawn("stats printer", mgr.clone().stats_printer()); + spawn("stats printer", { + let this = mgr.clone(); + async move { this.stats_printer().await } + }); spawn( "http api", make_and_run_http_api(mgr.state.clone(), estimator.clone()), @@ -221,7 +247,7 @@ impl TorrentManager { Ok(mgr.into_handle()) } - async fn stats_printer(self) -> anyhow::Result<()> { + async fn stats_printer(&self) -> anyhow::Result<()> { loop { let live_peer_stats = self.state.locked.read().peers.stats(); let seen_peers_count = self.state.locked.read().peers.seen().len(); @@ -257,34 +283,7 @@ impl TorrentManager { } } - async fn task_tracker_monitor(self) -> anyhow::Result<()> { - let mut seen_trackers = HashSet::new(); - let mut tracker_futures = FuturesUnordered::new(); - let parse_url = |url: &[u8]| -> anyhow::Result { - let url = std::str::from_utf8(url).context("error parsing tracker URL")?; - let url = Url::parse(url).context("error parsing tracker URL")?; - Ok(url) - }; - for tracker in self.state.torrent.iter_announce() { - if seen_trackers.contains(&tracker) { - continue; - } - seen_trackers.insert(tracker); - let tracker_url = match parse_url(tracker) { - Ok(url) => url, - Err(e) => { - warn!("ignoring tracker: {:#}", e); - continue; - } - }; - tracker_futures.push(self.clone().single_tracker_monitor(tracker_url)); - } - - while tracker_futures.next().await.is_some() {} - Ok(()) - } - - fn into_handle(self) -> TorrentManagerHandle { + fn into_handle(self: Arc) -> TorrentManagerHandle { TorrentManagerHandle { manager: self } } @@ -308,7 +307,7 @@ impl TorrentManager { Ok(response.interval) } - async fn single_tracker_monitor(self, mut tracker_url: Url) -> anyhow::Result<()> { + async fn single_tracker_monitor(&self, mut tracker_url: Url) -> anyhow::Result<()> { let mut event = Some(TrackerRequestEvent::Started); loop { let request = TrackerRequest { @@ -330,8 +329,7 @@ impl TorrentManager { let request_query = request.as_querystring(); tracker_url.set_query(Some(&request_query)); - let this = self.clone(); - match this.tracker_one_request(tracker_url.clone()).await { + match self.tracker_one_request(tracker_url.clone()).await { Ok(interval) => { event = None; let interval = self diff --git a/src/main.rs b/src/main.rs index 52ecab6..e9aff44 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,8 @@ use librqbit::{ torrent_manager::TorrentManagerBuilder, torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Owned}, }; -use log::info; +use log::{info, warn}; +use reqwest::Url; async fn torrent_from_url(url: &str) -> anyhow::Result { let response = reqwest::get(url) @@ -178,7 +179,28 @@ fn main() -> anyhow::Result<()> { None }; - let mut builder = TorrentManagerBuilder::new(torrent, opts.output_folder); + let trackers = torrent + .iter_announce() + .filter_map(|tracker| { + let url = match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => url, + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + return None; + } + }; + match Url::parse(url) { + Ok(url) => Some(url), + Err(e) => { + warn!("cannot parse tracker URL {}: {}", url, e); + None + } + } + }) + .collect::>(); + + let mut builder = + TorrentManagerBuilder::new(torrent.info, torrent.info_hash, opts.output_folder); builder.overwrite(opts.overwrite).spawner(spawner); if let Some(only_files) = only_files { builder.only_files(only_files); @@ -188,8 +210,13 @@ fn main() -> anyhow::Result<()> { builder.force_tracker_interval(Duration::from_secs(interval)); } - let manager_handle = builder.start_manager().await?; - manager_handle.wait_until_completed().await?; + let handle = builder.start_manager()?; + + for url in trackers { + handle.add_tracker(url); + } + + handle.wait_until_completed().await?; Ok(()) }) }