From 456a51d4dbff90e7aabd36c794790000a3152621 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 5 Dec 2024 23:37:13 +0000 Subject: [PATCH] Split up "add_torrent" method --- crates/librqbit/src/session.rs | 232 +++++++++++++++++---------------- 1 file changed, 117 insertions(+), 115 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 8999820..1a569f0 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -40,7 +40,7 @@ use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; use futures::{ future::BoxFuture, stream::{BoxStream, FuturesUnordered}, - FutureExt, Stream, TryFutureExt, + FutureExt, Stream, StreamExt, TryFutureExt, }; use itertools::Itertools; use librqbit_core::{ @@ -58,7 +58,6 @@ use tokio::{ net::{TcpListener, TcpStream}, sync::Notify, }; -use tokio_stream::StreamExt; use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span}; use tracker_comms::TrackerComms; @@ -474,12 +473,8 @@ pub(crate) struct CheckedIncomingConnection { struct InternalAddResult { info_hash: Id20, - info: TorrentMetaV1Info, - torrent_bytes: Bytes, - info_bytes: Bytes, + metadata: Option, trackers: Vec, - peer_rx: Option, - seen_peers: Vec, } impl Session { @@ -883,15 +878,7 @@ impl Session { opts: Option, ) -> BoxFuture<'a, anyhow::Result> { async move { - // Magnet links are different in that we first need to discover the metadata. let mut opts = opts.unwrap_or_default(); - - let paused = opts.list_only || opts.paused; - - // The main difference between magnet link and torrent file, is that we need to resolve the magnet link - // into a torrent file by connecting to peers that support extended handshakes. - // So we must discover at least one peer and connect to it to be able to proceed further. - let add_res = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") || magnet.len() == 40 => { let magnet = Magnet::parse(&magnet) @@ -906,66 +893,10 @@ impl Session { } } - let peer_rx = self.make_peer_rx( + InternalAddResult { info_hash, - if opts.disable_trackers { - Default::default() - } else { - let mut trackers = magnet.trackers.clone(); - if let Some(custom_trackers) = opts.trackers.clone() { - trackers.extend(custom_trackers); - } - trackers - }, - !paused, - opts.force_tracker_interval, - opts.initial_peers.clone().unwrap_or_default() - )?.context("can't find peers: DHT is disabled, no trackers in magnet, and no initial peers provided")?; - - debug!(?info_hash, "querying DHT"); - match read_metainfo_from_peer_receiver( - self.peer_id, - info_hash, - Default::default(), - peer_rx, - Some(self.merge_peer_opts(opts.peer_opts)), - self.connector.clone(), - ) - .await - { - ReadMetainfoResult::Found { - info, - info_bytes, - rx, - seen, - } => { - trace!(?info, "received result from DHT"); - let mut trackers = magnet.trackers.into_iter().unique().collect_vec(); - if let Some(custom_trackers) = opts.trackers.clone() { - trackers.extend(custom_trackers); - } - InternalAddResult { - info_hash, - torrent_bytes: torrent_file_from_info_bytes( - &info_bytes, - &trackers, - )?, - info_bytes: info_bytes.0, - info, - trackers, - peer_rx: Some(rx), - seen_peers: { - let seen = seen.into_iter().collect_vec(); - for peer in &seen { - trace!(?peer, "seen") - } - seen - }, - } - } - ReadMetainfoResult::ChannelClosed { .. } => { - bail!("input address stream exhausted, no way to discover torrent metainfo") - } + trackers: magnet.trackers, + metadata: None, } } other => { @@ -981,12 +912,13 @@ impl Session { url ) } - AddTorrent::TorrentFileBytes(bytes) => - torrent_from_bytes(bytes) - .context("error decoding torrent")? + AddTorrent::TorrentFileBytes(bytes) => { + torrent_from_bytes(bytes).context("error decoding torrent")? + } }; - let mut trackers = torrent.info + let mut trackers = torrent + .info .iter_announce() .unique() .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { @@ -1001,35 +933,14 @@ impl Session { trackers.extend(custom_trackers); } - let peer_rx = if paused { - None - } else { - self.make_peer_rx( - torrent.info.info_hash, - if opts.disable_trackers { - Default::default() - } else { - trackers.clone() - }, - !paused, - opts.force_tracker_interval, - opts.initial_peers.clone().unwrap_or_default() - )? - }; - InternalAddResult { info_hash: torrent.info.info_hash, - info: torrent.info.info, - torrent_bytes: torrent.torrent_bytes, - info_bytes: torrent.info_bytes, + metadata: Some(TorrentMetadata::new( + torrent.info.info, + torrent.torrent_bytes, + torrent.info_bytes, + )?), trackers, - peer_rx, - seen_peers: opts - .initial_peers - .clone() - .unwrap_or_default() - .into_iter() - .collect(), } } }; @@ -1073,19 +984,58 @@ impl Session { mut opts: AddTorrentOptions, ) -> anyhow::Result { let InternalAddResult { - info, info_hash, + metadata, trackers, - peer_rx, - seen_peers, - torrent_bytes, - info_bytes, } = add_res; - trace!("Torrent info: {:#?}", &info); + let peer_stream_permanent = !opts.paused && !opts.list_only; + let make_peer_rx = || { + self.make_peer_rx( + info_hash, + trackers.clone(), + peer_stream_permanent, + opts.force_tracker_interval, + opts.initial_peers.clone().unwrap_or_default(), + ) + .context("error creating peer stream") + }; + + let mut seen_peers = Vec::new(); + + let (metadata, peer_rx) = { + match metadata { + Some(metadata) => { + let mut peer_rx = None; + if peer_stream_permanent { + peer_rx = make_peer_rx()?; + } + (metadata, peer_rx) + } + None => { + let peer_rx = make_peer_rx()?.context( + "no known way to resolve peers (no DHT, no trackers, no initial_peers)", + )?; + let resolved_magnet = self + .resolve_magnet(info_hash, peer_rx, &trackers, opts.peer_opts) + .await?; + seen_peers = resolved_magnet.seen_peers.clone(); + let peer_rx = Some( + merge_streams( + resolved_magnet.peer_rx, + futures::stream::iter(resolved_magnet.seen_peers), + ) + .boxed(), + ); + (resolved_magnet.metadata, peer_rx) + } + } + }; + + trace!("Torrent metadata: {:#?}", &metadata.info); let only_files = compute_only_files( - &info, + &metadata.info, opts.only_files, opts.only_files_regex, opts.list_only, @@ -1093,7 +1043,7 @@ impl Session { let output_folder = match (opts.output_folder, opts.sub_folder) { (None, None) => self.output_folder.join( - self.get_default_subfolder_for_torrent(&info)? + self.get_default_subfolder_for_torrent(&metadata.info)? .unwrap_or_default(), ), (Some(o), None) => PathBuf::from(o), @@ -1112,11 +1062,11 @@ impl Session { if opts.list_only { return Ok(AddTorrentResponse::ListOnly(ListOnlyResponse { info_hash, - info, + info: metadata.info, only_files, output_folder, seen_peers, - torrent_bytes, + torrent_bytes: metadata.torrent_bytes, })); } @@ -1143,7 +1093,7 @@ impl Session { let span = error_span!(parent: self.rs(), "torrent", id); let peer_opts = self.merge_peer_opts(opts.peer_opts); - let metadata = Arc::new(TorrentMetadata::new(info, torrent_bytes, info_bytes)?); + let metadata = Arc::new(metadata); let minfo = Arc::new(ManagedTorrentShared { id, span, @@ -1411,6 +1361,58 @@ impl Session { pub fn tcp_listen_port(&self) -> Option { self.tcp_listen_port } + + async fn resolve_magnet( + self: &Arc, + info_hash: Id20, + peer_rx: PeerStream, + trackers: &[String], + peer_opts: Option, + ) -> anyhow::Result { + match read_metainfo_from_peer_receiver( + self.peer_id, + info_hash, + Default::default(), + peer_rx, + Some(self.merge_peer_opts(peer_opts)), + self.connector.clone(), + ) + .await + { + ReadMetainfoResult::Found { + info, + info_bytes, + rx, + seen, + } => { + trace!(?info, "received result from DHT"); + Ok(ResolveMagnetResult { + metadata: TorrentMetadata::new( + info, + torrent_file_from_info_bytes(&info_bytes, trackers)?, + info_bytes.0, + )?, + peer_rx: rx, + seen_peers: { + let seen = seen.into_iter().collect_vec(); + for peer in &seen { + trace!(?peer, "seen") + } + seen + }, + }) + } + ReadMetainfoResult::ChannelClosed { .. } => { + bail!("input address stream exhausted, no way to discover torrent metainfo") + } + } + } +} + +pub(crate) struct ResolveMagnetResult { + pub metadata: TorrentMetadata, + pub peer_rx: PeerStream, + pub seen_peers: Vec, } fn remove_files_and_dirs(infos: &FileInfos, files: &dyn TorrentStorage) {