From 1b79b66cc34e98e6505ff9d06690b9498de5f6e9 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 27 Feb 2024 08:00:56 +0000 Subject: [PATCH] Reduce compile times even more --- crates/dht/src/dht.rs | 83 +++++++++-------- crates/dht/src/persistence.rs | 168 ++++++++++++++++++---------------- 2 files changed, 135 insertions(+), 116 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index d983f9c..06cc2ca 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -1,6 +1,7 @@ use std::{ cmp::Reverse, net::SocketAddr, + pin::Pin, str::FromStr, sync::{ atomic::{AtomicU16, Ordering}, @@ -23,7 +24,7 @@ use anyhow::{bail, Context}; use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use bencode::ByteString; use dashmap::DashMap; -use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; +use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt, TryFutureExt}; use leaky_bucket::RateLimiter; use librqbit_core::{ @@ -232,6 +233,7 @@ impl Drop for RequestPeersStream { impl Stream for RequestPeersStream { type Item = SocketAddr; + #[inline(never)] fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -1144,49 +1146,56 @@ impl DhtState { &self.cancellation_token } - pub async fn with_config(mut config: DhtConfig) -> anyhow::Result> { - let socket = match config.listen_addr { - Some(addr) => UdpSocket::bind(addr) - .await - .with_context(|| format!("error binding socket, address {addr}")), - None => UdpSocket::bind("0.0.0.0:0") - .await - .context("error binding socket, address 0.0.0.0:0"), - }?; + #[inline(never)] + pub fn with_config( + mut config: DhtConfig, + ) -> Pin>> + Send>> { + async move { + let socket = match config.listen_addr { + Some(addr) => UdpSocket::bind(addr) + .await + .with_context(|| format!("error binding socket, address {addr}")), + None => UdpSocket::bind("0.0.0.0:0") + .await + .context("error binding socket, address 0.0.0.0:0"), + }?; - let listen_addr = socket - .local_addr() - .context("cannot determine UDP listen addr")?; - info!("DHT listening on {:?}", listen_addr); + let listen_addr = socket + .local_addr() + .context("cannot determine UDP listen addr")?; + info!("DHT listening on {:?}", listen_addr); - let peer_id = config.peer_id.unwrap_or_else(generate_peer_id); - info!("starting up DHT with peer id {:?}", peer_id); - let bootstrap_addrs = config - .bootstrap_addrs - .unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect()); + let peer_id = config.peer_id.unwrap_or_else(generate_peer_id); + info!("starting up DHT with peer id {:?}", peer_id); + let bootstrap_addrs = config + .bootstrap_addrs + .unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect()); - let token = config.cancellation_token.take().unwrap_or_default(); + let token = config.cancellation_token.take().unwrap_or_default(); - let (in_tx, in_rx) = unbounded_channel(); - let state = Arc::new(Self::new_internal( - peer_id, - in_tx, - config.routing_table, - listen_addr, - config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), - token, - )); + let (in_tx, in_rx) = unbounded_channel(); + let state = Arc::new(Self::new_internal( + peer_id, + in_tx, + config.routing_table, + listen_addr, + config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), + token, + )); - spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), { - let state = state.clone(); - async move { - let worker = DhtWorker { socket, dht: state }; - worker.start(in_rx, &bootstrap_addrs).await - } - }); - Ok(state) + spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), { + let state = state.clone(); + async move { + let worker = DhtWorker { socket, dht: state }; + worker.start(in_rx, &bootstrap_addrs).await + } + }); + Ok(state) + } + .boxed() } + #[inline(never)] pub fn get_peers( self: &Arc, info_hash: Id20, diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index 0f15236..0c73b05 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -1,5 +1,6 @@ // TODO: this now stores only the routing table, but we also need AT LEAST the same socket address... +use futures::{Future, FutureExt}; use librqbit_core::directories::get_configuration_directory; use librqbit_core::spawn_utils::spawn_with_cancel; use serde::{Deserialize, Serialize}; @@ -7,6 +8,7 @@ use std::fs::OpenOptions; use std::io::{BufReader, BufWriter}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::time::Duration; use tokio_util::sync::CancellationToken; @@ -75,94 +77,102 @@ impl PersistentDht { Ok(path) } - pub async fn create( + #[inline(never)] + pub fn create( config: Option, cancellation_token: Option, - ) -> anyhow::Result { - let mut config = config.unwrap_or_default(); - let config_filename = match config.config_filename.take() { - Some(config_filename) => config_filename, - None => Self::default_persistence_filename()?, - }; + ) -> Pin> + Send>> { + async move { + let mut config = config.unwrap_or_default(); + let config_filename = match config.config_filename.take() { + Some(config_filename) => config_filename, + None => Self::default_persistence_filename()?, + }; - info!( - filename=?config_filename, - "will store DHT routing table periodically", - ); + info!( + filename=?config_filename, + "will store DHT routing table periodically", + ); - if let Some(parent) = config_filename.parent() { - std::fs::create_dir_all(parent) - .with_context(|| format!("error creating dir {:?}", &parent))?; - } - - let de = match OpenOptions::new().read(true).open(&config_filename) { - Ok(dht_json) => { - let reader = BufReader::new(dht_json); - match serde_json::from_reader::<_, DhtSerialize>(reader) { - Ok(r) => { - info!(filename=?config_filename, "loaded DHT routing table from"); - Some(r) - } - Err(e) => { - warn!( - filename=?config_filename, - "cannot deserialize routing table: {:#}", - e - ); - None - } - } + if let Some(parent) = config_filename.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("error creating dir {:?}", &parent))?; } - Err(e) => match e.kind() { - std::io::ErrorKind::NotFound => None, - _ => return Err(e).with_context(|| format!("error reading {config_filename:?}")), - }, - }; - let (listen_addr, routing_table, peer_store) = de - .map(|de| (Some(de.addr), Some(de.table), de.peer_store)) - .unwrap_or((None, None, None)); - let peer_id = routing_table.as_ref().map(|r| r.id()); - let dht_config = DhtConfig { - peer_id, - routing_table, - listen_addr, - peer_store, - cancellation_token, - ..Default::default() - }; - let dht = DhtState::with_config(dht_config).await?; - spawn_with_cancel( - error_span!("dht_persistence"), - dht.cancellation_token().clone(), - { - let dht = dht.clone(); - let dump_interval = config - .dump_interval - .unwrap_or_else(|| Duration::from_secs(3)); - async move { - let tempfile_name = { - let file_name = format!("dht.json.tmp.{}", std::process::id()); - let mut tmp = config_filename.clone(); - tmp.set_file_name(file_name); - tmp - }; - - loop { - trace!("sleeping for {:?}", &dump_interval); - tokio::time::sleep(dump_interval).await; - - match dump_dht(&dht, &config_filename, &tempfile_name) { - Ok(_) => trace!(filename=?config_filename, "dumped DHT"), - Err(e) => { - error!(filename=?config_filename, "error dumping DHT: {:#}", e) - } + let de = match OpenOptions::new().read(true).open(&config_filename) { + Ok(dht_json) => { + let reader = BufReader::new(dht_json); + match serde_json::from_reader::<_, DhtSerialize>( + reader, + ) { + Ok(r) => { + info!(filename=?config_filename, "loaded DHT routing table from"); + Some(r) + } + Err(e) => { + warn!( + filename=?config_filename, + "cannot deserialize routing table: {:#}", + e + ); + None } } } - }, - ); + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => None, + _ => { + return Err(e).with_context(|| format!("error reading {config_filename:?}")) + } + }, + }; + let (listen_addr, routing_table, peer_store) = de + .map(|de| (Some(de.addr), Some(de.table), de.peer_store)) + .unwrap_or((None, None, None)); + let peer_id = routing_table.as_ref().map(|r| r.id()); - Ok(dht) + let dht_config = DhtConfig { + peer_id, + routing_table, + listen_addr, + peer_store, + cancellation_token, + ..Default::default() + }; + let dht = DhtState::with_config(dht_config).await?; + spawn_with_cancel( + error_span!("dht_persistence"), + dht.cancellation_token().clone(), + { + let dht = dht.clone(); + let dump_interval = config + .dump_interval + .unwrap_or_else(|| Duration::from_secs(3)); + async move { + let tempfile_name = { + let file_name = format!("dht.json.tmp.{}", std::process::id()); + let mut tmp = config_filename.clone(); + tmp.set_file_name(file_name); + tmp + }; + + loop { + trace!("sleeping for {:?}", &dump_interval); + tokio::time::sleep(dump_interval).await; + + match dump_dht(&dht, &config_filename, &tempfile_name) { + Ok(_) => trace!(filename=?config_filename, "dumped DHT"), + Err(e) => { + error!(filename=?config_filename, "error dumping DHT: {:#}", e) + } + } + } + } + }, + ); + + Ok(dht) + } + .boxed() } }