use std::{ collections::HashSet, fs::{File, OpenOptions}, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, time::{Duration, Instant}, }; use anyhow::Context; use bencode::from_bytes; use buffers::ByteString; use librqbit_core::{ id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator, torrent_metainfo::TorrentMetaV1Info, }; use log::{debug, info}; use parking_lot::Mutex; use reqwest::Url; use sha1w::Sha1; use size_format::SizeFormatterBinary as SF; use crate::{ chunk_tracker::ChunkTracker, file_ops::FileOps, spawn_utils::{spawn, BlockingSpawner}, torrent_state::{TorrentState, TorrentStateOptions}, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, }; #[derive(Default)] struct TorrentManagerOptions { force_tracker_interval: Option, peer_connect_timeout: Option, only_files: Option>, peer_id: Option, overwrite: bool, } pub struct TorrentManagerBuilder { info: TorrentMetaV1Info, info_hash: Id20, output_folder: PathBuf, options: TorrentManagerOptions, spawner: Option, } impl TorrentManagerBuilder { pub fn new>( info: TorrentMetaV1Info, info_hash: Id20, output_folder: P, ) -> Self { Self { info, info_hash, output_folder: output_folder.as_ref().into(), spawner: None, options: TorrentManagerOptions::default(), } } pub fn only_files(&mut self, only_files: Vec) -> &mut Self { self.options.only_files = Some(only_files); self } pub fn overwrite(&mut self, overwrite: bool) -> &mut Self { self.options.overwrite = overwrite; self } pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self { self.options.force_tracker_interval = Some(force_tracker_interval); self } pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self { self.spawner = Some(spawner); self } pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self { self.options.peer_id = Some(peer_id); self } pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self { self.options.peer_connect_timeout = Some(timeout); self } pub fn start_manager(self) -> anyhow::Result { TorrentManager::start( self.info, self.info_hash, self.output_folder, self.spawner.unwrap_or_else(|| BlockingSpawner::new(true)), Some(self.options), ) } } #[derive(Clone)] pub struct TorrentManagerHandle { manager: Arc, } impl TorrentManagerHandle { pub fn add_tracker(&self, url: Url) -> bool { let mgr = self.manager.clone(); if mgr.trackers.lock().insert(url.clone()) { spawn(format!("tracker monitor {}", url), async move { mgr.single_tracker_monitor(url).await }); true } else { false } } pub fn add_peer(&self, addr: SocketAddr) -> bool { self.manager.state.add_peer_if_not_seen(addr) } pub fn torrent_state(&self) -> &TorrentState { &self.manager.state } pub fn speed_estimator(&self) -> &Arc { &self.manager.speed_estimator } pub async fn cancel(&self) -> anyhow::Result<()> { todo!() } pub async fn wait_until_completed(&self) -> anyhow::Result<()> { loop { tokio::time::sleep(Duration::from_secs(60)).await; } } } struct TorrentManager { state: Arc, #[allow(dead_code)] speed_estimator: Arc, trackers: Mutex>, options: TorrentManagerOptions, } fn make_lengths>( torrent: &TorrentMetaV1Info, ) -> anyhow::Result { let total_length = torrent.iter_file_lengths()?.sum(); Lengths::new(total_length, torrent.piece_length, None) } impl TorrentManager { fn start>( info: TorrentMetaV1Info, info_hash: Id20, out: P, spawner: BlockingSpawner, options: Option, ) -> anyhow::Result { let options = options.unwrap_or_default(); let files = { let mut files = Vec::>>::with_capacity(info.iter_file_lengths()?.count()); for (path_bits, _) in info.iter_filenames_and_lengths()? { let mut full_path = out.as_ref().to_owned(); let relative_path = path_bits .to_pathbuf() .context("error converting file to path")?; full_path.push(relative_path); std::fs::create_dir_all(full_path.parent().unwrap())?; let file = if options.overwrite { OpenOptions::new() .create(true) .read(true) .write(true) .open(&full_path)? } else { // TODO: create_new does not seem to work with read(true), so calling this twice. OpenOptions::new() .create_new(true) .write(true) .open(&full_path) .with_context(|| format!("error creating {:?}", &full_path))?; OpenOptions::new().read(true).write(true).open(&full_path)? }; files.push(Arc::new(Mutex::new(file))) } files }; let peer_id = options.peer_id.unwrap_or_else(generate_peer_id); let lengths = make_lengths(&info).context("unable to compute Lengths from torrent")?; debug!("computed lengths: {:?}", &lengths); info!("Doing initial checksum validation, this might take a while..."); let initial_check_results = spawner.spawn_block_in_place(|| { FileOps::::new(&info, &files, &lengths) .initial_check(options.only_files.as_deref()) })?; info!( "Initial check results: have {}, needed {}", SF::new(initial_check_results.have_bytes), SF::new(initial_check_results.needed_bytes) ); let chunk_tracker = ChunkTracker::new( initial_check_results.needed_pieces, initial_check_results.have_pieces, lengths, ); #[allow(clippy::needless_update)] let state_options = TorrentStateOptions { peer_connect_timeout: options.peer_connect_timeout, ..Default::default() }; let state = TorrentState::new( info, info_hash, peer_id, files, chunk_tracker, lengths, initial_check_results.have_bytes, initial_check_results.needed_bytes, spawner, Some(state_options), ); let estimator = Arc::new(SpeedEstimator::new(5)); let mgr = Arc::new(Self { state, speed_estimator: estimator.clone(), trackers: Mutex::new(HashSet::new()), options, }); spawn("speed estimator updater", { let state = mgr.state.clone(); async move { loop { let stats = state.stats_snapshot(); let fetched = state.stats_snapshot().fetched_bytes; let needed = state.initially_needed(); // fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64. let remaining = needed .wrapping_sub(fetched) .min(needed - stats.downloaded_and_checked_bytes); estimator.add_snapshot(fetched, remaining, Instant::now()); tokio::time::sleep(Duration::from_secs(1)).await; } } }); Ok(mgr.into_handle()) } fn into_handle(self: Arc) -> TorrentManagerHandle { TorrentManagerHandle { manager: self } } async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result { let response: reqwest::Response = reqwest::get(tracker_url).await?; if !response.status().is_success() { anyhow::bail!("tracker responded with {:?}", response.status()); } let bytes = response.bytes().await?; if let Ok(error) = from_bytes::(&bytes) { anyhow::bail!( "tracker returned failure. Failure reason: {}", error.failure_reason ) }; let response = from_bytes::(&bytes)?; for peer in response.peers.iter_sockaddrs() { self.state.add_peer_if_not_seen(peer); } Ok(response.interval) } async fn single_tracker_monitor(&self, mut tracker_url: Url) -> anyhow::Result<()> { let mut event = Some(TrackerRequestEvent::Started); loop { let request = TrackerRequest { info_hash: self.state.info_hash(), peer_id: self.state.peer_id(), port: 6778, uploaded: self.state.get_uploaded(), downloaded: self.state.get_downloaded(), left: self.state.get_left_to_download(), compact: true, no_peer_id: false, event, ip: None, numwant: None, key: None, trackerid: None, }; let request_query = request.as_querystring(); tracker_url.set_query(Some(&request_query)); match self.tracker_one_request(tracker_url.clone()).await { Ok(interval) => { event = None; let interval = self .options .force_tracker_interval .unwrap_or_else(|| Duration::from_secs(interval)); debug!( "sleeping for {:?} after calling tracker {}", interval, tracker_url.host().unwrap() ); tokio::time::sleep(interval).await; } Err(e) => { debug!("error calling the tracker {}: {:#}", tracker_url, e); tokio::time::sleep(Duration::from_secs(60)).await; } }; } } }