Start calling trackers before going live

This commit is contained in:
Igor Katson 2024-02-17 11:14:40 +00:00
parent 8733538d83
commit 95769cca6a
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 99 additions and 52 deletions

View file

@ -16,7 +16,7 @@ use clone_to_owned::CloneToOwned;
use dht::{ use dht::{
Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream, Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream,
}; };
use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}; use futures::{stream::FuturesUnordered, Stream, TryFutureExt};
use librqbit_core::{ use librqbit_core::{
directories::get_configuration_directory, directories::get_configuration_directory,
magnet::Magnet, magnet::Magnet,
@ -28,10 +28,10 @@ use librqbit_core::{
}; };
use parking_lot::RwLock; use parking_lot::RwLock;
use peer_binary_protocol::Handshake; use peer_binary_protocol::Handshake;
use reqwest::Url;
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as; use serde_with::serde_as;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use tracing::{debug, error, error_span, info, trace, warn, Instrument};
@ -43,6 +43,8 @@ use crate::{
torrent_state::{ torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
}, },
tracker_comms::{TorrentStatsForTrackerDummy, TrackerComms},
type_aliases::PeerStream,
}; };
pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
@ -366,6 +368,18 @@ async fn create_tcp_listener(
bail!("no free TCP ports in range {port_range:?}"); bail!("no free TCP ports in range {port_range:?}");
} }
fn merge_peer_rx(
dht_rx: Option<RequestPeersStream>,
peer_rx: Option<impl Stream<Item = SocketAddr> + Unpin + Send + Sync + 'static>,
) -> Option<PeerStream> {
match (dht_rx, peer_rx) {
(Some(dht_rx), None) => Some(Box::new(dht_rx)),
(None, Some(peer_rx)) => Some(Box::new(peer_rx)),
(None, None) => None,
(Some(dht_rx), Some(peer_rx)) => Some(Box::new(dht_rx.merge(peer_rx))),
}
}
pub(crate) struct CheckedIncomingConnection { pub(crate) struct CheckedIncomingConnection {
pub addr: SocketAddr, pub addr: SocketAddr,
pub stream: tokio::net::TcpStream, pub stream: tokio::net::TcpStream,
@ -548,7 +562,10 @@ impl Session {
)); ));
} }
bail!("didn't find a matching torrent for {:?}", Id20::new(h.info_hash)) bail!(
"didn't find a matching torrent for {:?}",
Id20::new(h.info_hash)
)
} }
async fn task_tcp_listener(self: Arc<Self>, l: TcpListener) -> anyhow::Result<()> { async fn task_tcp_listener(self: Arc<Self>, l: TcpListener) -> anyhow::Result<()> {
@ -751,34 +768,41 @@ impl Session {
self.tcp_listen_port self.tcp_listen_port
}; };
let (info_hash, info, dht_rx, trackers, initial_peers) = match add { let (info_hash, info, peer_rx, initial_peers) = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let magnet = Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; let magnet =
let info_hash = magnet.as_id20().context("magnet link didn't contain a BTv1 infohash")?; Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?;
let info_hash = magnet
.as_id20()
.context("magnet link didn't contain a BTv1 infohash")?;
let dht_rx = self let dht_peer_rx = self
.dht .dht
.as_ref() .as_ref()
.context("magnet links without DHT are not supported")? .map(|d| d.get_peers(info_hash, announce_port))
.get_peers(info_hash, announce_port)?; .transpose()?;
let trackers = magnet.trackers let tracker_peer_rx = TrackerComms::start(
.into_iter() info_hash,
.filter_map(|url| match reqwest::Url::parse(&url) { self.peer_id,
Ok(url) => Some(url), magnet.trackers,
Err(e) => { Box::new(TorrentStatsForTrackerDummy {}),
warn!("error parsing tracker {} as url: {}", url, e); opts.force_tracker_interval,
None self.cancellation_token().clone(),
} self.tcp_listen_port,
}) );
.collect();
let peer_rx = match merge_peer_rx(dht_peer_rx, tracker_peer_rx) {
Some(peer_rx) => peer_rx,
None => bail!("can't find peers: DHT disabled and no trackers in magnet"),
};
debug!(?info_hash, "querying DHT"); debug!(?info_hash, "querying DHT");
let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver( let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver(
self.peer_id, self.peer_id,
info_hash, info_hash,
opts.initial_peers.clone().unwrap_or_default(), opts.initial_peers.clone().unwrap_or_default(),
dht_rx, peer_rx,
Some(self.merge_peer_opts(opts.peer_opts)), Some(self.merge_peer_opts(opts.peer_opts)),
) )
.await .await
@ -795,9 +819,8 @@ impl Session {
if opts.paused || opts.list_only { if opts.paused || opts.list_only {
None None
} else { } else {
Some(dht_rx) Some(peer_rx)
}, },
trackers,
initial_peers, initial_peers,
) )
} }
@ -829,28 +852,31 @@ impl Session {
}; };
let trackers = torrent let trackers = torrent
.iter_announce() .iter_announce()
.filter_map(|tracker| { .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) {
let url = match std::str::from_utf8(tracker.as_ref()) { Ok(url) => Some(url.to_owned()),
Ok(url) => url, Err(_) => {
Err(_) => { warn!("cannot parse tracker url as utf-8, ignoring");
warn!("cannot parse tracker url as utf-8, ignoring"); return None;
return None;
}
};
match Url::parse(url) {
Ok(url) => Some(url),
Err(e) => {
warn!("cannot parse tracker URL {}: {}", url, e);
None
}
} }
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let tracker_peer_rx = TrackerComms::start(
torrent.info_hash,
self.peer_id,
trackers,
Box::new(TorrentStatsForTrackerDummy {}),
opts.force_tracker_interval,
self.cancellation_token().clone(),
self.tcp_listen_port,
);
let peer_rx = merge_peer_rx(dht_rx, tracker_peer_rx);
( (
torrent.info_hash, torrent.info_hash,
torrent.info, torrent.info,
dht_rx, peer_rx,
trackers,
opts.initial_peers opts.initial_peers
.clone() .clone()
.unwrap_or_default() .unwrap_or_default()
@ -863,9 +889,8 @@ impl Session {
self.main_torrent_info( self.main_torrent_info(
info_hash, info_hash,
info, info,
dht_rx, peer_rx,
initial_peers.into_iter().collect(), initial_peers.into_iter().collect(),
trackers,
opts, opts,
) )
.await .await
@ -876,9 +901,8 @@ impl Session {
&self, &self,
info_hash: Id20, info_hash: Id20,
info: TorrentMetaV1Info<ByteString>, info: TorrentMetaV1Info<ByteString>,
dht_peer_rx: Option<RequestPeersStream>, peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
trackers: Vec<reqwest::Url>,
opts: AddTorrentOptions, opts: AddTorrentOptions,
) -> anyhow::Result<AddTorrentResponse> { ) -> anyhow::Result<AddTorrentResponse> {
debug!("Torrent info: {:#?}", &info); debug!("Torrent info: {:#?}", &info);
@ -966,10 +990,6 @@ impl Session {
.cancellation_token(self.cancellation_token.child_token()) .cancellation_token(self.cancellation_token.child_token())
.peer_id(self.peer_id); .peer_id(self.peer_id);
if opts.disable_trackers {
builder.trackers(trackers);
}
if let Some(only_files) = only_files { if let Some(only_files) = only_files {
builder.only_files(only_files); builder.only_files(only_files);
} }
@ -1004,7 +1024,7 @@ impl Session {
let span = managed_torrent.info.span.clone(); let span = managed_torrent.info.span.clone();
let _ = span.enter(); let _ = span.enter();
managed_torrent managed_torrent
.start(initial_peers, dht_peer_rx, opts.paused) .start(initial_peers, peer_rx, opts.paused)
.context("error starting torrent")?; .context("error starting torrent")?;
} }
@ -1059,6 +1079,8 @@ impl Session {
.as_ref() .as_ref()
.map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port)) .map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port))
.transpose()?; .transpose()?;
todo!();
let peer_rx = None;
handle.start(Default::default(), peer_rx, false)?; handle.start(Default::default(), peer_rx, false)?;
Ok(()) Ok(())
} }

View file

@ -37,6 +37,7 @@ use url::Url;
use crate::chunk_tracker::ChunkTracker; use crate::chunk_tracker::ChunkTracker;
use crate::spawn_utils::BlockingSpawner; use crate::spawn_utils::BlockingSpawner;
use crate::torrent_state::stats::LiveStats; use crate::torrent_state::stats::LiveStats;
use crate::type_aliases::PeerStream;
use initializing::TorrentStateInitializing; use initializing::TorrentStateInitializing;
@ -173,7 +174,7 @@ impl ManagedTorrent {
pub(crate) fn start( pub(crate) fn start(
self: &Arc<Self>, self: &Arc<Self>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
peer_rx: Option<RequestPeersStream>, peer_rx: Option<PeerStream>,
start_paused: bool, start_paused: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut g = self.locked.write(); let mut g = self.locked.write();
@ -204,7 +205,7 @@ impl ManagedTorrent {
fn spawn_peer_adder( fn spawn_peer_adder(
live: &Arc<TorrentStateLive>, live: &Arc<TorrentStateLive>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
peer_rx: Option<RequestPeersStream>, peer_rx: Option<PeerStream>,
) { ) {
live.spawn( live.spawn(
error_span!(parent: live.meta().span.clone(), "external_peer_adder"), error_span!(parent: live.meta().span.clone(), "external_peer_adder"),

View file

@ -41,6 +41,21 @@ pub trait TorrentStatsForTracker: Send + Sync {
} }
} }
pub struct TorrentStatsForTrackerDummy {}
impl TorrentStatsForTracker for TorrentStatsForTrackerDummy {
fn get_uploaded_bytes(&self) -> u64 {
0
}
fn get_downloaded_bytes(&self) -> u64 {
0
}
fn get_total_bytes(&self) -> u64 {
0
}
}
type Sender = tokio::sync::mpsc::Sender<SocketAddr>; type Sender = tokio::sync::mpsc::Sender<SocketAddr>;
impl TrackerComms { impl TrackerComms {
@ -52,7 +67,7 @@ impl TrackerComms {
force_interval: Option<Duration>, force_interval: Option<Duration>,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
tcp_listen_port: Option<u16>, tcp_listen_port: Option<u16>,
) -> anyhow::Result<impl Stream<Item = SocketAddr> + Send + Sync + Unpin + 'static> { ) -> Option<impl Stream<Item = SocketAddr> + Send + Sync + Unpin + 'static> {
let (tx, rx) = tokio::sync::mpsc::channel::<SocketAddr>(16); let (tx, rx) = tokio::sync::mpsc::channel::<SocketAddr>(16);
let comms = Arc::new(Self { let comms = Arc::new(Self {
info_hash, info_hash,
@ -63,12 +78,18 @@ impl TrackerComms {
tx, tx,
tcp_listen_port, tcp_listen_port,
}); });
let mut added = false;
for tracker in trackers { for tracker in trackers {
if let Err(e) = comms.clone().add_tracker(&tracker) { if let Err(e) = comms.clone().add_tracker(&tracker) {
info!(tracker = tracker, "error adding tracker: {:#}", e) info!(tracker = tracker, "error adding tracker: {:#}", e)
} else {
added = true;
} }
} }
Ok(tokio_stream::wrappers::ReceiverStream::new(rx)) if !added {
return None;
}
Some(tokio_stream::wrappers::ReceiverStream::new(rx))
} }
fn add_tracker(self: Arc<Self>, tracker: &str) -> anyhow::Result<()> { fn add_tracker(self: Arc<Self>, tracker: &str) -> anyhow::Result<()> {

View file

@ -1,5 +1,8 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use futures::Stream;
pub type BF = bitvec::vec::BitVec<u8, bitvec::order::Msb0>; pub type BF = bitvec::vec::BitVec<u8, bitvec::order::Msb0>;
pub type PeerHandle = SocketAddr; pub type PeerHandle = SocketAddr;
pub type PeerStream = Box<dyn Stream<Item = SocketAddr> + Unpin + Send + Sync + 'static>;