use std::{ collections::HashSet, fs::{File, OpenOptions}, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, time::{Duration, Instant}, }; use anyhow::Context; use bencode::from_bytes; use buffers::ByteString; use librqbit_core::{ id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator, torrent_metainfo::TorrentMetaV1Info, }; use log::{debug, info}; use parking_lot::Mutex; use reqwest::Url; use sha1w::Sha1; use size_format::SizeFormatterBinary as SF; use crate::{ chunk_tracker::ChunkTracker, file_ops::FileOps, spawn_utils::{spawn, BlockingSpawner}, torrent_state::TorrentState, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, }; pub struct TorrentManagerBuilder { info: TorrentMetaV1Info, info_hash: Id20, overwrite: bool, output_folder: PathBuf, only_files: Option>, peer_id: Option, force_tracker_interval: Option, spawner: Option, } impl TorrentManagerBuilder { pub fn new>( info: TorrentMetaV1Info, info_hash: Id20, output_folder: P, ) -> Self { Self { info, info_hash, overwrite: false, output_folder: output_folder.as_ref().into(), only_files: None, peer_id: None, force_tracker_interval: None, spawner: None, } } pub fn only_files(&mut self, only_files: Vec) -> &mut Self { self.only_files = Some(only_files); self } pub fn overwrite(&mut self, overwrite: bool) -> &mut Self { self.overwrite = overwrite; self } pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self { self.force_tracker_interval = Some(force_tracker_interval); self } pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self { self.spawner = Some(spawner); self } pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self { self.peer_id = Some(peer_id); self } pub fn start_manager(self) -> anyhow::Result { TorrentManager::start( self.info, self.info_hash, self.output_folder, self.overwrite, self.only_files, self.force_tracker_interval, self.peer_id, self.spawner.unwrap_or_else(|| BlockingSpawner::new(true)), ) } } #[derive(Clone)] pub struct TorrentManagerHandle { manager: Arc, } impl TorrentManagerHandle { pub fn add_tracker(&self, url: Url) -> bool { let mgr = self.manager.clone(); if mgr.trackers.lock().insert(url.clone()) { spawn(format!("tracker monitor {}", url), async move { mgr.single_tracker_monitor(url).await }); true } else { false } } pub fn add_peer(&self, addr: SocketAddr) -> bool { self.manager.state.add_peer_if_not_seen(addr) } pub fn torrent_state(&self) -> &TorrentState { &self.manager.state } pub fn speed_estimator(&self) -> &Arc { &self.manager.speed_estimator } pub async fn cancel(&self) -> anyhow::Result<()> { todo!() } pub async fn wait_until_completed(&self) -> anyhow::Result<()> { loop { tokio::time::sleep(Duration::from_secs(60)).await; } } } struct TorrentManager { state: Arc, #[allow(dead_code)] speed_estimator: Arc, trackers: Mutex>, force_tracker_interval: Option, } fn make_lengths>( torrent: &TorrentMetaV1Info, ) -> anyhow::Result { let total_length = torrent.iter_file_lengths().sum(); Lengths::new(total_length, torrent.piece_length, None) } impl TorrentManager { #[allow(clippy::too_many_arguments)] fn start>( info: TorrentMetaV1Info, info_hash: Id20, out: P, overwrite: bool, only_files: Option>, force_tracker_interval: Option, peer_id: Option, spawner: BlockingSpawner, ) -> anyhow::Result { let files = { let mut files = Vec::>>::with_capacity(info.iter_file_lengths().count()); for (path_bits, _) in info.iter_filenames_and_lengths() { let mut full_path = out.as_ref().to_owned(); for bit in path_bits.iter_components() { full_path.push( bit.as_ref() .map(|b| std::str::from_utf8(b.as_ref())) .unwrap_or(Ok("output"))?, ); } std::fs::create_dir_all(full_path.parent().unwrap())?; let file = if overwrite { OpenOptions::new() .create(true) .read(true) .write(true) .open(&full_path)? } else { // TODO: create_new does not seem to work with read(true), so calling this twice. OpenOptions::new() .create_new(true) .write(true) .open(&full_path) .with_context(|| format!("error creating {:?}", &full_path))?; OpenOptions::new().read(true).write(true).open(&full_path)? }; files.push(Arc::new(Mutex::new(file))) } files }; let peer_id = peer_id.unwrap_or_else(generate_peer_id); let lengths = make_lengths(&info).context("unable to compute Lengths from torrent")?; debug!("computed lengths: {:?}", &lengths); info!("Doing initial checksum validation, this might take a while..."); let initial_check_results = FileOps::::new(&info, &files, &lengths).initial_check(only_files.as_deref())?; info!( "Initial check results: have {}, needed {}", SF::new(initial_check_results.have_bytes), SF::new(initial_check_results.needed_bytes) ); let chunk_tracker = ChunkTracker::new( initial_check_results.needed_pieces, initial_check_results.have_pieces, lengths, ); let state = TorrentState::new( info, info_hash, peer_id, files, chunk_tracker, lengths, initial_check_results.have_bytes, initial_check_results.needed_bytes, spawner, ); let estimator = Arc::new(SpeedEstimator::new(5)); let mgr = Arc::new(Self { state, speed_estimator: estimator.clone(), trackers: Mutex::new(HashSet::new()), force_tracker_interval, }); spawn("stats printer", { let this = mgr.clone(); async move { this.stats_printer().await } }); spawn("speed estimator updater", { let state = mgr.state.clone(); async move { loop { let downloaded = state.stats_snapshot().downloaded_and_checked_bytes; let needed = state.initially_needed(); let remaining = needed - downloaded; estimator.add_snapshot(downloaded, remaining, Instant::now()); tokio::time::sleep(Duration::from_secs(1)).await; } } }); Ok(mgr.into_handle()) } async fn stats_printer(&self) -> anyhow::Result<()> { loop { let live_peer_stats = self.state.lock_read().peers.stats(); let seen_peers_count = self.state.lock_read().peers.seen().len(); let stats = self.state.stats_snapshot(); let needed = self.state.initially_needed(); let downloaded_pct = if stats.remaining_bytes == 0 { 100f64 } else { (stats.downloaded_and_checked_bytes as f64 / needed as f64) * 100f64 }; info!( "Stats: downloaded {:.2}% ({:.2}), peers {{live: {}, connecting: {}, queued: {}, seen: {}}}, fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, total have {:.2}", downloaded_pct, SF::new(stats.downloaded_and_checked_bytes), live_peer_stats.live, live_peer_stats.connecting, live_peer_stats.queued, seen_peers_count, SF::new(stats.fetched_bytes), SF::new(stats.remaining_bytes), SF::new(needed), SF::new(stats.uploaded_bytes), SF::new(stats.have_bytes) ); tokio::time::sleep(Duration::from_secs(1)).await; } } fn into_handle(self: Arc) -> TorrentManagerHandle { TorrentManagerHandle { manager: self } } async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result { let response: reqwest::Response = reqwest::get(tracker_url).await?; if !response.status().is_success() { anyhow::bail!("tracker responded with {:?}", response.status()); } let bytes = response.bytes().await?; if let Ok(error) = from_bytes::(&bytes) { anyhow::bail!( "tracker returned failure. Failure reason: {}", error.failure_reason ) }; let response = from_bytes::(&bytes)?; for peer in response.peers.iter_sockaddrs() { self.state.add_peer_if_not_seen(peer); } Ok(response.interval) } async fn single_tracker_monitor(&self, mut tracker_url: Url) -> anyhow::Result<()> { let mut event = Some(TrackerRequestEvent::Started); loop { let request = TrackerRequest { info_hash: self.state.info_hash(), peer_id: self.state.peer_id(), port: 6778, uploaded: self.state.get_uploaded(), downloaded: self.state.get_downloaded(), left: self.state.get_left_to_download(), compact: true, no_peer_id: false, event, ip: None, numwant: None, key: None, trackerid: None, }; let request_query = request.as_querystring(); tracker_url.set_query(Some(&request_query)); match self.tracker_one_request(tracker_url.clone()).await { Ok(interval) => { event = None; let interval = self .force_tracker_interval .unwrap_or_else(|| Duration::from_secs(interval)); debug!( "sleeping for {:?} after calling tracker {}", interval, tracker_url.host().unwrap() ); tokio::time::sleep(interval).await; } Err(e) => { debug!("error calling the tracker {}: {:#}", tracker_url, e); tokio::time::sleep(Duration::from_secs(60)).await; } }; } } }