diff --git a/Cargo.lock b/Cargo.lock index 571545d..f967a0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1257,6 +1257,7 @@ name = "librqbit" version = "5.5.0" dependencies = [ "anyhow", + "async-stream", "axum 0.7.4", "backoff", "base64", @@ -1277,6 +1278,7 @@ dependencies = [ "librqbit-dht", "librqbit-peer-protocol", "librqbit-sha1-wrapper", + "librqbit-tracker-comms", "librqbit-upnp", "openssl", "parking_lot", @@ -1396,6 +1398,26 @@ dependencies = [ "sha1", ] +[[package]] +name = "librqbit-tracker-comms" +version = "1.0.0" +dependencies = [ + "anyhow", + "async-stream", + "byteorder", + "futures", + "librqbit-bencode", + "librqbit-buffers", + "librqbit-core", + "rand", + "reqwest", + "serde", + "tokio", + "tracing", + "url", + "urlencoding", +] + [[package]] name = "librqbit-upnp" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a3a89a4..70f255f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,8 @@ members = [ "crates/librqbit_core", "crates/peer_binary_protocol", "crates/dht", - "crates/upnp" + "crates/upnp", + "crates/tracker_comms", ] [profile.dev] @@ -22,4 +23,4 @@ debug = true [profile.release-github] inherits = "release" -debug = false \ No newline at end of file +debug = false diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index d983f9c..0198038 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -23,7 +23,9 @@ use anyhow::{bail, Context}; use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; use bencode::ByteString; use dashmap::DashMap; -use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt}; +use futures::{ + future::BoxFuture, stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryFutureExt, +}; use leaky_bucket::RateLimiter; use librqbit_core::{ @@ -232,6 +234,7 @@ impl Drop for RequestPeersStream { impl Stream for RequestPeersStream { type Item = SocketAddr; + #[inline(never)] fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -1144,49 +1147,54 @@ impl DhtState { &self.cancellation_token } - pub async fn with_config(mut config: DhtConfig) -> anyhow::Result> { - let socket = match config.listen_addr { - Some(addr) => UdpSocket::bind(addr) - .await - .with_context(|| format!("error binding socket, address {addr}")), - None => UdpSocket::bind("0.0.0.0:0") - .await - .context("error binding socket, address 0.0.0.0:0"), - }?; + #[inline(never)] + pub fn with_config(mut config: DhtConfig) -> BoxFuture<'static, anyhow::Result>> { + async move { + let socket = match config.listen_addr { + Some(addr) => UdpSocket::bind(addr) + .await + .with_context(|| format!("error binding socket, address {addr}")), + None => UdpSocket::bind("0.0.0.0:0") + .await + .context("error binding socket, address 0.0.0.0:0"), + }?; - let listen_addr = socket - .local_addr() - .context("cannot determine UDP listen addr")?; - info!("DHT listening on {:?}", listen_addr); + let listen_addr = socket + .local_addr() + .context("cannot determine UDP listen addr")?; + info!("DHT listening on {:?}", listen_addr); - let peer_id = config.peer_id.unwrap_or_else(generate_peer_id); - info!("starting up DHT with peer id {:?}", peer_id); - let bootstrap_addrs = config - .bootstrap_addrs - .unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect()); + let peer_id = config.peer_id.unwrap_or_else(generate_peer_id); + info!("starting up DHT with peer id {:?}", peer_id); + let bootstrap_addrs = config + .bootstrap_addrs + .unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect()); - let token = config.cancellation_token.take().unwrap_or_default(); + let token = config.cancellation_token.take().unwrap_or_default(); - let (in_tx, in_rx) = unbounded_channel(); - let state = Arc::new(Self::new_internal( - peer_id, - in_tx, - config.routing_table, - listen_addr, - config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), - token, - )); + let (in_tx, in_rx) = unbounded_channel(); + let state = Arc::new(Self::new_internal( + peer_id, + in_tx, + config.routing_table, + listen_addr, + config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), + token, + )); - spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), { - let state = state.clone(); - async move { - let worker = DhtWorker { socket, dht: state }; - worker.start(in_rx, &bootstrap_addrs).await - } - }); - Ok(state) + spawn_with_cancel(error_span!("dht"), state.cancellation_token.clone(), { + let state = state.clone(); + async move { + let worker = DhtWorker { socket, dht: state }; + worker.start(in_rx, &bootstrap_addrs).await + } + }); + Ok(state) + } + .boxed() } + #[inline(never)] pub fn get_peers( self: &Arc, info_hash: Id20, diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index 0f15236..05bff6f 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -1,5 +1,7 @@ // TODO: this now stores only the routing table, but we also need AT LEAST the same socket address... +use futures::future::BoxFuture; +use futures::FutureExt; use librqbit_core::directories::get_configuration_directory; use librqbit_core::spawn_utils::spawn_with_cancel; use serde::{Deserialize, Serialize}; @@ -75,94 +77,102 @@ impl PersistentDht { Ok(path) } - pub async fn create( + #[inline(never)] + pub fn create( config: Option, cancellation_token: Option, - ) -> anyhow::Result { - let mut config = config.unwrap_or_default(); - let config_filename = match config.config_filename.take() { - Some(config_filename) => config_filename, - None => Self::default_persistence_filename()?, - }; + ) -> BoxFuture<'static, anyhow::Result> { + async move { + let mut config = config.unwrap_or_default(); + let config_filename = match config.config_filename.take() { + Some(config_filename) => config_filename, + None => Self::default_persistence_filename()?, + }; - info!( - filename=?config_filename, - "will store DHT routing table periodically", - ); + info!( + filename=?config_filename, + "will store DHT routing table periodically", + ); - if let Some(parent) = config_filename.parent() { - std::fs::create_dir_all(parent) - .with_context(|| format!("error creating dir {:?}", &parent))?; - } - - let de = match OpenOptions::new().read(true).open(&config_filename) { - Ok(dht_json) => { - let reader = BufReader::new(dht_json); - match serde_json::from_reader::<_, DhtSerialize>(reader) { - Ok(r) => { - info!(filename=?config_filename, "loaded DHT routing table from"); - Some(r) - } - Err(e) => { - warn!( - filename=?config_filename, - "cannot deserialize routing table: {:#}", - e - ); - None - } - } + if let Some(parent) = config_filename.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("error creating dir {:?}", &parent))?; } - Err(e) => match e.kind() { - std::io::ErrorKind::NotFound => None, - _ => return Err(e).with_context(|| format!("error reading {config_filename:?}")), - }, - }; - let (listen_addr, routing_table, peer_store) = de - .map(|de| (Some(de.addr), Some(de.table), de.peer_store)) - .unwrap_or((None, None, None)); - let peer_id = routing_table.as_ref().map(|r| r.id()); - let dht_config = DhtConfig { - peer_id, - routing_table, - listen_addr, - peer_store, - cancellation_token, - ..Default::default() - }; - let dht = DhtState::with_config(dht_config).await?; - spawn_with_cancel( - error_span!("dht_persistence"), - dht.cancellation_token().clone(), - { - let dht = dht.clone(); - let dump_interval = config - .dump_interval - .unwrap_or_else(|| Duration::from_secs(3)); - async move { - let tempfile_name = { - let file_name = format!("dht.json.tmp.{}", std::process::id()); - let mut tmp = config_filename.clone(); - tmp.set_file_name(file_name); - tmp - }; - - loop { - trace!("sleeping for {:?}", &dump_interval); - tokio::time::sleep(dump_interval).await; - - match dump_dht(&dht, &config_filename, &tempfile_name) { - Ok(_) => trace!(filename=?config_filename, "dumped DHT"), - Err(e) => { - error!(filename=?config_filename, "error dumping DHT: {:#}", e) - } + let de = match OpenOptions::new().read(true).open(&config_filename) { + Ok(dht_json) => { + let reader = BufReader::new(dht_json); + match serde_json::from_reader::<_, DhtSerialize>( + reader, + ) { + Ok(r) => { + info!(filename=?config_filename, "loaded DHT routing table from"); + Some(r) + } + Err(e) => { + warn!( + filename=?config_filename, + "cannot deserialize routing table: {:#}", + e + ); + None } } } - }, - ); + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => None, + _ => { + return Err(e).with_context(|| format!("error reading {config_filename:?}")) + } + }, + }; + let (listen_addr, routing_table, peer_store) = de + .map(|de| (Some(de.addr), Some(de.table), de.peer_store)) + .unwrap_or((None, None, None)); + let peer_id = routing_table.as_ref().map(|r| r.id()); - Ok(dht) + let dht_config = DhtConfig { + peer_id, + routing_table, + listen_addr, + peer_store, + cancellation_token, + ..Default::default() + }; + let dht = DhtState::with_config(dht_config).await?; + spawn_with_cancel( + error_span!("dht_persistence"), + dht.cancellation_token().clone(), + { + let dht = dht.clone(); + let dump_interval = config + .dump_interval + .unwrap_or_else(|| Duration::from_secs(3)); + async move { + let tempfile_name = { + let file_name = format!("dht.json.tmp.{}", std::process::id()); + let mut tmp = config_filename.clone(); + tmp.set_file_name(file_name); + tmp + }; + + loop { + trace!("sleeping for {:?}", &dump_interval); + tokio::time::sleep(dump_interval).await; + + match dump_dht(&dht, &config_filename, &tempfile_name) { + Ok(_) => trace!(filename=?config_filename, "dumped DHT"), + Err(e) => { + error!(filename=?config_filename, "error dumping DHT: {:#}", e) + } + } + } + } + }, + ); + + Ok(dht) + } + .boxed() } } diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 9b54625..e05ceb5 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -23,6 +23,7 @@ rust-tls = ["reqwest/rustls-tls"] [dependencies] bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"} +tracker_comms = {path = "../tracker_comms", default-features=false, package="librqbit-tracker-comms", version="1.0.0"} buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"} librqbit-core = {path = "../librqbit_core", version = "3.5.0"} clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} @@ -68,6 +69,7 @@ serde_with = "3.4.0" tokio-util = "0.7.10" bytes = "1.5.0" rlimit = "0.10.1" +async-stream = "0.3.5" [dev-dependencies] futures = {version = "0.3"} diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 09f95a9..d710b22 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -3,7 +3,8 @@ use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; use axum::routing::{get, post}; -use futures::TryStreamExt; +use futures::future::BoxFuture; +use futures::{FutureExt, TryStreamExt}; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -44,7 +45,8 @@ impl HttpApi { /// Run the HTTP server forever on the given address. /// If read_only is passed, no state-modifying methods will be exposed. - pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> { + #[inline(never)] + pub fn make_http_api_and_run(self, addr: SocketAddr) -> BoxFuture<'static, anyhow::Result<()>> { let state = self.inner; async fn api_root() -> impl IntoResponse { @@ -288,11 +290,15 @@ impl HttpApi { info!(%addr, "starting HTTP server"); use tokio::net::TcpListener; - let listener = TcpListener::bind(&addr) - .await - .with_context(|| format!("error binding to {addr}"))?; - axum::serve(listener, app).await?; - Ok(()) + + async move { + let listener = TcpListener::bind(&addr) + .await + .with_context(|| format!("error binding to {addr}"))?; + axum::serve(listener, app).await?; + Ok(()) + } + .boxed() } } diff --git a/crates/librqbit/src/http_api_client.rs b/crates/librqbit/src/http_api_client.rs index 2b6bf77..4414afa 100644 --- a/crates/librqbit/src/http_api_client.rs +++ b/crates/librqbit/src/http_api_client.rs @@ -1,4 +1,5 @@ use anyhow::Context; +use futures::{future::BoxFuture, FutureExt}; use serde::Deserialize; use crate::{ @@ -59,6 +60,7 @@ async fn json_response( } impl HttpApiClient { + #[inline(never)] pub fn new(url: &str) -> anyhow::Result { Ok(Self { base_url: reqwest::Url::parse(url)?, @@ -70,40 +72,47 @@ impl HttpApiClient { &self.base_url } - pub async fn validate_rqbit_server(&self) -> anyhow::Result<()> { - let response = self.client.get(self.base_url.clone()).send().await?; - let root: ApiRoot = json_response(response).await?; - if root.server == "rqbit" { - return Ok(()); + #[inline(never)] + pub fn validate_rqbit_server(&self) -> BoxFuture<'_, anyhow::Result<()>> { + async move { + let response = self.client.get(self.base_url.clone()).send().await?; + let root: ApiRoot = json_response(response).await?; + if root.server == "rqbit" { + return Ok(()); + } + anyhow::bail!("not an rqbit server at {}", &self.base_url) } - anyhow::bail!("not an rqbit server at {}", &self.base_url) + .boxed() } - pub async fn add_torrent( - &self, - torrent: AddTorrent<'_>, + pub fn add_torrent<'a>( + &'a self, + torrent: AddTorrent<'a>, opts: Option, - ) -> anyhow::Result { - let opts = opts.unwrap_or_default(); - let params = TorrentAddQueryParams { - overwrite: Some(opts.overwrite), - only_files_regex: opts.only_files_regex, - only_files: None, - output_folder: opts.output_folder, - sub_folder: opts.sub_folder, - list_only: Some(opts.list_only), - ..Default::default() - }; - let qs = serde_urlencoded::to_string(¶ms).unwrap(); - let url = format!("{}torrents?{}", &self.base_url, qs); - let response = check_response( - self.client - .post(&url) - .body(torrent.into_bytes()) - .send() - .await?, - ) - .await?; - json_response(response).await + ) -> BoxFuture<'a, anyhow::Result> { + async move { + let opts = opts.unwrap_or_default(); + let params = TorrentAddQueryParams { + overwrite: Some(opts.overwrite), + only_files_regex: opts.only_files_regex, + only_files: None, + output_folder: opts.output_folder, + sub_folder: opts.sub_folder, + list_only: Some(opts.list_only), + ..Default::default() + }; + let qs = serde_urlencoded::to_string(¶ms).unwrap(); + let url = format!("{}torrents?{}", &self.base_url, qs); + let response = check_response( + self.client + .post(&url) + .body(torrent.into_bytes()) + .send() + .await?, + ) + .await?; + json_response(response).await + } + .boxed() } } diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 817c086..d16c427 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -36,9 +36,6 @@ mod session; mod spawn_utils; mod torrent_state; pub mod tracing_subscriber_config_utils; -pub mod tracker_comms; -pub mod tracker_comms_http; -pub mod tracker_comms_udp; mod type_aliases; pub use api::Api; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 95d65ea..f781d49 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -9,12 +9,22 @@ use std::{ time::Duration, }; +use crate::{ + dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, + peer_connection::PeerConnectionOptions, + read_buf::ReadBuf, + spawn_utils::BlockingSpawner, + torrent_state::{ + ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, + }, + type_aliases::PeerStream, +}; use anyhow::{bail, Context}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use buffers::{ByteBuf, ByteBufT, ByteString}; use clone_to_owned::CloneToOwned; use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; -use futures::{stream::FuturesUnordered, TryFutureExt}; +use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, TryFutureExt}; use librqbit_core::{ directories::get_configuration_directory, magnet::Magnet, @@ -32,18 +42,7 @@ use tokio::net::{TcpListener, TcpStream}; use tokio_stream::StreamExt; use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; - -use crate::{ - dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, - peer_connection::PeerConnectionOptions, - read_buf::ReadBuf, - spawn_utils::BlockingSpawner, - torrent_state::{ - ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, - }, - tracker_comms::TrackerComms, - type_aliases::PeerStream, -}; +use tracker_comms::TrackerComms; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; @@ -298,6 +297,7 @@ pub enum AddTorrent<'a> { impl<'a> AddTorrent<'a> { // Don't call this from HTTP API. + #[inline(never)] pub fn from_cli_argument(path: &'a str) -> anyhow::Result { if SUPPORTED_SCHEMES.iter().any(|s| path.starts_with(s)) { return Ok(Self::Url(Cow::Borrowed(path))); @@ -314,6 +314,7 @@ impl<'a> AddTorrent<'a> { } // Don't call this from HTTP API. + #[inline(never)] pub fn from_local_filename(filename: &str) -> anyhow::Result { let file = read_local_file_including_stdin(filename) .with_context(|| format!("error reading local file {filename:?}"))?; @@ -378,8 +379,9 @@ pub(crate) struct CheckedIncomingConnection { impl Session { /// Create a new session. The passed in folder will be used as a default unless overriden per torrent. - pub async fn new(output_folder: PathBuf) -> anyhow::Result> { - Self::new_with_opts(output_folder, SessionOptions::default()).await + #[inline(never)] + pub fn new(output_folder: PathBuf) -> BoxFuture<'static, anyhow::Result>> { + Self::new_with_opts(output_folder, SessionOptions::default()) } pub fn default_persistence_filename() -> anyhow::Result { @@ -392,93 +394,97 @@ impl Session { } /// Create a new session with options. - pub async fn new_with_opts( + #[inline(never)] + pub fn new_with_opts( output_folder: PathBuf, mut opts: SessionOptions, - ) -> anyhow::Result> { - let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); - let token = CancellationToken::new(); + ) -> BoxFuture<'static, anyhow::Result>> { + async move { + let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); + let token = CancellationToken::new(); - let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range { - let (l, p) = create_tcp_listener(port_range) - .await - .context("error listening on TCP")?; - info!("Listening on 0.0.0.0:{p} for incoming peer connections"); - (Some(l), Some(p)) - } else { - (None, None) - }; - - let dht = if opts.disable_dht { - None - } else { - let dht = if opts.disable_dht_persistence { - DhtBuilder::with_config(DhtConfig { - cancellation_token: Some(token.child_token()), - ..Default::default() - }) - .await - .context("error initializing DHT")? - } else { - let pdht_config = opts.dht_config.take().unwrap_or_default(); - PersistentDht::create(Some(pdht_config), Some(token.clone())) + let (tcp_listener, tcp_listen_port) = if let Some(port_range) = opts.listen_port_range { + let (l, p) = create_tcp_listener(port_range) .await - .context("error initializing persistent DHT")? + .context("error listening on TCP")?; + info!("Listening on 0.0.0.0:{p} for incoming peer connections"); + (Some(l), Some(p)) + } else { + (None, None) }; - Some(dht) - }; - let peer_opts = opts.peer_opts.unwrap_or_default(); - let persistence_filename = match opts.persistence_filename { - Some(filename) => filename, - None => Self::default_persistence_filename()?, - }; - let spawner = BlockingSpawner::default(); + let dht = if opts.disable_dht { + None + } else { + let dht = if opts.disable_dht_persistence { + DhtBuilder::with_config(DhtConfig { + cancellation_token: Some(token.child_token()), + ..Default::default() + }) + .await + .context("error initializing DHT")? + } else { + let pdht_config = opts.dht_config.take().unwrap_or_default(); + PersistentDht::create(Some(pdht_config), Some(token.clone())) + .await + .context("error initializing persistent DHT")? + }; - let session = Arc::new(Self { - persistence_filename, - peer_id, - dht, - peer_opts, - spawner, - output_folder, - db: RwLock::new(Default::default()), - _cancellation_token_drop_guard: token.clone().drop_guard(), - cancellation_token: token, - tcp_listen_port, - }); + Some(dht) + }; + let peer_opts = opts.peer_opts.unwrap_or_default(); + let persistence_filename = match opts.persistence_filename { + Some(filename) => filename, + None => Self::default_persistence_filename()?, + }; + let spawner = BlockingSpawner::default(); - if let Some(tcp_listener) = tcp_listener { - session.spawn( - error_span!("tcp_listen", port = tcp_listen_port), - session.clone().task_tcp_listener(tcp_listener), - ); - } + let session = Arc::new(Self { + persistence_filename, + peer_id, + dht, + peer_opts, + spawner, + output_folder, + db: RwLock::new(Default::default()), + _cancellation_token_drop_guard: token.clone().drop_guard(), + cancellation_token: token, + tcp_listen_port, + }); - if let Some(listen_port) = tcp_listen_port { - if opts.enable_upnp_port_forwarding { + if let Some(tcp_listener) = tcp_listener { session.spawn( - error_span!("upnp_forward", port = listen_port), - session.clone().task_upnp_port_forwarder(listen_port), + error_span!("tcp_listen", port = tcp_listen_port), + session.clone().task_tcp_listener(tcp_listener), ); } - } - if opts.persistence { - info!( - "will use {:?} for session persistence", - session.persistence_filename - ); - if let Some(parent) = session.persistence_filename.parent() { - std::fs::create_dir_all(parent).with_context(|| { - format!("couldn't create directory {:?} for session storage", parent) - })?; + if let Some(listen_port) = tcp_listen_port { + if opts.enable_upnp_port_forwarding { + session.spawn( + error_span!("upnp_forward", port = listen_port), + session.clone().task_upnp_port_forwarder(listen_port), + ); + } } - let persistence_task = session.clone().task_persistence(); - session.spawn(error_span!("session_persistence"), persistence_task); - } - Ok(session) + if opts.persistence { + info!( + "will use {:?} for session persistence", + session.persistence_filename + ); + if let Some(parent) = session.persistence_filename.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!("couldn't create directory {:?} for session storage", parent) + })?; + } + let persistence_task = session.clone().task_persistence(); + session.spawn(error_span!("session_persistence"), persistence_task); + } + + Ok(session) + } + .boxed() } async fn task_persistence(self: Arc) -> anyhow::Result<()> { @@ -739,149 +745,140 @@ impl Session { } /// Add a torrent to the session. - pub async fn add_torrent( - &self, - add: AddTorrent<'_>, + #[inline(never)] + pub fn add_torrent<'a>( + &'a self, + add: AddTorrent<'a>, opts: Option, - ) -> anyhow::Result { - // Magnet links are different in that we first need to discover the metadata. - let span = error_span!("add_torrent"); - let _ = span.enter(); + ) -> BoxFuture<'a, anyhow::Result> { + async move { + // Magnet links are different in that we first need to discover the metadata. + let span = error_span!("add_torrent"); + let _ = span.enter(); - let opts = opts.unwrap_or_default(); + let opts = opts.unwrap_or_default(); - let announce_port = if opts.list_only { - None - } else { - self.tcp_listen_port - }; + let announce_port = if opts.list_only { + None + } else { + self.tcp_listen_port + }; - let cancellation_token = self.cancellation_token.child_token(); - let cancellation_token_drop_guard = cancellation_token.clone().drop_guard(); - let paused = opts.list_only || opts.paused; + let paused = opts.list_only || opts.paused; - // The main difference between magnet link and torrent file, is that we need to resolve the magnet link - // into a torrent file by connecting to peers that support extended handshakes. - // So we must discover at least one peer and connect to it to be able to proceed further. + // The main difference between magnet link and torrent file, is that we need to resolve the magnet link + // into a torrent file by connecting to peers that support extended handshakes. + // So we must discover at least one peer and connect to it to be able to proceed further. - let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add { - AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { - let magnet = - Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; - let info_hash = magnet - .as_id20() - .context("magnet link didn't contain a BTv1 infohash")?; + let (info_hash, info, trackers, peer_rx, initial_peers) = match add { + AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { + let magnet = Magnet::parse(&magnet) + .context("provided path is not a valid magnet URL")?; + let info_hash = magnet + .as_id20() + .context("magnet link didn't contain a BTv1 infohash")?; - let peer_token = cancellation_token.child_token(); - let peer_rx = self.make_peer_rx( - info_hash, - magnet.trackers.clone(), - peer_token.clone(), - announce_port, - opts.force_tracker_interval, - )?; - let peer_rx = match peer_rx { - Some(peer_rx) => peer_rx, - None => bail!("can't find peers: DHT disabled and no trackers in magnet"), - }; - - debug!(?info_hash, "querying DHT"); - let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( - self.peer_id, - info_hash, - opts.initial_peers.clone().unwrap_or_default(), - peer_rx, - Some(self.merge_peer_opts(opts.peer_opts)), - ) - .await - { - ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), - ReadMetainfoResult::ChannelClosed { .. } => { - anyhow::bail!("DHT died, no way to discover torrent metainfo") - } - }; - if paused { - peer_token.cancel(); - } - debug!(?info, "received result from DHT"); - ( - info_hash, - info, - magnet.trackers, - Some(peer_rx), - initial_peers, - cancellation_token, - ) - } - other => { - let torrent = match other { - AddTorrent::Url(url) - if url.starts_with("http://") || url.starts_with("https://") => - { - torrent_from_url(&url).await? - } - AddTorrent::Url(url) => { - bail!( - "unsupported URL {:?}. Supporting magnet:, http:, and https", - url - ) - } - AddTorrent::TorrentFileBytes(bytes) => { - torrent_from_bytes(&bytes).context("error decoding torrent")? - } - AddTorrent::TorrentInfo(t) => *t, - }; - - let trackers = torrent - .iter_announce() - .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { - Ok(url) => Some(url.to_owned()), - Err(_) => { - warn!("cannot parse tracker url as utf-8, ignoring"); - None - } - }) - .collect::>(); - - let peer_rx = if paused { - None - } else { - self.make_peer_rx( - torrent.info_hash, - trackers.clone(), - cancellation_token.clone(), + let peer_rx = self.make_peer_rx( + info_hash, + magnet.trackers.clone(), announce_port, opts.force_tracker_interval, - )? - }; + )?; + let peer_rx = match peer_rx { + Some(peer_rx) => peer_rx, + None => bail!("can't find peers: DHT disabled and no trackers in magnet"), + }; - ( - torrent.info_hash, - torrent.info, - trackers, - peer_rx, - opts.initial_peers - .clone() - .unwrap_or_default() - .into_iter() - .collect(), - cancellation_token, - ) - } - }; + debug!(?info_hash, "querying DHT"); + let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( + self.peer_id, + info_hash, + opts.initial_peers.clone().unwrap_or_default(), + peer_rx, + Some(self.merge_peer_opts(opts.peer_opts)), + ) + .await + { + ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), + ReadMetainfoResult::ChannelClosed { .. } => { + anyhow::bail!("DHT died, no way to discover torrent metainfo") + } + }; + debug!(?info, "received result from DHT"); + ( + info_hash, + info, + magnet.trackers, + Some(peer_rx), + initial_peers, + ) + } + other => { + let torrent = match other { + AddTorrent::Url(url) + if url.starts_with("http://") || url.starts_with("https://") => + { + torrent_from_url(&url).await? + } + AddTorrent::Url(url) => { + bail!( + "unsupported URL {:?}. Supporting magnet:, http:, and https", + url + ) + } + AddTorrent::TorrentFileBytes(bytes) => { + torrent_from_bytes(&bytes).context("error decoding torrent")? + } + AddTorrent::TorrentInfo(t) => *t, + }; - cancellation_token_drop_guard.disarm(); + let trackers = torrent + .iter_announce() + .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => Some(url.to_owned()), + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + None + } + }) + .collect::>(); - self.main_torrent_info( - info_hash, - info, - trackers, - peer_rx, - initial_peers.into_iter().collect(), - opts, - cancellation_token, - ) - .await + let peer_rx = if paused { + None + } else { + self.make_peer_rx( + torrent.info_hash, + trackers.clone(), + announce_port, + opts.force_tracker_interval, + )? + }; + + ( + torrent.info_hash, + torrent.info, + trackers, + peer_rx, + opts.initial_peers + .clone() + .unwrap_or_default() + .into_iter() + .collect(), + ) + } + }; + + self.main_torrent_info( + info_hash, + info, + trackers, + peer_rx, + initial_peers.into_iter().collect(), + opts, + ) + .await + } + .boxed() } #[allow(clippy::too_many_arguments)] @@ -893,12 +890,9 @@ impl Session { peer_rx: Option, initial_peers: Vec, opts: AddTorrentOptions, - cancellation_token: CancellationToken, ) -> anyhow::Result { debug!("Torrent info: {:#?}", &info); - let drop_guard = cancellation_token.clone().drop_guard(); - let get_only_files = |only_files: Option>, only_files_regex: Option, list_only: bool| { match (only_files, only_files_regex) { @@ -1016,20 +1010,16 @@ impl Session { let span = managed_torrent.info.span.clone(); let _ = span.enter(); - // Just in case, cancel all tasks started for this torrent so far. - // This is defensive, and not proven necessary. - let token = if opts.paused { - cancellation_token.cancel(); - self.cancellation_token.child_token() - } else { - cancellation_token - }; managed_torrent - .start(initial_peers, peer_rx, opts.paused, token) + .start( + initial_peers, + peer_rx, + opts.paused, + self.cancellation_token.child_token(), + ) .context("error starting torrent")?; } - drop_guard.disarm(); Ok(AddTorrentResponse::Added(id, managed_torrent)) } @@ -1080,7 +1070,6 @@ impl Session { &self, info_hash: Id20, trackers: Vec, - cancel: CancellationToken, announce_port: Option, force_tracker_interval: Option, ) -> anyhow::Result> { @@ -1097,7 +1086,6 @@ impl Session { // TODO: report actual bytes, not zeroes. Box::new(()), force_tracker_interval, - cancel, announce_port, ); @@ -1111,15 +1099,18 @@ impl Session { } pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { - let token = self.cancellation_token.child_token(); let peer_rx = self.make_peer_rx( handle.info_hash(), handle.info().trackers.clone().into_iter().collect(), - token.clone(), self.tcp_listen_port, handle.info().options.force_tracker_interval, )?; - handle.start(Default::default(), peer_rx, false, token)?; + handle.start( + Default::default(), + peer_rx, + false, + self.cancellation_token.child_token(), + )?; Ok(()) } } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 90bfd8e..a3fe61f 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -15,6 +15,8 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; use buffers::ByteString; +use futures::future::BoxFuture; +use futures::FutureExt; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::Lengths; use librqbit_core::peer_id::generate_peer_id; @@ -395,23 +397,29 @@ impl ManagedTorrent { }) } - pub async fn wait_until_completed(&self) -> anyhow::Result<()> { - // TODO: rewrite, this polling is horrible - let live = loop { - let live = self.with_state(|s| match s { - ManagedTorrentState::Initializing(_) | ManagedTorrentState::Paused(_) => Ok(None), - ManagedTorrentState::Live(l) => Ok(Some(l.clone())), - ManagedTorrentState::Error(e) => bail!("{:?}", e), - ManagedTorrentState::None => bail!("bug: torrent state is None"), - })?; - if let Some(live) = live { - break live; - } - tokio::time::sleep(Duration::from_secs(1)).await; - }; + #[inline(never)] + pub fn wait_until_completed(&self) -> BoxFuture<'_, anyhow::Result<()>> { + async move { + // TODO: rewrite, this polling is horrible + let live = loop { + let live = self.with_state(|s| match s { + ManagedTorrentState::Initializing(_) | ManagedTorrentState::Paused(_) => { + Ok(None) + } + ManagedTorrentState::Live(l) => Ok(Some(l.clone())), + ManagedTorrentState::Error(e) => bail!("{:?}", e), + ManagedTorrentState::None => bail!("bug: torrent state is None"), + })?; + if let Some(live) = live { + break live; + } + tokio::time::sleep(Duration::from_secs(1)).await; + }; - live.wait_until_completed().await; - Ok(()) + live.wait_until_completed().await; + Ok(()) + } + .boxed() } } diff --git a/crates/librqbit/src/tracing_subscriber_config_utils.rs b/crates/librqbit/src/tracing_subscriber_config_utils.rs index 7cac1a2..b719a30 100644 --- a/crates/librqbit/src/tracing_subscriber_config_utils.rs +++ b/crates/librqbit/src/tracing_subscriber_config_utils.rs @@ -60,6 +60,7 @@ pub struct InitLoggingResult { pub line_broadcast: LineBroadcast, } +#[inline(never)] pub fn init_logging(opts: InitLoggingOptions) -> anyhow::Result { let stderr_filter = EnvFilter::builder() .with_default_directive( diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs index 2b6efa9..d68f5bc 100644 --- a/crates/librqbit/src/type_aliases.rs +++ b/crates/librqbit/src/type_aliases.rs @@ -5,4 +5,4 @@ use futures::Stream; pub type BF = bitvec::vec::BitVec; pub type PeerHandle = SocketAddr; -pub type PeerStream = Box + Unpin + Send + Sync + 'static>; +pub type PeerStream = Box + Unpin + Send + 'static>; diff --git a/crates/librqbit_core/src/torrent_metainfo.rs b/crates/librqbit_core/src/torrent_metainfo.rs index c89230e..e60fcee 100644 --- a/crates/librqbit_core/src/torrent_metainfo.rs +++ b/crates/librqbit_core/src/torrent_metainfo.rs @@ -159,6 +159,7 @@ impl> TorrentMetaV1Info { Some(expected_hash == hash) } + #[inline(never)] pub fn iter_filenames_and_lengths( &self, ) -> anyhow::Result, u64)>> { diff --git a/crates/tracker_comms/Cargo.toml b/crates/tracker_comms/Cargo.toml new file mode 100644 index 0000000..98329ec --- /dev/null +++ b/crates/tracker_comms/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "librqbit-tracker-comms" +version = "1.0.0" +edition = "2018" +description = "Common interface around various sha1 implementations used in rqbit torrent client." +license = "Apache-2.0" +documentation = "https://docs.rs/librqbit-tracker-comms" +repository = "https://github.com/ikatson/rqbit" +readme = "README.md" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = "1" +anyhow = "1" +futures = "0.3" +async-stream = "0.3.5" +buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"} +librqbit-core = {path = "../librqbit_core", version = "3.5.0"} +byteorder = "1.5" +serde = {version = "1", features=["derive"]} +urlencoding = "2" +rand = "0.8" +tracing = "0.1.40" +reqwest = {version="0.11.22", default-features=false, features = ["json"]} +bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"} +url = "2" \ No newline at end of file diff --git a/crates/tracker_comms/resources/test/udp-tracker-announce-response.bin b/crates/tracker_comms/resources/test/udp-tracker-announce-response.bin new file mode 100644 index 0000000..4b1bc3a Binary files /dev/null and b/crates/tracker_comms/resources/test/udp-tracker-announce-response.bin differ diff --git a/crates/tracker_comms/src/lib.rs b/crates/tracker_comms/src/lib.rs new file mode 100644 index 0000000..74cc980 --- /dev/null +++ b/crates/tracker_comms/src/lib.rs @@ -0,0 +1,5 @@ +mod tracker_comms; +mod tracker_comms_http; +mod tracker_comms_udp; + +pub use tracker_comms::*; diff --git a/crates/librqbit/src/tracker_comms.rs b/crates/tracker_comms/src/tracker_comms.rs similarity index 67% rename from crates/librqbit/src/tracker_comms.rs rename to crates/tracker_comms/src/tracker_comms.rs index d1a7813..3816131 100644 --- a/crates/librqbit/src/tracker_comms.rs +++ b/crates/tracker_comms/src/tracker_comms.rs @@ -4,13 +4,15 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; -use futures::Stream; -use librqbit_core::spawn_utils::spawn_with_cancel; -use tokio_util::sync::CancellationToken; +use futures::future::Either; +use futures::stream::BoxStream; +use futures::stream::FuturesUnordered; +use futures::FutureExt; +use futures::StreamExt; use tracing::debug; use tracing::error_span; -use tracing::info; use tracing::trace; +use tracing::Instrument; use url::Url; use crate::tracker_comms_http; @@ -22,7 +24,6 @@ pub struct TrackerComms { peer_id: Id20, stats: Box, force_tracker_interval: Option, - cancellation_token: CancellationToken, tx: Sender, tcp_listen_port: Option, } @@ -57,6 +58,11 @@ impl TorrentStatsProvider for () { type Sender = tokio::sync::mpsc::Sender; +enum SupportedTracker { + Udp(Url), + Http(Url), +} + impl TrackerComms { pub fn start( info_hash: Id20, @@ -64,69 +70,94 @@ impl TrackerComms { trackers: Vec, stats: Box, force_interval: Option, - cancellation_token: CancellationToken, tcp_listen_port: Option, - ) -> Option + Send + Sync + Unpin + 'static> { - let (tx, rx) = tokio::sync::mpsc::channel::(16); - let comms = Arc::new(Self { - info_hash, - peer_id, - stats, - force_tracker_interval: force_interval, - cancellation_token, - tx, - tcp_listen_port, - }); - let mut added = false; - for tracker in trackers { - if let Err(e) = comms.clone().add_tracker(&tracker) { - info!(tracker = tracker, "error adding tracker: {:#}", e) - } else { - added = true; - } - } - if !added { + ) -> Option> { + let trackers = trackers + .into_iter() + .filter_map(|t| match Url::parse(&t) { + Ok(parsed) => match parsed.scheme() { + "http" | "https" => Some(SupportedTracker::Http(parsed)), + "udp" => Some(SupportedTracker::Udp(parsed)), + _ => { + debug!("unsuppoted tracker URL: {}", t); + None + } + }, + Err(e) => { + debug!("error parsing tracker URL {}: {}", t, e); + None + } + }) + .collect::>(); + if trackers.is_empty() { return None; } - Some(tokio_stream::wrappers::ReceiverStream::new(rx)) + + let (tx, mut rx) = tokio::sync::mpsc::channel::(16); + + let s = async_stream::stream! { + use futures::StreamExt; + let comms = Arc::new(Self { + info_hash, + peer_id, + stats, + force_tracker_interval: force_interval, + tx, + tcp_listen_port, + }); + let mut futures = FuturesUnordered::new(); + for tracker in trackers { + futures.push(comms.add_tracker(tracker)) + } + while !(futures.is_empty()) { + tokio::select! { + addr = rx.recv() => { + if let Some(addr) = addr { + yield addr; + } + } + e = futures.next(), if !futures.is_empty() => { + if let Some(Err(e)) = e { + debug!("error: {e}"); + } + } + } + } + }; + + Some(s.boxed()) } - fn add_tracker(self: Arc, tracker: &str) -> anyhow::Result<()> { - if tracker.starts_with("http://") || tracker.starts_with("https://") { - spawn_with_cancel( - error_span!( + fn add_tracker( + &self, + url: SupportedTracker, + ) -> Either< + impl std::future::Future> + '_ + Send, + impl std::future::Future> + '_ + Send, + > { + let info_hash = self.info_hash; + match url { + SupportedTracker::Udp(url) => { + let span = error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash); + self.task_single_tracker_monitor_udp(url) + .instrument(span) + .right_future() + } + SupportedTracker::Http(url) => { + let span = error_span!( parent: None, "http_tracker", - tracker = tracker, - info_hash = ?self.info_hash - ), - self.cancellation_token.clone(), - { - let comms = self; - let url = Url::parse(tracker).context("can't parse URL")?; - async move { comms.task_single_tracker_monitor_http(url).await } - }, - ); - } else if tracker.starts_with("udp://") { - spawn_with_cancel( - error_span!(parent: None, "udp_tracker", tracker = tracker, info_hash = ?self.info_hash), - self.cancellation_token.clone(), - { - let comms = self; - let url = Url::parse(tracker).context("can't parse URL")?; - async move { comms.task_single_tracker_monitor_udp(url).await } - }, - ); - } else { - bail!("unsupported tracker url {}", tracker) + tracker = %url, + info_hash = ?info_hash + ); + self.task_single_tracker_monitor_http(url) + .instrument(span) + .left_future() + } } - Ok(()) } - async fn task_single_tracker_monitor_http( - self: Arc, - mut tracker_url: Url, - ) -> anyhow::Result<()> { + async fn task_single_tracker_monitor_http(&self, mut tracker_url: Url) -> anyhow::Result<()> { let mut event = Some(tracker_comms_http::TrackerRequestEvent::Started); loop { let stats = self.stats.get(); diff --git a/crates/librqbit/src/tracker_comms_http.rs b/crates/tracker_comms/src/tracker_comms_http.rs similarity index 100% rename from crates/librqbit/src/tracker_comms_http.rs rename to crates/tracker_comms/src/tracker_comms_http.rs diff --git a/crates/librqbit/src/tracker_comms_udp.rs b/crates/tracker_comms/src/tracker_comms_udp.rs similarity index 97% rename from crates/librqbit/src/tracker_comms_udp.rs rename to crates/tracker_comms/src/tracker_comms_udp.rs index 1e72ae8..35af733 100644 --- a/crates/librqbit/src/tracker_comms_udp.rs +++ b/crates/tracker_comms/src/tracker_comms_udp.rs @@ -12,9 +12,9 @@ const ACTION_ANNOUNCE: u32 = 1; // const ACTION_ERROR: u32 = 3; pub const EVENT_NONE: u32 = 0; -pub const EVENT_COMPLETED: u32 = 1; -pub const EVENT_STARTED: u32 = 2; -pub const EVENT_STOPPED: u32 = 3; +// pub const EVENT_COMPLETED: u32 = 1; +// pub const EVENT_STARTED: u32 = 2; +// pub const EVENT_STOPPED: u32 = 3; pub type ConnectionId = u64; const CONNECTION_ID_MAGIC: ConnectionId = 0x41727101980; @@ -293,7 +293,7 @@ mod tests { Response::Connect(connection_id) => { dbg!(connection_id) } - other => panic!("unexpected response {other:?}"), + other => panic!("unexpected response {:?}", other), }; let hash = Id20::from_str("775459190aa65566591634203f8d9f17d341f969").unwrap(); @@ -335,7 +335,7 @@ mod tests { Response::Announce(r) => { dbg!(r); } - other => panic!("unexpected response {other:?}"), + other => panic!("unexpected response {:?}", other), } } }