From 1667efdaa77103943782aa7c74747f30b5331c14 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 9 Oct 2021 15:03:42 +0100 Subject: [PATCH] Start refactoring code to support running in background and adding multiple torrents --- Cargo.lock | 1 + crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/lib.rs | 1 + crates/librqbit/src/session.rs | 372 +++++++++++++++++++++++++++++++++ crates/rqbit/src/main.rs | 317 +++------------------------- 5 files changed, 405 insertions(+), 287 deletions(-) create mode 100644 crates/librqbit/src/session.rs diff --git a/Cargo.lock b/Cargo.lock index 6e68c6f..e94e0c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -725,6 +725,7 @@ dependencies = [ "peer_binary_protocol", "pretty_env_logger", "rand 0.8.4", + "regex", "reqwest", "serde", "serde_json", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index b9aa94e..1bb1c58 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -27,6 +27,7 @@ serde = {version = "1", features=["derive"]} serde_json = "1" anyhow = "1" +regex = "1" reqwest = "0.11" urlencoding = "1" byteorder = "1" diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 5def0b3..c0b8ca0 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -6,6 +6,7 @@ pub mod peer_connection; pub mod peer_handler; pub mod peer_info_reader; pub mod peer_state; +pub mod session; pub mod spawn_utils; pub mod torrent_manager; pub mod torrent_state; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs new file mode 100644 index 0000000..7fc3dff --- /dev/null +++ b/crates/librqbit/src/session.rs @@ -0,0 +1,372 @@ +use std::{fs::File, io::Read, net::SocketAddr, path::PathBuf, time::Duration}; + +use anyhow::Context; +use buffers::ByteString; +use dht::{Dht, Id20, PersistentDht, PersistentDhtConfig}; +use librqbit_core::{ + magnet::Magnet, + peer_id::generate_peer_id, + torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned}, +}; +use log::{info, warn}; +use reqwest::Url; +use tokio_stream::StreamExt; + +use size_format::SizeFormatterBinary as SF; + +use crate::{ + dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, + http_api::HttpApi, + peer_connection::PeerConnectionOptions, + spawn_utils::{spawn, BlockingSpawner}, + torrent_manager::{TorrentManagerBuilder, TorrentManagerHandle}, +}; + +pub struct Session { + peer_id: Id20, + dht: Option, + http_api: Option, + peer_opts: PeerConnectionOptions, + spawner: BlockingSpawner, + output_folder: PathBuf, +} + +async fn torrent_from_url(url: &str) -> anyhow::Result { + let response = reqwest::get(url) + .await + .with_context(|| format!("error downloading torrent metadata from {}", url))?; + if !response.status().is_success() { + anyhow::bail!("GET {} returned {}", url, response.status()) + } + let b = response + .bytes() + .await + .with_context(|| format!("error reading repsonse body from {}", url))?; + torrent_from_bytes(&b).context("error decoding torrent") +} + +fn torrent_from_file(filename: &str) -> anyhow::Result { + let mut buf = Vec::new(); + if filename == "-" { + std::io::stdin() + .read_to_end(&mut buf) + .context("error reading stdin")?; + } else { + File::open(filename) + .with_context(|| format!("error opening {}", filename))? + .read_to_end(&mut buf) + .with_context(|| format!("error reading {}", filename))?; + } + torrent_from_bytes(&buf).context("error decoding torrent") +} + +fn compute_only_files>( + torrent: &TorrentMetaV1Info, + filename_re: &str, +) -> anyhow::Result> { + let filename_re = regex::Regex::new(filename_re).context("filename regex is incorrect")?; + let mut only_files = Vec::new(); + for (idx, (filename, _)) in torrent.iter_filenames_and_lengths()?.enumerate() { + let full_path = filename + .to_pathbuf() + .with_context(|| format!("filename of file {} is not valid utf8", idx))?; + if filename_re.is_match(full_path.to_str().unwrap()) { + only_files.push(idx); + } + } + if only_files.is_empty() { + anyhow::bail!("none of the filenames match the given regex") + } + Ok(only_files) +} + +#[derive(Default)] +pub struct AddTorrentOptions { + pub only_files_regex: Option, + pub overwrite: bool, + pub list_only: bool, + pub output_folder: Option, + pub peer_opts: Option, + pub force_tracker_interval: Option, +} + +#[derive(Default)] +pub struct SessionOptions { + pub disable_dht: bool, + pub disable_dht_persistence: bool, + pub dht_config: Option, + pub disable_http_api: bool, + pub http_api_listen_addr: Option, + pub peer_id: Option, + pub peer_opts: Option, +} + +impl Session { + pub async fn new(output_folder: PathBuf, spawner: BlockingSpawner) -> anyhow::Result { + Self::new_with_opts(output_folder, spawner, SessionOptions::default()).await + } + pub async fn new_with_opts( + output_folder: PathBuf, + spawner: BlockingSpawner, + opts: SessionOptions, + ) -> anyhow::Result { + let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); + let dht = if opts.disable_dht { + None + } else { + let dht = if opts.disable_dht_persistence { + Dht::new().await + } else { + PersistentDht::create(opts.dht_config).await + } + .context("error initializing DHT")?; + Some(dht) + }; + let peer_opts = opts.peer_opts.unwrap_or_default(); + + let http_api = if opts.disable_http_api { + None + } else { + let http_api_listen_addr = opts + .http_api_listen_addr + .unwrap_or_else(|| "127.0.0.1:3001".parse().unwrap()); + let http_api = HttpApi::new(dht.clone()); + spawn("HTTP API", { + let http_api = http_api.clone(); + async move { http_api.make_http_api_and_run(http_api_listen_addr).await } + }); + Some(http_api) + }; + + Ok(Self { + peer_id, + dht, + http_api, + peer_opts, + spawner, + output_folder, + }) + } + pub async fn add_torrent( + &self, + url: String, + opts: Option, + ) -> anyhow::Result> { + // Magnet links are different in that we first need to discover the metadata. + let opts = opts.unwrap_or_default(); + if url.starts_with("magnet:") { + let Magnet { + info_hash, + trackers, + } = Magnet::parse(&url).context("provided path is not a valid magnet URL")?; + + let dht_rx = self + .dht + .as_ref() + .context("magnet links without DHT are not supported")? + .get_peers(info_hash) + .await?; + + let trackers = trackers + .into_iter() + .filter_map(|url| match reqwest::Url::parse(&url) { + Ok(url) => Some(url), + Err(e) => { + warn!("error parsing tracker {} as url: {}", url, e); + None + } + }) + .collect(); + + let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver( + self.peer_id, + info_hash, + dht_rx, + Some(self.peer_opts), + ) + .await + { + ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), + ReadMetainfoResult::ChannelClosed { .. } => { + anyhow::bail!("DHT died, no way to discover torrent metainfo") + } + }; + self.main_torrent_info( + info_hash, + info, + Some(dht_rx), + initial_peers.into_iter().collect(), + trackers, + opts, + ) + .await + } else { + let torrent = if url.starts_with("http://") || url.starts_with("https://") { + torrent_from_url(&url).await? + } else { + torrent_from_file(&url)? + }; + let dht_rx = match self.dht.as_ref() { + Some(dht) => Some(dht.get_peers(torrent.info_hash).await?), + None => None, + }; + let trackers = torrent + .iter_announce() + .filter_map(|tracker| { + let url = match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => url, + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + return None; + } + }; + match Url::parse(url) { + Ok(url) => Some(url), + Err(e) => { + warn!("cannot parse tracker URL {}: {}", url, e); + None + } + } + }) + .collect::>(); + self.main_torrent_info( + torrent.info_hash, + torrent.info, + dht_rx, + Vec::new(), + trackers, + opts, + ) + .await + } + } + + #[allow(clippy::too_many_arguments)] + async fn main_torrent_info( + &self, + info_hash: Id20, + info: TorrentMetaV1Info, + dht_peer_rx: Option + Unpin + Send + Sync + 'static>, + initial_peers: Vec, + trackers: Vec, + opts: AddTorrentOptions, + ) -> anyhow::Result> { + info!("Torrent info: {:#?}", &info); + let only_files = if let Some(filename_re) = opts.only_files_regex { + let only_files = compute_only_files(&info, &filename_re)?; + for (idx, (filename, _)) in info.iter_filenames_and_lengths()?.enumerate() { + if !only_files.contains(&idx) { + continue; + } + info!("Will download {:?}", filename); + } + Some(only_files) + } else { + None + }; + + if opts.list_only { + for (idx, (filename, len)) in info.iter_filenames_and_lengths()?.enumerate() { + let included = match &only_files { + Some(files) => files.contains(&idx), + None => true, + }; + info!( + "File {}, size {}{}", + filename.to_string()?, + SF::new(len), + if included { "" } else { "will skip" } + ) + } + info!("--list was passed, nothing to do, exiting."); + return Ok(None); + } + + let output_folder = opts + .output_folder + .map(PathBuf::from) + .unwrap_or_else(|| self.output_folder.clone()); + + let mut builder = TorrentManagerBuilder::new(info, info_hash, output_folder); + builder + .overwrite(opts.overwrite) + .spawner(self.spawner) + .peer_id(self.peer_id); + if let Some(only_files) = only_files { + builder.only_files(only_files); + } + if let Some(interval) = opts.force_tracker_interval { + builder.force_tracker_interval(interval); + } + + if let Some(t) = opts.peer_opts.unwrap_or(self.peer_opts).connect_timeout { + builder.peer_connect_timeout(t); + } + + // let http_api = HttpApi::new(self.dht.clone()); + // spawn("HTTP API", { + // let http_api = http_api.clone(); + // async move { http_api.make_http_api_and_run(http_api_listen_addr).await } + // }); + + let handle = builder.start_manager()?; + if let Some(http_api) = self.http_api.as_ref() { + http_api.add_mgr(handle.clone()); + } + + for url in trackers { + handle.add_tracker(url); + } + for peer in initial_peers { + handle.add_peer(peer); + } + + spawn("Stats printer", { + let handle = handle.clone(); + async move { + loop { + let peer_stats = handle.torrent_state().peer_stats_snapshot(); + let stats = handle.torrent_state().stats_snapshot(); + let speed = handle.speed_estimator(); + let total = stats.total_bytes; + let progress = stats.total_bytes - stats.remaining_bytes; + let downloaded_pct = if stats.remaining_bytes == 0 { + 100f64 + } else { + (progress as f64 / total as f64) * 100f64 + }; + info!( + "Stats: {:.2}% ({:.2}), down speed {:.2} Mbps, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", + downloaded_pct, + SF::new(progress), + speed.download_mbps(), + SF::new(stats.fetched_bytes), + SF::new(stats.remaining_bytes), + SF::new(total), + SF::new(stats.uploaded_bytes), + peer_stats.live, + peer_stats.connecting, + peer_stats.queued, + peer_stats.seen, + ); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }); + + if let Some(mut dht_peer_rx) = dht_peer_rx { + spawn("DHT peer adder", { + let handle = handle.clone(); + async move { + while let Some(peer) = dht_peer_rx.next().await { + handle.add_peer(peer); + } + warn!("dht was closed"); + Ok(()) + } + }); + } + + Ok(Some(handle)) + } +} diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 4028a76..7153126 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,50 +1,12 @@ -use std::{fs::File, io::Read, net::SocketAddr, str::FromStr, time::Duration}; +use std::{net::SocketAddr, str::FromStr, time::Duration}; use anyhow::Context; use clap::Clap; -use dht::{Dht, Id20, PersistentDht}; -use futures::StreamExt; use librqbit::{ - dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, - generate_peer_id, peer_connection::PeerConnectionOptions, - spawn_utils::{spawn, BlockingSpawner}, - torrent_from_bytes, - torrent_manager::TorrentManagerBuilder, - ByteString, Magnet, TorrentMetaV1Info, TorrentMetaV1Owned, + session::{AddTorrentOptions, Session, SessionOptions}, + spawn_utils::BlockingSpawner, }; -use log::{info, warn}; -use reqwest::Url; -use size_format::SizeFormatterBinary as SF; - -async fn torrent_from_url(url: &str) -> anyhow::Result { - let response = reqwest::get(url) - .await - .with_context(|| format!("error downloading torrent metadata from {}", url))?; - if !response.status().is_success() { - anyhow::bail!("GET {} returned {}", url, response.status()) - } - let b = response - .bytes() - .await - .with_context(|| format!("error reading repsonse body from {}", url))?; - torrent_from_bytes(&b).context("error decoding torrent") -} - -fn torrent_from_file(filename: &str) -> anyhow::Result { - let mut buf = Vec::new(); - if filename == "-" { - std::io::stdin() - .read_to_end(&mut buf) - .context("error reading stdin")?; - } else { - File::open(filename) - .with_context(|| format!("error opening {}", filename))? - .read_to_end(&mut buf) - .with_context(|| format!("error reading {}", filename))?; - } - torrent_from_bytes(&buf).context("error decoding torrent") -} #[derive(Debug, Clap)] enum LogLevel { @@ -121,26 +83,6 @@ struct Opts { peer_connect_timeout: Option, } -fn compute_only_files>( - torrent: &TorrentMetaV1Info, - filename_re: &str, -) -> anyhow::Result> { - let filename_re = regex::Regex::new(filename_re).context("filename regex is incorrect")?; - let mut only_files = Vec::new(); - for (idx, (filename, _)) in torrent.iter_filenames_and_lengths()?.enumerate() { - let full_path = filename - .to_pathbuf() - .with_context(|| format!("filename of file {} is not valid utf8", idx))?; - if filename_re.is_match(full_path.to_str().unwrap()) { - only_files.push(idx); - } - } - if only_files.is_empty() { - anyhow::bail!("none of the filenames match the given regex") - } - Ok(only_files) -} - fn init_logging(opts: &Opts) { if std::env::var_os("RUST_LOG").is_none() { match opts.log_level.as_ref() { @@ -193,237 +135,38 @@ fn main() -> anyhow::Result<()> { } async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> { - let peer_id = generate_peer_id(); - let dht = if opts.disable_dht { - None - } else { - let dht = if opts.disable_dht_persistence { - Dht::new().await - } else { - PersistentDht::create(None).await - } - .context("error initializing DHT")?; - Some(dht) + let sopts = SessionOptions { + disable_dht: opts.disable_dht, + disable_dht_persistence: opts.disable_dht_persistence, + dht_config: None, + disable_http_api: false, + http_api_listen_addr: Some(opts.http_api_listen_addr), + peer_id: None, + peer_opts: Some(PeerConnectionOptions { + connect_timeout: opts.peer_connect_timeout.map(|d| d.0), + ..Default::default() + }), }; + let session = Session::new_with_opts(opts.output_folder.into(), spawner, sopts) + .await + .context("error initializing rqbit session")?; - let peer_opts = PeerConnectionOptions { - connect_timeout: opts.peer_connect_timeout.map(|p| p.0), + let torrent_opts = AddTorrentOptions { + only_files_regex: opts.only_files_matching_regex, + overwrite: opts.overwrite, + list_only: opts.list, + force_tracker_interval: opts.force_tracker_interval.map(|d| d.0), ..Default::default() }; - - // Magnet links are different in that we first need to discover the metadata. - if opts.torrent_path.starts_with("magnet:") { - let Magnet { - info_hash, - trackers, - } = Magnet::parse(&opts.torrent_path).context("provided path is not a valid magnet URL")?; - - let dht_rx = dht - .as_ref() - .ok_or_else(|| anyhow::anyhow!("magnet links without DHT are not supported"))? - .get_peers(info_hash) - .await?; - - let trackers = trackers - .into_iter() - .filter_map(|url| match reqwest::Url::parse(&url) { - Ok(url) => Some(url), - Err(e) => { - warn!("error parsing tracker {} as url: {}", url, e); - None - } - }) - .collect(); - - let (info, dht_rx, initial_peers) = - match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx, Some(peer_opts)) - .await - { - ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), - ReadMetainfoResult::ChannelClosed { .. } => { - anyhow::bail!("DHT died, no way to discover torrent metainfo") - } - }; - main_torrent_info( - opts, - info_hash, - info, - peer_id, - dht, - Some(dht_rx), - initial_peers.into_iter().collect(), - trackers, - spawner, - ) - .await - } else { - let torrent = if opts.torrent_path.starts_with("http://") - || opts.torrent_path.starts_with("https://") - { - torrent_from_url(&opts.torrent_path).await? - } else { - torrent_from_file(&opts.torrent_path)? - }; - let dht_rx = match dht.as_ref() { - Some(dht) => Some(dht.get_peers(torrent.info_hash).await?), - None => None, - }; - let trackers = torrent - .iter_announce() - .filter_map(|tracker| { - let url = match std::str::from_utf8(tracker.as_ref()) { - Ok(url) => url, - Err(_) => { - warn!("cannot parse tracker url as utf-8, ignoring"); - return None; - } - }; - match Url::parse(url) { - Ok(url) => Some(url), - Err(e) => { - warn!("cannot parse tracker URL {}: {}", url, e); - None - } - } - }) - .collect::>(); - main_torrent_info( - opts, - torrent.info_hash, - torrent.info, - peer_id, - dht, - dht_rx, - Vec::new(), - trackers, - spawner, - ) + if let Some(handle) = session + .add_torrent(opts.torrent_path, Some(torrent_opts)) .await + .context("error adding torrent to session")? + { + handle + .wait_until_completed() + .await + .context("error waiting for torrent completion")?; } -} - -#[allow(clippy::too_many_arguments)] -async fn main_torrent_info( - opts: Opts, - info_hash: Id20, - info: TorrentMetaV1Info, - peer_id: Id20, - dht: Option, - dht_peer_rx: Option + Unpin + Send + Sync + 'static>, - initial_peers: Vec, - trackers: Vec, - spawner: BlockingSpawner, -) -> anyhow::Result<()> { - info!("Torrent info: {:#?}", &info); - let only_files = if let Some(filename_re) = opts.only_files_matching_regex { - let only_files = compute_only_files(&info, &filename_re)?; - for (idx, (filename, _)) in info.iter_filenames_and_lengths()?.enumerate() { - if !only_files.contains(&idx) { - continue; - } - info!("Will download {:?}", filename); - } - Some(only_files) - } else { - None - }; - - if opts.list { - for (idx, (filename, len)) in info.iter_filenames_and_lengths()?.enumerate() { - let included = match &only_files { - Some(files) => files.contains(&idx), - None => true, - }; - info!( - "File {}, size {}{}", - filename.to_string()?, - SF::new(len), - if included { "" } else { "will skip" } - ) - } - info!("--list was passed, nothing to do, exiting."); - return Ok(()); - } - - let http_api_listen_addr = opts.http_api_listen_addr; - - let mut builder = TorrentManagerBuilder::new(info, info_hash, opts.output_folder); - builder - .overwrite(opts.overwrite) - .spawner(spawner) - .peer_id(peer_id); - if let Some(only_files) = only_files { - builder.only_files(only_files); - } - if let Some(interval) = opts.force_tracker_interval { - builder.force_tracker_interval(interval.0); - } - if let Some(t) = opts.peer_connect_timeout { - builder.peer_connect_timeout(t.0); - } - - let http_api = librqbit::http_api::HttpApi::new(dht.clone()); - spawn("HTTP API", { - let http_api = http_api.clone(); - async move { http_api.make_http_api_and_run(http_api_listen_addr).await } - }); - - let handle = builder.start_manager()?; - http_api.add_mgr(handle.clone()); - - for url in trackers { - handle.add_tracker(url); - } - for peer in initial_peers { - handle.add_peer(peer); - } - - spawn("Stats printer", { - let handle = handle.clone(); - async move { - loop { - let peer_stats = handle.torrent_state().peer_stats_snapshot(); - let stats = handle.torrent_state().stats_snapshot(); - let speed = handle.speed_estimator(); - let total = stats.total_bytes; - let progress = stats.total_bytes - stats.remaining_bytes; - let downloaded_pct = if stats.remaining_bytes == 0 { - 100f64 - } else { - (progress as f64 / total as f64) * 100f64 - }; - info!( - "Stats: {:.2}% ({:.2}), down speed {:.2} Mbps, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", - downloaded_pct, - SF::new(progress), - speed.download_mbps(), - SF::new(stats.fetched_bytes), - SF::new(stats.remaining_bytes), - SF::new(total), - SF::new(stats.uploaded_bytes), - peer_stats.live, - peer_stats.connecting, - peer_stats.queued, - peer_stats.seen, - ); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - }); - - if let Some(mut dht_peer_rx) = dht_peer_rx { - spawn("DHT peer adder", { - let handle = handle.clone(); - async move { - while let Some(peer) = dht_peer_rx.next().await { - handle.add_peer(peer); - } - warn!("dht was closed"); - Ok(()) - } - }); - } - - handle.wait_until_completed().await?; Ok(()) }