More anyhow.context()

This commit is contained in:
Igor Katson 2024-08-18 17:20:59 +01:00
parent 76e5044d33
commit 675aecf44b
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 42 additions and 17 deletions

View file

@ -864,7 +864,14 @@ impl Session {
info, info,
trackers, trackers,
peer_rx: Some(rx), peer_rx: Some(rx),
initial_peers: seen.into_iter().collect(), initial_peers: {
let seen = seen.into_iter().collect_vec();
info!(count=seen.len(), "seen");
for peer in &seen {
debug!(?peer, "seen")
}
seen
},
} }
} }
ReadMetainfoResult::ChannelClosed { .. } => { ReadMetainfoResult::ChannelClosed { .. } => {

View file

@ -3,8 +3,7 @@ use std::{
time::Duration, time::Duration,
}; };
use anyhow::bail; use anyhow::{bail, Context};
use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::magnet::Magnet; use librqbit_core::magnet::Magnet;
use rand::Rng; use rand::Rng;
use tokio::{ use tokio::{
@ -45,15 +44,14 @@ async fn test_e2e_download() {
let num_servers = 128; let num_servers = 128;
let torrent_file_bytes = torrent_file.as_bytes().unwrap(); let torrent_file_bytes = torrent_file.as_bytes().unwrap();
let mut futs = FuturesUnordered::new(); let mut futs = Vec::new();
// 2. Start N servers that are serving that torrent, and return their IP:port combos. // 2. Start N servers that are serving that torrent, and return their IP:port combos.
// Disable DHT on each. // Disable DHT on each.
for i in 0u8..num_servers { for i in 0u8..num_servers {
let torrent_file_bytes = torrent_file_bytes.clone(); let torrent_file_bytes = torrent_file_bytes.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
let tempdir = tempdir.path().to_owned(); let tempdir = tempdir.path().to_owned();
spawn( let fut = spawn(
async move { async move {
let peer_id = TestPeerMetadata { let peer_id = TestPeerMetadata {
server_id: i, server_id: i,
@ -81,7 +79,7 @@ async fn test_e2e_download() {
}, },
) )
.await .await
.unwrap(); .context("error starting session")?;
info!("started session"); info!("started session");
@ -95,8 +93,8 @@ async fn test_e2e_download() {
}), }),
) )
.await .await
.unwrap(); .context("error adding torrent")?;
let h = handle.into_handle().unwrap(); let h = handle.into_handle().context("into_handle()")?;
let mut interval = interval(Duration::from_millis(100)); let mut interval = interval(Duration::from_millis(100));
info!("added torrent"); info!("added torrent");
@ -114,25 +112,38 @@ async fn test_e2e_download() {
crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"), crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"),
_ => bail!("broken state"), _ => bail!("broken state"),
}) })
.unwrap(); .context("error checking for torrent liveness")?;
if is_live { if is_live {
break; break;
} }
} }
info!("torrent is live"); info!("torrent is live");
tx.send(SocketAddr::new( Ok::<_, anyhow::Error>(SocketAddr::new(
std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
session.tcp_listen_port().unwrap(), session
.tcp_listen_port()
.context("expected session.tcp_listen_port() to be set")?,
)) ))
} }
.instrument(error_span!("server", id = i)), .instrument(error_span!("server", id = i)),
); );
futs.push(timeout(Duration::from_secs(30), rx)); futs.push(timeout(Duration::from_secs(30), fut));
} }
let mut peers = Vec::new(); let mut peers = Vec::new();
while let Some(addr) = futs.next().await { for (id, peer) in futures::future::join_all(futs)
peers.push(addr.unwrap().unwrap()); .await
.into_iter()
.enumerate()
{
let peer = peer
.with_context(|| format!("join error, server={id}"))
.unwrap()
.with_context(|| format!("timeout, server={id}"))
.unwrap()
.with_context(|| format!("server couldn't start, server={id}"))
.unwrap();
peers.push(peer);
} }
info!("started all servers, starting client"); info!("started all servers, starting client");

View file

@ -257,15 +257,22 @@ impl ManagedTorrent {
loop { loop {
match timeout(Duration::from_secs(5), peer_rx.next()).await { match timeout(Duration::from_secs(5), peer_rx.next()).await {
Ok(Some(peer)) => { Ok(Some(peer)) => {
debug!(?peer, "received peer from peer_rx");
let live = match live.upgrade() { let live = match live.upgrade() {
Some(live) => live, Some(live) => live,
None => return Ok(()), None => return Ok(()),
}; };
live.add_peer_if_not_seen(peer).context("torrent closed")?; live.add_peer_if_not_seen(peer).context("torrent closed")?;
} }
Ok(None) => return Ok(()), Ok(None) => {
debug!("peer_rx closed, closing peer adder");
return Ok(());
}
// If timeout, check if the torrent is live. // If timeout, check if the torrent is live.
Err(_) if live.strong_count() == 0 => return Ok(()), Err(_) if live.strong_count() == 0 => {
debug!("timed out waiting for peers, torrent isn't live, closing peer adder");
return Ok(());
}
Err(_) => continue, Err(_) => continue,
} }
} }