From f42007f4367b1802fcdd05e7146b53f5b0caacd7 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 22:59:54 +0000 Subject: [PATCH] This one makes it better for sure --- crates/librqbit/src/session.rs | 397 +++++++++--------- .../src/tracing_subscriber_config_utils.rs | 1 + 2 files changed, 204 insertions(+), 194 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 2252332..1641a14 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -4,6 +4,7 @@ use std::{ io::{BufReader, BufWriter, Read}, net::SocketAddr, path::PathBuf, + pin::Pin, str::FromStr, sync::Arc, time::Duration, @@ -24,7 +25,7 @@ use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use buffers::{ByteBuf, ByteBufT, ByteString}; use clone_to_owned::CloneToOwned; use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; -use futures::{stream::FuturesUnordered, TryFutureExt}; +use futures::{stream::FuturesUnordered, Future, FutureExt, TryFutureExt}; use librqbit_core::{ directories::get_configuration_directory, magnet::Magnet, @@ -377,8 +378,9 @@ pub(crate) struct CheckedIncomingConnection { impl Session { /// Create a new session. The passed in folder will be used as a default unless overriden per torrent. - pub async fn new(output_folder: PathBuf) -> anyhow::Result> { - Self::new_with_opts(output_folder, SessionOptions::default()).await + #[inline(never)] + pub fn new(output_folder: PathBuf) -> Pin>>>> { + Self::new_with_opts(output_folder, SessionOptions::default()) } pub fn default_persistence_filename() -> anyhow::Result { @@ -391,93 +393,97 @@ impl Session { } /// Create a new session with options. - pub async fn new_with_opts( + #[inline(never)] + pub fn new_with_opts( output_folder: PathBuf, mut opts: SessionOptions, - ) -> anyhow::Result> { - let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); - let token = CancellationToken::new(); + ) -> Pin>>>> { + async move { + let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); + let token = CancellationToken::new(); - let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range { - let (l, p) = create_tcp_listener(port_range) - .await - .context("error listening on TCP")?; - info!("Listening on 0.0.0.0:{p} for incoming peer connections"); - (Some(l), Some(p)) - } else { - (None, None) - }; - - let dht = if opts.disable_dht { - None - } else { - let dht = if opts.disable_dht_persistence { - DhtBuilder::with_config(DhtConfig { - cancellation_token: Some(token.child_token()), - ..Default::default() - }) - .await - .context("error initializing DHT")? - } else { - let pdht_config = opts.dht_config.take().unwrap_or_default(); - PersistentDht::create(Some(pdht_config), Some(token.clone())) + let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range { + let (l, p) = create_tcp_listener(port_range) .await - .context("error initializing persistent DHT")? + .context("error listening on TCP")?; + info!("Listening on 0.0.0.0:{p} for incoming peer connections"); + (Some(l), Some(p)) + } else { + (None, None) }; - Some(dht) - }; - let peer_opts = opts.peer_opts.unwrap_or_default(); - let persistence_filename = match opts.persistence_filename { - Some(filename) => filename, - None => Self::default_persistence_filename()?, - }; - let spawner = BlockingSpawner::default(); + let dht = if opts.disable_dht { + None + } else { + let dht = if opts.disable_dht_persistence { + DhtBuilder::with_config(DhtConfig { + cancellation_token: Some(token.child_token()), + ..Default::default() + }) + .await + .context("error initializing DHT")? + } else { + let pdht_config = opts.dht_config.take().unwrap_or_default(); + PersistentDht::create(Some(pdht_config), Some(token.clone())) + .await + .context("error initializing persistent DHT")? + }; - let session = Arc::new(Self { - persistence_filename, - peer_id, - dht, - peer_opts, - spawner, - output_folder, - db: RwLock::new(Default::default()), - _cancellation_token_drop_guard: token.clone().drop_guard(), - cancellation_token: token, - tcp_listen_port, - }); + Some(dht) + }; + let peer_opts = opts.peer_opts.unwrap_or_default(); + let persistence_filename = match opts.persistence_filename { + Some(filename) => filename, + None => Self::default_persistence_filename()?, + }; + let spawner = BlockingSpawner::default(); - if let Some(tcp_listener) = tcp_listener { - session.spawn( - error_span!("tcp_listen", port = tcp_listen_port), - session.clone().task_tcp_listener(tcp_listener), - ); - } + let session = Arc::new(Self { + persistence_filename, + peer_id, + dht, + peer_opts, + spawner, + output_folder, + db: RwLock::new(Default::default()), + _cancellation_token_drop_guard: token.clone().drop_guard(), + cancellation_token: token, + tcp_listen_port, + }); - if let Some(listen_port) = tcp_listen_port { - if opts.enable_upnp_port_forwarding { + if let Some(tcp_listener) = tcp_listener { session.spawn( - error_span!("upnp_forward", port = listen_port), - session.clone().task_upnp_port_forwarder(listen_port), + error_span!("tcp_listen", port = tcp_listen_port), + session.clone().task_tcp_listener(tcp_listener), ); } - } - if opts.persistence { - info!( - "will use {:?} for session persistence", - session.persistence_filename - ); - if let Some(parent) = session.persistence_filename.parent() { - std::fs::create_dir_all(parent).with_context(|| { - format!("couldn't create directory {:?} for session storage", parent) - })?; + if let Some(listen_port) = tcp_listen_port { + if opts.enable_upnp_port_forwarding { + session.spawn( + error_span!("upnp_forward", port = listen_port), + session.clone().task_upnp_port_forwarder(listen_port), + ); + } } - let persistence_task = session.clone().task_persistence(); - session.spawn(error_span!("session_persistence"), persistence_task); - } - Ok(session) + if opts.persistence { + info!( + "will use {:?} for session persistence", + session.persistence_filename + ); + if let Some(parent) = session.persistence_filename.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!("couldn't create directory {:?} for session storage", parent) + })?; + } + let persistence_task = session.clone().task_persistence(); + session.spawn(error_span!("session_persistence"), persistence_task); + } + + Ok(session) + } + .boxed() } async fn task_persistence(self: Arc) -> anyhow::Result<()> { @@ -738,136 +744,139 @@ impl Session { } /// Add a torrent to the session. - pub async fn add_torrent( - &self, - add: AddTorrent<'_>, + pub fn add_torrent<'a>( + &'a self, + add: AddTorrent<'a>, opts: Option, - ) -> anyhow::Result { - // Magnet links are different in that we first need to discover the metadata. - let span = error_span!("add_torrent"); - let _ = span.enter(); + ) -> Pin> + Send + 'a>> { + async move { + // Magnet links are different in that we first need to discover the metadata. + let span = error_span!("add_torrent"); + let _ = span.enter(); - let opts = opts.unwrap_or_default(); + let opts = opts.unwrap_or_default(); - let announce_port = if opts.list_only { - None - } else { - self.tcp_listen_port - }; + let announce_port = if opts.list_only { + None + } else { + self.tcp_listen_port + }; - let paused = opts.list_only || opts.paused; + let paused = opts.list_only || opts.paused; - // The main difference between magnet link and torrent file, is that we need to resolve the magnet link - // into a torrent file by connecting to peers that support extended handshakes. - // So we must discover at least one peer and connect to it to be able to proceed further. + // The main difference between magnet link and torrent file, is that we need to resolve the magnet link + // into a torrent file by connecting to peers that support extended handshakes. + // So we must discover at least one peer and connect to it to be able to proceed further. - let (info_hash, info, trackers, peer_rx, initial_peers) = match add { - AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { - let magnet = - Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; - let info_hash = magnet - .as_id20() - .context("magnet link didn't contain a BTv1 infohash")?; + let (info_hash, info, trackers, peer_rx, initial_peers) = match add { + AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { + let magnet = Magnet::parse(&magnet) + .context("provided path is not a valid magnet URL")?; + let info_hash = magnet + .as_id20() + .context("magnet link didn't contain a BTv1 infohash")?; - let peer_rx = self.make_peer_rx( - info_hash, - magnet.trackers.clone(), - announce_port, - opts.force_tracker_interval, - )?; - let peer_rx = match peer_rx { - Some(peer_rx) => peer_rx, - None => bail!("can't find peers: DHT disabled and no trackers in magnet"), - }; - - debug!(?info_hash, "querying DHT"); - let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( - self.peer_id, - info_hash, - opts.initial_peers.clone().unwrap_or_default(), - peer_rx, - Some(self.merge_peer_opts(opts.peer_opts)), - ) - .await - { - ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), - ReadMetainfoResult::ChannelClosed { .. } => { - anyhow::bail!("DHT died, no way to discover torrent metainfo") - } - }; - debug!(?info, "received result from DHT"); - ( - info_hash, - info, - magnet.trackers, - Some(peer_rx), - initial_peers, - ) - } - other => { - let torrent = match other { - AddTorrent::Url(url) - if url.starts_with("http://") || url.starts_with("https://") => - { - torrent_from_url(&url).await? - } - AddTorrent::Url(url) => { - bail!( - "unsupported URL {:?}. Supporting magnet:, http:, and https", - url - ) - } - AddTorrent::TorrentFileBytes(bytes) => { - torrent_from_bytes(&bytes).context("error decoding torrent")? - } - AddTorrent::TorrentInfo(t) => *t, - }; - - let trackers = torrent - .iter_announce() - .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { - Ok(url) => Some(url.to_owned()), - Err(_) => { - warn!("cannot parse tracker url as utf-8, ignoring"); - None - } - }) - .collect::>(); - - let peer_rx = if paused { - None - } else { - self.make_peer_rx( - torrent.info_hash, - trackers.clone(), + let peer_rx = self.make_peer_rx( + info_hash, + magnet.trackers.clone(), announce_port, opts.force_tracker_interval, - )? - }; + )?; + let peer_rx = match peer_rx { + Some(peer_rx) => peer_rx, + None => bail!("can't find peers: DHT disabled and no trackers in magnet"), + }; - ( - torrent.info_hash, - torrent.info, - trackers, - peer_rx, - opts.initial_peers - .clone() - .unwrap_or_default() - .into_iter() - .collect(), - ) - } - }; + debug!(?info_hash, "querying DHT"); + let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( + self.peer_id, + info_hash, + opts.initial_peers.clone().unwrap_or_default(), + peer_rx, + Some(self.merge_peer_opts(opts.peer_opts)), + ) + .await + { + ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), + ReadMetainfoResult::ChannelClosed { .. } => { + anyhow::bail!("DHT died, no way to discover torrent metainfo") + } + }; + debug!(?info, "received result from DHT"); + ( + info_hash, + info, + magnet.trackers, + Some(peer_rx), + initial_peers, + ) + } + other => { + let torrent = match other { + AddTorrent::Url(url) + if url.starts_with("http://") || url.starts_with("https://") => + { + torrent_from_url(&url).await? + } + AddTorrent::Url(url) => { + bail!( + "unsupported URL {:?}. Supporting magnet:, http:, and https", + url + ) + } + AddTorrent::TorrentFileBytes(bytes) => { + torrent_from_bytes(&bytes).context("error decoding torrent")? + } + AddTorrent::TorrentInfo(t) => *t, + }; - self.main_torrent_info( - info_hash, - info, - trackers, - peer_rx, - initial_peers.into_iter().collect(), - opts, - ) - .await + let trackers = torrent + .iter_announce() + .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => Some(url.to_owned()), + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + None + } + }) + .collect::>(); + + let peer_rx = if paused { + None + } else { + self.make_peer_rx( + torrent.info_hash, + trackers.clone(), + announce_port, + opts.force_tracker_interval, + )? + }; + + ( + torrent.info_hash, + torrent.info, + trackers, + peer_rx, + opts.initial_peers + .clone() + .unwrap_or_default() + .into_iter() + .collect(), + ) + } + }; + + self.main_torrent_info( + info_hash, + info, + trackers, + peer_rx, + initial_peers.into_iter().collect(), + opts, + ) + .await + } + .boxed() } #[allow(clippy::too_many_arguments)] diff --git a/crates/librqbit/src/tracing_subscriber_config_utils.rs b/crates/librqbit/src/tracing_subscriber_config_utils.rs index 7cac1a2..b719a30 100644 --- a/crates/librqbit/src/tracing_subscriber_config_utils.rs +++ b/crates/librqbit/src/tracing_subscriber_config_utils.rs @@ -60,6 +60,7 @@ pub struct InitLoggingResult { pub line_broadcast: LineBroadcast, } +#[inline(never)] pub fn init_logging(opts: InitLoggingOptions) -> anyhow::Result { let stderr_filter = EnvFilter::builder() .with_default_directive(