From 1667efdaa77103943782aa7c74747f30b5331c14 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 9 Oct 2021 15:03:42 +0100 Subject: [PATCH 01/13] Start refactoring code to support running in background and adding multiple torrents --- Cargo.lock | 1 + crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/lib.rs | 1 + crates/librqbit/src/session.rs | 372 +++++++++++++++++++++++++++++++++ crates/rqbit/src/main.rs | 317 +++------------------------- 5 files changed, 405 insertions(+), 287 deletions(-) create mode 100644 crates/librqbit/src/session.rs diff --git a/Cargo.lock b/Cargo.lock index 6e68c6f..e94e0c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -725,6 +725,7 @@ dependencies = [ "peer_binary_protocol", "pretty_env_logger", "rand 0.8.4", + "regex", "reqwest", "serde", "serde_json", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index b9aa94e..1bb1c58 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -27,6 +27,7 @@ serde = {version = "1", features=["derive"]} serde_json = "1" anyhow = "1" +regex = "1" reqwest = "0.11" urlencoding = "1" byteorder = "1" diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 5def0b3..c0b8ca0 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -6,6 +6,7 @@ pub mod peer_connection; pub mod peer_handler; pub mod peer_info_reader; pub mod peer_state; +pub mod session; pub mod spawn_utils; pub mod torrent_manager; pub mod torrent_state; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs new file mode 100644 index 0000000..7fc3dff --- /dev/null +++ b/crates/librqbit/src/session.rs @@ -0,0 +1,372 @@ +use std::{fs::File, io::Read, net::SocketAddr, path::PathBuf, time::Duration}; + +use anyhow::Context; +use buffers::ByteString; +use dht::{Dht, Id20, PersistentDht, PersistentDhtConfig}; +use librqbit_core::{ + magnet::Magnet, + peer_id::generate_peer_id, + torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned}, +}; +use log::{info, warn}; +use reqwest::Url; +use tokio_stream::StreamExt; + +use size_format::SizeFormatterBinary as SF; + +use crate::{ + dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, + http_api::HttpApi, + peer_connection::PeerConnectionOptions, + spawn_utils::{spawn, BlockingSpawner}, + torrent_manager::{TorrentManagerBuilder, TorrentManagerHandle}, +}; + +pub struct Session { + peer_id: Id20, + dht: Option, + http_api: Option, + peer_opts: PeerConnectionOptions, + spawner: BlockingSpawner, + output_folder: PathBuf, +} + +async fn torrent_from_url(url: &str) -> anyhow::Result { + let response = reqwest::get(url) + .await + .with_context(|| format!("error downloading torrent metadata from {}", url))?; + if !response.status().is_success() { + anyhow::bail!("GET {} returned {}", url, response.status()) + } + let b = response + .bytes() + .await + .with_context(|| format!("error reading repsonse body from {}", url))?; + torrent_from_bytes(&b).context("error decoding torrent") +} + +fn torrent_from_file(filename: &str) -> anyhow::Result { + let mut buf = Vec::new(); + if filename == "-" { + std::io::stdin() + .read_to_end(&mut buf) + .context("error reading stdin")?; + } else { + File::open(filename) + .with_context(|| format!("error opening {}", filename))? + .read_to_end(&mut buf) + .with_context(|| format!("error reading {}", filename))?; + } + torrent_from_bytes(&buf).context("error decoding torrent") +} + +fn compute_only_files>( + torrent: &TorrentMetaV1Info, + filename_re: &str, +) -> anyhow::Result> { + let filename_re = regex::Regex::new(filename_re).context("filename regex is incorrect")?; + let mut only_files = Vec::new(); + for (idx, (filename, _)) in torrent.iter_filenames_and_lengths()?.enumerate() { + let full_path = filename + .to_pathbuf() + .with_context(|| format!("filename of file {} is not valid utf8", idx))?; + if filename_re.is_match(full_path.to_str().unwrap()) { + only_files.push(idx); + } + } + if only_files.is_empty() { + anyhow::bail!("none of the filenames match the given regex") + } + Ok(only_files) +} + +#[derive(Default)] +pub struct AddTorrentOptions { + pub only_files_regex: Option, + pub overwrite: bool, + pub list_only: bool, + pub output_folder: Option, + pub peer_opts: Option, + pub force_tracker_interval: Option, +} + +#[derive(Default)] +pub struct SessionOptions { + pub disable_dht: bool, + pub disable_dht_persistence: bool, + pub dht_config: Option, + pub disable_http_api: bool, + pub http_api_listen_addr: Option, + pub peer_id: Option, + pub peer_opts: Option, +} + +impl Session { + pub async fn new(output_folder: PathBuf, spawner: BlockingSpawner) -> anyhow::Result { + Self::new_with_opts(output_folder, spawner, SessionOptions::default()).await + } + pub async fn new_with_opts( + output_folder: PathBuf, + spawner: BlockingSpawner, + opts: SessionOptions, + ) -> anyhow::Result { + let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); + let dht = if opts.disable_dht { + None + } else { + let dht = if opts.disable_dht_persistence { + Dht::new().await + } else { + PersistentDht::create(opts.dht_config).await + } + .context("error initializing DHT")?; + Some(dht) + }; + let peer_opts = opts.peer_opts.unwrap_or_default(); + + let http_api = if opts.disable_http_api { + None + } else { + let http_api_listen_addr = opts + .http_api_listen_addr + .unwrap_or_else(|| "127.0.0.1:3001".parse().unwrap()); + let http_api = HttpApi::new(dht.clone()); + spawn("HTTP API", { + let http_api = http_api.clone(); + async move { http_api.make_http_api_and_run(http_api_listen_addr).await } + }); + Some(http_api) + }; + + Ok(Self { + peer_id, + dht, + http_api, + peer_opts, + spawner, + output_folder, + }) + } + pub async fn add_torrent( + &self, + url: String, + opts: Option, + ) -> anyhow::Result> { + // Magnet links are different in that we first need to discover the metadata. + let opts = opts.unwrap_or_default(); + if url.starts_with("magnet:") { + let Magnet { + info_hash, + trackers, + } = Magnet::parse(&url).context("provided path is not a valid magnet URL")?; + + let dht_rx = self + .dht + .as_ref() + .context("magnet links without DHT are not supported")? + .get_peers(info_hash) + .await?; + + let trackers = trackers + .into_iter() + .filter_map(|url| match reqwest::Url::parse(&url) { + Ok(url) => Some(url), + Err(e) => { + warn!("error parsing tracker {} as url: {}", url, e); + None + } + }) + .collect(); + + let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver( + self.peer_id, + info_hash, + dht_rx, + Some(self.peer_opts), + ) + .await + { + ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), + ReadMetainfoResult::ChannelClosed { .. } => { + anyhow::bail!("DHT died, no way to discover torrent metainfo") + } + }; + self.main_torrent_info( + info_hash, + info, + Some(dht_rx), + initial_peers.into_iter().collect(), + trackers, + opts, + ) + .await + } else { + let torrent = if url.starts_with("http://") || url.starts_with("https://") { + torrent_from_url(&url).await? + } else { + torrent_from_file(&url)? + }; + let dht_rx = match self.dht.as_ref() { + Some(dht) => Some(dht.get_peers(torrent.info_hash).await?), + None => None, + }; + let trackers = torrent + .iter_announce() + .filter_map(|tracker| { + let url = match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => url, + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + return None; + } + }; + match Url::parse(url) { + Ok(url) => Some(url), + Err(e) => { + warn!("cannot parse tracker URL {}: {}", url, e); + None + } + } + }) + .collect::>(); + self.main_torrent_info( + torrent.info_hash, + torrent.info, + dht_rx, + Vec::new(), + trackers, + opts, + ) + .await + } + } + + #[allow(clippy::too_many_arguments)] + async fn main_torrent_info( + &self, + info_hash: Id20, + info: TorrentMetaV1Info, + dht_peer_rx: Option + Unpin + Send + Sync + 'static>, + initial_peers: Vec, + trackers: Vec, + opts: AddTorrentOptions, + ) -> anyhow::Result> { + info!("Torrent info: {:#?}", &info); + let only_files = if let Some(filename_re) = opts.only_files_regex { + let only_files = compute_only_files(&info, &filename_re)?; + for (idx, (filename, _)) in info.iter_filenames_and_lengths()?.enumerate() { + if !only_files.contains(&idx) { + continue; + } + info!("Will download {:?}", filename); + } + Some(only_files) + } else { + None + }; + + if opts.list_only { + for (idx, (filename, len)) in info.iter_filenames_and_lengths()?.enumerate() { + let included = match &only_files { + Some(files) => files.contains(&idx), + None => true, + }; + info!( + "File {}, size {}{}", + filename.to_string()?, + SF::new(len), + if included { "" } else { "will skip" } + ) + } + info!("--list was passed, nothing to do, exiting."); + return Ok(None); + } + + let output_folder = opts + .output_folder + .map(PathBuf::from) + .unwrap_or_else(|| self.output_folder.clone()); + + let mut builder = TorrentManagerBuilder::new(info, info_hash, output_folder); + builder + .overwrite(opts.overwrite) + .spawner(self.spawner) + .peer_id(self.peer_id); + if let Some(only_files) = only_files { + builder.only_files(only_files); + } + if let Some(interval) = opts.force_tracker_interval { + builder.force_tracker_interval(interval); + } + + if let Some(t) = opts.peer_opts.unwrap_or(self.peer_opts).connect_timeout { + builder.peer_connect_timeout(t); + } + + // let http_api = HttpApi::new(self.dht.clone()); + // spawn("HTTP API", { + // let http_api = http_api.clone(); + // async move { http_api.make_http_api_and_run(http_api_listen_addr).await } + // }); + + let handle = builder.start_manager()?; + if let Some(http_api) = self.http_api.as_ref() { + http_api.add_mgr(handle.clone()); + } + + for url in trackers { + handle.add_tracker(url); + } + for peer in initial_peers { + handle.add_peer(peer); + } + + spawn("Stats printer", { + let handle = handle.clone(); + async move { + loop { + let peer_stats = handle.torrent_state().peer_stats_snapshot(); + let stats = handle.torrent_state().stats_snapshot(); + let speed = handle.speed_estimator(); + let total = stats.total_bytes; + let progress = stats.total_bytes - stats.remaining_bytes; + let downloaded_pct = if stats.remaining_bytes == 0 { + 100f64 + } else { + (progress as f64 / total as f64) * 100f64 + }; + info!( + "Stats: {:.2}% ({:.2}), down speed {:.2} Mbps, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", + downloaded_pct, + SF::new(progress), + speed.download_mbps(), + SF::new(stats.fetched_bytes), + SF::new(stats.remaining_bytes), + SF::new(total), + SF::new(stats.uploaded_bytes), + peer_stats.live, + peer_stats.connecting, + peer_stats.queued, + peer_stats.seen, + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }); + + if let Some(mut dht_peer_rx) = dht_peer_rx { + spawn("DHT peer adder", { + let handle = handle.clone(); + async move { + while let Some(peer) = dht_peer_rx.next().await { + handle.add_peer(peer); + } + warn!("dht was closed"); + Ok(()) + } + }); + } + + Ok(Some(handle)) + } +} diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 4028a76..7153126 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,50 +1,12 @@ -use std::{fs::File, io::Read, net::SocketAddr, str::FromStr, time::Duration}; +use std::{net::SocketAddr, str::FromStr, time::Duration}; use anyhow::Context; use clap::Clap; -use dht::{Dht, Id20, PersistentDht}; -use futures::StreamExt; use librqbit::{ - dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, - generate_peer_id, peer_connection::PeerConnectionOptions, - spawn_utils::{spawn, BlockingSpawner}, - torrent_from_bytes, - torrent_manager::TorrentManagerBuilder, - ByteString, Magnet, TorrentMetaV1Info, TorrentMetaV1Owned, + session::{AddTorrentOptions, Session, SessionOptions}, + spawn_utils::BlockingSpawner, }; -use log::{info, warn}; -use reqwest::Url; -use size_format::SizeFormatterBinary as SF; - -async fn torrent_from_url(url: &str) -> anyhow::Result { - let response = reqwest::get(url) - .await - .with_context(|| format!("error downloading torrent metadata from {}", url))?; - if !response.status().is_success() { - anyhow::bail!("GET {} returned {}", url, response.status()) - } - let b = response - .bytes() - .await - .with_context(|| format!("error reading repsonse body from {}", url))?; - torrent_from_bytes(&b).context("error decoding torrent") -} - -fn torrent_from_file(filename: &str) -> anyhow::Result { - let mut buf = Vec::new(); - if filename == "-" { - std::io::stdin() - .read_to_end(&mut buf) - .context("error reading stdin")?; - } else { - File::open(filename) - .with_context(|| format!("error opening {}", filename))? - .read_to_end(&mut buf) - .with_context(|| format!("error reading {}", filename))?; - } - torrent_from_bytes(&buf).context("error decoding torrent") -} #[derive(Debug, Clap)] enum LogLevel { @@ -121,26 +83,6 @@ struct Opts { peer_connect_timeout: Option, } -fn compute_only_files>( - torrent: &TorrentMetaV1Info, - filename_re: &str, -) -> anyhow::Result> { - let filename_re = regex::Regex::new(filename_re).context("filename regex is incorrect")?; - let mut only_files = Vec::new(); - for (idx, (filename, _)) in torrent.iter_filenames_and_lengths()?.enumerate() { - let full_path = filename - .to_pathbuf() - .with_context(|| format!("filename of file {} is not valid utf8", idx))?; - if filename_re.is_match(full_path.to_str().unwrap()) { - only_files.push(idx); - } - } - if only_files.is_empty() { - anyhow::bail!("none of the filenames match the given regex") - } - Ok(only_files) -} - fn init_logging(opts: &Opts) { if std::env::var_os("RUST_LOG").is_none() { match opts.log_level.as_ref() { @@ -193,237 +135,38 @@ fn main() -> anyhow::Result<()> { } async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> { - let peer_id = generate_peer_id(); - let dht = if opts.disable_dht { - None - } else { - let dht = if opts.disable_dht_persistence { - Dht::new().await - } else { - PersistentDht::create(None).await - } - .context("error initializing DHT")?; - Some(dht) + let sopts = SessionOptions { + disable_dht: opts.disable_dht, + disable_dht_persistence: opts.disable_dht_persistence, + dht_config: None, + disable_http_api: false, + http_api_listen_addr: Some(opts.http_api_listen_addr), + peer_id: None, + peer_opts: Some(PeerConnectionOptions { + connect_timeout: opts.peer_connect_timeout.map(|d| d.0), + ..Default::default() + }), }; + let session = Session::new_with_opts(opts.output_folder.into(), spawner, sopts) + .await + .context("error initializing rqbit session")?; - let peer_opts = PeerConnectionOptions { - connect_timeout: opts.peer_connect_timeout.map(|p| p.0), + let torrent_opts = AddTorrentOptions { + only_files_regex: opts.only_files_matching_regex, + overwrite: opts.overwrite, + list_only: opts.list, + force_tracker_interval: opts.force_tracker_interval.map(|d| d.0), ..Default::default() }; - - // Magnet links are different in that we first need to discover the metadata. - if opts.torrent_path.starts_with("magnet:") { - let Magnet { - info_hash, - trackers, - } = Magnet::parse(&opts.torrent_path).context("provided path is not a valid magnet URL")?; - - let dht_rx = dht - .as_ref() - .ok_or_else(|| anyhow::anyhow!("magnet links without DHT are not supported"))? - .get_peers(info_hash) - .await?; - - let trackers = trackers - .into_iter() - .filter_map(|url| match reqwest::Url::parse(&url) { - Ok(url) => Some(url), - Err(e) => { - warn!("error parsing tracker {} as url: {}", url, e); - None - } - }) - .collect(); - - let (info, dht_rx, initial_peers) = - match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx, Some(peer_opts)) - .await - { - ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), - ReadMetainfoResult::ChannelClosed { .. } => { - anyhow::bail!("DHT died, no way to discover torrent metainfo") - } - }; - main_torrent_info( - opts, - info_hash, - info, - peer_id, - dht, - Some(dht_rx), - initial_peers.into_iter().collect(), - trackers, - spawner, - ) - .await - } else { - let torrent = if opts.torrent_path.starts_with("http://") - || opts.torrent_path.starts_with("https://") - { - torrent_from_url(&opts.torrent_path).await? - } else { - torrent_from_file(&opts.torrent_path)? - }; - let dht_rx = match dht.as_ref() { - Some(dht) => Some(dht.get_peers(torrent.info_hash).await?), - None => None, - }; - let trackers = torrent - .iter_announce() - .filter_map(|tracker| { - let url = match std::str::from_utf8(tracker.as_ref()) { - Ok(url) => url, - Err(_) => { - warn!("cannot parse tracker url as utf-8, ignoring"); - return None; - } - }; - match Url::parse(url) { - Ok(url) => Some(url), - Err(e) => { - warn!("cannot parse tracker URL {}: {}", url, e); - None - } - } - }) - .collect::>(); - main_torrent_info( - opts, - torrent.info_hash, - torrent.info, - peer_id, - dht, - dht_rx, - Vec::new(), - trackers, - spawner, - ) + if let Some(handle) = session + .add_torrent(opts.torrent_path, Some(torrent_opts)) .await + .context("error adding torrent to session")? + { + handle + .wait_until_completed() + .await + .context("error waiting for torrent completion")?; } -} - -#[allow(clippy::too_many_arguments)] -async fn main_torrent_info( - opts: Opts, - info_hash: Id20, - info: TorrentMetaV1Info, - peer_id: Id20, - dht: Option, - dht_peer_rx: Option + Unpin + Send + Sync + 'static>, - initial_peers: Vec, - trackers: Vec, - spawner: BlockingSpawner, -) -> anyhow::Result<()> { - info!("Torrent info: {:#?}", &info); - let only_files = if let Some(filename_re) = opts.only_files_matching_regex { - let only_files = compute_only_files(&info, &filename_re)?; - for (idx, (filename, _)) in info.iter_filenames_and_lengths()?.enumerate() { - if !only_files.contains(&idx) { - continue; - } - info!("Will download {:?}", filename); - } - Some(only_files) - } else { - None - }; - - if opts.list { - for (idx, (filename, len)) in info.iter_filenames_and_lengths()?.enumerate() { - let included = match &only_files { - Some(files) => files.contains(&idx), - None => true, - }; - info!( - "File {}, size {}{}", - filename.to_string()?, - SF::new(len), - if included { "" } else { "will skip" } - ) - } - info!("--list was passed, nothing to do, exiting."); - return Ok(()); - } - - let http_api_listen_addr = opts.http_api_listen_addr; - - let mut builder = TorrentManagerBuilder::new(info, info_hash, opts.output_folder); - builder - .overwrite(opts.overwrite) - .spawner(spawner) - .peer_id(peer_id); - if let Some(only_files) = only_files { - builder.only_files(only_files); - } - if let Some(interval) = opts.force_tracker_interval { - builder.force_tracker_interval(interval.0); - } - if let Some(t) = opts.peer_connect_timeout { - builder.peer_connect_timeout(t.0); - } - - let http_api = librqbit::http_api::HttpApi::new(dht.clone()); - spawn("HTTP API", { - let http_api = http_api.clone(); - async move { http_api.make_http_api_and_run(http_api_listen_addr).await } - }); - - let handle = builder.start_manager()?; - http_api.add_mgr(handle.clone()); - - for url in trackers { - handle.add_tracker(url); - } - for peer in initial_peers { - handle.add_peer(peer); - } - - spawn("Stats printer", { - let handle = handle.clone(); - async move { - loop { - let peer_stats = handle.torrent_state().peer_stats_snapshot(); - let stats = handle.torrent_state().stats_snapshot(); - let speed = handle.speed_estimator(); - let total = stats.total_bytes; - let progress = stats.total_bytes - stats.remaining_bytes; - let downloaded_pct = if stats.remaining_bytes == 0 { - 100f64 - } else { - (progress as f64 / total as f64) * 100f64 - }; - info!( - "Stats: {:.2}% ({:.2}), down speed {:.2} Mbps, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", - downloaded_pct, - SF::new(progress), - speed.download_mbps(), - SF::new(stats.fetched_bytes), - SF::new(stats.remaining_bytes), - SF::new(total), - SF::new(stats.uploaded_bytes), - peer_stats.live, - peer_stats.connecting, - peer_stats.queued, - peer_stats.seen, - ); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - }); - - if let Some(mut dht_peer_rx) = dht_peer_rx { - spawn("DHT peer adder", { - let handle = handle.clone(); - async move { - while let Some(peer) = dht_peer_rx.next().await { - handle.add_peer(peer); - } - warn!("dht was closed"); - Ok(()) - } - }); - } - - handle.wait_until_completed().await?; Ok(()) } From 6e9e79a02e8f50ed0ba5483879bb1b9bc5004875 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 9 Oct 2021 16:49:55 +0100 Subject: [PATCH 02/13] Move stats printing to main --- crates/librqbit/src/http_api.rs | 3 +- crates/librqbit/src/session.rs | 33 -------------------- crates/rqbit/src/main.rs | 53 ++++++++++++++++++++++++++++----- 3 files changed, 48 insertions(+), 41 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 39cf113..b753552 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -209,7 +209,8 @@ impl HttpApi { "GET /torrents": "List torrents (default torrent is 0)", "GET /torrents/{index}": "Torrent details", "GET /torrents/{index}/haves": "The bitfield of have pieces", - "GET /torrents/{index}/stats": "Torrent stats" + "GET /torrents/{index}/stats": "Torrent stats", + "POST /torrents/": "Add a torrent here. magnet: or http:// or a local file." } }); move || json_response(&api_list) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 7fc3dff..b1ffc8c 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -321,39 +321,6 @@ impl Session { handle.add_peer(peer); } - spawn("Stats printer", { - let handle = handle.clone(); - async move { - loop { - let peer_stats = handle.torrent_state().peer_stats_snapshot(); - let stats = handle.torrent_state().stats_snapshot(); - let speed = handle.speed_estimator(); - let total = stats.total_bytes; - let progress = stats.total_bytes - stats.remaining_bytes; - let downloaded_pct = if stats.remaining_bytes == 0 { - 100f64 - } else { - (progress as f64 / total as f64) * 100f64 - }; - info!( - "Stats: {:.2}% ({:.2}), down speed {:.2} Mbps, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", - downloaded_pct, - SF::new(progress), - speed.download_mbps(), - SF::new(stats.fetched_bytes), - SF::new(stats.remaining_bytes), - SF::new(total), - SF::new(stats.uploaded_bytes), - peer_stats.live, - peer_stats.connecting, - peer_stats.queued, - peer_stats.seen, - ); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - }); - if let Some(mut dht_peer_rx) = dht_peer_rx { spawn("DHT peer adder", { let handle = handle.clone(); diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 7153126..df48ff4 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -5,8 +5,10 @@ use clap::Clap; use librqbit::{ peer_connection::PeerConnectionOptions, session::{AddTorrentOptions, Session, SessionOptions}, - spawn_utils::BlockingSpawner, + spawn_utils::{spawn, BlockingSpawner}, }; +use log::info; +use size_format::SizeFormatterBinary as SF; #[derive(Debug, Clap)] enum LogLevel { @@ -158,15 +160,52 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> force_tracker_interval: opts.force_tracker_interval.map(|d| d.0), ..Default::default() }; - if let Some(handle) = session + + let handle = match session .add_torrent(opts.torrent_path, Some(torrent_opts)) .await .context("error adding torrent to session")? { - handle - .wait_until_completed() - .await - .context("error waiting for torrent completion")?; - } + Some(handle) => handle, + None => return Ok(()), + }; + + spawn("Stats printer", { + let handle = handle.clone(); + async move { + loop { + let peer_stats = handle.torrent_state().peer_stats_snapshot(); + let stats = handle.torrent_state().stats_snapshot(); + let speed = handle.speed_estimator(); + let total = stats.total_bytes; + let progress = stats.total_bytes - stats.remaining_bytes; + let downloaded_pct = if stats.remaining_bytes == 0 { + 100f64 + } else { + (progress as f64 / total as f64) * 100f64 + }; + info!( + "Stats: {:.2}% ({:.2}), down speed {:.2} Mbps, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", + downloaded_pct, + SF::new(progress), + speed.download_mbps(), + SF::new(stats.fetched_bytes), + SF::new(stats.remaining_bytes), + SF::new(total), + SF::new(stats.uploaded_bytes), + peer_stats.live, + peer_stats.connecting, + peer_stats.queued, + peer_stats.seen, + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }); + + handle + .wait_until_completed() + .await + .context("error waiting for torrent completion")?; Ok(()) } From 64900e1fd42012e02946695bd91751d2cf5240b0 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 10 Oct 2021 09:57:21 +0100 Subject: [PATCH 03/13] Add API to add a torrent --- crates/librqbit/src/http_api.rs | 80 +++++++++++++++++++++++++++------ crates/librqbit/src/session.rs | 30 ++----------- crates/rqbit/src/main.rs | 22 ++++++--- 3 files changed, 86 insertions(+), 46 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index b753552..6172330 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,13 +1,15 @@ -use std::net::SocketAddr; -use std::sync::Arc; - +use anyhow::Context; use dht::{Dht, DhtStats}; use parking_lot::RwLock; use serde::Serialize; +use std::net::SocketAddr; +use std::sync::Arc; use std::time::{Duration, Instant}; +use warp::hyper::body::Bytes; use warp::hyper::Body; use warp::Filter; +use crate::session::Session; use crate::torrent_manager::TorrentManagerHandle; use crate::torrent_state::StatsSnapshot; @@ -15,16 +17,25 @@ struct ApiInternal { dht: Option, startup_time: Instant, torrent_managers: RwLock>, + session: Arc, } impl ApiInternal { - fn new(dht: Option) -> Self { + fn new(session: Arc) -> Self { Self { - dht, + dht: session.get_dht(), startup_time: Instant::now(), torrent_managers: RwLock::new(Vec::new()), + session, } } + + fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { + let mut g = self.torrent_managers.write(); + let idx = g.len(); + g.push(handle); + idx + } } #[derive(Serialize)] @@ -116,6 +127,16 @@ impl ApiInternal { Some(TorrentDetailsResponse { info_hash, files }) } + async fn api_add_torrent(&self, url: String) -> anyhow::Result { + let handle = self + .session + .add_torrent(url, None) + .await + .context("error adding torrent")? + .context("expected session.add_torrent() to return a handle")?; + Ok(self.add_mgr(handle)) + } + fn api_dht_stats(&self) -> Option { self.dht.as_ref().map(|d| d.stats()) } @@ -185,16 +206,13 @@ fn json_or_404(idx: usize, v: Option) -> warp::reply::Response } impl HttpApi { - pub fn new(dht: Option) -> Self { + pub fn new(session: Arc) -> Self { Self { - inner: Arc::new(ApiInternal::new(dht)), + inner: Arc::new(ApiInternal::new(session)), } } pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { - let mut g = self.inner.torrent_managers.write(); - let idx = g.len(); - g.push(handle); - idx + self.inner.add_mgr(handle) } pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { @@ -235,11 +253,46 @@ impl HttpApi { } }); - let torrent_list = warp::path!("torrents").map({ + let torrent_list = warp::get().and(warp::path("torrents")).map({ let inner = inner.clone(); move || json_response(inner.api_torrent_list()) }); + let torrent_add = warp::post() + .and(warp::path("torrents")) + .and(warp::body::bytes()) + .and_then({ + let inner = inner.clone(); + use warp::http::Response; + fn make_response(status: u16, body: String) -> Response { + Response::builder().status(status).body(body).unwrap() + } + move |body: Bytes| { + let inner = inner.clone(); + async move { + let url = match String::from_utf8(body.to_vec()) { + Ok(str) => str, + Err(_) => { + return Ok::<_, warp::Rejection>(make_response( + 400, + "invalid utf-8".into(), + )) + } + }; + let idx = inner + .api_add_torrent(url) + .await + .context("error calling HttpApi::api_add_torrent"); + match idx { + Ok(idx) => { + return Ok(make_response(200, format!("{}", idx))); + } + Err(e) => return Ok(make_response(400, format!("{}", e))), + } + } + } + }); + let torrent_details = warp::path!("torrents" / usize).map({ let inner = inner.clone(); move |idx| json_or_404(idx, inner.api_torrent_details(idx)) @@ -264,7 +317,8 @@ impl HttpApi { .or(dht_routing_table) .or(torrent_details) .or(torrent_dump_haves) - .or(torrent_dump_stats); + .or(torrent_dump_stats) + .or(torrent_add); warp::serve(router).run(addr).await; Ok(()) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index b1ffc8c..4f90843 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -25,7 +25,6 @@ use crate::{ pub struct Session { peer_id: Id20, dht: Option, - http_api: Option, peer_opts: PeerConnectionOptions, spawner: BlockingSpawner, output_folder: PathBuf, @@ -95,8 +94,6 @@ pub struct SessionOptions { pub disable_dht: bool, pub disable_dht_persistence: bool, pub dht_config: Option, - pub disable_http_api: bool, - pub http_api_listen_addr: Option, pub peer_id: Option, pub peer_opts: Option, } @@ -124,29 +121,17 @@ impl Session { }; let peer_opts = opts.peer_opts.unwrap_or_default(); - let http_api = if opts.disable_http_api { - None - } else { - let http_api_listen_addr = opts - .http_api_listen_addr - .unwrap_or_else(|| "127.0.0.1:3001".parse().unwrap()); - let http_api = HttpApi::new(dht.clone()); - spawn("HTTP API", { - let http_api = http_api.clone(); - async move { http_api.make_http_api_and_run(http_api_listen_addr).await } - }); - Some(http_api) - }; - Ok(Self { peer_id, dht, - http_api, peer_opts, spawner, output_folder, }) } + pub fn get_dht(&self) -> Option { + self.dht.clone() + } pub async fn add_torrent( &self, url: String, @@ -303,16 +288,7 @@ impl Session { builder.peer_connect_timeout(t); } - // let http_api = HttpApi::new(self.dht.clone()); - // spawn("HTTP API", { - // let http_api = http_api.clone(); - // async move { http_api.make_http_api_and_run(http_api_listen_addr).await } - // }); - let handle = builder.start_manager()?; - if let Some(http_api) = self.http_api.as_ref() { - http_api.add_mgr(handle.clone()); - } for url in trackers { handle.add_tracker(url); diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index df48ff4..a18165d 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,8 +1,9 @@ -use std::{net::SocketAddr, str::FromStr, time::Duration}; +use std::{net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; use anyhow::Context; use clap::Clap; use librqbit::{ + http_api::HttpApi, peer_connection::PeerConnectionOptions, session::{AddTorrentOptions, Session, SessionOptions}, spawn_utils::{spawn, BlockingSpawner}, @@ -141,17 +142,26 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> disable_dht: opts.disable_dht, disable_dht_persistence: opts.disable_dht_persistence, dht_config: None, - disable_http_api: false, - http_api_listen_addr: Some(opts.http_api_listen_addr), peer_id: None, peer_opts: Some(PeerConnectionOptions { connect_timeout: opts.peer_connect_timeout.map(|d| d.0), ..Default::default() }), }; - let session = Session::new_with_opts(opts.output_folder.into(), spawner, sopts) - .await - .context("error initializing rqbit session")?; + + let session = Arc::new( + Session::new_with_opts(opts.output_folder.into(), spawner, sopts) + .await + .context("error initializing rqbit session")?, + ); + + { + let http_api = HttpApi::new(session.clone()); + spawn("HTTP API", { + let http_api_listen_addr = opts.http_api_listen_addr; + async move { http_api.make_http_api_and_run(http_api_listen_addr).await } + }); + }; let torrent_opts = AddTorrentOptions { only_files_regex: opts.only_files_matching_regex, From ba875294198be100448b72829a6b14019559424f Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 10 Oct 2021 10:45:58 +0100 Subject: [PATCH 04/13] Fix a bug in DHT that caused panics sometimes --- crates/dht/src/dht.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index ef56727..ae30b02 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -584,7 +584,7 @@ impl Stream for PeerStream { .unwrap() .get_index(pos) .unwrap(); - if pos < end { + if pos + 1 < end { self.initial_peers_pos = Some((pos + 1, end)); } self.absolute_stream_pos += 1; From 2ac76683e620e3352492e8a11c78e1c7035308cd Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 10 Oct 2021 10:46:09 +0100 Subject: [PATCH 05/13] Now actually downloads 2 files at a time --- crates/librqbit/src/http_api.rs | 6 +++--- crates/rqbit/src/main.rs | 17 +++++++++-------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 6172330..96c3e20 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -287,7 +287,7 @@ impl HttpApi { Ok(idx) => { return Ok(make_response(200, format!("{}", idx))); } - Err(e) => return Ok(make_response(400, format!("{}", e))), + Err(e) => return Ok(make_response(400, format!("{:#}", e))), } } } @@ -312,13 +312,13 @@ impl HttpApi { }); let router = api_list - .or(torrent_list) .or(dht_stats) .or(dht_routing_table) .or(torrent_details) .or(torrent_dump_haves) .or(torrent_dump_stats) - .or(torrent_add); + .or(torrent_add) + .or(torrent_list); warp::serve(router).run(addr).await; Ok(()) diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index a18165d..6abfbe2 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -155,14 +155,6 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> .context("error initializing rqbit session")?, ); - { - let http_api = HttpApi::new(session.clone()); - spawn("HTTP API", { - let http_api_listen_addr = opts.http_api_listen_addr; - async move { http_api.make_http_api_and_run(http_api_listen_addr).await } - }); - }; - let torrent_opts = AddTorrentOptions { only_files_regex: opts.only_files_matching_regex, overwrite: opts.overwrite, @@ -180,6 +172,15 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> None => return Ok(()), }; + { + let http_api = HttpApi::new(session.clone()); + http_api.add_mgr(handle.clone()); + spawn("HTTP API", { + let http_api_listen_addr = opts.http_api_listen_addr; + async move { http_api.make_http_api_and_run(http_api_listen_addr).await } + }); + }; + spawn("Stats printer", { let handle = handle.clone(); async move { From 99208800f4625f9142608239b92ef2f69cf541ff Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 10 Oct 2021 10:58:09 +0100 Subject: [PATCH 06/13] Initial check to use block_in_place --- crates/librqbit/src/torrent_manager.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index c825459..743a454 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -198,8 +198,10 @@ impl TorrentManager { debug!("computed lengths: {:?}", &lengths); info!("Doing initial checksum validation, this might take a while..."); - let initial_check_results = FileOps::::new(&info, &files, &lengths) - .initial_check(options.only_files.as_deref())?; + let initial_check_results = spawner.spawn_block_in_place(|| { + FileOps::::new(&info, &files, &lengths) + .initial_check(options.only_files.as_deref()) + })?; info!( "Initial check results: have {}, needed {}", From b1df9af3f6cc74b381f4b07a798ddff3af365d0d Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 10 Oct 2021 11:00:04 +0100 Subject: [PATCH 07/13] Cargo check / fmt fixes --- crates/librqbit/src/session.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 4f90843..f34a14f 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -16,7 +16,6 @@ use size_format::SizeFormatterBinary as SF; use crate::{ dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, - http_api::HttpApi, peer_connection::PeerConnectionOptions, spawn_utils::{spawn, BlockingSpawner}, torrent_manager::{TorrentManagerBuilder, TorrentManagerHandle}, From 9b73a8ff2fec5e12d691399fa21eb80da3180b0e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 10 Oct 2021 11:39:25 +0100 Subject: [PATCH 08/13] Add "overwrite=true" option to torrent add API --- crates/librqbit/src/http_api.rs | 26 +++++++--- crates/librqbit/src/session.rs | 87 ++++++++++++++++++++++++++++++++- 2 files changed, 105 insertions(+), 8 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 96c3e20..22e3ba4 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,7 +1,7 @@ use anyhow::Context; use dht::{Dht, DhtStats}; use parking_lot::RwLock; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -9,7 +9,7 @@ use warp::hyper::body::Bytes; use warp::hyper::Body; use warp::Filter; -use crate::session::Session; +use crate::session::{AddTorrentOptions, Session}; use crate::torrent_manager::TorrentManagerHandle; use crate::torrent_state::StatsSnapshot; @@ -127,10 +127,14 @@ impl ApiInternal { Some(TorrentDetailsResponse { info_hash, files }) } - async fn api_add_torrent(&self, url: String) -> anyhow::Result { + async fn api_add_torrent( + &self, + url: String, + opts: Option, + ) -> anyhow::Result { let handle = self .session - .add_torrent(url, None) + .add_torrent(url, opts) .await .context("error adding torrent")? .context("expected session.add_torrent() to return a handle")?; @@ -258,16 +262,22 @@ impl HttpApi { move || json_response(inner.api_torrent_list()) }); + #[derive(Deserialize)] + struct TorrentAddQueryParams { + overwrite: Option, + } + let torrent_add = warp::post() .and(warp::path("torrents")) .and(warp::body::bytes()) + .and(warp::query()) .and_then({ let inner = inner.clone(); use warp::http::Response; fn make_response(status: u16, body: String) -> Response { Response::builder().status(status).body(body).unwrap() } - move |body: Bytes| { + move |body: Bytes, params: TorrentAddQueryParams| { let inner = inner.clone(); async move { let url = match String::from_utf8(body.to_vec()) { @@ -279,8 +289,12 @@ impl HttpApi { )) } }; + let opts = AddTorrentOptions { + overwrite: params.overwrite.unwrap_or(false), + ..Default::default() + }; let idx = inner - .api_add_torrent(url) + .api_add_torrent(url, Some(opts)) .await .context("error calling HttpApi::api_add_torrent"); match idx { diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index f34a14f..001917e 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -9,6 +9,7 @@ use librqbit_core::{ torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned}, }; use log::{info, warn}; +use parking_lot::RwLock; use reqwest::Url; use tokio_stream::StreamExt; @@ -21,11 +22,45 @@ use crate::{ torrent_manager::{TorrentManagerBuilder, TorrentManagerHandle}, }; +pub enum ManagedTorrentState { + Initializing, + Running(TorrentManagerHandle), +} + +pub struct ManagedTorrent { + info_hash: Id20, + output_folder: PathBuf, + state: ManagedTorrentState, +} + +impl PartialEq for ManagedTorrent { + fn eq(&self, other: &Self) -> bool { + self.info_hash == other.info_hash && self.output_folder == other.output_folder + } +} + +#[derive(Default)] +pub struct SessionLocked { + torrents: Vec, +} + +impl SessionLocked { + fn add_torrent(&mut self, torrent: ManagedTorrent) -> Option { + if self.torrents.contains(&torrent) { + return None; + } + let idx = self.torrents.len(); + self.torrents.push(torrent); + Some(idx) + } +} + pub struct Session { peer_id: Id20, dht: Option, peer_opts: PeerConnectionOptions, spawner: BlockingSpawner, + locked: RwLock, output_folder: PathBuf, } @@ -126,6 +161,7 @@ impl Session { peer_opts, spawner, output_folder, + locked: RwLock::new(SessionLocked::default()), }) } pub fn get_dht(&self) -> Option { @@ -271,7 +307,21 @@ impl Session { .map(PathBuf::from) .unwrap_or_else(|| self.output_folder.clone()); - let mut builder = TorrentManagerBuilder::new(info, info_hash, output_folder); + let managed_torrent = ManagedTorrent { + info_hash, + output_folder: output_folder.clone(), + state: ManagedTorrentState::Initializing, + }; + + if self.locked.write().add_torrent(managed_torrent).is_none() { + anyhow::bail!( + "torrent with info_hash {:?} that is downloaded to {:?} is already managed", + info_hash, + &output_folder + ); + }; + + let mut builder = TorrentManagerBuilder::new(info, info_hash, output_folder.clone()); builder .overwrite(opts.overwrite) .spawner(self.spawner) @@ -287,7 +337,40 @@ impl Session { builder.peer_connect_timeout(t); } - let handle = builder.start_manager()?; + let handle = match builder + .start_manager() + .context("error starting torrent manager") + { + Ok(handle) => { + let mut g = self.locked.write(); + let m = g + .torrents + .iter_mut() + .find(|t| t.info_hash == info_hash && t.output_folder == output_folder) + .unwrap(); + m.state = ManagedTorrentState::Running(handle.clone()); + handle + } + Err(error) => { + let mut g = self.locked.write(); + let idx = g + .torrents + .iter() + .position(|t| t.info_hash == info_hash && t.output_folder == output_folder) + .unwrap(); + g.torrents.remove(idx); + return Err(error); + } + }; + { + let mut g = self.locked.write(); + let m = g + .torrents + .iter_mut() + .find(|t| t.info_hash == info_hash && t.output_folder == output_folder) + .unwrap(); + m.state = ManagedTorrentState::Running(handle.clone()); + } for url in trackers { handle.add_tracker(url); From 218aa4d9ee1572e7f14e538427539bdd2034a5d4 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 10 Oct 2021 11:43:44 +0100 Subject: [PATCH 09/13] Safety note --- crates/librqbit/src/http_api.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 22e3ba4..8104af3 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -232,6 +232,8 @@ impl HttpApi { "GET /torrents/{index}": "Torrent details", "GET /torrents/{index}/haves": "The bitfield of have pieces", "GET /torrents/{index}/stats": "Torrent stats", + // This is kind of not secure as it just reads any local file that it has access to, + // or any URL, but whatever, ok for our purposes / thread model. "POST /torrents/": "Add a torrent here. magnet: or http:// or a local file." } }); From d0757d41c5bebb0c24e7cdfecf259d686130bc51 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 10 Oct 2021 11:52:21 +0100 Subject: [PATCH 10/13] Print stats for each managed torrent now --- crates/librqbit/src/session.rs | 14 ++++++-- crates/rqbit/src/main.rs | 64 ++++++++++++++++++++-------------- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 001917e..53d63de 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -22,15 +22,17 @@ use crate::{ torrent_manager::{TorrentManagerBuilder, TorrentManagerHandle}, }; +#[derive(Clone)] pub enum ManagedTorrentState { Initializing, Running(TorrentManagerHandle), } +#[derive(Clone)] pub struct ManagedTorrent { - info_hash: Id20, - output_folder: PathBuf, - state: ManagedTorrentState, + pub info_hash: Id20, + pub output_folder: PathBuf, + pub state: ManagedTorrentState, } impl PartialEq for ManagedTorrent { @@ -167,6 +169,12 @@ impl Session { pub fn get_dht(&self) -> Option { self.dht.clone() } + pub fn with_torrents(&self, callback: F) + where + F: Fn(&[ManagedTorrent]), + { + callback(&self.locked.read().torrents) + } pub async fn add_torrent( &self, url: String, diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 6abfbe2..5f3b1b0 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -5,7 +5,7 @@ use clap::Clap; use librqbit::{ http_api::HttpApi, peer_connection::PeerConnectionOptions, - session::{AddTorrentOptions, Session, SessionOptions}, + session::{AddTorrentOptions, ManagedTorrentState, Session, SessionOptions}, spawn_utils::{spawn, BlockingSpawner}, }; use log::info; @@ -182,33 +182,45 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> }; spawn("Stats printer", { - let handle = handle.clone(); + let session = session.clone(); async move { loop { - let peer_stats = handle.torrent_state().peer_stats_snapshot(); - let stats = handle.torrent_state().stats_snapshot(); - let speed = handle.speed_estimator(); - let total = stats.total_bytes; - let progress = stats.total_bytes - stats.remaining_bytes; - let downloaded_pct = if stats.remaining_bytes == 0 { - 100f64 - } else { - (progress as f64 / total as f64) * 100f64 - }; - info!( - "Stats: {:.2}% ({:.2}), down speed {:.2} Mbps, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", - downloaded_pct, - SF::new(progress), - speed.download_mbps(), - SF::new(stats.fetched_bytes), - SF::new(stats.remaining_bytes), - SF::new(total), - SF::new(stats.uploaded_bytes), - peer_stats.live, - peer_stats.connecting, - peer_stats.queued, - peer_stats.seen, - ); + session.with_torrents(|torrents| { + for (idx, torrent) in torrents.iter().enumerate() { + match &torrent.state { + ManagedTorrentState::Initializing => { + info!("[{}] initializing", idx); + }, + ManagedTorrentState::Running(handle) => { + let peer_stats = handle.torrent_state().peer_stats_snapshot(); + let stats = handle.torrent_state().stats_snapshot(); + let speed = handle.speed_estimator(); + let total = stats.total_bytes; + let progress = stats.total_bytes - stats.remaining_bytes; + let downloaded_pct = if stats.remaining_bytes == 0 { + 100f64 + } else { + (progress as f64 / total as f64) * 100f64 + }; + info!( + "[{}]: {:.2}% ({:.2}), down speed {:.2} Mbps, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", + idx, + downloaded_pct, + SF::new(progress), + speed.download_mbps(), + SF::new(stats.fetched_bytes), + SF::new(stats.remaining_bytes), + SF::new(total), + SF::new(stats.uploaded_bytes), + peer_stats.live, + peer_stats.connecting, + peer_stats.queued, + peer_stats.seen, + ); + }, + } + } + }); tokio::time::sleep(Duration::from_secs(1)).await; } } From 3b8c4e053ff45669bf451fd036944bdac932ffbe Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 16 Oct 2021 00:13:58 +0100 Subject: [PATCH 11/13] Speed computation now better fits torrent with large pieces --- crates/librqbit/src/torrent_manager.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 743a454..683c5e1 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -247,10 +247,14 @@ impl TorrentManager { let state = mgr.state.clone(); async move { loop { - let downloaded = state.stats_snapshot().downloaded_and_checked_bytes; + let stats = state.stats_snapshot(); + let fetched = state.stats_snapshot().fetched_bytes; let needed = state.initially_needed(); - let remaining = needed - downloaded; - estimator.add_snapshot(downloaded, remaining, Instant::now()); + // fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64. + let remaining = needed + .wrapping_sub(fetched) + .min(needed - stats.downloaded_and_checked_bytes); + estimator.add_snapshot(fetched, remaining, Instant::now()); tokio::time::sleep(Duration::from_secs(1)).await; } } From 6bd518676d4de13643e5472f130fdc923a593f0e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 16 Oct 2021 12:51:17 +0100 Subject: [PATCH 12/13] Truncate files at the start to required length --- crates/librqbit/src/torrent_manager.rs | 27 +++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 683c5e1..7e35c1d 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -14,7 +14,7 @@ use librqbit_core::{ id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator, torrent_metainfo::TorrentMetaV1Info, }; -use log::{debug, info}; +use log::{debug, info, warn}; use parking_lot::Mutex; use reqwest::Url; use sha1w::Sha1; @@ -152,6 +152,10 @@ fn make_lengths>( Lengths::new(total_length, torrent.piece_length, None) } +fn ensure_file_length(file: &mut File, length: u64) -> anyhow::Result<()> { + Ok(file.set_len(length)?) +} + impl TorrentManager { fn start>( info: TorrentMetaV1Info, @@ -209,6 +213,27 @@ impl TorrentManager { SF::new(initial_check_results.needed_bytes) ); + spawner.spawn_block_in_place(|| { + for (file, (name, length)) in + files.iter().zip(info.iter_filenames_and_lengths().unwrap()) + { + let now = Instant::now(); + if let Err(err) = ensure_file_length(&mut file.lock(), length) { + warn!( + "Error setting length for file {:?} to {}: {:#?}", + name, length, err + ); + } else { + info!( + "Set length for file {:?} to {} in {:?}", + name, + SF::new(length), + now.elapsed() + ); + } + } + }); + let chunk_tracker = ChunkTracker::new( initial_check_results.needed_pieces, initial_check_results.have_pieces, From 5de63af78309266c03ee06744b64f1b38751bd2a Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 16 Oct 2021 13:18:25 +0100 Subject: [PATCH 13/13] Prioritize last piece. Dont ask me why :) --- crates/librqbit/src/chunk_tracker.rs | 12 ++++++++++++ crates/librqbit/src/torrent_state.rs | 4 ++-- crates/librqbit_core/src/lengths.rs | 3 +++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 1385a4d..7dbdebe 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -72,6 +72,18 @@ impl ChunkTracker { self.needed_pieces.set(index.get() as usize, false) } + pub fn iter_needed_pieces(&self) -> impl Iterator + '_ { + // Try the last piece first. Often important information is stored in the last piece. + // Then sequentially one by one. This is not super sophisticated but sometimes helps. + let last_piece_id = self.lengths.last_piece_id().get() as usize; + self.needed_pieces + .get(last_piece_id..) + .unwrap() + .iter_ones() + .map(move |n| n + last_piece_id) + .chain(self.needed_pieces.get(..last_piece_id).unwrap().iter_ones()) + } + // None if wrong chunk // true if did something // false if didn't do anything diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index a240eff..d6340be 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -349,7 +349,7 @@ impl TorrentState { fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { let g = self.locked.read(); let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; - for n in g.chunks.get_needed_pieces().iter_ones() { + for n in g.chunks.iter_needed_pieces() { if bf.get(n).map(|v| *v) == Some(true) { // in theory it should be safe without validation, but whatever. return self.lengths.validate_piece_index(n as u32); @@ -375,7 +375,7 @@ impl TorrentState { let n = { let mut n_opt = None; let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; - for n in g.chunks.get_needed_pieces().iter_ones() { + for n in g.chunks.iter_needed_pieces() { if bf.get(n).map(|v| *v) == Some(true) { n_opt = Some(n); break; diff --git a/crates/librqbit_core/src/lengths.rs b/crates/librqbit_core/src/lengths.rs index 3d5ade4..e3d7206 100644 --- a/crates/librqbit_core/src/lengths.rs +++ b/crates/librqbit_core/src/lengths.rs @@ -118,6 +118,9 @@ impl Lengths { pub const fn total_chunks(&self) -> u32 { ceil_div_u64(self.total_length, self.chunk_length as u64) as u32 } + pub const fn last_piece_id(&self) -> ValidPieceIndex { + ValidPieceIndex(self.last_piece_id) + } pub const fn total_pieces(&self) -> u32 { self.last_piece_id + 1 }