This commit is contained in:
Igor Katson 2024-02-27 08:25:10 +00:00
parent 3cecb1a4c3
commit 6fafdc16da
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -58,6 +58,11 @@ impl TorrentStatsProvider for () {
type Sender = tokio::sync::mpsc::Sender<SocketAddr>; type Sender = tokio::sync::mpsc::Sender<SocketAddr>;
enum SupportedTracker {
Udp(Url),
Http(Url),
}
impl TrackerComms { impl TrackerComms {
pub fn start( pub fn start(
info_hash: Id20, info_hash: Id20,
@ -70,9 +75,16 @@ impl TrackerComms {
let trackers = trackers let trackers = trackers
.into_iter() .into_iter()
.filter_map(|t| match Url::parse(&t) { .filter_map(|t| match Url::parse(&t) {
Ok(parsed) => Some(parsed), Ok(parsed) => match parsed.scheme() {
"http" | "https" => Some(SupportedTracker::Http(parsed)),
"udp" => Some(SupportedTracker::Udp(parsed)),
_ => {
debug!("unsuppoted tracker URL: {}", t);
None
}
},
Err(e) => { Err(e) => {
debug!("error parsing tracker URL: {}", e); debug!("error parsing tracker URL {}: {}", t, e);
None None
} }
}) })
@ -85,7 +97,6 @@ impl TrackerComms {
let s = async_stream::stream! { let s = async_stream::stream! {
use futures::StreamExt; use futures::StreamExt;
let mut rx_done = false;
let comms = Arc::new(Self { let comms = Arc::new(Self {
info_hash, info_hash,
peer_id, peer_id,
@ -96,19 +107,13 @@ impl TrackerComms {
}); });
let mut futures = FuturesUnordered::new(); let mut futures = FuturesUnordered::new();
for tracker in trackers { for tracker in trackers {
if let Ok(fut) = comms.add_tracker(tracker) { futures.push(comms.add_tracker(tracker))
futures.push(fut);
}
} }
if futures.is_empty() { while !(futures.is_empty()) {
return;
}
while !(futures.is_empty() && rx_done) {
tokio::select! { tokio::select! {
addr = rx.recv(), if !rx_done => { addr = rx.recv() => {
match addr { if let Some(addr) = addr {
Some(addr) => yield addr, yield addr;
None => rx_done = true
} }
} }
e = futures.next(), if !futures.is_empty() => { e = futures.next(), if !futures.is_empty() => {
@ -125,34 +130,30 @@ impl TrackerComms {
fn add_tracker( fn add_tracker(
&self, &self,
url: Url, url: SupportedTracker,
) -> anyhow::Result< ) -> Either<
Either< impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send, impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
>,
> { > {
let info_hash = self.info_hash; let info_hash = self.info_hash;
if url.scheme() == "http" || url.scheme() == "https" { match url {
let span = error_span!( SupportedTracker::Udp(url) => {
parent: None, let span = error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash);
"http_tracker", self.task_single_tracker_monitor_udp(url)
tracker = %url, .instrument(span)
info_hash = ?info_hash .right_future()
); }
Ok(self SupportedTracker::Http(url) => {
.task_single_tracker_monitor_http(url) let span = error_span!(
.instrument(span) parent: None,
.left_future()) "http_tracker",
} else if url.scheme() == "udp" { tracker = %url,
let span = info_hash = ?info_hash
error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash); );
Ok(self self.task_single_tracker_monitor_http(url)
.task_single_tracker_monitor_udp(url) .instrument(span)
.instrument(span) .left_future()
.right_future()) }
} else {
bail!("unsupported tracker url {}", url)
} }
} }