diff --git a/Cargo.lock b/Cargo.lock index 16ebca5..89191e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -851,6 +851,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36" dependencies = [ + "num-bigint", "num-complex", "num-integer", "num-iter", @@ -858,6 +859,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-bigint" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-complex" version = "0.2.4" @@ -896,6 +908,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef" dependencies = [ "autocfg", + "num-bigint", "num-integer", "num-traits", ] @@ -995,6 +1008,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "parse_duration" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d" +dependencies = [ + "lazy_static", + "num", + "regex", +] + [[package]] name = "peer_binary_protocol" version = "0.1.0" @@ -1296,6 +1320,7 @@ dependencies = [ "futures", "librqbit", "log", + "parse_duration", "pretty_env_logger", "regex", "reqwest", diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index ce74841..056edf1 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -6,7 +6,7 @@ use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use log::debug; -use crate::peer_info_reader; +use crate::{peer_connection::PeerConnectionOptions, peer_info_reader}; use librqbit_core::id20::Id20; #[derive(Debug)] @@ -25,6 +25,7 @@ pub async fn read_metainfo_from_peer_receiver + peer_id: Id20, info_hash: Id20, mut addrs: A, + peer_connection_options: Option, ) -> ReadMetainfoResult { let mut seen = HashSet::::new(); let first_addr = match addrs.next().await { @@ -39,9 +40,14 @@ pub async fn read_metainfo_from_peer_receiver + let semaphore = &semaphore; async move { let token = semaphore.acquire().await?; - let ret = peer_info_reader::read_metainfo_from_peer(addr, peer_id, info_hash) - .await - .with_context(|| format!("error reading metainfo from {}", addr)); + let ret = peer_info_reader::read_metainfo_from_peer( + addr, + peer_id, + info_hash, + peer_connection_options, + ) + .await + .with_context(|| format!("error reading metainfo from {}", addr)); drop(token); ret } @@ -97,7 +103,7 @@ mod tests { let dht = Dht::new().await.unwrap(); let peer_rx = dht.get_peers(info_hash).await; let peer_id = generate_peer_id(); - match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx).await { + match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx, None).await { ReadMetainfoResult::Found { info, .. } => dbg!(info), ReadMetainfoResult::ChannelClosed { .. } => todo!("should not have happened"), }; diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index d79556d..a783963 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -9,12 +9,12 @@ use warp::Filter; use crate::torrent_manager::TorrentManagerHandle; use crate::torrent_state::StatsSnapshot; -struct Inner { +struct ApiInternal { startup_time: Instant, torrent_managers: RwLock>, } -impl Inner { +impl ApiInternal { fn new() -> Self { Self { startup_time: Instant::now(), @@ -76,7 +76,7 @@ struct StatsResponse { time_remaining: Option, } -impl Inner { +impl ApiInternal { fn mgr_handle(&self, idx: usize) -> Option { self.torrent_managers.read().get(idx).cloned() } @@ -141,7 +141,7 @@ impl Inner { #[derive(Clone)] pub struct HttpApi { - inner: Arc, + inner: Arc, } fn json_response(v: T) -> warp::reply::Response { @@ -170,7 +170,7 @@ fn json_or_404(idx: usize, v: Option) -> warp::reply::Response impl HttpApi { pub fn new() -> Self { Self { - inner: Arc::new(Inner::new()), + inner: Arc::new(ApiInternal::new()), } } pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { @@ -180,8 +180,6 @@ impl HttpApi { idx } - // TODO: this is all for debugging, not even JSON. - // After using this for a bit, not a big fan of warp. pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { let inner = self.inner; diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 6f1fcfe..6914d16 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -31,11 +31,18 @@ pub enum WriterRequest { ReadChunkRequest(ChunkInfo), } +#[derive(Default, Copy, Clone)] +pub struct PeerConnectionOptions { + pub connect_timeout: Option, + pub keep_alive_interval: Option, +} + pub struct PeerConnection { handler: H, addr: SocketAddr, info_hash: Id20, peer_id: Id20, + options: PeerConnectionOptions, } // async fn read_one<'a, R: AsyncReadExt + Unpin>( @@ -94,12 +101,19 @@ macro_rules! read_one { } impl PeerConnection { - pub fn new(addr: SocketAddr, info_hash: Id20, peer_id: Id20, handler: H) -> Self { + pub fn new( + addr: SocketAddr, + info_hash: Id20, + peer_id: Id20, + handler: H, + options: Option, + ) -> Self { PeerConnection { handler, addr, info_hash, peer_id, + options: options.unwrap_or_default(), } } pub fn into_handler(self) -> H { @@ -112,7 +126,9 @@ impl PeerConnection { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; let mut conn = match timeout( - Duration::from_secs(2), + self.options + .connect_timeout + .unwrap_or_else(|| Duration::from_secs(2)), tokio::net::TcpStream::connect(self.addr), ) .await @@ -191,7 +207,10 @@ impl PeerConnection { let (mut read_half, mut write_half) = tokio::io::split(conn); let writer = async move { - let keep_alive_interval = Duration::from_secs(120); + let keep_alive_interval = self + .options + .keep_alive_interval + .unwrap_or_else(|| Duration::from_secs(120)); if self.handler.get_have_bytes() > 0 { if let Some(len) = self diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index 71c9cc2..6bb2f9b 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -17,12 +17,15 @@ use peer_binary_protocol::{ use sha1w::{ISha1, Sha1}; use tokio::sync::mpsc::UnboundedSender; -use crate::peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest}; +use crate::peer_connection::{ + PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, +}; pub async fn read_metainfo_from_peer( addr: SocketAddr, peer_id: Id20, info_hash: Id20, + peer_connection_options: Option, ) -> anyhow::Result> { let (result_tx, result_rx) = tokio::sync::oneshot::channel::>>(); @@ -34,7 +37,8 @@ pub async fn read_metainfo_from_peer( result_tx: Mutex::new(Some(result_tx)), locked: RwLock::new(None), }; - let connection = PeerConnection::new(addr, info_hash, peer_id, handler); + let connection = + PeerConnection::new(addr, info_hash, peer_id, handler, peer_connection_options); let result_reader = async move { result_rx.await? }; let connection_runner = async move { connection.manage_peer(writer_rx).await }; @@ -230,7 +234,7 @@ mod tests { let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap(); let peer_id = generate_peer_id(); let info_hash = Id20::from_str("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap(); - dbg!(read_metainfo_from_peer(addr, peer_id, info_hash) + dbg!(read_metainfo_from_peer(addr, peer_id, info_hash, None) .await .unwrap()); } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 51fea6d..141f907 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -24,17 +24,24 @@ use crate::{ chunk_tracker::ChunkTracker, file_ops::FileOps, spawn_utils::{spawn, BlockingSpawner}, - torrent_state::TorrentState, + torrent_state::{TorrentState, TorrentStateOptions}, tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, }; + +#[derive(Default)] +struct TorrentManagerOptions { + force_tracker_interval: Option, + peer_connect_timeout: Option, + only_files: Option>, + peer_id: Option, + overwrite: bool, +} + pub struct TorrentManagerBuilder { info: TorrentMetaV1Info, info_hash: Id20, - overwrite: bool, output_folder: PathBuf, - only_files: Option>, - peer_id: Option, - force_tracker_interval: Option, + options: TorrentManagerOptions, spawner: Option, } @@ -47,27 +54,24 @@ impl TorrentManagerBuilder { Self { info, info_hash, - overwrite: false, output_folder: output_folder.as_ref().into(), - only_files: None, - peer_id: None, - force_tracker_interval: None, spawner: None, + options: TorrentManagerOptions::default(), } } pub fn only_files(&mut self, only_files: Vec) -> &mut Self { - self.only_files = Some(only_files); + self.options.only_files = Some(only_files); self } pub fn overwrite(&mut self, overwrite: bool) -> &mut Self { - self.overwrite = overwrite; + self.options.overwrite = overwrite; self } pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self { - self.force_tracker_interval = Some(force_tracker_interval); + self.options.force_tracker_interval = Some(force_tracker_interval); self } @@ -77,7 +81,12 @@ impl TorrentManagerBuilder { } pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self { - self.peer_id = Some(peer_id); + self.options.peer_id = Some(peer_id); + self + } + + pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self { + self.options.peer_connect_timeout = Some(timeout); self } @@ -86,11 +95,8 @@ impl TorrentManagerBuilder { self.info, self.info_hash, self.output_folder, - self.overwrite, - self.only_files, - self.force_tracker_interval, - self.peer_id, self.spawner.unwrap_or_else(|| BlockingSpawner::new(true)), + Some(self.options), ) } } @@ -136,7 +142,7 @@ struct TorrentManager { #[allow(dead_code)] speed_estimator: Arc, trackers: Mutex>, - force_tracker_interval: Option, + options: TorrentManagerOptions, } fn make_lengths>( @@ -147,17 +153,14 @@ fn make_lengths>( } impl TorrentManager { - #[allow(clippy::too_many_arguments)] fn start>( info: TorrentMetaV1Info, info_hash: Id20, out: P, - overwrite: bool, - only_files: Option>, - force_tracker_interval: Option, - peer_id: Option, spawner: BlockingSpawner, + options: Option, ) -> anyhow::Result { + let options = options.unwrap_or_default(); let files = { let mut files = Vec::>>::with_capacity(info.iter_file_lengths().count()); @@ -173,7 +176,7 @@ impl TorrentManager { } std::fs::create_dir_all(full_path.parent().unwrap())?; - let file = if overwrite { + let file = if options.overwrite { OpenOptions::new() .create(true) .read(true) @@ -193,13 +196,13 @@ impl TorrentManager { files }; - let peer_id = peer_id.unwrap_or_else(generate_peer_id); + let peer_id = options.peer_id.unwrap_or_else(generate_peer_id); let lengths = make_lengths(&info).context("unable to compute Lengths from torrent")?; debug!("computed lengths: {:?}", &lengths); info!("Doing initial checksum validation, this might take a while..."); - let initial_check_results = - FileOps::::new(&info, &files, &lengths).initial_check(only_files.as_deref())?; + let initial_check_results = FileOps::::new(&info, &files, &lengths) + .initial_check(options.only_files.as_deref())?; info!( "Initial check results: have {}, needed {}", @@ -213,6 +216,11 @@ impl TorrentManager { lengths, ); + let state_options = TorrentStateOptions { + peer_connect_timeout: options.peer_connect_timeout, + ..Default::default() + }; + let state = TorrentState::new( info, info_hash, @@ -223,6 +231,7 @@ impl TorrentManager { initial_check_results.have_bytes, initial_check_results.needed_bytes, spawner, + Some(state_options), ); let estimator = Arc::new(SpeedEstimator::new(5)); @@ -231,7 +240,7 @@ impl TorrentManager { state, speed_estimator: estimator.clone(), trackers: Mutex::new(HashSet::new()), - force_tracker_interval, + options, }); spawn("stats printer", { @@ -333,6 +342,7 @@ impl TorrentManager { Ok(interval) => { event = None; let interval = self + .options .force_tracker_interval .unwrap_or_else(|| Duration::from_secs(interval)); debug!( diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 04d75f4..e092826 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -36,7 +36,9 @@ use tokio::{ use crate::{ chunk_tracker::{ChunkMarkingResult, ChunkTracker}, file_ops::FileOps, - peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest}, + peer_connection::{ + PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, + }, peer_state::{InflightRequest, LivePeerState, PeerState}, spawn_utils::{spawn, BlockingSpawner}, type_aliases::{PeerHandle, BF}, @@ -215,6 +217,11 @@ impl StatsSnapshot { } } +#[derive(Default)] +pub struct TorrentStateOptions { + pub peer_connect_timeout: Option, +} + pub struct TorrentState { info: TorrentMetaV1Info, locked: Arc>, @@ -226,6 +233,8 @@ pub struct TorrentState { stats: AtomicStats, spawner: BlockingSpawner, + options: TorrentStateOptions, + peer_semaphore: Semaphore, peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver)>, } @@ -242,7 +251,9 @@ impl TorrentState { have_bytes: u64, needed_bytes: u64, spawner: BlockingSpawner, + options: Option, ) -> Arc { + let options = options.unwrap_or_default(); let (peer_queue_tx, mut peer_queue_rx) = unbounded_channel(); let state = Arc::new(TorrentState { info_hash, @@ -260,6 +271,7 @@ impl TorrentState { needed: needed_bytes, lengths, spawner, + options, peer_semaphore: Semaphore::new(128), peer_queue_tx, @@ -285,8 +297,17 @@ impl TorrentState { state: state.clone(), spawner: state.spawner, }; - let peer_connection = - PeerConnection::new(addr, state.info_hash, state.peer_id, handler); + let options = PeerConnectionOptions { + connect_timeout: state.options.peer_connect_timeout, + ..Default::default() + }; + let peer_connection = PeerConnection::new( + addr, + state.info_hash, + state.peer_id, + handler, + Some(options), + ); spawn(format!("manage_peer({})", addr), async move { if let Err(e) = peer_connection.manage_peer(out_rx).await { debug!("error managing peer {}: {:#}", addr, e) diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index d83f485..ed6095c 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -18,6 +18,7 @@ pretty_env_logger = "0.4" reqwest = "0.11" regex = "1" futures = "0.3" +parse_duration = "2" [dev-dependencies] futures = {version = "0.3"} \ No newline at end of file diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 9a0e3cf..f4642eb 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,4 +1,4 @@ -use std::{fs::File, io::Read, net::SocketAddr, time::Duration}; +use std::{fs::File, io::Read, net::SocketAddr, str::FromStr, time::Duration}; use anyhow::Context; use clap::Clap; @@ -7,6 +7,7 @@ use futures::StreamExt; use librqbit::{ dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, generate_peer_id, + peer_connection::PeerConnectionOptions, spawn_utils::{spawn, BlockingSpawner}, torrent_from_bytes, torrent_manager::TorrentManagerBuilder, @@ -53,6 +54,16 @@ enum LogLevel { Error, } +#[derive(Debug, Clone, Copy)] +struct ParsedDuration(Duration); +impl FromStr for ParsedDuration { + type Err = parse_duration::parse::Error; + + fn from_str(s: &str) -> Result { + parse_duration::parse(s).map(ParsedDuration) + } +} + #[derive(Clap)] #[clap(version = "1.0", author = "Igor Katson ")] struct Opts { @@ -85,7 +96,7 @@ struct Opts { #[clap(short = 'i', long = "tracker-refresh-interval")] force_tracker_interval: Option, - /// The listen address for (debugging) HTTP API + /// The listen address for HTTP API #[clap(long = "http-api-listen-addr", default_value = "127.0.0.1:3030")] http_api_listen_addr: SocketAddr, @@ -97,6 +108,10 @@ struct Opts { #[clap(long = "disable-dht")] disable_dht: bool, + + /// The connect timeout, e.g. 1s, 1.5s, 100ms etc. + #[clap(long = "peer-connect-timeout")] + peer_connect_timeout: Option, } fn compute_only_files>( @@ -178,6 +193,11 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> Some(Dht::new().await.context("error initializing DHT")?) }; + let peer_opts = PeerConnectionOptions { + connect_timeout: opts.peer_connect_timeout.map(|p| p.0), + ..Default::default() + }; + if opts.torrent_path.starts_with("magnet:") { let Magnet { info_hash, @@ -188,7 +208,9 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> .get_peers(info_hash) .await; let (info, dht_rx, initial_peers) = - match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx).await { + match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx, Some(peer_opts)) + .await + { ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), ReadMetainfoResult::ChannelClosed { .. } => { anyhow::bail!("DHT died, no way to discover torrent metainfo") @@ -293,6 +315,9 @@ async fn main_info( if let Some(interval) = opts.force_tracker_interval { builder.force_tracker_interval(Duration::from_secs(interval)); } + if let Some(t) = opts.peer_connect_timeout { + builder.peer_connect_timeout(t.0); + } let http_api = librqbit::http_api::HttpApi::new(); spawn("HTTP API", {