From 18f22cf323f5cf9daa68b4a4571a42575e87a403 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 20:45:21 +0000 Subject: [PATCH 01/10] Simplify cancellation as peer_rx doesnt need a token no longer --- Cargo.lock | 1 + crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/session.rs | 45 +++------ crates/librqbit/src/tracker_comms.rs | 144 ++++++++++++++++----------- crates/librqbit/src/type_aliases.rs | 2 +- 5 files changed, 103 insertions(+), 90 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 571545d..08ff622 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1257,6 +1257,7 @@ name = "librqbit" version = "5.5.0" dependencies = [ "anyhow", + "async-stream", "axum 0.7.4", "backoff", "base64", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 9b54625..b840cf8 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -68,6 +68,7 @@ serde_with = "3.4.0" tokio-util = "0.7.10" bytes = "1.5.0" rlimit = "0.10.1" +async-stream = "0.3.5" [dev-dependencies] futures = {version = "0.3"} diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 95d65ea..529be59 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -756,15 +756,13 @@ impl Session { self.tcp_listen_port }; - let cancellation_token = self.cancellation_token.child_token(); - let cancellation_token_drop_guard = cancellation_token.clone().drop_guard(); let paused = opts.list_only || opts.paused; // The main difference between magnet link and torrent file, is that we need to resolve the magnet link // into a torrent file by connecting to peers that support extended handshakes. // So we must discover at least one peer and connect to it to be able to proceed further. - let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add { + let (info_hash, info, trackers, peer_rx, initial_peers) = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { let magnet = Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; @@ -772,11 +770,9 @@ impl Session { .as_id20() .context("magnet link didn't contain a BTv1 infohash")?; - let peer_token = cancellation_token.child_token(); let peer_rx = self.make_peer_rx( info_hash, magnet.trackers.clone(), - peer_token.clone(), announce_port, opts.force_tracker_interval, )?; @@ -800,9 +796,6 @@ impl Session { anyhow::bail!("DHT died, no way to discover torrent metainfo") } }; - if paused { - peer_token.cancel(); - } debug!(?info, "received result from DHT"); ( info_hash, @@ -810,7 +803,6 @@ impl Session { magnet.trackers, Some(peer_rx), initial_peers, - cancellation_token, ) } other => { @@ -849,7 +841,6 @@ impl Session { self.make_peer_rx( torrent.info_hash, trackers.clone(), - cancellation_token.clone(), announce_port, opts.force_tracker_interval, )? @@ -865,13 +856,10 @@ impl Session { .unwrap_or_default() .into_iter() .collect(), - cancellation_token, ) } }; - cancellation_token_drop_guard.disarm(); - self.main_torrent_info( info_hash, info, @@ -879,7 +867,6 @@ impl Session { peer_rx, initial_peers.into_iter().collect(), opts, - cancellation_token, ) .await } @@ -893,12 +880,9 @@ impl Session { peer_rx: Option, initial_peers: Vec, opts: AddTorrentOptions, - cancellation_token: CancellationToken, ) -> anyhow::Result { debug!("Torrent info: {:#?}", &info); - let drop_guard = cancellation_token.clone().drop_guard(); - let get_only_files = |only_files: Option>, only_files_regex: Option, list_only: bool| { match (only_files, only_files_regex) { @@ -1016,20 +1000,16 @@ impl Session { let span = managed_torrent.info.span.clone(); let _ = span.enter(); - // Just in case, cancel all tasks started for this torrent so far. - // This is defensive, and not proven necessary. - let token = if opts.paused { - cancellation_token.cancel(); - self.cancellation_token.child_token() - } else { - cancellation_token - }; managed_torrent - .start(initial_peers, peer_rx, opts.paused, token) + .start( + initial_peers, + peer_rx, + opts.paused, + self.cancellation_token.child_token(), + ) .context("error starting torrent")?; } - drop_guard.disarm(); Ok(AddTorrentResponse::Added(id, managed_torrent)) } @@ -1080,7 +1060,6 @@ impl Session { &self, info_hash: Id20, trackers: Vec, - cancel: CancellationToken, announce_port: Option, force_tracker_interval: Option, ) -> anyhow::Result> { @@ -1097,7 +1076,6 @@ impl Session { // TODO: report actual bytes, not zeroes. Box::new(()), force_tracker_interval, - cancel, announce_port, ); @@ -1111,15 +1089,18 @@ impl Session { } pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { - let token = self.cancellation_token.child_token(); let peer_rx = self.make_peer_rx( handle.info_hash(), handle.info().trackers.clone().into_iter().collect(), - token.clone(), self.tcp_listen_port, handle.info().options.force_tracker_interval, )?; - handle.start(Default::default(), peer_rx, false, token)?; + handle.start( + Default::default(), + peer_rx, + false, + self.cancellation_token.child_token(), + )?; Ok(()) } } diff --git a/crates/librqbit/src/tracker_comms.rs b/crates/librqbit/src/tracker_comms.rs index d1a7813..dae75ef 100644 --- a/crates/librqbit/src/tracker_comms.rs +++ b/crates/librqbit/src/tracker_comms.rs @@ -4,13 +4,15 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; +use futures::future::Either; +use futures::stream::FuturesUnordered; +use futures::FutureExt; use futures::Stream; -use librqbit_core::spawn_utils::spawn_with_cancel; -use tokio_util::sync::CancellationToken; +use futures::StreamExt; use tracing::debug; use tracing::error_span; -use tracing::info; use tracing::trace; +use tracing::Instrument; use url::Url; use crate::tracker_comms_http; @@ -22,7 +24,6 @@ pub struct TrackerComms { peer_id: Id20, stats: Box, force_tracker_interval: Option, - cancellation_token: CancellationToken, tx: Sender, tcp_listen_port: Option, } @@ -64,69 +65,98 @@ impl TrackerComms { trackers: Vec, stats: Box, force_interval: Option, - cancellation_token: CancellationToken, tcp_listen_port: Option, - ) -> Option + Send + Sync + Unpin + 'static> { - let (tx, rx) = tokio::sync::mpsc::channel::(16); - let comms = Arc::new(Self { - info_hash, - peer_id, - stats, - force_tracker_interval: force_interval, - cancellation_token, - tx, - tcp_listen_port, - }); - let mut added = false; - for tracker in trackers { - if let Err(e) = comms.clone().add_tracker(&tracker) { - info!(tracker = tracker, "error adding tracker: {:#}", e) - } else { - added = true; - } - } - if !added { + ) -> Option + Unpin + Send + 'static> { + let trackers = trackers + .into_iter() + .filter_map(|t| match Url::parse(&t) { + Ok(parsed) => Some(parsed), + Err(e) => { + debug!("error parsing tracker URL: {}", e); + None + } + }) + .collect::>(); + if trackers.is_empty() { return None; } - Some(tokio_stream::wrappers::ReceiverStream::new(rx)) + + let (tx, mut rx) = tokio::sync::mpsc::channel::(16); + + let s = async_stream::stream! { + use futures::StreamExt; + let mut rx_done = false; + let comms = Arc::new(Self { + info_hash, + peer_id, + stats, + force_tracker_interval: force_interval, + tx, + tcp_listen_port, + }); + let mut futures = FuturesUnordered::new(); + for tracker in trackers { + if let Ok(fut) = comms.add_tracker(tracker) { + futures.push(fut); + } + } + if futures.is_empty() { + return; + } + while !(futures.is_empty() && rx_done) { + tokio::select! { + addr = rx.recv(), if !rx_done => { + match addr { + Some(addr) => yield addr, + None => rx_done = true + } + } + e = futures.next(), if !futures.is_empty() => { + if let Some(Err(e)) = e { + debug!("error: {e}"); + } + } + } + } + }; + + Some(s.boxed()) } - fn add_tracker(self: Arc, tracker: &str) -> anyhow::Result<()> { - if tracker.starts_with("http://") || tracker.starts_with("https://") { - spawn_with_cancel( - error_span!( - parent: None, - "http_tracker", - tracker = tracker, - info_hash = ?self.info_hash - ), - self.cancellation_token.clone(), - { - let comms = self; - let url = Url::parse(tracker).context("can't parse URL")?; - async move { comms.task_single_tracker_monitor_http(url).await } - }, - ); - } else if tracker.starts_with("udp://") { - spawn_with_cancel( - error_span!(parent: None, "udp_tracker", tracker = tracker, info_hash = ?self.info_hash), - self.cancellation_token.clone(), - { - let comms = self; - let url = Url::parse(tracker).context("can't parse URL")?; - async move { comms.task_single_tracker_monitor_udp(url).await } - }, + fn add_tracker( + &self, + url: Url, + ) -> anyhow::Result< + Either< + impl std::future::Future> + '_ + Send, + impl std::future::Future> + '_ + Send, + >, + > { + let info_hash = self.info_hash; + if url.scheme() == "http" || url.scheme() == "https" { + let span = error_span!( + parent: None, + "http_tracker", + tracker = %url, + info_hash = ?info_hash ); + Ok(self + .task_single_tracker_monitor_http(url) + .instrument(span) + .left_future()) + } else if url.scheme() == "udp" { + let span = + error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash); + Ok(self + .task_single_tracker_monitor_udp(url) + .instrument(span) + .right_future()) } else { - bail!("unsupported tracker url {}", tracker) + bail!("unsupported tracker url {}", url) } - Ok(()) } - async fn task_single_tracker_monitor_http( - self: Arc, - mut tracker_url: Url, - ) -> anyhow::Result<()> { + async fn task_single_tracker_monitor_http(&self, mut tracker_url: Url) -> anyhow::Result<()> { let mut event = Some(tracker_comms_http::TrackerRequestEvent::Started); loop { let stats = self.stats.get(); diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs index 2b6efa9..d68f5bc 100644 --- a/crates/librqbit/src/type_aliases.rs +++ b/crates/librqbit/src/type_aliases.rs @@ -5,4 +5,4 @@ use futures::Stream; pub type BF = bitvec::vec::BitVec; pub type PeerHandle = SocketAddr; -pub type PeerStream = Box + Unpin + Send + Sync + 'static>; +pub type PeerStream = Box + Unpin + Send + 'static>; From 15c078619c1787a819994554581ee9168e804266 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 22:41:36 +0000 Subject: [PATCH 02/10] Compile times are even worse now --- Cargo.lock | 21 ++++++++++++++ Cargo.toml | 4 +-- crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/lib.rs | 3 -- crates/librqbit/src/session.rs | 23 +++++++-------- crates/tracker_comms/Cargo.toml | 27 ++++++++++++++++++ .../test/udp-tracker-announce-response.bin | Bin 0 -> 1220 bytes crates/tracker_comms/src/lib.rs | 5 ++++ .../src/tracker_comms.rs | 0 .../src/tracker_comms_http.rs | 0 .../src/tracker_comms_udp.rs | 10 +++---- 11 files changed, 72 insertions(+), 22 deletions(-) create mode 100644 crates/tracker_comms/Cargo.toml create mode 100644 crates/tracker_comms/resources/test/udp-tracker-announce-response.bin create mode 100644 crates/tracker_comms/src/lib.rs rename crates/{librqbit => tracker_comms}/src/tracker_comms.rs (100%) rename crates/{librqbit => tracker_comms}/src/tracker_comms_http.rs (100%) rename crates/{librqbit => tracker_comms}/src/tracker_comms_udp.rs (97%) diff --git a/Cargo.lock b/Cargo.lock index 08ff622..f967a0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1278,6 +1278,7 @@ dependencies = [ "librqbit-dht", "librqbit-peer-protocol", "librqbit-sha1-wrapper", + "librqbit-tracker-comms", "librqbit-upnp", "openssl", "parking_lot", @@ -1397,6 +1398,26 @@ dependencies = [ "sha1", ] +[[package]] +name = "librqbit-tracker-comms" +version = "1.0.0" +dependencies = [ + "anyhow", + "async-stream", + "byteorder", + "futures", + "librqbit-bencode", + "librqbit-buffers", + "librqbit-core", + "rand", + "reqwest", + "serde", + "tokio", + "tracing", + "url", + "urlencoding", +] + [[package]] name = "librqbit-upnp" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a3a89a4..22f0267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ "crates/peer_binary_protocol", "crates/dht", "crates/upnp" -] +, "crates/tracker_comms"] [profile.dev] panic = "abort" @@ -22,4 +22,4 @@ debug = true [profile.release-github] inherits = "release" -debug = false \ No newline at end of file +debug = false diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index b840cf8..e05ceb5 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -23,6 +23,7 @@ rust-tls = ["reqwest/rustls-tls"] [dependencies] bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"} +tracker_comms = {path = "../tracker_comms", default-features=false, package="librqbit-tracker-comms", version="1.0.0"} buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"} librqbit-core = {path = "../librqbit_core", version = "3.5.0"} clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 817c086..d16c427 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -36,9 +36,6 @@ mod session; mod spawn_utils; mod torrent_state; pub mod tracing_subscriber_config_utils; -pub mod tracker_comms; -pub mod tracker_comms_http; -pub mod tracker_comms_udp; mod type_aliases; pub use api::Api; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 529be59..2252332 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -9,6 +9,16 @@ use std::{ time::Duration, }; +use crate::{ + dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, + peer_connection::PeerConnectionOptions, + read_buf::ReadBuf, + spawn_utils::BlockingSpawner, + torrent_state::{ + ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, + }, + type_aliases::PeerStream, +}; use anyhow::{bail, Context}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use buffers::{ByteBuf, ByteBufT, ByteString}; @@ -32,18 +42,7 @@ use tokio::net::{TcpListener, TcpStream}; use tokio_stream::StreamExt; use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; - -use crate::{ - dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, - peer_connection::PeerConnectionOptions, - read_buf::ReadBuf, - spawn_utils::BlockingSpawner, - torrent_state::{ - ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, - }, - tracker_comms::TrackerComms, - type_aliases::PeerStream, -}; +use tracker_comms::TrackerComms; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; diff --git a/crates/tracker_comms/Cargo.toml b/crates/tracker_comms/Cargo.toml new file mode 100644 index 0000000..98329ec --- /dev/null +++ b/crates/tracker_comms/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "librqbit-tracker-comms" +version = "1.0.0" +edition = "2018" +description = "Common interface around various sha1 implementations used in rqbit torrent client." +license = "Apache-2.0" +documentation = "https://docs.rs/librqbit-tracker-comms" +repository = "https://github.com/ikatson/rqbit" +readme = "README.md" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = "1" +anyhow = "1" +futures = "0.3" +async-stream = "0.3.5" +buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"} +librqbit-core = {path = "../librqbit_core", version = "3.5.0"} +byteorder = "1.5" +serde = {version = "1", features=["derive"]} +urlencoding = "2" +rand = "0.8" +tracing = "0.1.40" +reqwest = {version="0.11.22", default-features=false, features = ["json"]} +bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"} +url = "2" \ No newline at end of file diff --git a/crates/tracker_comms/resources/test/udp-tracker-announce-response.bin b/crates/tracker_comms/resources/test/udp-tracker-announce-response.bin new file mode 100644 index 0000000000000000000000000000000000000000..4b1bc3aa41c1c1041424a3e05d07afc867a90614 GIT binary patch literal 1220 zcmV;#1Uvfx0005yl-(2n00wjb0099201XW;06%FO;aa^eBT~JI)Uu;p4zg9SG!TmLd0%M0?s}SDDZaHjGh6NClg6@HZ7+t7PE(7s zImp#dp>-^V1*FIB4nUbD<4I1H*;A?*x;cd9UF^5|P_mz-SK64$b-86BNA9bSV41RCL7|N2cT zDOn_>T6i)~i{CGYc0P>sJ29&+@-r6)v?YueV6R>U14pb2Z5>!KQht(K` zDoOvmq6|_nh`a!)Ro=X^%={`wTC!QJ3QH)mLW3rMNU+pj`B9g!z?qz8d2Rekgz@ZO zns#P`j9(hlva)1dt7g77@?3n)h|^?Q7~{w?W>D>17goMct8tlXmQ8HcNvH)bioW9- z;U(5cCK}-aKV_$@4AtJ^cXX4&yOp;_6uiF`GWg8MS6#r2bJT6MxlTE?eAJSgN8|b(*-&=2UGxpR1};qInr~C{R$u7XGR}*}fQJqb>k62v`s$ zaK6?tXeBw3#VA$<|G$ywh6Rh_yoTP?+~q-IxYQuP$f`TM^oJwfDZC-+X#Z5KZ>;Ue z)ouLNSVQMu-NkhZL0?WV75ar=JHuYY5?>`pS}Y)68cM<7EngY1-GwJ!8RPm@2woII zH7fpF)`^;jc3Js=%Dc)~ljwt>`&Enq9i2s0bzMGL6I3VkZ(B%Di9K8O1l*2*165+*L zp}91T=Z5*Rn-O{7$*^>#+mf2Fb7Al4RIp}5)F)eQ{E(Q=VQr(>-;pR^8PE8FuU{D? zT-Bpq6Wq|J99%v-F-Y=UC62OH`d9fq(!wuPHU)Thl23j}PN*d=@(VISqAsm8rdKpB ziV-483oa@+lgQO2PFajCTLt7%>l)z&rH_z48(Y7EWxt46@p1RA?Zwn;u`&vaoRYDMU)KsYl)b literal 0 HcmV?d00001 diff --git a/crates/tracker_comms/src/lib.rs b/crates/tracker_comms/src/lib.rs new file mode 100644 index 0000000..74cc980 --- /dev/null +++ b/crates/tracker_comms/src/lib.rs @@ -0,0 +1,5 @@ +mod tracker_comms; +mod tracker_comms_http; +mod tracker_comms_udp; + +pub use tracker_comms::*; diff --git a/crates/librqbit/src/tracker_comms.rs b/crates/tracker_comms/src/tracker_comms.rs similarity index 100% rename from crates/librqbit/src/tracker_comms.rs rename to crates/tracker_comms/src/tracker_comms.rs diff --git a/crates/librqbit/src/tracker_comms_http.rs b/crates/tracker_comms/src/tracker_comms_http.rs similarity index 100% rename from crates/librqbit/src/tracker_comms_http.rs rename to crates/tracker_comms/src/tracker_comms_http.rs diff --git a/crates/librqbit/src/tracker_comms_udp.rs b/crates/tracker_comms/src/tracker_comms_udp.rs similarity index 97% rename from crates/librqbit/src/tracker_comms_udp.rs rename to crates/tracker_comms/src/tracker_comms_udp.rs index 1e72ae8..35af733 100644 --- a/crates/librqbit/src/tracker_comms_udp.rs +++ b/crates/tracker_comms/src/tracker_comms_udp.rs @@ -12,9 +12,9 @@ const ACTION_ANNOUNCE: u32 = 1; // const ACTION_ERROR: u32 = 3; pub const EVENT_NONE: u32 = 0; -pub const EVENT_COMPLETED: u32 = 1; -pub const EVENT_STARTED: u32 = 2; -pub const EVENT_STOPPED: u32 = 3; +// pub const EVENT_COMPLETED: u32 = 1; +// pub const EVENT_STARTED: u32 = 2; +// pub const EVENT_STOPPED: u32 = 3; pub type ConnectionId = u64; const CONNECTION_ID_MAGIC: ConnectionId = 0x41727101980; @@ -293,7 +293,7 @@ mod tests { Response::Connect(connection_id) => { dbg!(connection_id) } - other => panic!("unexpected response {other:?}"), + other => panic!("unexpected response {:?}", other), }; let hash = Id20::from_str("775459190aa65566591634203f8d9f17d341f969").unwrap(); @@ -335,7 +335,7 @@ mod tests { Response::Announce(r) => { dbg!(r); } - other => panic!("unexpected response {other:?}"), + other => panic!("unexpected response {:?}", other), } } } From 2a371537fed8e33e767908f14c5db35de8b5c7d7 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 22:49:16 +0000 Subject: [PATCH 03/10] Nothing... --- crates/tracker_comms/src/tracker_comms.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/tracker_comms/src/tracker_comms.rs b/crates/tracker_comms/src/tracker_comms.rs index dae75ef..cf675ba 100644 --- a/crates/tracker_comms/src/tracker_comms.rs +++ b/crates/tracker_comms/src/tracker_comms.rs @@ -1,4 +1,5 @@ use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -66,7 +67,7 @@ impl TrackerComms { stats: Box, force_interval: Option, tcp_listen_port: Option, - ) -> Option + Unpin + Send + 'static> { + ) -> Option + Send + 'static>>> { let trackers = trackers .into_iter() .filter_map(|t| match Url::parse(&t) { From 15d17355b56cd9e0c0addb84ba4f28540c9d083b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 22:52:53 +0000 Subject: [PATCH 04/10] Async http server. Even slower --- crates/librqbit/src/http_api.rs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 09f95a9..a2f1abe 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -3,11 +3,12 @@ use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; use axum::routing::{get, post}; -use futures::TryStreamExt; +use futures::{Future, FutureExt, TryStreamExt}; use itertools::Itertools; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; +use std::pin::Pin; use std::str::FromStr; use std::time::Duration; use tracing::{debug, info}; @@ -44,7 +45,10 @@ impl HttpApi { /// Run the HTTP server forever on the given address. /// If read_only is passed, no state-modifying methods will be exposed. - pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { + pub fn make_http_api_and_run( + self, + addr: SocketAddr, + ) -> Pin> + Send>> { let state = self.inner; async fn api_root() -> impl IntoResponse { @@ -288,11 +292,15 @@ impl HttpApi { info!(%addr, "starting HTTP server"); use tokio::net::TcpListener; - let listener = TcpListener::bind(&addr) - .await - .with_context(|| format!("error binding to {addr}"))?; - axum::serve(listener, app).await?; - Ok(()) + + async move { + let listener = TcpListener::bind(&addr) + .await + .with_context(|| format!("error binding to {addr}"))?; + axum::serve(listener, app).await?; + Ok(()) + } + .boxed() } } From f42007f4367b1802fcdd05e7146b53f5b0caacd7 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 22:59:54 +0000 Subject: [PATCH 05/10] This one makes it better for sure --- crates/librqbit/src/session.rs | 397 +++++++++--------- .../src/tracing_subscriber_config_utils.rs | 1 + 2 files changed, 204 insertions(+), 194 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 2252332..1641a14 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -4,6 +4,7 @@ use std::{ io::{BufReader, BufWriter, Read}, net::SocketAddr, path::PathBuf, + pin::Pin, str::FromStr, sync::Arc, time::Duration, @@ -24,7 +25,7 @@ use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use buffers::{ByteBuf, ByteBufT, ByteString}; use clone_to_owned::CloneToOwned; use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; -use futures::{stream::FuturesUnordered, TryFutureExt}; +use futures::{stream::FuturesUnordered, Future, FutureExt, TryFutureExt}; use librqbit_core::{ directories::get_configuration_directory, magnet::Magnet, @@ -377,8 +378,9 @@ pub(crate) struct CheckedIncomingConnection { impl Session { /// Create a new session. The passed in folder will be used as a default unless overriden per torrent. - pub async fn new(output_folder: PathBuf) -> anyhow::Result> { - Self::new_with_opts(output_folder, SessionOptions::default()).await + #[inline(never)] + pub fn new(output_folder: PathBuf) -> Pin>>>> { + Self::new_with_opts(output_folder, SessionOptions::default()) } pub fn default_persistence_filename() -> anyhow::Result { @@ -391,93 +393,97 @@ impl Session { } /// Create a new session with options. - pub async fn new_with_opts( + #[inline(never)] + pub fn new_with_opts( output_folder: PathBuf, mut opts: SessionOptions, - ) -> anyhow::Result> { - let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); - let token = CancellationToken::new(); + ) -> Pin>>>> { + async move { + let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); + let token = CancellationToken::new(); - let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range { - let (l, p) = create_tcp_listener(port_range) - .await - .context("error listening on TCP")?; - info!("Listening on 0.0.0.0:{p} for incoming peer connections"); - (Some(l), Some(p)) - } else { - (None, None) - }; - - let dht = if opts.disable_dht { - None - } else { - let dht = if opts.disable_dht_persistence { - DhtBuilder::with_config(DhtConfig { - cancellation_token: Some(token.child_token()), - ..Default::default() - }) - .await - .context("error initializing DHT")? - } else { - let pdht_config = opts.dht_config.take().unwrap_or_default(); - PersistentDht::create(Some(pdht_config), Some(token.clone())) + let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range { + let (l, p) = create_tcp_listener(port_range) .await - .context("error initializing persistent DHT")? + .context("error listening on TCP")?; + info!("Listening on 0.0.0.0:{p} for incoming peer connections"); + (Some(l), Some(p)) + } else { + (None, None) }; - Some(dht) - }; - let peer_opts = opts.peer_opts.unwrap_or_default(); - let persistence_filename = match opts.persistence_filename { - Some(filename) => filename, - None => Self::default_persistence_filename()?, - }; - let spawner = BlockingSpawner::default(); + let dht = if opts.disable_dht { + None + } else { + let dht = if opts.disable_dht_persistence { + DhtBuilder::with_config(DhtConfig { + cancellation_token: Some(token.child_token()), + ..Default::default() + }) + .await + .context("error initializing DHT")? + } else { + let pdht_config = opts.dht_config.take().unwrap_or_default(); + PersistentDht::create(Some(pdht_config), Some(token.clone())) + .await + .context("error initializing persistent DHT")? + }; - let session = Arc::new(Self { - persistence_filename, - peer_id, - dht, - peer_opts, - spawner, - output_folder, - db: RwLock::new(Default::default()), - _cancellation_token_drop_guard: token.clone().drop_guard(), - cancellation_token: token, - tcp_listen_port, - }); + Some(dht) + }; + let peer_opts = opts.peer_opts.unwrap_or_default(); + let persistence_filename = match opts.persistence_filename { + Some(filename) => filename, + None => Self::default_persistence_filename()?, + }; + let spawner = BlockingSpawner::default(); - if let Some(tcp_listener) = tcp_listener { - session.spawn( - error_span!("tcp_listen", port = tcp_listen_port), - session.clone().task_tcp_listener(tcp_listener), - ); - } + let session = Arc::new(Self { + persistence_filename, + peer_id, + dht, + peer_opts, + spawner, + output_folder, + db: RwLock::new(Default::default()), + _cancellation_token_drop_guard: token.clone().drop_guard(), + cancellation_token: token, + tcp_listen_port, + }); - if let Some(listen_port) = tcp_listen_port { - if opts.enable_upnp_port_forwarding { + if let Some(tcp_listener) = tcp_listener { session.spawn( - error_span!("upnp_forward", port = listen_port), - session.clone().task_upnp_port_forwarder(listen_port), + error_span!("tcp_listen", port = tcp_listen_port), + session.clone().task_tcp_listener(tcp_listener), ); } - } - if opts.persistence { - info!( - "will use {:?} for session persistence", - session.persistence_filename - ); - if let Some(parent) = session.persistence_filename.parent() { - std::fs::create_dir_all(parent).with_context(|| { - format!("couldn't create directory {:?} for session storage", parent) - })?; + if let Some(listen_port) = tcp_listen_port { + if opts.enable_upnp_port_forwarding { + session.spawn( + error_span!("upnp_forward", port = listen_port), + session.clone().task_upnp_port_forwarder(listen_port), + ); + } } - let persistence_task = session.clone().task_persistence(); - session.spawn(error_span!("session_persistence"), persistence_task); - } - Ok(session) + if opts.persistence { + info!( + "will use {:?} for session persistence", + session.persistence_filename + ); + if let Some(parent) = session.persistence_filename.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!("couldn't create directory {:?} for session storage", parent) + })?; + } + let persistence_task = session.clone().task_persistence(); + session.spawn(error_span!("session_persistence"), persistence_task); + } + + Ok(session) + } + .boxed() } async fn task_persistence(self: Arc) -> anyhow::Result<()> { @@ -738,136 +744,139 @@ impl Session { } /// Add a torrent to the session. - pub async fn add_torrent( - &self, - add: AddTorrent<'_>, + pub fn add_torrent<'a>( + &'a self, + add: AddTorrent<'a>, opts: Option, - ) -> anyhow::Result { - // Magnet links are different in that we first need to discover the metadata. - let span = error_span!("add_torrent"); - let _ = span.enter(); + ) -> Pin> + Send + 'a>> { + async move { + // Magnet links are different in that we first need to discover the metadata. + let span = error_span!("add_torrent"); + let _ = span.enter(); - let opts = opts.unwrap_or_default(); + let opts = opts.unwrap_or_default(); - let announce_port = if opts.list_only { - None - } else { - self.tcp_listen_port - }; + let announce_port = if opts.list_only { + None + } else { + self.tcp_listen_port + }; - let paused = opts.list_only || opts.paused; + let paused = opts.list_only || opts.paused; - // The main difference between magnet link and torrent file, is that we need to resolve the magnet link - // into a torrent file by connecting to peers that support extended handshakes. - // So we must discover at least one peer and connect to it to be able to proceed further. + // The main difference between magnet link and torrent file, is that we need to resolve the magnet link + // into a torrent file by connecting to peers that support extended handshakes. + // So we must discover at least one peer and connect to it to be able to proceed further. - let (info_hash, info, trackers, peer_rx, initial_peers) = match add { - AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { - let magnet = - Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; - let info_hash = magnet - .as_id20() - .context("magnet link didn't contain a BTv1 infohash")?; + let (info_hash, info, trackers, peer_rx, initial_peers) = match add { + AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { + let magnet = Magnet::parse(&magnet) + .context("provided path is not a valid magnet URL")?; + let info_hash = magnet + .as_id20() + .context("magnet link didn't contain a BTv1 infohash")?; - let peer_rx = self.make_peer_rx( - info_hash, - magnet.trackers.clone(), - announce_port, - opts.force_tracker_interval, - )?; - let peer_rx = match peer_rx { - Some(peer_rx) => peer_rx, - None => bail!("can't find peers: DHT disabled and no trackers in magnet"), - }; - - debug!(?info_hash, "querying DHT"); - let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( - self.peer_id, - info_hash, - opts.initial_peers.clone().unwrap_or_default(), - peer_rx, - Some(self.merge_peer_opts(opts.peer_opts)), - ) - .await - { - ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), - ReadMetainfoResult::ChannelClosed { .. } => { - anyhow::bail!("DHT died, no way to discover torrent metainfo") - } - }; - debug!(?info, "received result from DHT"); - ( - info_hash, - info, - magnet.trackers, - Some(peer_rx), - initial_peers, - ) - } - other => { - let torrent = match other { - AddTorrent::Url(url) - if url.starts_with("http://") || url.starts_with("https://") => - { - torrent_from_url(&url).await? - } - AddTorrent::Url(url) => { - bail!( - "unsupported URL {:?}. Supporting magnet:, http:, and https", - url - ) - } - AddTorrent::TorrentFileBytes(bytes) => { - torrent_from_bytes(&bytes).context("error decoding torrent")? - } - AddTorrent::TorrentInfo(t) => *t, - }; - - let trackers = torrent - .iter_announce() - .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { - Ok(url) => Some(url.to_owned()), - Err(_) => { - warn!("cannot parse tracker url as utf-8, ignoring"); - None - } - }) - .collect::>(); - - let peer_rx = if paused { - None - } else { - self.make_peer_rx( - torrent.info_hash, - trackers.clone(), + let peer_rx = self.make_peer_rx( + info_hash, + magnet.trackers.clone(), announce_port, opts.force_tracker_interval, - )? - }; + )?; + let peer_rx = match peer_rx { + Some(peer_rx) => peer_rx, + None => bail!("can't find peers: DHT disabled and no trackers in magnet"), + }; - ( - torrent.info_hash, - torrent.info, - trackers, - peer_rx, - opts.initial_peers - .clone() - .unwrap_or_default() - .into_iter() - .collect(), - ) - } - }; + debug!(?info_hash, "querying DHT"); + let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( + self.peer_id, + info_hash, + opts.initial_peers.clone().unwrap_or_default(), + peer_rx, + Some(self.merge_peer_opts(opts.peer_opts)), + ) + .await + { + ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), + ReadMetainfoResult::ChannelClosed { .. } => { + anyhow::bail!("DHT died, no way to discover torrent metainfo") + } + }; + debug!(?info, "received result from DHT"); + ( + info_hash, + info, + magnet.trackers, + Some(peer_rx), + initial_peers, + ) + } + other => { + let torrent = match other { + AddTorrent::Url(url) + if url.starts_with("http://") || url.starts_with("https://") => + { + torrent_from_url(&url).await? + } + AddTorrent::Url(url) => { + bail!( + "unsupported URL {:?}. Supporting magnet:, http:, and https", + url + ) + } + AddTorrent::TorrentFileBytes(bytes) => { + torrent_from_bytes(&bytes).context("error decoding torrent")? + } + AddTorrent::TorrentInfo(t) => *t, + }; - self.main_torrent_info( - info_hash, - info, - trackers, - peer_rx, - initial_peers.into_iter().collect(), - opts, - ) - .await + let trackers = torrent + .iter_announce() + .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => Some(url.to_owned()), + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + None + } + }) + .collect::>(); + + let peer_rx = if paused { + None + } else { + self.make_peer_rx( + torrent.info_hash, + trackers.clone(), + announce_port, + opts.force_tracker_interval, + )? + }; + + ( + torrent.info_hash, + torrent.info, + trackers, + peer_rx, + opts.initial_peers + .clone() + .unwrap_or_default() + .into_iter() + .collect(), + ) + } + }; + + self.main_torrent_info( + info_hash, + info, + trackers, + peer_rx, + initial_peers.into_iter().collect(), + opts, + ) + .await + } + .boxed() } #[allow(clippy::too_many_arguments)] diff --git a/crates/librqbit/src/tracing_subscriber_config_utils.rs b/crates/librqbit/src/tracing_subscriber_config_utils.rs index 7cac1a2..b719a30 100644 --- a/crates/librqbit/src/tracing_subscriber_config_utils.rs +++ b/crates/librqbit/src/tracing_subscriber_config_utils.rs @@ -60,6 +60,7 @@ pub struct InitLoggingResult { pub line_broadcast: LineBroadcast, } +#[inline(never)] pub fn init_logging(opts: InitLoggingOptions) -> anyhow::Result { let stderr_filter = EnvFilter::builder() .with_default_directive( From 34f3ec6c2903298d0e17a65c7a26e38792692478 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 23:08:47 +0000 Subject: [PATCH 06/10] Inline(never) more things for rqbit(bin) to compile faster --- crates/librqbit/src/http_api.rs | 1 + crates/librqbit/src/http_api_client.rs | 73 +++++++++++--------- crates/librqbit/src/session.rs | 3 + crates/librqbit/src/torrent_state/mod.rs | 41 ++++++----- crates/librqbit_core/src/torrent_metainfo.rs | 1 + 5 files changed, 72 insertions(+), 47 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index a2f1abe..806ba4c 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -45,6 +45,7 @@ impl HttpApi { /// Run the HTTP server forever on the given address. /// If read_only is passed, no state-modifying methods will be exposed. + #[inline(never)] pub fn make_http_api_and_run( self, addr: SocketAddr, diff --git a/crates/librqbit/src/http_api_client.rs b/crates/librqbit/src/http_api_client.rs index 2b6bf77..a1bfb47 100644 --- a/crates/librqbit/src/http_api_client.rs +++ b/crates/librqbit/src/http_api_client.rs @@ -1,4 +1,7 @@ +use std::pin::Pin; + use anyhow::Context; +use futures::{Future, FutureExt}; use serde::Deserialize; use crate::{ @@ -59,6 +62,7 @@ async fn json_response( } impl HttpApiClient { + #[inline(never)] pub fn new(url: &str) -> anyhow::Result { Ok(Self { base_url: reqwest::Url::parse(url)?, @@ -70,40 +74,47 @@ impl HttpApiClient { &self.base_url } - pub async fn validate_rqbit_server(&self) -> anyhow::Result<()> { - let response = self.client.get(self.base_url.clone()).send().await?; - let root: ApiRoot = json_response(response).await?; - if root.server == "rqbit" { - return Ok(()); + #[inline(never)] + pub fn validate_rqbit_server(&self) -> Pin> + '_>> { + async move { + let response = self.client.get(self.base_url.clone()).send().await?; + let root: ApiRoot = json_response(response).await?; + if root.server == "rqbit" { + return Ok(()); + } + anyhow::bail!("not an rqbit server at {}", &self.base_url) } - anyhow::bail!("not an rqbit server at {}", &self.base_url) + .boxed() } - pub async fn add_torrent( - &self, - torrent: AddTorrent<'_>, + pub fn add_torrent<'a>( + &'a self, + torrent: AddTorrent<'a>, opts: Option, - ) -> anyhow::Result { - let opts = opts.unwrap_or_default(); - let params = TorrentAddQueryParams { - overwrite: Some(opts.overwrite), - only_files_regex: opts.only_files_regex, - only_files: None, - output_folder: opts.output_folder, - sub_folder: opts.sub_folder, - list_only: Some(opts.list_only), - ..Default::default() - }; - let qs = serde_urlencoded::to_string(¶ms).unwrap(); - let url = format!("{}torrents?{}", &self.base_url, qs); - let response = check_response( - self.client - .post(&url) - .body(torrent.into_bytes()) - .send() - .await?, - ) - .await?; - json_response(response).await + ) -> Pin> + 'a>> { + async move { + let opts = opts.unwrap_or_default(); + let params = TorrentAddQueryParams { + overwrite: Some(opts.overwrite), + only_files_regex: opts.only_files_regex, + only_files: None, + output_folder: opts.output_folder, + sub_folder: opts.sub_folder, + list_only: Some(opts.list_only), + ..Default::default() + }; + let qs = serde_urlencoded::to_string(¶ms).unwrap(); + let url = format!("{}torrents?{}", &self.base_url, qs); + let response = check_response( + self.client + .post(&url) + .body(torrent.into_bytes()) + .send() + .await?, + ) + .await?; + json_response(response).await + } + .boxed() } } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 1641a14..9ee814d 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -298,6 +298,7 @@ pub enum AddTorrent<'a> { impl<'a> AddTorrent<'a> { // Don't call this from HTTP API. + #[inline(never)] pub fn from_cli_argument(path: &'a str) -> anyhow::Result { if SUPPORTED_SCHEMES.iter().any(|s| path.starts_with(s)) { return Ok(Self::Url(Cow::Borrowed(path))); @@ -314,6 +315,7 @@ impl<'a> AddTorrent<'a> { } // Don't call this from HTTP API. + #[inline(never)] pub fn from_local_filename(filename: &str) -> anyhow::Result { let file = read_local_file_including_stdin(filename) .with_context(|| format!("error reading local file {filename:?}"))?; @@ -744,6 +746,7 @@ impl Session { } /// Add a torrent to the session. + #[inline(never)] pub fn add_torrent<'a>( &'a self, add: AddTorrent<'a>, diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 90bfd8e..1e795de 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -8,6 +8,7 @@ use std::collections::HashSet; use std::net::SocketAddr; use std::path::Path; use std::path::PathBuf; +use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -15,6 +16,8 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; use buffers::ByteString; +use futures::Future; +use futures::FutureExt; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::Lengths; use librqbit_core::peer_id::generate_peer_id; @@ -395,23 +398,29 @@ impl ManagedTorrent { }) } - pub async fn wait_until_completed(&self) -> anyhow::Result<()> { - // TODO: rewrite, this polling is horrible - let live = loop { - let live = self.with_state(|s| match s { - ManagedTorrentState::Initializing(_) | ManagedTorrentState::Paused(_) => Ok(None), - ManagedTorrentState::Live(l) => Ok(Some(l.clone())), - ManagedTorrentState::Error(e) => bail!("{:?}", e), - ManagedTorrentState::None => bail!("bug: torrent state is None"), - })?; - if let Some(live) = live { - break live; - } - tokio::time::sleep(Duration::from_secs(1)).await; - }; + #[inline(never)] + pub fn wait_until_completed(&self) -> Pin> + '_>> { + async move { + // TODO: rewrite, this polling is horrible + let live = loop { + let live = self.with_state(|s| match s { + ManagedTorrentState::Initializing(_) | ManagedTorrentState::Paused(_) => { + Ok(None) + } + ManagedTorrentState::Live(l) => Ok(Some(l.clone())), + ManagedTorrentState::Error(e) => bail!("{:?}", e), + ManagedTorrentState::None => bail!("bug: torrent state is None"), + })?; + if let Some(live) = live { + break live; + } + tokio::time::sleep(Duration::from_secs(1)).await; + }; - live.wait_until_completed().await; - Ok(()) + live.wait_until_completed().await; + Ok(()) + } + .boxed() } } diff --git a/crates/librqbit_core/src/torrent_metainfo.rs b/crates/librqbit_core/src/torrent_metainfo.rs index c89230e..e60fcee 100644 --- a/crates/librqbit_core/src/torrent_metainfo.rs +++ b/crates/librqbit_core/src/torrent_metainfo.rs @@ -159,6 +159,7 @@ impl> TorrentMetaV1Info { Some(expected_hash == hash) } + #[inline(never)] pub fn iter_filenames_and_lengths( &self, ) -> anyhow::Result, u64)>> { From 1b79b66cc34e98e6505ff9d06690b9498de5f6e9 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 27 Feb 2024 08:00:56 +0000 Subject: [PATCH 07/10] Reduce compile times even more --- crates/dht/src/dht.rs | 83 +++++++++-------- crates/dht/src/persistence.rs | 168 ++++++++++++++++++---------------- 2 files changed, 135 insertions(+), 116 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index d983f9c..06cc2ca 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -1,6 +1,7 @@ use std::{ cmp::Reverse, net::SocketAddr, + pin::Pin, str::FromStr, sync::{ atomic::{AtomicU16, Ordering}, @@ -23,7 +24,7 @@ use anyhow::{bail, Context}; use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use bencode::ByteString; use dashmap::DashMap; -use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; +use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt, TryFutureExt}; use leaky_bucket::RateLimiter; use librqbit_core::{ @@ -232,6 +233,7 @@ impl Drop for RequestPeersStream { impl Stream for RequestPeersStream { type Item = SocketAddr; + #[inline(never)] fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -1144,49 +1146,56 @@ impl DhtState { &self.cancellation_token } - pub async fn with_config(mut config: DhtConfig) -> anyhow::Result> { - let socket = match config.listen_addr { - Some(addr) => UdpSocket::bind(addr) - .await - .with_context(|| format!("error binding socket, address {addr}")), - None => UdpSocket::bind("0.0.0.0:0") - .await - .context("error binding socket, address 0.0.0.0:0"), - }?; + #[inline(never)] + pub fn with_config( + mut config: DhtConfig, + ) -> Pin>> + Send>> { + async move { + let socket = match config.listen_addr { + Some(addr) => UdpSocket::bind(addr) + .await + .with_context(|| format!("error binding socket, address {addr}")), + None => UdpSocket::bind("0.0.0.0:0") + .await + .context("error binding socket, address 0.0.0.0:0"), + }?; - let listen_addr = socket - .local_addr() - .context("cannot determine UDP listen addr")?; - info!("DHT listening on {:?}", listen_addr); + let listen_addr = socket + .local_addr() + .context("cannot determine UDP listen addr")?; + info!("DHT listening on {:?}", listen_addr); - let peer_id = config.peer_id.unwrap_or_else(generate_peer_id); - info!("starting up DHT with peer id {:?}", peer_id); - let bootstrap_addrs = config - .bootstrap_addrs - .unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect()); + let peer_id = config.peer_id.unwrap_or_else(generate_peer_id); + info!("starting up DHT with peer id {:?}", peer_id); + let bootstrap_addrs = config + .bootstrap_addrs + .unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect()); - let token = config.cancellation_token.take().unwrap_or_default(); + let token = config.cancellation_token.take().unwrap_or_default(); - let (in_tx, in_rx) = unbounded_channel(); - let state = Arc::new(Self::new_internal( - peer_id, - in_tx, - config.routing_table, - listen_addr, - config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), - token, - )); + let (in_tx, in_rx) = unbounded_channel(); + let state = Arc::new(Self::new_internal( + peer_id, + in_tx, + config.routing_table, + listen_addr, + config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), + token, + )); - spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), { - let state = state.clone(); - async move { - let worker = DhtWorker { socket, dht: state }; - worker.start(in_rx, &bootstrap_addrs).await - } - }); - Ok(state) + spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), { + let state = state.clone(); + async move { + let worker = DhtWorker { socket, dht: state }; + worker.start(in_rx, &bootstrap_addrs).await + } + }); + Ok(state) + } + .boxed() } + #[inline(never)] pub fn get_peers( self: &Arc, info_hash: Id20, diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index 0f15236..0c73b05 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -1,5 +1,6 @@ // TODO: this now stores only the routing table, but we also need AT LEAST the same socket address... +use futures::{Future, FutureExt}; use librqbit_core::directories::get_configuration_directory; use librqbit_core::spawn_utils::spawn_with_cancel; use serde::{Deserialize, Serialize}; @@ -7,6 +8,7 @@ use std::fs::OpenOptions; use std::io::{BufReader, BufWriter}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::time::Duration; use tokio_util::sync::CancellationToken; @@ -75,94 +77,102 @@ impl PersistentDht { Ok(path) } - pub async fn create( + #[inline(never)] + pub fn create( config: Option, cancellation_token: Option, - ) -> anyhow::Result { - let mut config = config.unwrap_or_default(); - let config_filename = match config.config_filename.take() { - Some(config_filename) => config_filename, - None => Self::default_persistence_filename()?, - }; + ) -> Pin> + Send>> { + async move { + let mut config = config.unwrap_or_default(); + let config_filename = match config.config_filename.take() { + Some(config_filename) => config_filename, + None => Self::default_persistence_filename()?, + }; - info!( - filename=?config_filename, - "will store DHT routing table periodically", - ); + info!( + filename=?config_filename, + "will store DHT routing table periodically", + ); - if let Some(parent) = config_filename.parent() { - std::fs::create_dir_all(parent) - .with_context(|| format!("error creating dir {:?}", &parent))?; - } - - let de = match OpenOptions::new().read(true).open(&config_filename) { - Ok(dht_json) => { - let reader = BufReader::new(dht_json); - match serde_json::from_reader::<_, DhtSerialize>(reader) { - Ok(r) => { - info!(filename=?config_filename, "loaded DHT routing table from"); - Some(r) - } - Err(e) => { - warn!( - filename=?config_filename, - "cannot deserialize routing table: {:#}", - e - ); - None - } - } + if let Some(parent) = config_filename.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("error creating dir {:?}", &parent))?; } - Err(e) => match e.kind() { - std::io::ErrorKind::NotFound => None, - _ => return Err(e).with_context(|| format!("error reading {config_filename:?}")), - }, - }; - let (listen_addr, routing_table, peer_store) = de - .map(|de| (Some(de.addr), Some(de.table), de.peer_store)) - .unwrap_or((None, None, None)); - let peer_id = routing_table.as_ref().map(|r| r.id()); - let dht_config = DhtConfig { - peer_id, - routing_table, - listen_addr, - peer_store, - cancellation_token, - ..Default::default() - }; - let dht = DhtState::with_config(dht_config).await?; - spawn_with_cancel( - error_span!("dht_persistence"), - dht.cancellation_token().clone(), - { - let dht = dht.clone(); - let dump_interval = config - .dump_interval - .unwrap_or_else(|| Duration::from_secs(3)); - async move { - let tempfile_name = { - let file_name = format!("dht.json.tmp.{}", std::process::id()); - let mut tmp = config_filename.clone(); - tmp.set_file_name(file_name); - tmp - }; - - loop { - trace!("sleeping for {:?}", &dump_interval); - tokio::time::sleep(dump_interval).await; - - match dump_dht(&dht, &config_filename, &tempfile_name) { - Ok(_) => trace!(filename=?config_filename, "dumped DHT"), - Err(e) => { - error!(filename=?config_filename, "error dumping DHT: {:#}", e) - } + let de = match OpenOptions::new().read(true).open(&config_filename) { + Ok(dht_json) => { + let reader = BufReader::new(dht_json); + match serde_json::from_reader::<_, DhtSerialize>( + reader, + ) { + Ok(r) => { + info!(filename=?config_filename, "loaded DHT routing table from"); + Some(r) + } + Err(e) => { + warn!( + filename=?config_filename, + "cannot deserialize routing table: {:#}", + e + ); + None } } } - }, - ); + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => None, + _ => { + return Err(e).with_context(|| format!("error reading {config_filename:?}")) + } + }, + }; + let (listen_addr, routing_table, peer_store) = de + .map(|de| (Some(de.addr), Some(de.table), de.peer_store)) + .unwrap_or((None, None, None)); + let peer_id = routing_table.as_ref().map(|r| r.id()); - Ok(dht) + let dht_config = DhtConfig { + peer_id, + routing_table, + listen_addr, + peer_store, + cancellation_token, + ..Default::default() + }; + let dht = DhtState::with_config(dht_config).await?; + spawn_with_cancel( + error_span!("dht_persistence"), + dht.cancellation_token().clone(), + { + let dht = dht.clone(); + let dump_interval = config + .dump_interval + .unwrap_or_else(|| Duration::from_secs(3)); + async move { + let tempfile_name = { + let file_name = format!("dht.json.tmp.{}", std::process::id()); + let mut tmp = config_filename.clone(); + tmp.set_file_name(file_name); + tmp + }; + + loop { + trace!("sleeping for {:?}", &dump_interval); + tokio::time::sleep(dump_interval).await; + + match dump_dht(&dht, &config_filename, &tempfile_name) { + Ok(_) => trace!(filename=?config_filename, "dumped DHT"), + Err(e) => { + error!(filename=?config_filename, "error dumping DHT: {:#}", e) + } + } + } + } + }, + ); + + Ok(dht) + } + .boxed() } } From a001bb8c97b9542ba6783430bc897ab42ebf8e58 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 27 Feb 2024 08:14:39 +0000 Subject: [PATCH 08/10] Shorten Pin Pin>> + Send>> { + pub fn with_config(mut config: DhtConfig) -> BoxFuture<'static, anyhow::Result>> { async move { let socket = match config.listen_addr { Some(addr) => UdpSocket::bind(addr) diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index 0c73b05..05bff6f 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -1,6 +1,7 @@ // TODO: this now stores only the routing table, but we also need AT LEAST the same socket address... -use futures::{Future, FutureExt}; +use futures::future::BoxFuture; +use futures::FutureExt; use librqbit_core::directories::get_configuration_directory; use librqbit_core::spawn_utils::spawn_with_cancel; use serde::{Deserialize, Serialize}; @@ -8,7 +9,6 @@ use std::fs::OpenOptions; use std::io::{BufReader, BufWriter}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; -use std::pin::Pin; use std::time::Duration; use tokio_util::sync::CancellationToken; @@ -81,7 +81,7 @@ impl PersistentDht { pub fn create( config: Option, cancellation_token: Option, - ) -> Pin> + Send>> { + ) -> BoxFuture<'static, anyhow::Result> { async move { let mut config = config.unwrap_or_default(); let config_filename = match config.config_filename.take() { diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 806ba4c..d710b22 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -3,12 +3,12 @@ use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; use axum::routing::{get, post}; -use futures::{Future, FutureExt, TryStreamExt}; +use futures::future::BoxFuture; +use futures::{FutureExt, TryStreamExt}; use itertools::Itertools; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; -use std::pin::Pin; use std::str::FromStr; use std::time::Duration; use tracing::{debug, info}; @@ -46,10 +46,7 @@ impl HttpApi { /// Run the HTTP server forever on the given address. /// If read_only is passed, no state-modifying methods will be exposed. #[inline(never)] - pub fn make_http_api_and_run( - self, - addr: SocketAddr, - ) -> Pin> + Send>> { + pub fn make_http_api_and_run(self, addr: SocketAddr) -> BoxFuture<'static, anyhow::Result<()>> { let state = self.inner; async fn api_root() -> impl IntoResponse { diff --git a/crates/librqbit/src/http_api_client.rs b/crates/librqbit/src/http_api_client.rs index a1bfb47..4414afa 100644 --- a/crates/librqbit/src/http_api_client.rs +++ b/crates/librqbit/src/http_api_client.rs @@ -1,7 +1,5 @@ -use std::pin::Pin; - use anyhow::Context; -use futures::{Future, FutureExt}; +use futures::{future::BoxFuture, FutureExt}; use serde::Deserialize; use crate::{ @@ -75,7 +73,7 @@ impl HttpApiClient { } #[inline(never)] - pub fn validate_rqbit_server(&self) -> Pin> + '_>> { + pub fn validate_rqbit_server(&self) -> BoxFuture<'_, anyhow::Result<()>> { async move { let response = self.client.get(self.base_url.clone()).send().await?; let root: ApiRoot = json_response(response).await?; @@ -91,7 +89,7 @@ impl HttpApiClient { &'a self, torrent: AddTorrent<'a>, opts: Option, - ) -> Pin> + 'a>> { + ) -> BoxFuture<'a, anyhow::Result> { async move { let opts = opts.unwrap_or_default(); let params = TorrentAddQueryParams { diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 9ee814d..f781d49 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -4,7 +4,6 @@ use std::{ io::{BufReader, BufWriter, Read}, net::SocketAddr, path::PathBuf, - pin::Pin, str::FromStr, sync::Arc, time::Duration, @@ -25,7 +24,7 @@ use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use buffers::{ByteBuf, ByteBufT, ByteString}; use clone_to_owned::CloneToOwned; use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; -use futures::{stream::FuturesUnordered, Future, FutureExt, TryFutureExt}; +use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, TryFutureExt}; use librqbit_core::{ directories::get_configuration_directory, magnet::Magnet, @@ -381,7 +380,7 @@ pub(crate) struct CheckedIncomingConnection { impl Session { /// Create a new session. The passed in folder will be used as a default unless overriden per torrent. #[inline(never)] - pub fn new(output_folder: PathBuf) -> Pin>>>> { + pub fn new(output_folder: PathBuf) -> BoxFuture<'static, anyhow::Result>> { Self::new_with_opts(output_folder, SessionOptions::default()) } @@ -399,7 +398,7 @@ impl Session { pub fn new_with_opts( output_folder: PathBuf, mut opts: SessionOptions, - ) -> Pin>>>> { + ) -> BoxFuture<'static, anyhow::Result>> { async move { let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); let token = CancellationToken::new(); @@ -751,7 +750,7 @@ impl Session { &'a self, add: AddTorrent<'a>, opts: Option, - ) -> Pin> + Send + 'a>> { + ) -> BoxFuture<'a, anyhow::Result> { async move { // Magnet links are different in that we first need to discover the metadata. let span = error_span!("add_torrent"); diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 1e795de..a3fe61f 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -8,7 +8,6 @@ use std::collections::HashSet; use std::net::SocketAddr; use std::path::Path; use std::path::PathBuf; -use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -16,7 +15,7 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; use buffers::ByteString; -use futures::Future; +use futures::future::BoxFuture; use futures::FutureExt; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::Lengths; @@ -399,7 +398,7 @@ impl ManagedTorrent { } #[inline(never)] - pub fn wait_until_completed(&self) -> Pin> + '_>> { + pub fn wait_until_completed(&self) -> BoxFuture<'_, anyhow::Result<()>> { async move { // TODO: rewrite, this polling is horrible let live = loop { From 3cecb1a4c3379006846547bc0b09ad0d8294ca1b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 27 Feb 2024 08:17:15 +0000 Subject: [PATCH 09/10] Remove pin, force_interval: Option, tcp_listen_port: Option, - ) -> Option + Send + 'static>>> { + ) -> Option> { let trackers = trackers .into_iter() .filter_map(|t| match Url::parse(&t) { From 6fafdc16da7c12302352554250f2b9dd3b3ed2d4 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 27 Feb 2024 08:25:10 +0000 Subject: [PATCH 10/10] Nothing --- crates/tracker_comms/src/tracker_comms.rs | 81 ++++++++++++----------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/crates/tracker_comms/src/tracker_comms.rs b/crates/tracker_comms/src/tracker_comms.rs index d8b984a..3816131 100644 --- a/crates/tracker_comms/src/tracker_comms.rs +++ b/crates/tracker_comms/src/tracker_comms.rs @@ -58,6 +58,11 @@ impl TorrentStatsProvider for () { type Sender = tokio::sync::mpsc::Sender; +enum SupportedTracker { + Udp(Url), + Http(Url), +} + impl TrackerComms { pub fn start( info_hash: Id20, @@ -70,9 +75,16 @@ impl TrackerComms { let trackers = trackers .into_iter() .filter_map(|t| match Url::parse(&t) { - Ok(parsed) => Some(parsed), + Ok(parsed) => match parsed.scheme() { + "http" | "https" => Some(SupportedTracker::Http(parsed)), + "udp" => Some(SupportedTracker::Udp(parsed)), + _ => { + debug!("unsuppoted tracker URL: {}", t); + None + } + }, Err(e) => { - debug!("error parsing tracker URL: {}", e); + debug!("error parsing tracker URL {}: {}", t, e); None } }) @@ -85,7 +97,6 @@ impl TrackerComms { let s = async_stream::stream! { use futures::StreamExt; - let mut rx_done = false; let comms = Arc::new(Self { info_hash, peer_id, @@ -96,19 +107,13 @@ impl TrackerComms { }); let mut futures = FuturesUnordered::new(); for tracker in trackers { - if let Ok(fut) = comms.add_tracker(tracker) { - futures.push(fut); - } + futures.push(comms.add_tracker(tracker)) } - if futures.is_empty() { - return; - } - while !(futures.is_empty() && rx_done) { + while !(futures.is_empty()) { tokio::select! { - addr = rx.recv(), if !rx_done => { - match addr { - Some(addr) => yield addr, - None => rx_done = true + addr = rx.recv() => { + if let Some(addr) = addr { + yield addr; } } e = futures.next(), if !futures.is_empty() => { @@ -125,34 +130,30 @@ impl TrackerComms { fn add_tracker( &self, - url: Url, - ) -> anyhow::Result< - Either< - impl std::future::Future> + '_ + Send, - impl std::future::Future> + '_ + Send, - >, + url: SupportedTracker, + ) -> Either< + impl std::future::Future> + '_ + Send, + impl std::future::Future> + '_ + Send, > { let info_hash = self.info_hash; - if url.scheme() == "http" || url.scheme() == "https" { - let span = error_span!( - parent: None, - "http_tracker", - tracker = %url, - info_hash = ?info_hash - ); - Ok(self - .task_single_tracker_monitor_http(url) - .instrument(span) - .left_future()) - } else if url.scheme() == "udp" { - let span = - error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash); - Ok(self - .task_single_tracker_monitor_udp(url) - .instrument(span) - .right_future()) - } else { - bail!("unsupported tracker url {}", url) + match url { + SupportedTracker::Udp(url) => { + let span = error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash); + self.task_single_tracker_monitor_udp(url) + .instrument(span) + .right_future() + } + SupportedTracker::Http(url) => { + let span = error_span!( + parent: None, + "http_tracker", + tracker = %url, + info_hash = ?info_hash + ); + self.task_single_tracker_monitor_http(url) + .instrument(span) + .left_future() + } } }