diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index db149b7..86a29dd 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -1,7 +1,6 @@ use std::{ collections::HashSet, fs::{File, OpenOptions}, - net::SocketAddr, path::{Path, PathBuf}, sync::{ atomic::{AtomicU64, Ordering}, @@ -21,7 +20,6 @@ use crate::{ chunk_tracker::ChunkTracker, file_ops::FileOps, lengths::Lengths, - peer_connection::{PeerConnection, WriterRequest}, spawn_utils::spawn, torrent_metainfo::TorrentMetaV1Owned, torrent_state::{AtomicStats, TorrentState, TorrentStateLocked}, @@ -32,6 +30,7 @@ pub struct TorrentManagerBuilder { overwrite: bool, output_folder: PathBuf, only_files: Option>, + force_tracker_interval: Option, } impl TorrentManagerBuilder { @@ -41,6 +40,7 @@ impl TorrentManagerBuilder { overwrite: false, output_folder: output_folder.as_ref().into(), only_files: None, + force_tracker_interval: None, } } @@ -54,12 +54,18 @@ impl TorrentManagerBuilder { self } + pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self { + self.force_tracker_interval = Some(force_tracker_interval); + self + } + pub async fn start_manager(self) -> anyhow::Result { TorrentManager::start( self.torrent, self.output_folder, self.overwrite, self.only_files, + self.force_tracker_interval, ) } } @@ -82,7 +88,8 @@ impl TorrentManagerHandle { #[derive(Clone)] struct TorrentManager { - inner: Arc, + state: Arc, + force_tracker_interval: Option, } fn generate_peer_id() -> [u8; 20] { @@ -103,6 +110,7 @@ impl TorrentManager { out: P, overwrite: bool, only_files: Option>, + force_tracker_interval: Option, ) -> anyhow::Result { let files = { let mut files = @@ -160,7 +168,7 @@ impl TorrentManager { ); let mgr = Self { - inner: Arc::new(TorrentState { + state: Arc::new(TorrentState { info_hash: torrent.info_hash, torrent, peer_id, @@ -178,6 +186,7 @@ impl TorrentManager { needed: initial_check_results.needed_bytes, lengths, }), + force_tracker_interval, }; spawn("tracker monitor", mgr.clone().task_tracker_monitor()); @@ -187,18 +196,18 @@ impl TorrentManager { async fn stats_printer(self) -> anyhow::Result<()> { loop { - let live_peer_stats = self.inner.locked.read().peers.stats(); - let seen_peers_count = self.inner.locked.read().peers.seen().len(); - let have = self.inner.stats.have.load(Ordering::Relaxed); - let fetched = self.inner.stats.fetched_bytes.load(Ordering::Relaxed); - let needed = self.inner.needed; + let live_peer_stats = self.state.locked.read().peers.stats(); + let seen_peers_count = self.state.locked.read().peers.seen().len(); + let have = self.state.stats.have.load(Ordering::Relaxed); + let fetched = self.state.stats.fetched_bytes.load(Ordering::Relaxed); + let needed = self.state.needed; let downloaded = self - .inner + .state .stats .downloaded_and_checked .load(Ordering::Relaxed); let remaining = needed - downloaded; - let uploaded = self.inner.stats.uploaded.load(Ordering::Relaxed); + let uploaded = self.state.stats.uploaded.load(Ordering::Relaxed); let downloaded_pct = if downloaded == needed { 100f64 } else { @@ -229,7 +238,7 @@ impl TorrentManager { let url = Url::parse(url).context("error parsing tracker URL")?; Ok(url) }; - for tracker in self.inner.torrent.iter_announce() { + for tracker in self.state.torrent.iter_announce() { if seen_trackers.contains(&tracker) { continue; } @@ -258,7 +267,7 @@ impl TorrentManager { let response = crate::serde_bencode::from_bytes::(&bytes)?; for peer in response.peers.iter_sockaddrs() { - self.add_peer(peer); + self.state.add_peer(peer); } Ok(response.interval) } @@ -267,12 +276,12 @@ impl TorrentManager { let mut event = Some(TrackerRequestEvent::Started); loop { let request = TrackerRequest { - info_hash: self.inner.torrent.info_hash, - peer_id: self.inner.peer_id, + info_hash: self.state.torrent.info_hash, + peer_id: self.state.peer_id, port: 6778, - uploaded: self.inner.get_uploaded(), - downloaded: self.inner.get_downloaded(), - left: self.inner.get_left_to_download(), + uploaded: self.state.get_uploaded(), + downloaded: self.state.get_downloaded(), + left: self.state.get_left_to_download(), compact: true, no_peer_id: false, event, @@ -289,14 +298,15 @@ impl TorrentManager { match this.tracker_one_request(tracker_url.clone()).await { Ok(interval) => { event = None; - let interval = 30; - let duration = Duration::from_secs(interval); + let interval = self + .force_tracker_interval + .unwrap_or(Duration::from_secs(interval)); debug!( "sleeping for {:?} after calling tracker {}", - duration, + interval, tracker_url.host().unwrap() ); - tokio::time::sleep(duration).await; + tokio::time::sleep(interval).await; } Err(e) => { debug!("error calling the tracker {}: {:#}", tracker_url, e); @@ -305,27 +315,4 @@ impl TorrentManager { }; } } - - fn add_peer(&self, addr: SocketAddr) { - let (out_tx, out_rx) = tokio::sync::mpsc::channel::(1); - let handle = match self - .inner - .locked - .write() - .peers - .add_if_not_seen(addr, out_tx) - { - Some(handle) => handle, - None => return, - }; - - let peer_connection = PeerConnection::new(self.inner.clone()); - spawn(format!("manage_peer({})", handle), async move { - if let Err(e) = peer_connection.manage_peer(addr, handle, out_rx).await { - debug!("error managing peer {}: {:#}", handle, e) - }; - peer_connection.into_state().drop_peer(handle); - Ok::<_, anyhow::Error>(()) - }); - } } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 60f3db6..75ec3d4 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -9,16 +9,16 @@ use std::{ }; use futures::{stream::FuturesUnordered, StreamExt}; -use log::{trace, warn}; +use log::{debug, trace, warn}; use parking_lot::{Mutex, RwLock}; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::{channel, Sender}; use crate::{ chunk_tracker::ChunkTracker, file_ops::FileOps, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, peer_binary_protocol::{Handshake, Message}, - peer_connection::WriterRequest, + peer_connection::{PeerConnection, WriterRequest}, peer_state::{LivePeerState, PeerState}, spawn_utils::spawn, torrent_metainfo::TorrentMetaV1Owned, @@ -333,4 +333,21 @@ impl TorrentState { }, ); } + + pub fn add_peer(self: &Arc, addr: SocketAddr) { + let (out_tx, out_rx) = channel::(1); + let handle = match self.locked.write().peers.add_if_not_seen(addr, out_tx) { + Some(handle) => handle, + None => return, + }; + + let peer_connection = PeerConnection::new(self.clone()); + spawn(format!("manage_peer({})", handle), async move { + if let Err(e) = peer_connection.manage_peer(addr, handle, out_rx).await { + debug!("error managing peer {}: {:#}", handle, e) + }; + peer_connection.into_state().drop_peer(handle); + Ok::<_, anyhow::Error>(()) + }); + } } diff --git a/src/main.rs b/src/main.rs index 85800a9..2efcdbb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::{fs::File, io::Read}; +use std::{fs::File, io::Read, time::Duration}; use anyhow::Context; use clap::Clap; @@ -60,6 +60,8 @@ struct Opts { /// The filename of the .torrent file. output_folder: String, + /// If set, only the file whose filename matching this regex will + /// be downloaded #[clap(short = 'r', long = "filename-re")] only_files_matching_regex: Option, @@ -71,8 +73,15 @@ struct Opts { #[clap(short, long)] list: bool, + /// The loglevel #[clap(arg_enum, short = 'v')] log_level: Option, + + /// The interval in seconds to poll trackers. + /// Trackers send the refresh interval when we connect to them. Often this is + /// pretty big, e.g. 30 minutes. This can force a certain value. + #[clap(short = 'i', long = "tracker-refresh-interval")] + force_tracker_interval: Option, } fn compute_only_files( @@ -162,6 +171,10 @@ fn main() -> anyhow::Result<()> { builder.only_files(only_files); } + if let Some(interval) = opts.force_tracker_interval { + builder.force_tracker_interval(Duration::from_secs(interval)); + } + let manager_handle = builder.start_manager().await?; manager_handle.wait_until_completed().await?; Ok(())