From 3f1ad390be403fb219e1ddaca377cdf7871a53ec Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 18 Aug 2024 15:01:12 +0100 Subject: [PATCH 1/3] Add session parent spans possibility --- crates/librqbit/src/session.rs | 26 ++++++++++++++++++-------- crates/rqbit/src/main.rs | 1 + 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 767b818..ff26c44 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -54,7 +54,7 @@ use serde::{Deserialize, Serialize}; use tokio::net::{TcpListener, TcpStream}; use tokio_stream::StreamExt; use tokio_util::sync::{CancellationToken, DropGuard}; -use tracing::{debug, error, error_span, info, trace, warn, Instrument}; +use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span}; use tracker_comms::TrackerComms; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; @@ -114,6 +114,8 @@ pub struct Session { concurrent_initialize_semaphore: Arc, + root_span: Option, + // This is stored for all tasks to stop when session is dropped. _cancellation_token_drop_guard: DropGuard, } @@ -380,6 +382,9 @@ pub struct SessionOptions { // how many concurrent torrent initializations can happen pub concurrent_init_limit: Option, + + // the root span to use. If not set will be None. + pub root_span: Option, } async fn create_tcp_listener( @@ -567,11 +572,12 @@ impl Session { default_storage_factory: opts.default_storage_factory, reqwest_client, connector: stream_connector, + root_span: opts.root_span, concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) }); if let Some(mut disk_write_rx) = disk_write_rx { - session.spawn(error_span!("disk_writer"), async move { + session.spawn(error_span!(parent: session.rs(), "disk_writer"), async move { while let Some(work) = disk_write_rx.recv().await { trace!(disk_write_rx_queue_len = disk_write_rx.len()); spawner.spawn_block_in_place(work); @@ -582,7 +588,7 @@ impl Session { if let Some(tcp_listener) = tcp_listener { session.spawn( - error_span!("tcp_listen", port = tcp_listen_port), + error_span!(parent: session.rs(), "tcp_listen", port = tcp_listen_port), session.clone().task_tcp_listener(tcp_listener), ); } @@ -590,7 +596,7 @@ impl Session { if let Some(listen_port) = tcp_listen_port { if opts.enable_upnp_port_forwarding { session.spawn( - error_span!("upnp_forward", port = listen_port), + error_span!(parent: session.rs(), "upnp_forward", port = listen_port), session.clone().task_upnp_port_forwarder(listen_port), ); } @@ -613,7 +619,7 @@ impl Session { st = ps.next(), if !added_all => { if let Some(st) = st { let (id, st) = st?; - let span = error_span!("add_torrent", info_hash=?st.info_hash()); + let span = error_span!(parent: session.rs(), "add_torrent", info_hash=?st.info_hash()); let (add_torrent, mut opts) = st.into_add_torrent()?; opts.preferred_id = Some(id); let fut = session.add_torrent(add_torrent, Some(opts)).instrument(span); @@ -698,7 +704,7 @@ impl Session { debug!("error checking incoming connection: {e:#}"); e }) - .instrument(error_span!("incoming", addr=%addr)) + .instrument(error_span!(parent: self.rs(), "incoming", addr=%addr)) ); } Err(e) => { @@ -750,6 +756,10 @@ impl Session { spawn_with_cancel(span, self.cancellation_token.clone(), fut); } + fn rs(&self) -> Option { + self.root_span.as_ref().and_then(|s| s.id()) + } + /// Stop the session and all managed tasks. pub async fn stop(&self) { let torrents = self @@ -926,7 +936,7 @@ impl Session { self.main_torrent_info(add_res, opts).await } - .instrument(error_span!("add_torrent")) + .instrument(error_span!(parent: self.rs(), "add_torrent")) .boxed() } @@ -1067,7 +1077,7 @@ impl Session { }) { return Ok(AddTorrentResponse::AlreadyManaged(id, handle)); } - let managed_torrent = builder.build(error_span!(parent: None, "torrent", id))?; + let managed_torrent = builder.build(error_span!(parent: self.rs(), "torrent", id))?; g.add_torrent(managed_torrent.clone(), id); managed_torrent }; diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 870a056..02fcb90 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -340,6 +340,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { }), socks_proxy_url: socks_url, concurrent_init_limit: Some(opts.concurrent_init_limit), + root_span: None, }; let stats_printer = |session: Arc| async move { From 18624d5bd807d8aab01067b4b221a1d2c39fd047 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 18 Aug 2024 16:20:26 +0100 Subject: [PATCH 2/3] More spans --- crates/librqbit/src/tests/e2e.rs | 4 +++- crates/peer_binary_protocol/src/extended/handshake.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 3370ecd..abb598f 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -73,6 +73,7 @@ async fn test_e2e_download() { enable_upnp_port_forwarding: false, default_storage_factory: None, defer_writes_up_to: None, + root_span: Some(error_span!(parent: None, "server", id = i)), ..Default::default() }, ) @@ -121,7 +122,7 @@ async fn test_e2e_download() { session.tcp_listen_port().unwrap(), )) } - .instrument(error_span!("server", server = i)), + .instrument(error_span!("server", id = i)), ); futs.push(timeout(Duration::from_secs(30), rx)); } @@ -152,6 +153,7 @@ async fn test_e2e_download() { persistence: None, listen_port_range: None, enable_upnp_port_forwarding: false, + root_span: Some(error_span!("client")), ..Default::default() }, ) diff --git a/crates/peer_binary_protocol/src/extended/handshake.rs b/crates/peer_binary_protocol/src/extended/handshake.rs index e3cd460..2a438bc 100644 --- a/crates/peer_binary_protocol/src/extended/handshake.rs +++ b/crates/peer_binary_protocol/src/extended/handshake.rs @@ -52,7 +52,7 @@ where ByteBuf: Eq + std::hash::Hash + std::borrow::Borrow<[u8]>, { fn get_msgid(&self, msg_type: &'a [u8]) -> Option { - self.m.get(msg_type).map(|v| *v) + self.m.get(msg_type).copied() } pub fn ut_metadata(&self) -> Option { From 76e5044d33b43f5ceccb2cf10db63fbd60e4c81b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 18 Aug 2024 16:37:59 +0100 Subject: [PATCH 3/3] fix test ports --- crates/librqbit/src/tests/e2e.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index abb598f..4bc8e6d 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -60,6 +60,9 @@ async fn test_e2e_download() { max_random_sleep_ms: rand::thread_rng().gen_range(0u8..16), } .as_peer_id(); + let listen_range_start = 15100u16 + i as u16; + let listen_range_end = listen_range_start + 1; + let listen_range = listen_range_start..listen_range_end; let session = crate::Session::new_with_opts( std::env::temp_dir().join("does_not_exist"), SessionOptions { @@ -69,7 +72,7 @@ async fn test_e2e_download() { persistence: None, peer_id: Some(peer_id), peer_opts: None, - listen_port_range: Some(15100..17000), + listen_port_range: Some(listen_range), enable_upnp_port_forwarding: false, default_storage_factory: None, defer_writes_up_to: None,