Add session parent spans possibility

This commit is contained in:
Igor Katson 2024-08-18 15:01:12 +01:00
parent 75c1127f37
commit 3f1ad390be
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 19 additions and 8 deletions

View file

@ -54,7 +54,7 @@ use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tokio_util::sync::{CancellationToken, DropGuard}; use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span};
use tracker_comms::TrackerComms; use tracker_comms::TrackerComms;
pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
@ -114,6 +114,8 @@ pub struct Session {
concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>, concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>,
root_span: Option<Span>,
// This is stored for all tasks to stop when session is dropped. // This is stored for all tasks to stop when session is dropped.
_cancellation_token_drop_guard: DropGuard, _cancellation_token_drop_guard: DropGuard,
} }
@ -380,6 +382,9 @@ pub struct SessionOptions {
// how many concurrent torrent initializations can happen // how many concurrent torrent initializations can happen
pub concurrent_init_limit: Option<usize>, pub concurrent_init_limit: Option<usize>,
// the root span to use. If not set will be None.
pub root_span: Option<Span>,
} }
async fn create_tcp_listener( async fn create_tcp_listener(
@ -567,11 +572,12 @@ impl Session {
default_storage_factory: opts.default_storage_factory, default_storage_factory: opts.default_storage_factory,
reqwest_client, reqwest_client,
connector: stream_connector, connector: stream_connector,
root_span: opts.root_span,
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3))) concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3)))
}); });
if let Some(mut disk_write_rx) = disk_write_rx { if let Some(mut disk_write_rx) = disk_write_rx {
session.spawn(error_span!("disk_writer"), async move { session.spawn(error_span!(parent: session.rs(), "disk_writer"), async move {
while let Some(work) = disk_write_rx.recv().await { while let Some(work) = disk_write_rx.recv().await {
trace!(disk_write_rx_queue_len = disk_write_rx.len()); trace!(disk_write_rx_queue_len = disk_write_rx.len());
spawner.spawn_block_in_place(work); spawner.spawn_block_in_place(work);
@ -582,7 +588,7 @@ impl Session {
if let Some(tcp_listener) = tcp_listener { if let Some(tcp_listener) = tcp_listener {
session.spawn( session.spawn(
error_span!("tcp_listen", port = tcp_listen_port), error_span!(parent: session.rs(), "tcp_listen", port = tcp_listen_port),
session.clone().task_tcp_listener(tcp_listener), session.clone().task_tcp_listener(tcp_listener),
); );
} }
@ -590,7 +596,7 @@ impl Session {
if let Some(listen_port) = tcp_listen_port { if let Some(listen_port) = tcp_listen_port {
if opts.enable_upnp_port_forwarding { if opts.enable_upnp_port_forwarding {
session.spawn( session.spawn(
error_span!("upnp_forward", port = listen_port), error_span!(parent: session.rs(), "upnp_forward", port = listen_port),
session.clone().task_upnp_port_forwarder(listen_port), session.clone().task_upnp_port_forwarder(listen_port),
); );
} }
@ -613,7 +619,7 @@ impl Session {
st = ps.next(), if !added_all => { st = ps.next(), if !added_all => {
if let Some(st) = st { if let Some(st) = st {
let (id, st) = st?; let (id, st) = st?;
let span = error_span!("add_torrent", info_hash=?st.info_hash()); let span = error_span!(parent: session.rs(), "add_torrent", info_hash=?st.info_hash());
let (add_torrent, mut opts) = st.into_add_torrent()?; let (add_torrent, mut opts) = st.into_add_torrent()?;
opts.preferred_id = Some(id); opts.preferred_id = Some(id);
let fut = session.add_torrent(add_torrent, Some(opts)).instrument(span); let fut = session.add_torrent(add_torrent, Some(opts)).instrument(span);
@ -698,7 +704,7 @@ impl Session {
debug!("error checking incoming connection: {e:#}"); debug!("error checking incoming connection: {e:#}");
e e
}) })
.instrument(error_span!("incoming", addr=%addr)) .instrument(error_span!(parent: self.rs(), "incoming", addr=%addr))
); );
} }
Err(e) => { Err(e) => {
@ -750,6 +756,10 @@ impl Session {
spawn_with_cancel(span, self.cancellation_token.clone(), fut); spawn_with_cancel(span, self.cancellation_token.clone(), fut);
} }
fn rs(&self) -> Option<tracing::Id> {
self.root_span.as_ref().and_then(|s| s.id())
}
/// Stop the session and all managed tasks. /// Stop the session and all managed tasks.
pub async fn stop(&self) { pub async fn stop(&self) {
let torrents = self let torrents = self
@ -926,7 +936,7 @@ impl Session {
self.main_torrent_info(add_res, opts).await self.main_torrent_info(add_res, opts).await
} }
.instrument(error_span!("add_torrent")) .instrument(error_span!(parent: self.rs(), "add_torrent"))
.boxed() .boxed()
} }
@ -1067,7 +1077,7 @@ impl Session {
}) { }) {
return Ok(AddTorrentResponse::AlreadyManaged(id, handle)); return Ok(AddTorrentResponse::AlreadyManaged(id, handle));
} }
let managed_torrent = builder.build(error_span!(parent: None, "torrent", id))?; let managed_torrent = builder.build(error_span!(parent: self.rs(), "torrent", id))?;
g.add_torrent(managed_torrent.clone(), id); g.add_torrent(managed_torrent.clone(), id);
managed_torrent managed_torrent
}; };

View file

@ -340,6 +340,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
}), }),
socks_proxy_url: socks_url, socks_proxy_url: socks_url,
concurrent_init_limit: Some(opts.concurrent_init_limit), concurrent_init_limit: Some(opts.concurrent_init_limit),
root_span: None,
}; };
let stats_printer = |session: Arc<Session>| async move { let stats_printer = |session: Arc<Session>| async move {