Merge pull request #195 from ikatson/root-span
Fix e2e test to break less
This commit is contained in:
commit
2ee8366db0
4 changed files with 27 additions and 11 deletions
|
|
@ -54,7 +54,7 @@ use serde::{Deserialize, Serialize};
|
||||||
use tokio::net::{TcpListener, TcpStream};
|
use tokio::net::{TcpListener, TcpStream};
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tokio_util::sync::{CancellationToken, DropGuard};
|
use tokio_util::sync::{CancellationToken, DropGuard};
|
||||||
use tracing::{debug, error, error_span, info, trace, warn, Instrument};
|
use tracing::{debug, error, error_span, info, trace, warn, Instrument, Span};
|
||||||
use tracker_comms::TrackerComms;
|
use tracker_comms::TrackerComms;
|
||||||
|
|
||||||
pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
|
pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
|
||||||
|
|
@ -114,6 +114,8 @@ pub struct Session {
|
||||||
|
|
||||||
concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>,
|
concurrent_initialize_semaphore: Arc<tokio::sync::Semaphore>,
|
||||||
|
|
||||||
|
root_span: Option<Span>,
|
||||||
|
|
||||||
// This is stored for all tasks to stop when session is dropped.
|
// This is stored for all tasks to stop when session is dropped.
|
||||||
_cancellation_token_drop_guard: DropGuard,
|
_cancellation_token_drop_guard: DropGuard,
|
||||||
}
|
}
|
||||||
|
|
@ -380,6 +382,9 @@ pub struct SessionOptions {
|
||||||
|
|
||||||
// how many concurrent torrent initializations can happen
|
// how many concurrent torrent initializations can happen
|
||||||
pub concurrent_init_limit: Option<usize>,
|
pub concurrent_init_limit: Option<usize>,
|
||||||
|
|
||||||
|
// the root span to use. If not set will be None.
|
||||||
|
pub root_span: Option<Span>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn create_tcp_listener(
|
async fn create_tcp_listener(
|
||||||
|
|
@ -567,11 +572,12 @@ impl Session {
|
||||||
default_storage_factory: opts.default_storage_factory,
|
default_storage_factory: opts.default_storage_factory,
|
||||||
reqwest_client,
|
reqwest_client,
|
||||||
connector: stream_connector,
|
connector: stream_connector,
|
||||||
|
root_span: opts.root_span,
|
||||||
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3)))
|
concurrent_initialize_semaphore: Arc::new(tokio::sync::Semaphore::new(opts.concurrent_init_limit.unwrap_or(3)))
|
||||||
});
|
});
|
||||||
|
|
||||||
if let Some(mut disk_write_rx) = disk_write_rx {
|
if let Some(mut disk_write_rx) = disk_write_rx {
|
||||||
session.spawn(error_span!("disk_writer"), async move {
|
session.spawn(error_span!(parent: session.rs(), "disk_writer"), async move {
|
||||||
while let Some(work) = disk_write_rx.recv().await {
|
while let Some(work) = disk_write_rx.recv().await {
|
||||||
trace!(disk_write_rx_queue_len = disk_write_rx.len());
|
trace!(disk_write_rx_queue_len = disk_write_rx.len());
|
||||||
spawner.spawn_block_in_place(work);
|
spawner.spawn_block_in_place(work);
|
||||||
|
|
@ -582,7 +588,7 @@ impl Session {
|
||||||
|
|
||||||
if let Some(tcp_listener) = tcp_listener {
|
if let Some(tcp_listener) = tcp_listener {
|
||||||
session.spawn(
|
session.spawn(
|
||||||
error_span!("tcp_listen", port = tcp_listen_port),
|
error_span!(parent: session.rs(), "tcp_listen", port = tcp_listen_port),
|
||||||
session.clone().task_tcp_listener(tcp_listener),
|
session.clone().task_tcp_listener(tcp_listener),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -590,7 +596,7 @@ impl Session {
|
||||||
if let Some(listen_port) = tcp_listen_port {
|
if let Some(listen_port) = tcp_listen_port {
|
||||||
if opts.enable_upnp_port_forwarding {
|
if opts.enable_upnp_port_forwarding {
|
||||||
session.spawn(
|
session.spawn(
|
||||||
error_span!("upnp_forward", port = listen_port),
|
error_span!(parent: session.rs(), "upnp_forward", port = listen_port),
|
||||||
session.clone().task_upnp_port_forwarder(listen_port),
|
session.clone().task_upnp_port_forwarder(listen_port),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
@ -613,7 +619,7 @@ impl Session {
|
||||||
st = ps.next(), if !added_all => {
|
st = ps.next(), if !added_all => {
|
||||||
if let Some(st) = st {
|
if let Some(st) = st {
|
||||||
let (id, st) = st?;
|
let (id, st) = st?;
|
||||||
let span = error_span!("add_torrent", info_hash=?st.info_hash());
|
let span = error_span!(parent: session.rs(), "add_torrent", info_hash=?st.info_hash());
|
||||||
let (add_torrent, mut opts) = st.into_add_torrent()?;
|
let (add_torrent, mut opts) = st.into_add_torrent()?;
|
||||||
opts.preferred_id = Some(id);
|
opts.preferred_id = Some(id);
|
||||||
let fut = session.add_torrent(add_torrent, Some(opts)).instrument(span);
|
let fut = session.add_torrent(add_torrent, Some(opts)).instrument(span);
|
||||||
|
|
@ -698,7 +704,7 @@ impl Session {
|
||||||
debug!("error checking incoming connection: {e:#}");
|
debug!("error checking incoming connection: {e:#}");
|
||||||
e
|
e
|
||||||
})
|
})
|
||||||
.instrument(error_span!("incoming", addr=%addr))
|
.instrument(error_span!(parent: self.rs(), "incoming", addr=%addr))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
@ -750,6 +756,10 @@ impl Session {
|
||||||
spawn_with_cancel(span, self.cancellation_token.clone(), fut);
|
spawn_with_cancel(span, self.cancellation_token.clone(), fut);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn rs(&self) -> Option<tracing::Id> {
|
||||||
|
self.root_span.as_ref().and_then(|s| s.id())
|
||||||
|
}
|
||||||
|
|
||||||
/// Stop the session and all managed tasks.
|
/// Stop the session and all managed tasks.
|
||||||
pub async fn stop(&self) {
|
pub async fn stop(&self) {
|
||||||
let torrents = self
|
let torrents = self
|
||||||
|
|
@ -926,7 +936,7 @@ impl Session {
|
||||||
|
|
||||||
self.main_torrent_info(add_res, opts).await
|
self.main_torrent_info(add_res, opts).await
|
||||||
}
|
}
|
||||||
.instrument(error_span!("add_torrent"))
|
.instrument(error_span!(parent: self.rs(), "add_torrent"))
|
||||||
.boxed()
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1067,7 +1077,7 @@ impl Session {
|
||||||
}) {
|
}) {
|
||||||
return Ok(AddTorrentResponse::AlreadyManaged(id, handle));
|
return Ok(AddTorrentResponse::AlreadyManaged(id, handle));
|
||||||
}
|
}
|
||||||
let managed_torrent = builder.build(error_span!(parent: None, "torrent", id))?;
|
let managed_torrent = builder.build(error_span!(parent: self.rs(), "torrent", id))?;
|
||||||
g.add_torrent(managed_torrent.clone(), id);
|
g.add_torrent(managed_torrent.clone(), id);
|
||||||
managed_torrent
|
managed_torrent
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -60,6 +60,9 @@ async fn test_e2e_download() {
|
||||||
max_random_sleep_ms: rand::thread_rng().gen_range(0u8..16),
|
max_random_sleep_ms: rand::thread_rng().gen_range(0u8..16),
|
||||||
}
|
}
|
||||||
.as_peer_id();
|
.as_peer_id();
|
||||||
|
let listen_range_start = 15100u16 + i as u16;
|
||||||
|
let listen_range_end = listen_range_start + 1;
|
||||||
|
let listen_range = listen_range_start..listen_range_end;
|
||||||
let session = crate::Session::new_with_opts(
|
let session = crate::Session::new_with_opts(
|
||||||
std::env::temp_dir().join("does_not_exist"),
|
std::env::temp_dir().join("does_not_exist"),
|
||||||
SessionOptions {
|
SessionOptions {
|
||||||
|
|
@ -69,10 +72,11 @@ async fn test_e2e_download() {
|
||||||
persistence: None,
|
persistence: None,
|
||||||
peer_id: Some(peer_id),
|
peer_id: Some(peer_id),
|
||||||
peer_opts: None,
|
peer_opts: None,
|
||||||
listen_port_range: Some(15100..17000),
|
listen_port_range: Some(listen_range),
|
||||||
enable_upnp_port_forwarding: false,
|
enable_upnp_port_forwarding: false,
|
||||||
default_storage_factory: None,
|
default_storage_factory: None,
|
||||||
defer_writes_up_to: None,
|
defer_writes_up_to: None,
|
||||||
|
root_span: Some(error_span!(parent: None, "server", id = i)),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
@ -121,7 +125,7 @@ async fn test_e2e_download() {
|
||||||
session.tcp_listen_port().unwrap(),
|
session.tcp_listen_port().unwrap(),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
.instrument(error_span!("server", server = i)),
|
.instrument(error_span!("server", id = i)),
|
||||||
);
|
);
|
||||||
futs.push(timeout(Duration::from_secs(30), rx));
|
futs.push(timeout(Duration::from_secs(30), rx));
|
||||||
}
|
}
|
||||||
|
|
@ -152,6 +156,7 @@ async fn test_e2e_download() {
|
||||||
persistence: None,
|
persistence: None,
|
||||||
listen_port_range: None,
|
listen_port_range: None,
|
||||||
enable_upnp_port_forwarding: false,
|
enable_upnp_port_forwarding: false,
|
||||||
|
root_span: Some(error_span!("client")),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,7 @@ where
|
||||||
ByteBuf: Eq + std::hash::Hash + std::borrow::Borrow<[u8]>,
|
ByteBuf: Eq + std::hash::Hash + std::borrow::Borrow<[u8]>,
|
||||||
{
|
{
|
||||||
fn get_msgid(&self, msg_type: &'a [u8]) -> Option<u8> {
|
fn get_msgid(&self, msg_type: &'a [u8]) -> Option<u8> {
|
||||||
self.m.get(msg_type).map(|v| *v)
|
self.m.get(msg_type).copied()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn ut_metadata(&self) -> Option<u8> {
|
pub fn ut_metadata(&self) -> Option<u8> {
|
||||||
|
|
|
||||||
|
|
@ -340,6 +340,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
|
||||||
}),
|
}),
|
||||||
socks_proxy_url: socks_url,
|
socks_proxy_url: socks_url,
|
||||||
concurrent_init_limit: Some(opts.concurrent_init_limit),
|
concurrent_init_limit: Some(opts.concurrent_init_limit),
|
||||||
|
root_span: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let stats_printer = |session: Arc<Session>| async move {
|
let stats_printer = |session: Arc<Session>| async move {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue