Compile times are even worse now
This commit is contained in:
parent
18f22cf323
commit
15c078619c
11 changed files with 72 additions and 22 deletions
281
crates/tracker_comms/src/tracker_comms.rs
Normal file
281
crates/tracker_comms/src/tracker_comms.rs
Normal file
|
|
@ -0,0 +1,281 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::bail;
|
||||
use anyhow::Context;
|
||||
use futures::future::Either;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::FutureExt;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use tracing::debug;
|
||||
use tracing::error_span;
|
||||
use tracing::trace;
|
||||
use tracing::Instrument;
|
||||
use url::Url;
|
||||
|
||||
use crate::tracker_comms_http;
|
||||
use crate::tracker_comms_udp;
|
||||
use librqbit_core::hash_id::Id20;
|
||||
|
||||
pub struct TrackerComms {
|
||||
info_hash: Id20,
|
||||
peer_id: Id20,
|
||||
stats: Box<dyn TorrentStatsProvider>,
|
||||
force_tracker_interval: Option<Duration>,
|
||||
tx: Sender,
|
||||
tcp_listen_port: Option<u16>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct TrackerCommsStats {
|
||||
pub uploaded_bytes: u64,
|
||||
pub downloaded_bytes: u64,
|
||||
pub total_bytes: u64,
|
||||
}
|
||||
|
||||
impl TrackerCommsStats {
|
||||
pub fn get_left_to_download_bytes(&self) -> u64 {
|
||||
let total = self.total_bytes;
|
||||
let down = self.downloaded_bytes;
|
||||
if total >= down {
|
||||
return total - down;
|
||||
}
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TorrentStatsProvider: Send + Sync {
|
||||
fn get(&self) -> TrackerCommsStats;
|
||||
}
|
||||
|
||||
impl TorrentStatsProvider for () {
|
||||
fn get(&self) -> TrackerCommsStats {
|
||||
Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
type Sender = tokio::sync::mpsc::Sender<SocketAddr>;
|
||||
|
||||
impl TrackerComms {
|
||||
pub fn start(
|
||||
info_hash: Id20,
|
||||
peer_id: Id20,
|
||||
trackers: Vec<String>,
|
||||
stats: Box<dyn TorrentStatsProvider>,
|
||||
force_interval: Option<Duration>,
|
||||
tcp_listen_port: Option<u16>,
|
||||
) -> Option<impl Stream<Item = SocketAddr> + Unpin + Send + 'static> {
|
||||
let trackers = trackers
|
||||
.into_iter()
|
||||
.filter_map(|t| match Url::parse(&t) {
|
||||
Ok(parsed) => Some(parsed),
|
||||
Err(e) => {
|
||||
debug!("error parsing tracker URL: {}", e);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
if trackers.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<SocketAddr>(16);
|
||||
|
||||
let s = async_stream::stream! {
|
||||
use futures::StreamExt;
|
||||
let mut rx_done = false;
|
||||
let comms = Arc::new(Self {
|
||||
info_hash,
|
||||
peer_id,
|
||||
stats,
|
||||
force_tracker_interval: force_interval,
|
||||
tx,
|
||||
tcp_listen_port,
|
||||
});
|
||||
let mut futures = FuturesUnordered::new();
|
||||
for tracker in trackers {
|
||||
if let Ok(fut) = comms.add_tracker(tracker) {
|
||||
futures.push(fut);
|
||||
}
|
||||
}
|
||||
if futures.is_empty() {
|
||||
return;
|
||||
}
|
||||
while !(futures.is_empty() && rx_done) {
|
||||
tokio::select! {
|
||||
addr = rx.recv(), if !rx_done => {
|
||||
match addr {
|
||||
Some(addr) => yield addr,
|
||||
None => rx_done = true
|
||||
}
|
||||
}
|
||||
e = futures.next(), if !futures.is_empty() => {
|
||||
if let Some(Err(e)) = e {
|
||||
debug!("error: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Some(s.boxed())
|
||||
}
|
||||
|
||||
fn add_tracker(
|
||||
&self,
|
||||
url: Url,
|
||||
) -> anyhow::Result<
|
||||
Either<
|
||||
impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
|
||||
impl std::future::Future<Output = anyhow::Result<()>> + '_ + Send,
|
||||
>,
|
||||
> {
|
||||
let info_hash = self.info_hash;
|
||||
if url.scheme() == "http" || url.scheme() == "https" {
|
||||
let span = error_span!(
|
||||
parent: None,
|
||||
"http_tracker",
|
||||
tracker = %url,
|
||||
info_hash = ?info_hash
|
||||
);
|
||||
Ok(self
|
||||
.task_single_tracker_monitor_http(url)
|
||||
.instrument(span)
|
||||
.left_future())
|
||||
} else if url.scheme() == "udp" {
|
||||
let span =
|
||||
error_span!(parent: None, "udp_tracker", tracker = %url, info_hash = ?info_hash);
|
||||
Ok(self
|
||||
.task_single_tracker_monitor_udp(url)
|
||||
.instrument(span)
|
||||
.right_future())
|
||||
} else {
|
||||
bail!("unsupported tracker url {}", url)
|
||||
}
|
||||
}
|
||||
|
||||
async fn task_single_tracker_monitor_http(&self, mut tracker_url: Url) -> anyhow::Result<()> {
|
||||
let mut event = Some(tracker_comms_http::TrackerRequestEvent::Started);
|
||||
loop {
|
||||
let stats = self.stats.get();
|
||||
let request = tracker_comms_http::TrackerRequest {
|
||||
info_hash: self.info_hash,
|
||||
peer_id: self.peer_id,
|
||||
port: 6778,
|
||||
uploaded: stats.uploaded_bytes,
|
||||
downloaded: stats.downloaded_bytes,
|
||||
left: stats.get_left_to_download_bytes(),
|
||||
compact: true,
|
||||
no_peer_id: false,
|
||||
event,
|
||||
ip: None,
|
||||
numwant: None,
|
||||
key: None,
|
||||
trackerid: None,
|
||||
};
|
||||
|
||||
let request_query = request.as_querystring();
|
||||
tracker_url.set_query(Some(&request_query));
|
||||
|
||||
match self.tracker_one_request_http(tracker_url.clone()).await {
|
||||
Ok(interval) => {
|
||||
event = None;
|
||||
let interval = self
|
||||
.force_tracker_interval
|
||||
.unwrap_or_else(|| Duration::from_secs(interval));
|
||||
debug!(
|
||||
"sleeping for {:?} after calling tracker {}",
|
||||
interval,
|
||||
tracker_url.host().unwrap()
|
||||
);
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("error calling the tracker {}: {:#}", tracker_url, e);
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async fn tracker_one_request_http(&self, tracker_url: Url) -> anyhow::Result<u64> {
|
||||
let response: reqwest::Response = reqwest::get(tracker_url).await?;
|
||||
if !response.status().is_success() {
|
||||
anyhow::bail!("tracker responded with {:?}", response.status());
|
||||
}
|
||||
let bytes = response.bytes().await?;
|
||||
if let Ok(error) = bencode::from_bytes::<tracker_comms_http::TrackerError>(&bytes) {
|
||||
anyhow::bail!(
|
||||
"tracker returned failure. Failure reason: {}",
|
||||
error.failure_reason
|
||||
)
|
||||
};
|
||||
let response = bencode::from_bytes::<tracker_comms_http::TrackerResponse>(&bytes)?;
|
||||
|
||||
for peer in response.peers.iter_sockaddrs() {
|
||||
self.tx.send(peer).await?;
|
||||
}
|
||||
Ok(response.interval)
|
||||
}
|
||||
|
||||
async fn task_single_tracker_monitor_udp(&self, url: Url) -> anyhow::Result<()> {
|
||||
use tracker_comms_udp::*;
|
||||
|
||||
if url.scheme() != "udp" {
|
||||
bail!("expected UDP scheme in {}", url);
|
||||
}
|
||||
let hp: (&str, u16) = (
|
||||
url.host_str().context("missing host")?,
|
||||
url.port().context("missing port")?,
|
||||
);
|
||||
let mut requester = UdpTrackerRequester::new(hp)
|
||||
.await
|
||||
.context("error creating UDP tracker requester")?;
|
||||
|
||||
let mut sleep_interval: Option<Duration> = None;
|
||||
loop {
|
||||
if let Some(i) = sleep_interval {
|
||||
trace!(interval=?sleep_interval, "sleeping");
|
||||
tokio::time::sleep(i).await;
|
||||
}
|
||||
|
||||
let stats = self.stats.get();
|
||||
let request = AnnounceFields {
|
||||
info_hash: self.info_hash,
|
||||
peer_id: self.peer_id,
|
||||
downloaded: stats.downloaded_bytes,
|
||||
left: stats.get_left_to_download_bytes(),
|
||||
uploaded: stats.uploaded_bytes,
|
||||
event: EVENT_NONE,
|
||||
key: 0, // whatever that is?
|
||||
port: self.tcp_listen_port.unwrap_or(0),
|
||||
};
|
||||
|
||||
match requester.announce(request).await {
|
||||
Ok(response) => {
|
||||
trace!(len = response.addrs.len(), "received announce response");
|
||||
for addr in response.addrs {
|
||||
self.tx
|
||||
.send(SocketAddr::V4(addr))
|
||||
.await
|
||||
.context("rx closed")?;
|
||||
}
|
||||
let new_interval = response.interval.max(5);
|
||||
let new_interval = Duration::from_secs(new_interval as u64);
|
||||
sleep_interval = Some(self.force_tracker_interval.unwrap_or(new_interval));
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(url = ?url, "error reading announce response: {e:#}");
|
||||
if sleep_interval.is_none() {
|
||||
sleep_interval = Some(
|
||||
self.force_tracker_interval
|
||||
.unwrap_or(Duration::from_secs(60)),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue