diff --git a/Cargo.lock b/Cargo.lock index 6e68c6f..e94e0c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -725,6 +725,7 @@ dependencies = [ "peer_binary_protocol", "pretty_env_logger", "rand 0.8.4", + "regex", "reqwest", "serde", "serde_json", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index b9aa94e..1bb1c58 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -27,6 +27,7 @@ serde = {version = "1", features=["derive"]} serde_json = "1" anyhow = "1" +regex = "1" reqwest = "0.11" urlencoding = "1" byteorder = "1" diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 1385a4d..7dbdebe 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -72,6 +72,18 @@ impl ChunkTracker { self.needed_pieces.set(index.get() as usize, false) } + pub fn iter_needed_pieces(&self) -> impl Iterator + '_ { + // Try the last piece first. Often important information is stored in the last piece. + // Then sequentially one by one. This is not super sophisticated but sometimes helps. + let last_piece_id = self.lengths.last_piece_id().get() as usize; + self.needed_pieces + .get(last_piece_id..) + .unwrap() + .iter_ones() + .map(move |n| n + last_piece_id) + .chain(self.needed_pieces.get(..last_piece_id).unwrap().iter_ones()) + } + // None if wrong chunk // true if did something // false if didn't do anything diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 39cf113..8104af3 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -1,13 +1,15 @@ -use std::net::SocketAddr; -use std::sync::Arc; - +use anyhow::Context; use dht::{Dht, DhtStats}; use parking_lot::RwLock; -use serde::Serialize; +use serde::{Deserialize, Serialize}; +use std::net::SocketAddr; +use std::sync::Arc; use std::time::{Duration, Instant}; +use warp::hyper::body::Bytes; use warp::hyper::Body; use warp::Filter; +use crate::session::{AddTorrentOptions, Session}; use crate::torrent_manager::TorrentManagerHandle; use crate::torrent_state::StatsSnapshot; @@ -15,16 +17,25 @@ struct ApiInternal { dht: Option, startup_time: Instant, torrent_managers: RwLock>, + session: Arc, } impl ApiInternal { - fn new(dht: Option) -> Self { + fn new(session: Arc) -> Self { Self { - dht, + dht: session.get_dht(), startup_time: Instant::now(), torrent_managers: RwLock::new(Vec::new()), + session, } } + + fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { + let mut g = self.torrent_managers.write(); + let idx = g.len(); + g.push(handle); + idx + } } #[derive(Serialize)] @@ -116,6 +127,20 @@ impl ApiInternal { Some(TorrentDetailsResponse { info_hash, files }) } + async fn api_add_torrent( + &self, + url: String, + opts: Option, + ) -> anyhow::Result { + let handle = self + .session + .add_torrent(url, opts) + .await + .context("error adding torrent")? + .context("expected session.add_torrent() to return a handle")?; + Ok(self.add_mgr(handle)) + } + fn api_dht_stats(&self) -> Option { self.dht.as_ref().map(|d| d.stats()) } @@ -185,16 +210,13 @@ fn json_or_404(idx: usize, v: Option) -> warp::reply::Response } impl HttpApi { - pub fn new(dht: Option) -> Self { + pub fn new(session: Arc) -> Self { Self { - inner: Arc::new(ApiInternal::new(dht)), + inner: Arc::new(ApiInternal::new(session)), } } pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize { - let mut g = self.inner.torrent_managers.write(); - let idx = g.len(); - g.push(handle); - idx + self.inner.add_mgr(handle) } pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { @@ -209,7 +231,10 @@ impl HttpApi { "GET /torrents": "List torrents (default torrent is 0)", "GET /torrents/{index}": "Torrent details", "GET /torrents/{index}/haves": "The bitfield of have pieces", - "GET /torrents/{index}/stats": "Torrent stats" + "GET /torrents/{index}/stats": "Torrent stats", + // This is kind of not secure as it just reads any local file that it has access to, + // or any URL, but whatever, ok for our purposes / thread model. + "POST /torrents/": "Add a torrent here. magnet: or http:// or a local file." } }); move || json_response(&api_list) @@ -234,11 +259,56 @@ impl HttpApi { } }); - let torrent_list = warp::path!("torrents").map({ + let torrent_list = warp::get().and(warp::path("torrents")).map({ let inner = inner.clone(); move || json_response(inner.api_torrent_list()) }); + #[derive(Deserialize)] + struct TorrentAddQueryParams { + overwrite: Option, + } + + let torrent_add = warp::post() + .and(warp::path("torrents")) + .and(warp::body::bytes()) + .and(warp::query()) + .and_then({ + let inner = inner.clone(); + use warp::http::Response; + fn make_response(status: u16, body: String) -> Response { + Response::builder().status(status).body(body).unwrap() + } + move |body: Bytes, params: TorrentAddQueryParams| { + let inner = inner.clone(); + async move { + let url = match String::from_utf8(body.to_vec()) { + Ok(str) => str, + Err(_) => { + return Ok::<_, warp::Rejection>(make_response( + 400, + "invalid utf-8".into(), + )) + } + }; + let opts = AddTorrentOptions { + overwrite: params.overwrite.unwrap_or(false), + ..Default::default() + }; + let idx = inner + .api_add_torrent(url, Some(opts)) + .await + .context("error calling HttpApi::api_add_torrent"); + match idx { + Ok(idx) => { + return Ok(make_response(200, format!("{}", idx))); + } + Err(e) => return Ok(make_response(400, format!("{:#}", e))), + } + } + } + }); + let torrent_details = warp::path!("torrents" / usize).map({ let inner = inner.clone(); move |idx| json_or_404(idx, inner.api_torrent_details(idx)) @@ -258,12 +328,13 @@ impl HttpApi { }); let router = api_list - .or(torrent_list) .or(dht_stats) .or(dht_routing_table) .or(torrent_details) .or(torrent_dump_haves) - .or(torrent_dump_stats); + .or(torrent_dump_stats) + .or(torrent_add) + .or(torrent_list); warp::serve(router).run(addr).await; Ok(()) diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 5def0b3..c0b8ca0 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -6,6 +6,7 @@ pub mod peer_connection; pub mod peer_handler; pub mod peer_info_reader; pub mod peer_state; +pub mod session; pub mod spawn_utils; pub mod torrent_manager; pub mod torrent_state; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs new file mode 100644 index 0000000..53d63de --- /dev/null +++ b/crates/librqbit/src/session.rs @@ -0,0 +1,405 @@ +use std::{fs::File, io::Read, net::SocketAddr, path::PathBuf, time::Duration}; + +use anyhow::Context; +use buffers::ByteString; +use dht::{Dht, Id20, PersistentDht, PersistentDhtConfig}; +use librqbit_core::{ + magnet::Magnet, + peer_id::generate_peer_id, + torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned}, +}; +use log::{info, warn}; +use parking_lot::RwLock; +use reqwest::Url; +use tokio_stream::StreamExt; + +use size_format::SizeFormatterBinary as SF; + +use crate::{ + dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, + peer_connection::PeerConnectionOptions, + spawn_utils::{spawn, BlockingSpawner}, + torrent_manager::{TorrentManagerBuilder, TorrentManagerHandle}, +}; + +#[derive(Clone)] +pub enum ManagedTorrentState { + Initializing, + Running(TorrentManagerHandle), +} + +#[derive(Clone)] +pub struct ManagedTorrent { + pub info_hash: Id20, + pub output_folder: PathBuf, + pub state: ManagedTorrentState, +} + +impl PartialEq for ManagedTorrent { + fn eq(&self, other: &Self) -> bool { + self.info_hash == other.info_hash && self.output_folder == other.output_folder + } +} + +#[derive(Default)] +pub struct SessionLocked { + torrents: Vec, +} + +impl SessionLocked { + fn add_torrent(&mut self, torrent: ManagedTorrent) -> Option { + if self.torrents.contains(&torrent) { + return None; + } + let idx = self.torrents.len(); + self.torrents.push(torrent); + Some(idx) + } +} + +pub struct Session { + peer_id: Id20, + dht: Option, + peer_opts: PeerConnectionOptions, + spawner: BlockingSpawner, + locked: RwLock, + output_folder: PathBuf, +} + +async fn torrent_from_url(url: &str) -> anyhow::Result { + let response = reqwest::get(url) + .await + .with_context(|| format!("error downloading torrent metadata from {}", url))?; + if !response.status().is_success() { + anyhow::bail!("GET {} returned {}", url, response.status()) + } + let b = response + .bytes() + .await + .with_context(|| format!("error reading repsonse body from {}", url))?; + torrent_from_bytes(&b).context("error decoding torrent") +} + +fn torrent_from_file(filename: &str) -> anyhow::Result { + let mut buf = Vec::new(); + if filename == "-" { + std::io::stdin() + .read_to_end(&mut buf) + .context("error reading stdin")?; + } else { + File::open(filename) + .with_context(|| format!("error opening {}", filename))? + .read_to_end(&mut buf) + .with_context(|| format!("error reading {}", filename))?; + } + torrent_from_bytes(&buf).context("error decoding torrent") +} + +fn compute_only_files>( + torrent: &TorrentMetaV1Info, + filename_re: &str, +) -> anyhow::Result> { + let filename_re = regex::Regex::new(filename_re).context("filename regex is incorrect")?; + let mut only_files = Vec::new(); + for (idx, (filename, _)) in torrent.iter_filenames_and_lengths()?.enumerate() { + let full_path = filename + .to_pathbuf() + .with_context(|| format!("filename of file {} is not valid utf8", idx))?; + if filename_re.is_match(full_path.to_str().unwrap()) { + only_files.push(idx); + } + } + if only_files.is_empty() { + anyhow::bail!("none of the filenames match the given regex") + } + Ok(only_files) +} + +#[derive(Default)] +pub struct AddTorrentOptions { + pub only_files_regex: Option, + pub overwrite: bool, + pub list_only: bool, + pub output_folder: Option, + pub peer_opts: Option, + pub force_tracker_interval: Option, +} + +#[derive(Default)] +pub struct SessionOptions { + pub disable_dht: bool, + pub disable_dht_persistence: bool, + pub dht_config: Option, + pub peer_id: Option, + pub peer_opts: Option, +} + +impl Session { + pub async fn new(output_folder: PathBuf, spawner: BlockingSpawner) -> anyhow::Result { + Self::new_with_opts(output_folder, spawner, SessionOptions::default()).await + } + pub async fn new_with_opts( + output_folder: PathBuf, + spawner: BlockingSpawner, + opts: SessionOptions, + ) -> anyhow::Result { + let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); + let dht = if opts.disable_dht { + None + } else { + let dht = if opts.disable_dht_persistence { + Dht::new().await + } else { + PersistentDht::create(opts.dht_config).await + } + .context("error initializing DHT")?; + Some(dht) + }; + let peer_opts = opts.peer_opts.unwrap_or_default(); + + Ok(Self { + peer_id, + dht, + peer_opts, + spawner, + output_folder, + locked: RwLock::new(SessionLocked::default()), + }) + } + pub fn get_dht(&self) -> Option { + self.dht.clone() + } + pub fn with_torrents(&self, callback: F) + where + F: Fn(&[ManagedTorrent]), + { + callback(&self.locked.read().torrents) + } + pub async fn add_torrent( + &self, + url: String, + opts: Option, + ) -> anyhow::Result> { + // Magnet links are different in that we first need to discover the metadata. + let opts = opts.unwrap_or_default(); + if url.starts_with("magnet:") { + let Magnet { + info_hash, + trackers, + } = Magnet::parse(&url).context("provided path is not a valid magnet URL")?; + + let dht_rx = self + .dht + .as_ref() + .context("magnet links without DHT are not supported")? + .get_peers(info_hash) + .await?; + + let trackers = trackers + .into_iter() + .filter_map(|url| match reqwest::Url::parse(&url) { + Ok(url) => Some(url), + Err(e) => { + warn!("error parsing tracker {} as url: {}", url, e); + None + } + }) + .collect(); + + let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver( + self.peer_id, + info_hash, + dht_rx, + Some(self.peer_opts), + ) + .await + { + ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), + ReadMetainfoResult::ChannelClosed { .. } => { + anyhow::bail!("DHT died, no way to discover torrent metainfo") + } + }; + self.main_torrent_info( + info_hash, + info, + Some(dht_rx), + initial_peers.into_iter().collect(), + trackers, + opts, + ) + .await + } else { + let torrent = if url.starts_with("http://") || url.starts_with("https://") { + torrent_from_url(&url).await? + } else { + torrent_from_file(&url)? + }; + let dht_rx = match self.dht.as_ref() { + Some(dht) => Some(dht.get_peers(torrent.info_hash).await?), + None => None, + }; + let trackers = torrent + .iter_announce() + .filter_map(|tracker| { + let url = match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => url, + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + return None; + } + }; + match Url::parse(url) { + Ok(url) => Some(url), + Err(e) => { + warn!("cannot parse tracker URL {}: {}", url, e); + None + } + } + }) + .collect::>(); + self.main_torrent_info( + torrent.info_hash, + torrent.info, + dht_rx, + Vec::new(), + trackers, + opts, + ) + .await + } + } + + #[allow(clippy::too_many_arguments)] + async fn main_torrent_info( + &self, + info_hash: Id20, + info: TorrentMetaV1Info, + dht_peer_rx: Option + Unpin + Send + Sync + 'static>, + initial_peers: Vec, + trackers: Vec, + opts: AddTorrentOptions, + ) -> anyhow::Result> { + info!("Torrent info: {:#?}", &info); + let only_files = if let Some(filename_re) = opts.only_files_regex { + let only_files = compute_only_files(&info, &filename_re)?; + for (idx, (filename, _)) in info.iter_filenames_and_lengths()?.enumerate() { + if !only_files.contains(&idx) { + continue; + } + info!("Will download {:?}", filename); + } + Some(only_files) + } else { + None + }; + + if opts.list_only { + for (idx, (filename, len)) in info.iter_filenames_and_lengths()?.enumerate() { + let included = match &only_files { + Some(files) => files.contains(&idx), + None => true, + }; + info!( + "File {}, size {}{}", + filename.to_string()?, + SF::new(len), + if included { "" } else { "will skip" } + ) + } + info!("--list was passed, nothing to do, exiting."); + return Ok(None); + } + + let output_folder = opts + .output_folder + .map(PathBuf::from) + .unwrap_or_else(|| self.output_folder.clone()); + + let managed_torrent = ManagedTorrent { + info_hash, + output_folder: output_folder.clone(), + state: ManagedTorrentState::Initializing, + }; + + if self.locked.write().add_torrent(managed_torrent).is_none() { + anyhow::bail!( + "torrent with info_hash {:?} that is downloaded to {:?} is already managed", + info_hash, + &output_folder + ); + }; + + let mut builder = TorrentManagerBuilder::new(info, info_hash, output_folder.clone()); + builder + .overwrite(opts.overwrite) + .spawner(self.spawner) + .peer_id(self.peer_id); + if let Some(only_files) = only_files { + builder.only_files(only_files); + } + if let Some(interval) = opts.force_tracker_interval { + builder.force_tracker_interval(interval); + } + + if let Some(t) = opts.peer_opts.unwrap_or(self.peer_opts).connect_timeout { + builder.peer_connect_timeout(t); + } + + let handle = match builder + .start_manager() + .context("error starting torrent manager") + { + Ok(handle) => { + let mut g = self.locked.write(); + let m = g + .torrents + .iter_mut() + .find(|t| t.info_hash == info_hash && t.output_folder == output_folder) + .unwrap(); + m.state = ManagedTorrentState::Running(handle.clone()); + handle + } + Err(error) => { + let mut g = self.locked.write(); + let idx = g + .torrents + .iter() + .position(|t| t.info_hash == info_hash && t.output_folder == output_folder) + .unwrap(); + g.torrents.remove(idx); + return Err(error); + } + }; + { + let mut g = self.locked.write(); + let m = g + .torrents + .iter_mut() + .find(|t| t.info_hash == info_hash && t.output_folder == output_folder) + .unwrap(); + m.state = ManagedTorrentState::Running(handle.clone()); + } + + for url in trackers { + handle.add_tracker(url); + } + for peer in initial_peers { + handle.add_peer(peer); + } + + if let Some(mut dht_peer_rx) = dht_peer_rx { + spawn("DHT peer adder", { + let handle = handle.clone(); + async move { + while let Some(peer) = dht_peer_rx.next().await { + handle.add_peer(peer); + } + warn!("dht was closed"); + Ok(()) + } + }); + } + + Ok(Some(handle)) + } +} diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 20f42e0..7e35c1d 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -14,7 +14,7 @@ use librqbit_core::{ id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator, torrent_metainfo::TorrentMetaV1Info, }; -use log::{debug, info}; +use log::{debug, info, warn}; use parking_lot::Mutex; use reqwest::Url; use sha1w::Sha1; @@ -152,6 +152,10 @@ fn make_lengths>( Lengths::new(total_length, torrent.piece_length, None) } +fn ensure_file_length(file: &mut File, length: u64) -> anyhow::Result<()> { + Ok(file.set_len(length)?) +} + impl TorrentManager { fn start>( info: TorrentMetaV1Info, @@ -198,8 +202,10 @@ impl TorrentManager { debug!("computed lengths: {:?}", &lengths); info!("Doing initial checksum validation, this might take a while..."); - let initial_check_results = FileOps::::new(&info, &files, &lengths) - .initial_check(options.only_files.as_deref())?; + let initial_check_results = spawner.spawn_block_in_place(|| { + FileOps::::new(&info, &files, &lengths) + .initial_check(options.only_files.as_deref()) + })?; info!( "Initial check results: have {}, needed {}", @@ -207,6 +213,27 @@ impl TorrentManager { SF::new(initial_check_results.needed_bytes) ); + spawner.spawn_block_in_place(|| { + for (file, (name, length)) in + files.iter().zip(info.iter_filenames_and_lengths().unwrap()) + { + let now = Instant::now(); + if let Err(err) = ensure_file_length(&mut file.lock(), length) { + warn!( + "Error setting length for file {:?} to {}: {:#?}", + name, length, err + ); + } else { + info!( + "Set length for file {:?} to {} in {:?}", + name, + SF::new(length), + now.elapsed() + ); + } + } + }); + let chunk_tracker = ChunkTracker::new( initial_check_results.needed_pieces, initial_check_results.have_pieces, diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index a240eff..d6340be 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -349,7 +349,7 @@ impl TorrentState { fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { let g = self.locked.read(); let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; - for n in g.chunks.get_needed_pieces().iter_ones() { + for n in g.chunks.iter_needed_pieces() { if bf.get(n).map(|v| *v) == Some(true) { // in theory it should be safe without validation, but whatever. return self.lengths.validate_piece_index(n as u32); @@ -375,7 +375,7 @@ impl TorrentState { let n = { let mut n_opt = None; let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; - for n in g.chunks.get_needed_pieces().iter_ones() { + for n in g.chunks.iter_needed_pieces() { if bf.get(n).map(|v| *v) == Some(true) { n_opt = Some(n); break; diff --git a/crates/librqbit_core/src/lengths.rs b/crates/librqbit_core/src/lengths.rs index 3d5ade4..e3d7206 100644 --- a/crates/librqbit_core/src/lengths.rs +++ b/crates/librqbit_core/src/lengths.rs @@ -118,6 +118,9 @@ impl Lengths { pub const fn total_chunks(&self) -> u32 { ceil_div_u64(self.total_length, self.chunk_length as u64) as u32 } + pub const fn last_piece_id(&self) -> ValidPieceIndex { + ValidPieceIndex(self.last_piece_id) + } pub const fn total_pieces(&self) -> u32 { self.last_piece_id + 1 } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 4028a76..5f3b1b0 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -1,51 +1,16 @@ -use std::{fs::File, io::Read, net::SocketAddr, str::FromStr, time::Duration}; +use std::{net::SocketAddr, str::FromStr, sync::Arc, time::Duration}; use anyhow::Context; use clap::Clap; -use dht::{Dht, Id20, PersistentDht}; -use futures::StreamExt; use librqbit::{ - dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, - generate_peer_id, + http_api::HttpApi, peer_connection::PeerConnectionOptions, + session::{AddTorrentOptions, ManagedTorrentState, Session, SessionOptions}, spawn_utils::{spawn, BlockingSpawner}, - torrent_from_bytes, - torrent_manager::TorrentManagerBuilder, - ByteString, Magnet, TorrentMetaV1Info, TorrentMetaV1Owned, }; -use log::{info, warn}; -use reqwest::Url; +use log::info; use size_format::SizeFormatterBinary as SF; -async fn torrent_from_url(url: &str) -> anyhow::Result { - let response = reqwest::get(url) - .await - .with_context(|| format!("error downloading torrent metadata from {}", url))?; - if !response.status().is_success() { - anyhow::bail!("GET {} returned {}", url, response.status()) - } - let b = response - .bytes() - .await - .with_context(|| format!("error reading repsonse body from {}", url))?; - torrent_from_bytes(&b).context("error decoding torrent") -} - -fn torrent_from_file(filename: &str) -> anyhow::Result { - let mut buf = Vec::new(); - if filename == "-" { - std::io::stdin() - .read_to_end(&mut buf) - .context("error reading stdin")?; - } else { - File::open(filename) - .with_context(|| format!("error opening {}", filename))? - .read_to_end(&mut buf) - .with_context(|| format!("error reading {}", filename))?; - } - torrent_from_bytes(&buf).context("error decoding torrent") -} - #[derive(Debug, Clap)] enum LogLevel { Trace, @@ -121,26 +86,6 @@ struct Opts { peer_connect_timeout: Option, } -fn compute_only_files>( - torrent: &TorrentMetaV1Info, - filename_re: &str, -) -> anyhow::Result> { - let filename_re = regex::Regex::new(filename_re).context("filename regex is incorrect")?; - let mut only_files = Vec::new(); - for (idx, (filename, _)) in torrent.iter_filenames_and_lengths()?.enumerate() { - let full_path = filename - .to_pathbuf() - .with_context(|| format!("filename of file {} is not valid utf8", idx))?; - if filename_re.is_match(full_path.to_str().unwrap()) { - only_files.push(idx); - } - } - if only_files.is_empty() { - anyhow::bail!("none of the filenames match the given regex") - } - Ok(only_files) -} - fn init_logging(opts: &Opts) { if std::env::var_os("RUST_LOG").is_none() { match opts.log_level.as_ref() { @@ -193,237 +138,97 @@ fn main() -> anyhow::Result<()> { } async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> { - let peer_id = generate_peer_id(); - let dht = if opts.disable_dht { - None - } else { - let dht = if opts.disable_dht_persistence { - Dht::new().await - } else { - PersistentDht::create(None).await - } - .context("error initializing DHT")?; - Some(dht) + let sopts = SessionOptions { + disable_dht: opts.disable_dht, + disable_dht_persistence: opts.disable_dht_persistence, + dht_config: None, + peer_id: None, + peer_opts: Some(PeerConnectionOptions { + connect_timeout: opts.peer_connect_timeout.map(|d| d.0), + ..Default::default() + }), }; - let peer_opts = PeerConnectionOptions { - connect_timeout: opts.peer_connect_timeout.map(|p| p.0), + let session = Arc::new( + Session::new_with_opts(opts.output_folder.into(), spawner, sopts) + .await + .context("error initializing rqbit session")?, + ); + + let torrent_opts = AddTorrentOptions { + only_files_regex: opts.only_files_matching_regex, + overwrite: opts.overwrite, + list_only: opts.list, + force_tracker_interval: opts.force_tracker_interval.map(|d| d.0), ..Default::default() }; - // Magnet links are different in that we first need to discover the metadata. - if opts.torrent_path.starts_with("magnet:") { - let Magnet { - info_hash, - trackers, - } = Magnet::parse(&opts.torrent_path).context("provided path is not a valid magnet URL")?; - - let dht_rx = dht - .as_ref() - .ok_or_else(|| anyhow::anyhow!("magnet links without DHT are not supported"))? - .get_peers(info_hash) - .await?; - - let trackers = trackers - .into_iter() - .filter_map(|url| match reqwest::Url::parse(&url) { - Ok(url) => Some(url), - Err(e) => { - warn!("error parsing tracker {} as url: {}", url, e); - None - } - }) - .collect(); - - let (info, dht_rx, initial_peers) = - match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx, Some(peer_opts)) - .await - { - ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), - ReadMetainfoResult::ChannelClosed { .. } => { - anyhow::bail!("DHT died, no way to discover torrent metainfo") - } - }; - main_torrent_info( - opts, - info_hash, - info, - peer_id, - dht, - Some(dht_rx), - initial_peers.into_iter().collect(), - trackers, - spawner, - ) + let handle = match session + .add_torrent(opts.torrent_path, Some(torrent_opts)) .await - } else { - let torrent = if opts.torrent_path.starts_with("http://") - || opts.torrent_path.starts_with("https://") - { - torrent_from_url(&opts.torrent_path).await? - } else { - torrent_from_file(&opts.torrent_path)? - }; - let dht_rx = match dht.as_ref() { - Some(dht) => Some(dht.get_peers(torrent.info_hash).await?), - None => None, - }; - let trackers = torrent - .iter_announce() - .filter_map(|tracker| { - let url = match std::str::from_utf8(tracker.as_ref()) { - Ok(url) => url, - Err(_) => { - warn!("cannot parse tracker url as utf-8, ignoring"); - return None; - } - }; - match Url::parse(url) { - Ok(url) => Some(url), - Err(e) => { - warn!("cannot parse tracker URL {}: {}", url, e); - None - } - } - }) - .collect::>(); - main_torrent_info( - opts, - torrent.info_hash, - torrent.info, - peer_id, - dht, - dht_rx, - Vec::new(), - trackers, - spawner, - ) - .await - } -} - -#[allow(clippy::too_many_arguments)] -async fn main_torrent_info( - opts: Opts, - info_hash: Id20, - info: TorrentMetaV1Info, - peer_id: Id20, - dht: Option, - dht_peer_rx: Option + Unpin + Send + Sync + 'static>, - initial_peers: Vec, - trackers: Vec, - spawner: BlockingSpawner, -) -> anyhow::Result<()> { - info!("Torrent info: {:#?}", &info); - let only_files = if let Some(filename_re) = opts.only_files_matching_regex { - let only_files = compute_only_files(&info, &filename_re)?; - for (idx, (filename, _)) in info.iter_filenames_and_lengths()?.enumerate() { - if !only_files.contains(&idx) { - continue; - } - info!("Will download {:?}", filename); - } - Some(only_files) - } else { - None + .context("error adding torrent to session")? + { + Some(handle) => handle, + None => return Ok(()), }; - if opts.list { - for (idx, (filename, len)) in info.iter_filenames_and_lengths()?.enumerate() { - let included = match &only_files { - Some(files) => files.contains(&idx), - None => true, - }; - info!( - "File {}, size {}{}", - filename.to_string()?, - SF::new(len), - if included { "" } else { "will skip" } - ) - } - info!("--list was passed, nothing to do, exiting."); - return Ok(()); - } - - let http_api_listen_addr = opts.http_api_listen_addr; - - let mut builder = TorrentManagerBuilder::new(info, info_hash, opts.output_folder); - builder - .overwrite(opts.overwrite) - .spawner(spawner) - .peer_id(peer_id); - if let Some(only_files) = only_files { - builder.only_files(only_files); - } - if let Some(interval) = opts.force_tracker_interval { - builder.force_tracker_interval(interval.0); - } - if let Some(t) = opts.peer_connect_timeout { - builder.peer_connect_timeout(t.0); - } - - let http_api = librqbit::http_api::HttpApi::new(dht.clone()); - spawn("HTTP API", { - let http_api = http_api.clone(); - async move { http_api.make_http_api_and_run(http_api_listen_addr).await } - }); - - let handle = builder.start_manager()?; - http_api.add_mgr(handle.clone()); - - for url in trackers { - handle.add_tracker(url); - } - for peer in initial_peers { - handle.add_peer(peer); - } + { + let http_api = HttpApi::new(session.clone()); + http_api.add_mgr(handle.clone()); + spawn("HTTP API", { + let http_api_listen_addr = opts.http_api_listen_addr; + async move { http_api.make_http_api_and_run(http_api_listen_addr).await } + }); + }; spawn("Stats printer", { - let handle = handle.clone(); + let session = session.clone(); async move { loop { - let peer_stats = handle.torrent_state().peer_stats_snapshot(); - let stats = handle.torrent_state().stats_snapshot(); - let speed = handle.speed_estimator(); - let total = stats.total_bytes; - let progress = stats.total_bytes - stats.remaining_bytes; - let downloaded_pct = if stats.remaining_bytes == 0 { - 100f64 - } else { - (progress as f64 / total as f64) * 100f64 - }; - info!( - "Stats: {:.2}% ({:.2}), down speed {:.2} Mbps, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", - downloaded_pct, - SF::new(progress), - speed.download_mbps(), - SF::new(stats.fetched_bytes), - SF::new(stats.remaining_bytes), - SF::new(total), - SF::new(stats.uploaded_bytes), - peer_stats.live, - peer_stats.connecting, - peer_stats.queued, - peer_stats.seen, - ); + session.with_torrents(|torrents| { + for (idx, torrent) in torrents.iter().enumerate() { + match &torrent.state { + ManagedTorrentState::Initializing => { + info!("[{}] initializing", idx); + }, + ManagedTorrentState::Running(handle) => { + let peer_stats = handle.torrent_state().peer_stats_snapshot(); + let stats = handle.torrent_state().stats_snapshot(); + let speed = handle.speed_estimator(); + let total = stats.total_bytes; + let progress = stats.total_bytes - stats.remaining_bytes; + let downloaded_pct = if stats.remaining_bytes == 0 { + 100f64 + } else { + (progress as f64 / total as f64) * 100f64 + }; + info!( + "[{}]: {:.2}% ({:.2}), down speed {:.2} Mbps, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}", + idx, + downloaded_pct, + SF::new(progress), + speed.download_mbps(), + SF::new(stats.fetched_bytes), + SF::new(stats.remaining_bytes), + SF::new(total), + SF::new(stats.uploaded_bytes), + peer_stats.live, + peer_stats.connecting, + peer_stats.queued, + peer_stats.seen, + ); + }, + } + } + }); tokio::time::sleep(Duration::from_secs(1)).await; } } }); - if let Some(mut dht_peer_rx) = dht_peer_rx { - spawn("DHT peer adder", { - let handle = handle.clone(); - async move { - while let Some(peer) = dht_peer_rx.next().await { - handle.add_peer(peer); - } - warn!("dht was closed"); - Ok(()) - } - }); - } - - handle.wait_until_completed().await?; + handle + .wait_until_completed() + .await + .context("error waiting for torrent completion")?; Ok(()) }