Merge pull request #196 from ikatson/fix-e2e-test

Fix a bug in merge_two_streams
This commit is contained in:
Igor Katson 2024-08-18 17:44:28 +01:00 committed by GitHub
commit a9df6332cf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 83 additions and 28 deletions

View file

@ -7,8 +7,8 @@ use futures::Stream;
struct MergedStreams<S1, S2> {
poll_count: AtomicU64,
s1: S1,
s2: S2,
s1: futures::stream::Fuse<S1>,
s2: futures::stream::Fuse<S2>,
}
pub fn merge_streams<
@ -19,10 +19,11 @@ pub fn merge_streams<
s1: S1,
s2: S2,
) -> impl Stream<Item = I> + Unpin + 'static {
use futures::stream::StreamExt;
MergedStreams {
poll_count: AtomicU64::new(0),
s1,
s2,
s1: s1.fuse(),
s2: s2.fuse(),
}
}
@ -34,8 +35,16 @@ fn poll_two<I, S1: Stream<Item = I> + Unpin, S2: Stream<Item = I> + Unpin>(
use futures::StreamExt;
let s1p = s1.poll_next_unpin(cx);
match s1p {
Poll::Ready(r) => Poll::Ready(r),
Poll::Pending => s2.poll_next_unpin(cx),
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
Poll::Ready(None) => match s2.poll_next_unpin(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
},
Poll::Pending => match s2.poll_next_unpin(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(v)),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
},
}
}

View file

@ -199,10 +199,22 @@ fn merge_two_optional_streams<T>(
s2: Option<impl Stream<Item = T> + Unpin + Send + 'static>,
) -> Option<BoxStream<'static, T>> {
match (s1, s2) {
(Some(s1), None) => Some(Box::pin(s1)),
(None, Some(s2)) => Some(Box::pin(s2)),
(Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))),
(None, None) => None,
(Some(s1), None) => {
debug!("merge_two_optional_streams: using first");
Some(Box::pin(s1))
}
(None, Some(s2)) => {
debug!("merge_two_optional_streams: using second");
Some(Box::pin(s2))
}
(Some(s1), Some(s2)) => {
debug!("merge_two_optional_streams: using both");
Some(Box::pin(merge_streams(s1, s2)))
}
(None, None) => {
debug!("merge_two_optional_streams: using none");
None
}
}
}
@ -864,7 +876,14 @@ impl Session {
info,
trackers,
peer_rx: Some(rx),
initial_peers: seen.into_iter().collect(),
initial_peers: {
let seen = seen.into_iter().collect_vec();
info!(count=seen.len(), "seen");
for peer in &seen {
debug!(?peer, "seen")
}
seen
},
}
}
ReadMetainfoResult::ChannelClosed { .. } => {
@ -1089,6 +1108,10 @@ impl Session {
// Merge "initial_peers" and "peer_rx" into one stream.
let peer_rx = merge_two_optional_streams(
if !initial_peers.is_empty() {
debug!(
count = initial_peers.len(),
"merging initial peers into peer_rx"
);
Some(futures::stream::iter(initial_peers.into_iter()))
} else {
None

View file

@ -3,8 +3,7 @@ use std::{
time::Duration,
};
use anyhow::bail;
use futures::{stream::FuturesUnordered, StreamExt};
use anyhow::{bail, Context};
use librqbit_core::magnet::Magnet;
use rand::Rng;
use tokio::{
@ -45,15 +44,14 @@ async fn test_e2e_download() {
let num_servers = 128;
let torrent_file_bytes = torrent_file.as_bytes().unwrap();
let mut futs = FuturesUnordered::new();
let mut futs = Vec::new();
// 2. Start N servers that are serving that torrent, and return their IP:port combos.
// Disable DHT on each.
for i in 0u8..num_servers {
let torrent_file_bytes = torrent_file_bytes.clone();
let (tx, rx) = tokio::sync::oneshot::channel();
let tempdir = tempdir.path().to_owned();
spawn(
let fut = spawn(
async move {
let peer_id = TestPeerMetadata {
server_id: i,
@ -81,7 +79,7 @@ async fn test_e2e_download() {
},
)
.await
.unwrap();
.context("error starting session")?;
info!("started session");
@ -95,8 +93,8 @@ async fn test_e2e_download() {
}),
)
.await
.unwrap();
let h = handle.into_handle().unwrap();
.context("error adding torrent")?;
let h = handle.into_handle().context("into_handle()")?;
let mut interval = interval(Duration::from_millis(100));
info!("added torrent");
@ -114,25 +112,38 @@ async fn test_e2e_download() {
crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"),
_ => bail!("broken state"),
})
.unwrap();
.context("error checking for torrent liveness")?;
if is_live {
break;
}
}
info!("torrent is live");
tx.send(SocketAddr::new(
Ok::<_, anyhow::Error>(SocketAddr::new(
std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
session.tcp_listen_port().unwrap(),
session
.tcp_listen_port()
.context("expected session.tcp_listen_port() to be set")?,
))
}
.instrument(error_span!("server", id = i)),
);
futs.push(timeout(Duration::from_secs(30), rx));
futs.push(timeout(Duration::from_secs(30), fut));
}
let mut peers = Vec::new();
while let Some(addr) = futs.next().await {
peers.push(addr.unwrap().unwrap());
for (id, peer) in futures::future::join_all(futs)
.await
.into_iter()
.enumerate()
{
let peer = peer
.with_context(|| format!("join error, server={id}"))
.unwrap()
.with_context(|| format!("timeout, server={id}"))
.unwrap()
.with_context(|| format!("server couldn't start, server={id}"))
.unwrap();
peers.push(peer);
}
info!("started all servers, starting client");

View file

@ -257,15 +257,22 @@ impl ManagedTorrent {
loop {
match timeout(Duration::from_secs(5), peer_rx.next()).await {
Ok(Some(peer)) => {
debug!(?peer, "received peer from peer_rx");
let live = match live.upgrade() {
Some(live) => live,
None => return Ok(()),
};
live.add_peer_if_not_seen(peer).context("torrent closed")?;
}
Ok(None) => return Ok(()),
Ok(None) => {
debug!("peer_rx closed, closing peer adder");
return Ok(());
}
// If timeout, check if the torrent is live.
Err(_) if live.strong_count() == 0 => return Ok(()),
Err(_) if live.strong_count() == 0 => {
debug!("timed out waiting for peers, torrent isn't live, closing peer adder");
return Ok(());
}
Err(_) => continue,
}
}
@ -313,6 +320,8 @@ impl ManagedTorrent {
let live =
TorrentStateLive::new(paused, tx, live_cancellation_token)?;
g.state = ManagedTorrentState::Live(live.clone());
drop(g);
t.state_change_notify.notify_waiters();
spawn_fatal_errors_receiver(&t, rx, token);
@ -336,6 +345,8 @@ impl ManagedTorrent {
let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone())?;
g.state = ManagedTorrentState::Live(live.clone());
drop(g);
spawn_fatal_errors_receiver(self, rx, live_cancellation_token);
spawn_peer_adder(&live, peer_rx);
Ok(())
@ -347,9 +358,10 @@ impl ManagedTorrent {
self.storage_factory.create_and_init(self.info())?,
));
g.state = ManagedTorrentState::Initializing(initializing.clone());
self.state_change_notify.notify_waiters();
drop(g);
self.state_change_notify.notify_waiters();
// Recurse.
self.start(
peer_rx,