From 95769cca6a42cabe2e16f25763ffe0ce30f81ff5 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 17 Feb 2024 11:14:40 +0000 Subject: [PATCH] Start calling trackers before going live --- crates/librqbit/src/session.rs | 118 ++++++++++++++--------- crates/librqbit/src/torrent_state/mod.rs | 5 +- crates/librqbit/src/tracker_comms.rs | 25 ++++- crates/librqbit/src/type_aliases.rs | 3 + 4 files changed, 99 insertions(+), 52 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 998befb..1bfd586 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -16,7 +16,7 @@ use clone_to_owned::CloneToOwned; use dht::{ Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream, }; -use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}; +use futures::{stream::FuturesUnordered, Stream, TryFutureExt}; use librqbit_core::{ directories::get_configuration_directory, magnet::Magnet, @@ -28,10 +28,10 @@ use librqbit_core::{ }; use parking_lot::RwLock; use peer_binary_protocol::Handshake; -use reqwest::Url; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::serde_as; use tokio::net::{TcpListener, TcpStream}; +use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; @@ -43,6 +43,8 @@ use crate::{ torrent_state::{ ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, }, + tracker_comms::{TorrentStatsForTrackerDummy, TrackerComms}, + type_aliases::PeerStream, }; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; @@ -366,6 +368,18 @@ async fn create_tcp_listener( bail!("no free TCP ports in range {port_range:?}"); } +fn merge_peer_rx( + dht_rx: Option, + peer_rx: Option + Unpin + Send + Sync + 'static>, +) -> Option { + match (dht_rx, peer_rx) { + (Some(dht_rx), None) => Some(Box::new(dht_rx)), + (None, Some(peer_rx)) => Some(Box::new(peer_rx)), + (None, None) => None, + (Some(dht_rx), Some(peer_rx)) => Some(Box::new(dht_rx.merge(peer_rx))), + } +} + pub(crate) struct CheckedIncomingConnection { pub addr: SocketAddr, pub stream: tokio::net::TcpStream, @@ -548,7 +562,10 @@ impl Session { )); } - bail!("didn't find a matching torrent for {:?}", Id20::new(h.info_hash)) + bail!( + "didn't find a matching torrent for {:?}", + Id20::new(h.info_hash) + ) } async fn task_tcp_listener(self: Arc, l: TcpListener) -> anyhow::Result<()> { @@ -751,34 +768,41 @@ impl Session { self.tcp_listen_port }; - let (info_hash, info, dht_rx, trackers, initial_peers) = match add { + let (info_hash, info, peer_rx, initial_peers) = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { - let magnet = Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; - let info_hash = magnet.as_id20().context("magnet link didn't contain a BTv1 infohash")?; + let magnet = + Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; + let info_hash = magnet + .as_id20() + .context("magnet link didn't contain a BTv1 infohash")?; - let dht_rx = self + let dht_peer_rx = self .dht .as_ref() - .context("magnet links without DHT are not supported")? - .get_peers(info_hash, announce_port)?; + .map(|d| d.get_peers(info_hash, announce_port)) + .transpose()?; - let trackers = magnet.trackers - .into_iter() - .filter_map(|url| match reqwest::Url::parse(&url) { - Ok(url) => Some(url), - Err(e) => { - warn!("error parsing tracker {} as url: {}", url, e); - None - } - }) - .collect(); + let tracker_peer_rx = TrackerComms::start( + info_hash, + self.peer_id, + magnet.trackers, + Box::new(TorrentStatsForTrackerDummy {}), + opts.force_tracker_interval, + self.cancellation_token().clone(), + self.tcp_listen_port, + ); + + let peer_rx = match merge_peer_rx(dht_peer_rx, tracker_peer_rx) { + Some(peer_rx) => peer_rx, + None => bail!("can't find peers: DHT disabled and no trackers in magnet"), + }; debug!(?info_hash, "querying DHT"); - let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver( + let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( self.peer_id, info_hash, opts.initial_peers.clone().unwrap_or_default(), - dht_rx, + peer_rx, Some(self.merge_peer_opts(opts.peer_opts)), ) .await @@ -795,9 +819,8 @@ impl Session { if opts.paused || opts.list_only { None } else { - Some(dht_rx) + Some(peer_rx) }, - trackers, initial_peers, ) } @@ -829,28 +852,31 @@ impl Session { }; let trackers = torrent .iter_announce() - .filter_map(|tracker| { - let url = match std::str::from_utf8(tracker.as_ref()) { - Ok(url) => url, - Err(_) => { - warn!("cannot parse tracker url as utf-8, ignoring"); - return None; - } - }; - match Url::parse(url) { - Ok(url) => Some(url), - Err(e) => { - warn!("cannot parse tracker URL {}: {}", url, e); - None - } + .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => Some(url.to_owned()), + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + return None; } }) .collect::>(); + + let tracker_peer_rx = TrackerComms::start( + torrent.info_hash, + self.peer_id, + trackers, + Box::new(TorrentStatsForTrackerDummy {}), + opts.force_tracker_interval, + self.cancellation_token().clone(), + self.tcp_listen_port, + ); + + let peer_rx = merge_peer_rx(dht_rx, tracker_peer_rx); + ( torrent.info_hash, torrent.info, - dht_rx, - trackers, + peer_rx, opts.initial_peers .clone() .unwrap_or_default() @@ -863,9 +889,8 @@ impl Session { self.main_torrent_info( info_hash, info, - dht_rx, + peer_rx, initial_peers.into_iter().collect(), - trackers, opts, ) .await @@ -876,9 +901,8 @@ impl Session { &self, info_hash: Id20, info: TorrentMetaV1Info, - dht_peer_rx: Option, + peer_rx: Option, initial_peers: Vec, - trackers: Vec, opts: AddTorrentOptions, ) -> anyhow::Result { debug!("Torrent info: {:#?}", &info); @@ -966,10 +990,6 @@ impl Session { .cancellation_token(self.cancellation_token.child_token()) .peer_id(self.peer_id); - if opts.disable_trackers { - builder.trackers(trackers); - } - if let Some(only_files) = only_files { builder.only_files(only_files); } @@ -1004,7 +1024,7 @@ impl Session { let span = managed_torrent.info.span.clone(); let _ = span.enter(); managed_torrent - .start(initial_peers, dht_peer_rx, opts.paused) + .start(initial_peers, peer_rx, opts.paused) .context("error starting torrent")?; } @@ -1059,6 +1079,8 @@ impl Session { .as_ref() .map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port)) .transpose()?; + todo!(); + let peer_rx = None; handle.start(Default::default(), peer_rx, false)?; Ok(()) } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 7d29b79..da80053 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -37,6 +37,7 @@ use url::Url; use crate::chunk_tracker::ChunkTracker; use crate::spawn_utils::BlockingSpawner; use crate::torrent_state::stats::LiveStats; +use crate::type_aliases::PeerStream; use initializing::TorrentStateInitializing; @@ -173,7 +174,7 @@ impl ManagedTorrent { pub(crate) fn start( self: &Arc, initial_peers: Vec, - peer_rx: Option, + peer_rx: Option, start_paused: bool, ) -> anyhow::Result<()> { let mut g = self.locked.write(); @@ -204,7 +205,7 @@ impl ManagedTorrent { fn spawn_peer_adder( live: &Arc, initial_peers: Vec, - peer_rx: Option, + peer_rx: Option, ) { live.spawn( error_span!(parent: live.meta().span.clone(), "external_peer_adder"), diff --git a/crates/librqbit/src/tracker_comms.rs b/crates/librqbit/src/tracker_comms.rs index 8482552..61fe467 100644 --- a/crates/librqbit/src/tracker_comms.rs +++ b/crates/librqbit/src/tracker_comms.rs @@ -41,6 +41,21 @@ pub trait TorrentStatsForTracker: Send + Sync { } } +pub struct TorrentStatsForTrackerDummy {} +impl TorrentStatsForTracker for TorrentStatsForTrackerDummy { + fn get_uploaded_bytes(&self) -> u64 { + 0 + } + + fn get_downloaded_bytes(&self) -> u64 { + 0 + } + + fn get_total_bytes(&self) -> u64 { + 0 + } +} + type Sender = tokio::sync::mpsc::Sender; impl TrackerComms { @@ -52,7 +67,7 @@ impl TrackerComms { force_interval: Option, cancellation_token: CancellationToken, tcp_listen_port: Option, - ) -> anyhow::Result + Send + Sync + Unpin + 'static> { + ) -> Option + Send + Sync + Unpin + 'static> { let (tx, rx) = tokio::sync::mpsc::channel::(16); let comms = Arc::new(Self { info_hash, @@ -63,12 +78,18 @@ impl TrackerComms { tx, tcp_listen_port, }); + let mut added = false; for tracker in trackers { if let Err(e) = comms.clone().add_tracker(&tracker) { info!(tracker = tracker, "error adding tracker: {:#}", e) + } else { + added = true; } } - Ok(tokio_stream::wrappers::ReceiverStream::new(rx)) + if !added { + return None; + } + Some(tokio_stream::wrappers::ReceiverStream::new(rx)) } fn add_tracker(self: Arc, tracker: &str) -> anyhow::Result<()> { diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs index 29fa9df..2b6efa9 100644 --- a/crates/librqbit/src/type_aliases.rs +++ b/crates/librqbit/src/type_aliases.rs @@ -1,5 +1,8 @@ use std::net::SocketAddr; +use futures::Stream; + pub type BF = bitvec::vec::BitVec; pub type PeerHandle = SocketAddr; +pub type PeerStream = Box + Unpin + Send + Sync + 'static>;