diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 6a423f2..5345714 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -33,6 +33,7 @@ mod dht_utils; mod file_ops; pub mod http_api; pub mod http_api_client; +mod merge_streams; mod opened_file; mod peer_connection; mod peer_info_reader; diff --git a/crates/librqbit/src/merge_streams.rs b/crates/librqbit/src/merge_streams.rs new file mode 100644 index 0000000..608de9e --- /dev/null +++ b/crates/librqbit/src/merge_streams.rs @@ -0,0 +1,61 @@ +use std::{ + sync::atomic::{AtomicU64, Ordering}, + task::Poll, +}; + +use futures::Stream; + +struct MergedStreams { + poll_count: AtomicU64, + s1: S1, + s2: S2, +} + +pub fn merge_streams< + I, + S1: Stream + 'static + Unpin, + S2: Stream + 'static + Unpin, +>( + s1: S1, + s2: S2, +) -> impl Stream + Unpin + 'static { + MergedStreams { + poll_count: AtomicU64::new(0), + s1, + s2, + } +} + +fn poll_two + Unpin, S2: Stream + Unpin>( + s1: &mut S1, + s2: &mut S2, + cx: &mut std::task::Context<'_>, +) -> Poll> { + use futures::StreamExt; + let s1p = s1.poll_next_unpin(cx); + match s1p { + Poll::Ready(r) => Poll::Ready(r), + Poll::Pending => s2.poll_next_unpin(cx), + } +} + +impl Stream for MergedStreams +where + S1: Stream + Unpin, + S2: Stream + Unpin, +{ + type Item = I; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + let s1_first = this.poll_count.fetch_add(1, Ordering::Relaxed) % 2 == 0; + if s1_first { + poll_two(&mut this.s1, &mut this.s2, cx) + } else { + poll_two(&mut this.s2, &mut this.s1, cx) + } + } +} diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 8c491c7..38c9bfc 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -11,6 +11,7 @@ use std::{ use crate::{ dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, + merge_streams::merge_streams, peer_connection::PeerConnectionOptions, read_buf::ReadBuf, spawn_utils::BlockingSpawner, @@ -254,13 +255,13 @@ fn compute_only_files( } fn merge_two_optional_streams( - s1: Option + Send + 'static>, - s2: Option + Send + 'static>, + s1: Option + Unpin + Send + 'static>, + s2: Option + Unpin + Send + 'static>, ) -> Option> { match (s1, s2) { (Some(s1), None) => Some(Box::pin(s1)), (None, Some(s2)) => Some(Box::pin(s2)), - (Some(s1), Some(s2)) => Some(Box::pin(s1.chain(s2))), + (Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))), (None, None) => None, } } diff --git a/crates/tracker_comms/src/tracker_comms.rs b/crates/tracker_comms/src/tracker_comms.rs index 5ab2554..13f6283 100644 --- a/crates/tracker_comms/src/tracker_comms.rs +++ b/crates/tracker_comms/src/tracker_comms.rs @@ -77,6 +77,15 @@ enum SupportedTracker { Http(Url), } +impl std::fmt::Debug for SupportedTracker { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SupportedTracker::Udp(u) => std::fmt::Display::fmt(u, f), + SupportedTracker::Http(u) => std::fmt::Display::fmt(u, f), + } + } +} + impl TrackerComms { pub fn start( info_hash: Id20, @@ -104,9 +113,12 @@ impl TrackerComms { }) .collect::>(); if trackers.is_empty() { + debug!(?info_hash, "trackers list is empty"); return None; } + tracing::trace!(?trackers); + let (tx, mut rx) = tokio::sync::mpsc::channel::(16); let s = async_stream::stream! { @@ -173,6 +185,7 @@ impl TrackerComms { async fn task_single_tracker_monitor_http(&self, mut tracker_url: Url) -> anyhow::Result<()> { let mut event = Some(tracker_comms_http::TrackerRequestEvent::Started); + trace!(url=?tracker_url, "starting monitor"); loop { let stats = self.stats.get(); let request = tracker_comms_http::TrackerRequest { @@ -216,6 +229,7 @@ impl TrackerComms { } async fn tracker_one_request_http(&self, tracker_url: Url) -> anyhow::Result { + debug!(url = ?tracker_url, "calling tracker over http"); let response: reqwest::Response = reqwest::get(tracker_url).await?; if !response.status().is_success() { anyhow::bail!("tracker responded with {:?}", response.status());