diff --git a/Cargo.lock b/Cargo.lock index 60c88b0..b7dec58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "librqbit" -version = "3.3.0" +version = "4.0.0-beta.0" dependencies = [ "anyhow", "axum", @@ -1066,7 +1066,7 @@ version = "2.2.1" [[package]] name = "librqbit-core" -version = "3.0.0" +version = "3.1.0" dependencies = [ "anyhow", "hex 0.4.3", @@ -1076,13 +1076,15 @@ dependencies = [ "librqbit-clone-to-owned", "parking_lot", "serde", + "tokio", + "tracing", "url", "uuid", ] [[package]] name = "librqbit-dht" -version = "3.1.0" +version = "3.2.0" dependencies = [ "anyhow", "directories", @@ -1105,7 +1107,7 @@ dependencies = [ [[package]] name = "librqbit-peer-protocol" -version = "3.0.0" +version = "3.1.0" dependencies = [ "anyhow", "bincode", @@ -1704,7 +1706,7 @@ dependencies = [ [[package]] name = "rqbit" -version = "3.3.0" +version = "4.0.0-beta.0" dependencies = [ "anyhow", "clap", diff --git a/crates/dht/Cargo.toml b/crates/dht/Cargo.toml index fa69e28..ec5c9d5 100644 --- a/crates/dht/Cargo.toml +++ b/crates/dht/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit-dht" -version = "3.1.0" +version = "3.2.0" edition = "2021" description = "DHT implementation, used in rqbit torrent client." license = "Apache-2.0" @@ -33,7 +33,7 @@ indexmap = "2" directories = "5" clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} -librqbit-core = {path="../librqbit_core", version = "3.0.0"} +librqbit-core = {path="../librqbit_core", version = "3.1.0"} [dev-dependencies] tracing-subscriber = "0.3" \ No newline at end of file diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index bcbc576..f2331e2 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -18,7 +18,7 @@ use bencode::ByteString; use futures::{stream::FuturesUnordered, Stream, StreamExt}; use indexmap::IndexSet; use leaky_bucket::RateLimiter; -use librqbit_core::{id20::Id20, peer_id::generate_peer_id}; +use librqbit_core::{id20::Id20, peer_id::generate_peer_id, spawn_utils::spawn}; use parking_lot::RwLock; use rand::Rng; use serde::Serialize; @@ -27,7 +27,7 @@ use tokio::{ sync::mpsc::{channel, unbounded_channel, Sender, UnboundedReceiver, UnboundedSender}, }; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, debug_span, error_span, info, trace, warn, Instrument}; #[derive(Debug, Serialize)] pub struct DhtStats { @@ -513,7 +513,7 @@ impl DhtWorker { bootstrap_addrs: &[String], ) -> anyhow::Result<()> { let (out_tx, mut out_rx) = channel(1); - let framer = run_framer(&self.socket, in_rx, out_tx); + let framer = run_framer(&self.socket, in_rx, out_tx).instrument(debug_span!("dht_framer")); let bootstrap = async { let mut futs = FuturesUnordered::new(); @@ -521,34 +521,40 @@ impl DhtWorker { for addr in bootstrap_addrs.iter() { let this = &self; let in_tx = &in_tx; - futs.push(async move { - match tokio::net::lookup_host(addr).await { - Ok(addrs) => { - for addr in addrs { - let request = this - .state - .write() - .create_request(Request::FindNode(this.peer_id), addr); - in_tx.send((request, addr))?; + futs.push( + async move { + match tokio::net::lookup_host(addr).await { + Ok(addrs) => { + for addr in addrs { + let request = this + .state + .write() + .create_request(Request::FindNode(this.peer_id), addr); + in_tx.send((request, addr))?; + } + } + Err(e) => { + warn!("error looking up {}: {}", addr, e); + return Err(e.into()); } } - Err(e) => warn!("error looking up {}: {}", addr, e), + Ok::<_, anyhow::Error>(()) } - Ok::<_, anyhow::Error>(()) - }); + .instrument(error_span!("dht_bootstrap", addr = addr)), + ); } let mut successes = 0; while let Some(resp) = futs.next().await { - match resp { - Ok(_) => successes += 1, - Err(e) => warn!("error in one of the bootstrappers: {}", e), + if resp.is_ok() { + successes += 1 } } if successes == 0 { anyhow::bail!("bootstrapping did not succeed") } Ok(()) - }; + } + .instrument(debug_span!("dht_bootstrapper")); let mut bootstrap_done = false; let response_reader = { @@ -563,7 +569,8 @@ impl DhtWorker { "closed response reader, nowhere to send results to, DHT closed" )) } - }; + } + .instrument(debug_span!("dht_responese_reader")); tokio::pin!(framer); tokio::pin!(bootstrap); @@ -676,7 +683,7 @@ impl Dht { listen_addr, ))); - tokio::spawn({ + spawn(error_span!("dht"), { let state = state.clone(); async move { let worker = DhtWorker { @@ -684,8 +691,8 @@ impl Dht { peer_id, state, }; - let result = worker.start(in_tx, in_rx, &bootstrap_addrs).await; - warn!("DHT worker finished with {:?}", result); + worker.start(in_tx, in_rx, &bootstrap_addrs).await?; + Ok(()) } }); Ok(Dht { state }) diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index d22ad6b..a4f091e 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -1,5 +1,6 @@ // TODO: this now stores only the routing table, but we also need AT LEAST the same socket address... +use librqbit_core::spawn_utils::spawn; use serde::{Deserialize, Serialize}; use std::fs::OpenOptions; use std::io::{BufReader, BufWriter}; @@ -8,8 +9,7 @@ use std::path::{Path, PathBuf}; use std::time::Duration; use anyhow::Context; -use tokio::spawn; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, error_span, info, trace, warn}; use crate::dht::{Dht, DhtConfig}; use crate::routing_table::RoutingTable; @@ -110,7 +110,7 @@ impl PersistentDht { }; let dht = Dht::with_config(dht_config).await?; - spawn({ + spawn(error_span!("dht_persistence"), { let dht = dht.clone(); let dump_interval = config .dump_interval diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 01beab9..1104017 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit" -version = "3.3.0" +version = "4.0.0-beta.0" authors = ["Igor Katson "] edition = "2021" description = "The main library used by rqbit torrent client. The binary is just a small wrapper on top of it." @@ -24,11 +24,11 @@ rust-tls = ["reqwest/rustls-tls"] [dependencies] bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"} buffers = {path = "../buffers", package="librqbit-buffers", version = "2.2.1"} -librqbit-core = {path = "../librqbit_core", version = "3.0.0"} +librqbit-core = {path = "../librqbit_core", version = "3.1.0"} clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} -peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.0.0"} +peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.1.0"} sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"} -dht = {path = "../dht", package="librqbit-dht", version="3.1.0"} +dht = {path = "../dht", package="librqbit-dht", version="3.2.0"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} axum = {version = "0.6"} diff --git a/crates/librqbit/src/spawn_utils.rs b/crates/librqbit/src/spawn_utils.rs index 1e404bd..3cb4aff 100644 --- a/crates/librqbit/src/spawn_utils.rs +++ b/crates/librqbit/src/spawn_utils.rs @@ -1,23 +1,9 @@ -use tracing::{debug, trace, warn, Instrument}; - pub fn spawn( _name: &str, span: tracing::Span, fut: impl std::future::Future> + Send + 'static, ) -> tokio::task::JoinHandle<()> { - let fut = async move { - trace!("started"); - match fut.await { - Ok(_) => { - debug!("finished"); - } - Err(e) => { - warn!("finished with error: {:#}", e) - } - } - } - .instrument(span.or_current()); - tokio::task::spawn(fut) + librqbit_core::spawn_utils::spawn(span, fut) } #[derive(Clone, Copy, Debug)] diff --git a/crates/librqbit_core/Cargo.toml b/crates/librqbit_core/Cargo.toml index 73ac147..4d1df54 100644 --- a/crates/librqbit_core/Cargo.toml +++ b/crates/librqbit_core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit-core" -version = "3.0.0" +version = "3.1.0" edition = "2021" description = "Important utilities used throughout librqbit useful for working with torrents." license = "Apache-2.0" @@ -17,6 +17,8 @@ sha1-openssl = ["bencode/sha1-openssl"] sha1-rust = ["bencode/sha1-rust"] [dependencies] +tracing = "0.1.40" +tokio = "1" hex = "0.4" anyhow = "1" url = "2" diff --git a/crates/librqbit_core/src/lib.rs b/crates/librqbit_core/src/lib.rs index 1c4d034..16e42d3 100644 --- a/crates/librqbit_core/src/lib.rs +++ b/crates/librqbit_core/src/lib.rs @@ -3,5 +3,6 @@ pub mod id20; pub mod lengths; pub mod magnet; pub mod peer_id; +pub mod spawn_utils; pub mod speed_estimator; pub mod torrent_metainfo; diff --git a/crates/librqbit_core/src/spawn_utils.rs b/crates/librqbit_core/src/spawn_utils.rs new file mode 100644 index 0000000..81e9b00 --- /dev/null +++ b/crates/librqbit_core/src/spawn_utils.rs @@ -0,0 +1,20 @@ +use tracing::{debug, error, trace, Instrument}; + +pub fn spawn( + span: tracing::Span, + fut: impl std::future::Future> + Send + 'static, +) -> tokio::task::JoinHandle<()> { + let fut = async move { + trace!("started"); + match fut.await { + Ok(_) => { + debug!("finished"); + } + Err(e) => { + error!("finished with error: {:#}", e) + } + } + } + .instrument(span); + tokio::task::spawn(fut) +} diff --git a/crates/peer_binary_protocol/Cargo.toml b/crates/peer_binary_protocol/Cargo.toml index aa0df2e..6da24f1 100644 --- a/crates/peer_binary_protocol/Cargo.toml +++ b/crates/peer_binary_protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "librqbit-peer-protocol" -version = "3.0.0" +version = "3.1.0" edition = "2021" description = "Protocol for working with torrent peers. Used in rqbit torrent client." license = "Apache-2.0" @@ -23,6 +23,6 @@ byteorder = "1" buffers = {path="../buffers", package="librqbit-buffers", version = "2.2.1"} bencode = {path = "../bencode", default-features=false, package="librqbit-bencode", version="2.2.1"} clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} -librqbit-core = {path="../librqbit_core", version = "3.0.0"} +librqbit-core = {path="../librqbit_core", version = "3.1.0"} bitvec = "1" anyhow = "1" \ No newline at end of file diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index e2a1d97..f879940 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rqbit" -version = "3.3.0" +version = "4.0.0-beta.0" authors = ["Igor Katson "] edition = "2021" description = "A bittorrent command line client and server." @@ -23,8 +23,8 @@ default-tls = ["librqbit/default-tls"] rust-tls = ["librqbit/rust-tls"] [dependencies] -librqbit = {path="../librqbit", default-features=false, version = "3.3.0"} -dht = {path="../dht", package="librqbit-dht", version="3.1.0"} +librqbit = {path="../librqbit", default-features=false, version = "4.0.0-beta.0"} +dht = {path="../dht", package="librqbit-dht", version="3.2.0"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} console-subscriber = {version = "0.2", optional = true} anyhow = "1"