Merge pull request #85 from ikatson/udp-trackers

UDP trackers and better operation without DHT
This commit is contained in:
Igor Katson 2024-02-26 10:01:20 +00:00 committed by GitHub
commit 3f8e3e3a3f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 974 additions and 401 deletions

View file

@ -17,8 +17,7 @@ webui-build: webui-deps
@PHONY: devserver
devserver:
echo -n '' > /tmp/rqbit-log
cargo run --release -- \
echo -n '' > /tmp/rqbit-log && cargo run --release -- \
--log-file /tmp/rqbit-log \
--log-file-rust-log=debug,librqbit=trace \
server start /tmp/scratch/

View file

@ -36,7 +36,9 @@ mod session;
mod spawn_utils;
mod torrent_state;
pub mod tracing_subscriber_config_utils;
mod tracker_comms;
pub mod tracker_comms;
pub mod tracker_comms_http;
pub mod tracker_comms_udp;
mod type_aliases;
pub use api::Api;

View file

@ -13,10 +13,8 @@ use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::{ByteBuf, ByteBufT, ByteString};
use clone_to_owned::CloneToOwned;
use dht::{
Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream,
};
use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt};
use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig};
use futures::{stream::FuturesUnordered, TryFutureExt};
use librqbit_core::{
directories::get_configuration_directory,
magnet::Magnet,
@ -28,11 +26,11 @@ use librqbit_core::{
};
use parking_lot::RwLock;
use peer_binary_protocol::Handshake;
use reqwest::Url;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tokio_stream::StreamExt;
use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};
use crate::{
@ -43,6 +41,8 @@ use crate::{
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
tracker_comms::TrackerComms,
type_aliases::PeerStream,
};
pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
@ -172,6 +172,9 @@ pub struct Session {
tcp_listen_port: Option<u16>,
cancellation_token: CancellationToken,
// This is stored for all tasks to stop when session is dropped.
_cancellation_token_drop_guard: DropGuard,
}
async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> {
@ -440,6 +443,7 @@ impl Session {
spawner,
output_folder,
db: RwLock::new(Default::default()),
_cancellation_token_drop_guard: token.clone().drop_guard(),
cancellation_token: token,
tcp_listen_port,
});
@ -505,8 +509,6 @@ impl Session {
addr: SocketAddr,
mut stream: TcpStream,
) -> anyhow::Result<(Arc<TorrentStateLive>, CheckedIncomingConnection)> {
// TODO: move buffer handling to peer_connection
let rwtimeout = self
.peer_opts
.read_write_timeout
@ -548,7 +550,10 @@ impl Session {
));
}
bail!("didn't find a matching torrent for {:?}", Id20::new(h.info_hash))
bail!(
"didn't find a matching torrent for {:?}",
Id20::new(h.info_hash)
)
}
async fn task_tcp_listener(self: Arc<Self>, l: TcpListener) -> anyhow::Result<()> {
@ -659,7 +664,7 @@ impl Session {
.collect();
let info = TorrentMetaV1Owned {
announce: trackers
.get(0)
.first()
.cloned()
.unwrap_or_else(|| ByteString(b"http://retracker.local/announce".to_vec())),
announce_list: vec![trackers],
@ -751,34 +756,41 @@ impl Session {
self.tcp_listen_port
};
let (info_hash, info, dht_rx, trackers, initial_peers) = match add {
let cancellation_token = self.cancellation_token.child_token();
let cancellation_token_drop_guard = cancellation_token.clone().drop_guard();
let paused = opts.list_only || opts.paused;
// The main difference between magnet link and torrent file, is that we need to resolve the magnet link
// into a torrent file by connecting to peers that support extended handshakes.
// So we must discover at least one peer and connect to it to be able to proceed further.
let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let magnet = Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?;
let info_hash = magnet.as_id20().context("magnet link didn't contain a BTv1 infohash")?;
let magnet =
Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?;
let info_hash = magnet
.as_id20()
.context("magnet link didn't contain a BTv1 infohash")?;
let dht_rx = self
.dht
.as_ref()
.context("magnet links without DHT are not supported")?
.get_peers(info_hash, announce_port)?;
let trackers = magnet.trackers
.into_iter()
.filter_map(|url| match reqwest::Url::parse(&url) {
Ok(url) => Some(url),
Err(e) => {
warn!("error parsing tracker {} as url: {}", url, e);
None
}
})
.collect();
let peer_token = cancellation_token.child_token();
let peer_rx = self.make_peer_rx(
info_hash,
magnet.trackers.clone(),
peer_token.clone(),
announce_port,
opts.force_tracker_interval,
)?;
let peer_rx = match peer_rx {
Some(peer_rx) => peer_rx,
None => bail!("can't find peers: DHT disabled and no trackers in magnet"),
};
debug!(?info_hash, "querying DHT");
let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver(
let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
opts.initial_peers.clone().unwrap_or_default(),
dht_rx,
peer_rx,
Some(self.merge_peer_opts(opts.peer_opts)),
)
.await
@ -788,17 +800,17 @@ impl Session {
anyhow::bail!("DHT died, no way to discover torrent metainfo")
}
};
if paused {
peer_token.cancel();
}
debug!(?info, "received result from DHT");
(
info_hash,
info,
if opts.paused || opts.list_only {
None
} else {
Some(dht_rx)
},
trackers,
magnet.trackers,
Some(peer_rx),
initial_peers,
cancellation_token,
)
}
other => {
@ -820,53 +832,54 @@ impl Session {
AddTorrent::TorrentInfo(t) => *t,
};
let dht_rx = match self.dht.as_ref() {
Some(dht) if !opts.paused && !opts.list_only => {
debug!(info_hash=?torrent.info_hash, "reading peers from DHT");
Some(dht.get_peers(torrent.info_hash, announce_port)?)
}
_ => None,
};
let trackers = torrent
.iter_announce()
.filter_map(|tracker| {
let url = match std::str::from_utf8(tracker.as_ref()) {
Ok(url) => url,
Err(_) => {
warn!("cannot parse tracker url as utf-8, ignoring");
return None;
}
};
match Url::parse(url) {
Ok(url) => Some(url),
Err(e) => {
warn!("cannot parse tracker URL {}: {}", url, e);
None
}
.filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) {
Ok(url) => Some(url.to_owned()),
Err(_) => {
warn!("cannot parse tracker url as utf-8, ignoring");
None
}
})
.collect::<Vec<_>>();
let peer_rx = if paused {
None
} else {
self.make_peer_rx(
torrent.info_hash,
trackers.clone(),
cancellation_token.clone(),
announce_port,
opts.force_tracker_interval,
)?
};
(
torrent.info_hash,
torrent.info,
dht_rx,
trackers,
peer_rx,
opts.initial_peers
.clone()
.unwrap_or_default()
.into_iter()
.collect(),
cancellation_token,
)
}
};
cancellation_token_drop_guard.disarm();
self.main_torrent_info(
info_hash,
info,
dht_rx,
initial_peers.into_iter().collect(),
trackers,
peer_rx,
initial_peers.into_iter().collect(),
opts,
cancellation_token,
)
.await
}
@ -876,13 +889,16 @@ impl Session {
&self,
info_hash: Id20,
info: TorrentMetaV1Info<ByteString>,
dht_peer_rx: Option<RequestPeersStream>,
trackers: Vec<String>,
peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>,
trackers: Vec<reqwest::Url>,
opts: AddTorrentOptions,
cancellation_token: CancellationToken,
) -> anyhow::Result<AddTorrentResponse> {
debug!("Torrent info: {:#?}", &info);
let drop_guard = cancellation_token.clone().drop_guard();
let get_only_files =
|only_files: Option<Vec<usize>>, only_files_regex: Option<String>, list_only: bool| {
match (only_files, only_files_regex) {
@ -963,13 +979,9 @@ impl Session {
builder
.overwrite(opts.overwrite)
.spawner(self.spawner)
.cancellation_token(self.cancellation_token.child_token())
.trackers(trackers)
.peer_id(self.peer_id);
if opts.disable_trackers {
builder.trackers(trackers);
}
if let Some(only_files) = only_files {
builder.only_files(only_files);
}
@ -1003,11 +1015,21 @@ impl Session {
{
let span = managed_torrent.info.span.clone();
let _ = span.enter();
// Just in case, cancel all tasks started for this torrent so far.
// This is defensive, and not proven necessary.
let token = if opts.paused {
cancellation_token.cancel();
self.cancellation_token.child_token()
} else {
cancellation_token
};
managed_torrent
.start(initial_peers, dht_peer_rx, opts.paused)
.start(initial_peers, peer_rx, opts.paused, token)
.context("error starting torrent")?;
}
drop_guard.disarm();
Ok(AddTorrentResponse::Added(id, managed_torrent))
}
@ -1053,13 +1075,51 @@ impl Session {
}
}
pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
let peer_rx = self
// Get a peer stream from both DHT and trackers.
fn make_peer_rx(
&self,
info_hash: Id20,
trackers: Vec<String>,
cancel: CancellationToken,
announce_port: Option<u16>,
force_tracker_interval: Option<Duration>,
) -> anyhow::Result<Option<PeerStream>> {
let announce_port = announce_port.or(self.tcp_listen_port);
let dht_rx = self
.dht
.as_ref()
.map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port))
.map(|dht| dht.get_peers(info_hash, announce_port))
.transpose()?;
handle.start(Default::default(), peer_rx, false)?;
let peer_rx = TrackerComms::start(
info_hash,
self.peer_id,
trackers,
// TODO: report actual bytes, not zeroes.
Box::new(()),
force_tracker_interval,
cancel,
announce_port,
);
// Merge DHT rx and tracker comms peer rx.
match (dht_rx, peer_rx) {
(Some(dht_rx), None) => Ok(Some(Box::new(dht_rx))),
(None, Some(peer_rx)) => Ok(Some(Box::new(peer_rx))),
(None, None) => Ok(None),
(Some(dht_rx), Some(peer_rx)) => Ok(Some(Box::new(dht_rx.merge(peer_rx)))),
}
}
pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
let token = self.cancellation_token.child_token();
let peer_rx = self.make_peer_rx(
handle.info_hash(),
handle.info().trackers.clone().into_iter().collect(),
token.clone(),
self.tcp_listen_port,
handle.info().options.force_tracker_interval,
)?;
handle.start(Default::default(), peer_rx, false, token)?;
Ok(())
}
}

View file

@ -57,7 +57,6 @@ use std::{
use anyhow::{bail, Context};
use backoff::backoff::Backoff;
use bencode::from_bytes;
use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use futures::{stream::FuturesUnordered, StreamExt};
@ -83,7 +82,6 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, error_span, info, trace, warn};
use url::Url;
use crate::{
chunk_tracker::{ChunkMarkingResult, ChunkTracker},
@ -93,7 +91,6 @@ use crate::{
},
session::CheckedIncomingConnection,
torrent_state::{peer::Peer, utils::atomic_inc},
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
type_aliases::{PeerHandle, BF},
};
@ -237,13 +234,6 @@ impl TorrentStateLive {
cancellation_token,
});
for tracker in state.meta.trackers.iter() {
state.spawn(
error_span!(parent: state.meta.span.clone(), "tracker_monitor", url = tracker.to_string()),
state.clone().task_single_tracker_monitor(tracker.clone()),
);
}
state.spawn(
error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"),
{
@ -297,74 +287,6 @@ impl TorrentStateLive {
&self.up_speed_estimator
}
async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result<u64> {
let response: reqwest::Response = reqwest::get(tracker_url).await?;
if !response.status().is_success() {
anyhow::bail!("tracker responded with {:?}", response.status());
}
let bytes = response.bytes().await?;
if let Ok(error) = from_bytes::<TrackerError>(&bytes) {
anyhow::bail!(
"tracker returned failure. Failure reason: {}",
error.failure_reason
)
};
let response = from_bytes::<TrackerResponse>(&bytes)?;
for peer in response.peers.iter_sockaddrs() {
self.add_peer_if_not_seen(peer)?;
}
Ok(response.interval)
}
async fn task_single_tracker_monitor(
self: Arc<Self>,
mut tracker_url: Url,
) -> anyhow::Result<()> {
let mut event = Some(TrackerRequestEvent::Started);
loop {
let request = TrackerRequest {
info_hash: self.info_hash(),
peer_id: self.peer_id(),
port: 6778,
uploaded: self.get_uploaded_bytes(),
downloaded: self.get_downloaded_bytes(),
left: self.get_left_to_download_bytes(),
compact: true,
no_peer_id: false,
event,
ip: None,
numwant: None,
key: None,
trackerid: None,
};
let request_query = request.as_querystring();
tracker_url.set_query(Some(&request_query));
match self.tracker_one_request(tracker_url.clone()).await {
Ok(interval) => {
event = None;
let interval = self
.meta
.options
.force_tracker_interval
.unwrap_or_else(|| Duration::from_secs(interval));
debug!(
"sleeping for {:?} after calling tracker {}",
interval,
tracker_url.host().unwrap()
);
tokio::time::sleep(interval).await;
}
Err(e) => {
debug!("error calling the tracker {}: {:#}", tracker_url, e);
tokio::time::sleep(Duration::from_secs(60)).await;
}
};
}
}
pub(crate) fn add_incoming_peer(
self: &Arc<Self>,
checked_peer: CheckedIncomingConnection,

View file

@ -15,7 +15,6 @@ use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use buffers::ByteString;
use dht::RequestPeersStream;
use librqbit_core::hash_id::Id20;
use librqbit_core::lengths::Lengths;
use librqbit_core::peer_id::generate_peer_id;
@ -32,11 +31,11 @@ use tracing::debug;
use tracing::error_span;
use tracing::trace;
use tracing::warn;
use url::Url;
use crate::chunk_tracker::ChunkTracker;
use crate::spawn_utils::BlockingSpawner;
use crate::torrent_state::stats::LiveStats;
use crate::type_aliases::PeerStream;
use initializing::TorrentStateInitializing;
@ -83,7 +82,7 @@ pub struct ManagedTorrentInfo {
pub info_hash: Id20,
pub out_dir: PathBuf,
pub(crate) spawner: BlockingSpawner,
pub trackers: HashSet<Url>,
pub trackers: HashSet<String>,
pub peer_id: Id20,
pub lengths: Lengths,
pub span: tracing::Span,
@ -92,7 +91,6 @@ pub struct ManagedTorrentInfo {
pub struct ManagedTorrent {
pub info: Arc<ManagedTorrentInfo>,
pub cancellation_token: CancellationToken,
pub(crate) only_files: Option<Vec<usize>>,
locked: RwLock<ManagedTorrentLocked>,
}
@ -173,15 +171,17 @@ impl ManagedTorrent {
pub(crate) fn start(
self: &Arc<Self>,
initial_peers: Vec<SocketAddr>,
peer_rx: Option<RequestPeersStream>,
peer_rx: Option<PeerStream>,
start_paused: bool,
live_cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
let mut g = self.locked.write();
let spawn_fatal_errors_receiver =
|state: &Arc<Self>, rx: tokio::sync::oneshot::Receiver<anyhow::Error>| {
|state: &Arc<Self>,
rx: tokio::sync::oneshot::Receiver<anyhow::Error>,
token: CancellationToken| {
let span = state.info.span.clone();
let token = state.cancellation_token.clone();
let state = Arc::downgrade(state);
spawn_with_cancel(
error_span!(parent: span, "fatal_errors_receiver"),
@ -204,7 +204,7 @@ impl ManagedTorrent {
fn spawn_peer_adder(
live: &Arc<TorrentStateLive>,
initial_peers: Vec<SocketAddr>,
peer_rx: Option<RequestPeersStream>,
peer_rx: Option<PeerStream>,
) {
live.spawn(
error_span!(parent: live.meta().span.clone(), "external_peer_adder"),
@ -257,7 +257,7 @@ impl ManagedTorrent {
drop(g);
let t = self.clone();
let span = self.info().span.clone();
let token = self.cancellation_token.clone();
let token = live_cancellation_token.clone();
spawn_with_cancel(
error_span!(parent: span.clone(), "initialize_and_start"),
token.clone(),
@ -277,10 +277,11 @@ impl ManagedTorrent {
}
let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx, token.child_token());
let live =
TorrentStateLive::new(paused, tx, live_cancellation_token);
g.state = ManagedTorrentState::Live(live.clone());
spawn_fatal_errors_receiver(&t, rx);
spawn_fatal_errors_receiver(&t, rx, token);
spawn_peer_adder(&live, initial_peers, peer_rx);
Ok(())
@ -298,13 +299,9 @@ impl ManagedTorrent {
ManagedTorrentState::Paused(_) => {
let paused = g.state.take().assert_paused();
let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(
paused,
tx,
self.cancellation_token.child_token().clone(),
);
let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone());
g.state = ManagedTorrentState::Live(live.clone());
spawn_fatal_errors_receiver(self, rx);
spawn_fatal_errors_receiver(self, rx, live_cancellation_token);
spawn_peer_adder(&live, initial_peers, peer_rx);
Ok(())
}
@ -317,7 +314,12 @@ impl ManagedTorrent {
drop(g);
// Recurse.
self.start(initial_peers, peer_rx, start_paused)
self.start(
initial_peers,
peer_rx,
start_paused,
live_cancellation_token,
)
}
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),
}
@ -421,11 +423,10 @@ pub struct ManagedTorrentBuilder {
peer_connect_timeout: Option<Duration>,
peer_read_write_timeout: Option<Duration>,
only_files: Option<Vec<usize>>,
trackers: Vec<Url>,
trackers: Vec<String>,
peer_id: Option<Id20>,
overwrite: bool,
spawner: Option<BlockingSpawner>,
cancellation_token: Option<CancellationToken>,
}
impl ManagedTorrentBuilder {
@ -446,21 +447,15 @@ impl ManagedTorrentBuilder {
trackers: Default::default(),
peer_id: None,
overwrite: false,
cancellation_token: None,
}
}
pub fn cancellation_token(&mut self, token: CancellationToken) -> &mut Self {
self.cancellation_token = Some(token);
self
}
pub fn only_files(&mut self, only_files: Vec<usize>) -> &mut Self {
self.only_files = Some(only_files);
self
}
pub fn trackers(&mut self, trackers: Vec<Url>) -> &mut Self {
pub fn trackers(&mut self, trackers: Vec<String>) -> &mut Self {
self.trackers = trackers;
self
}
@ -495,7 +490,7 @@ impl ManagedTorrentBuilder {
self
}
pub(crate) fn build(mut self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> {
pub(crate) fn build(self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> {
let lengths = Lengths::from_torrent(&self.info)?;
let info = Arc::new(ManagedTorrentInfo {
span,
@ -522,7 +517,6 @@ impl ManagedTorrentBuilder {
locked: RwLock::new(ManagedTorrentLocked {
state: ManagedTorrentState::Initializing(initializing),
}),
cancellation_token: self.cancellation_token.take().unwrap_or_default(),
info,
}))
}

View file

@ -96,7 +96,6 @@ pub fn init_logging(opts: InitLoggingOptions) -> anyhow::Result<InitLoggingResul
std::fs::OpenOptions::new()
.create(true)
.append(true)
.write(true)
.open(&log_file)
.with_context(|| format!("error opening log file {:?}", log_file))?,
));

View file

@ -1,233 +1,251 @@
use buffers::ByteBuf;
use byteorder::ByteOrder;
use serde::{Deserialize, Deserializer};
use std::{
fmt::Write,
marker::PhantomData,
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
str::FromStr,
};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use futures::Stream;
use librqbit_core::spawn_utils::spawn_with_cancel;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error_span;
use tracing::info;
use tracing::trace;
use url::Url;
use crate::tracker_comms_http;
use crate::tracker_comms_udp;
use librqbit_core::hash_id::Id20;
#[derive(Clone, Copy)]
pub enum TrackerRequestEvent {
Started,
#[allow(dead_code)]
Stopped,
#[allow(dead_code)]
Completed,
pub struct TrackerComms {
info_hash: Id20,
peer_id: Id20,
stats: Box<dyn TorrentStatsProvider>,
force_tracker_interval: Option<Duration>,
cancellation_token: CancellationToken,
tx: Sender,
tcp_listen_port: Option<u16>,
}
pub struct TrackerRequest {
pub info_hash: Id20,
pub peer_id: Id20,
pub event: Option<TrackerRequestEvent>,
pub port: u16,
pub uploaded: u64,
pub downloaded: u64,
pub left: u64,
pub compact: bool,
pub no_peer_id: bool,
pub ip: Option<std::net::IpAddr>,
pub numwant: Option<usize>,
pub key: Option<String>,
pub trackerid: Option<String>,
#[derive(Default)]
pub struct TrackerCommsStats {
pub uploaded_bytes: u64,
pub downloaded_bytes: u64,
pub total_bytes: u64,
}
#[derive(Deserialize, Debug)]
pub struct TrackerError<'a> {
#[serde(rename = "failure reason", borrow)]
pub failure_reason: ByteBuf<'a>,
}
#[derive(Deserialize, Debug)]
pub struct DictPeer<'a> {
#[serde(deserialize_with = "deserialize_ip_string")]
ip: IpAddr,
#[serde(borrow)]
#[allow(dead_code)]
peer_id: Option<ByteBuf<'a>>,
port: u16,
}
impl<'a> DictPeer<'a> {
fn as_sockaddr(&self) -> SocketAddr {
SocketAddr::new(self.ip, self.port)
impl TrackerCommsStats {
pub fn get_left_to_download_bytes(&self) -> u64 {
let total = self.total_bytes;
let down = self.downloaded_bytes;
if total >= down {
return total - down;
}
0
}
}
#[derive(Debug)]
pub struct Peers {
addrs: Vec<SocketAddr>,
pub trait TorrentStatsProvider: Send + Sync {
fn get(&self) -> TrackerCommsStats;
}
impl Peers {
pub fn iter_sockaddrs(&self) -> impl Iterator<Item = std::net::SocketAddr> + '_ {
self.addrs.iter().copied()
impl TorrentStatsProvider for () {
fn get(&self) -> TrackerCommsStats {
Default::default()
}
}
impl<'de> serde::de::Deserialize<'de> for Peers {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct Visitor<'de> {
phantom: std::marker::PhantomData<&'de ()>,
}
impl<'de> serde::de::Visitor<'de> for Visitor<'de> {
type Value = Peers;
type Sender = tokio::sync::mpsc::Sender<SocketAddr>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a list of peers in dict or binary format")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut peers = Vec::new();
while let Some(peer) = seq.next_element::<DictPeer>()? {
peers.push(peer.as_sockaddr())
}
Ok(Peers { addrs: peers })
}
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Peers {
addrs: parse_compact_peers(v)
.into_iter()
.map(|v| v.into())
.collect(),
})
}
}
deserializer.deserialize_any(Visitor {
phantom: PhantomData,
})
}
}
fn deserialize_ip_string<'de, D>(de: D) -> Result<IpAddr, D::Error>
where
D: Deserializer<'de>,
{
struct Visitor;
impl<'de> serde::de::Visitor<'de> for Visitor {
type Value = IpAddr;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("expecting an IPv4 address")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
IpAddr::from_str(v).map_err(|e| E::custom(format!("cannot parse ip: {e}")))
}
}
de.deserialize_str(Visitor {})
}
fn parse_compact_peers(b: &[u8]) -> Vec<SocketAddrV4> {
let mut ips = Vec::new();
for chunk in b.chunks_exact(6) {
let ip_chunk = &chunk[..4];
let port_chunk = &chunk[4..6];
let ipaddr = Ipv4Addr::new(ip_chunk[0], ip_chunk[1], ip_chunk[2], ip_chunk[3]);
let port = byteorder::BigEndian::read_u16(port_chunk);
ips.push(SocketAddrV4::new(ipaddr, port));
}
ips
}
#[derive(Deserialize, Debug)]
pub struct TrackerResponse<'a> {
#[serde(rename = "warning message", borrow)]
pub warning_message: Option<ByteBuf<'a>>,
pub complete: u64,
pub interval: u64,
#[serde(rename = "min interval")]
pub min_interval: Option<u64>,
pub tracker_id: Option<ByteBuf<'a>>,
pub incomplete: u64,
pub peers: Peers,
}
impl TrackerRequest {
pub fn as_querystring(&self) -> String {
use urlencoding as u;
let mut s = String::new();
s.push_str("info_hash=");
s.push_str(u::encode_binary(&self.info_hash.0).as_ref());
s.push_str("&peer_id=");
s.push_str(u::encode_binary(&self.peer_id.0).as_ref());
if let Some(event) = self.event {
write!(
s,
"&event={}",
match event {
TrackerRequestEvent::Started => "started",
TrackerRequestEvent::Stopped => "stopped",
TrackerRequestEvent::Completed => "completed",
}
)
.unwrap();
}
write!(s, "&port={}", self.port).unwrap();
write!(s, "&uploaded={}", self.uploaded).unwrap();
write!(s, "&downloaded={}", self.downloaded).unwrap();
write!(s, "&left={}", self.left).unwrap();
write!(s, "&compact={}", if self.compact { 1 } else { 0 }).unwrap();
write!(s, "&no_peer_id={}", if self.no_peer_id { 1 } else { 0 }).unwrap();
if let Some(ip) = &self.ip {
write!(s, "&ip={ip}").unwrap();
}
if let Some(numwant) = &self.numwant {
write!(s, "&numwant={numwant}").unwrap();
}
if let Some(key) = &self.key {
write!(s, "&key={key}").unwrap();
}
if let Some(trackerid) = &self.trackerid {
write!(s, "&trackerid={trackerid}").unwrap();
}
s
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialize() {
let info_hash = Id20::new([
1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
]);
let peer_id = Id20::new([
1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
]);
let request = TrackerRequest {
impl TrackerComms {
pub fn start(
info_hash: Id20,
peer_id: Id20,
trackers: Vec<String>,
stats: Box<dyn TorrentStatsProvider>,
force_interval: Option<Duration>,
cancellation_token: CancellationToken,
tcp_listen_port: Option<u16>,
) -> Option<impl Stream<Item = SocketAddr> + Send + Sync + Unpin + 'static> {
let (tx, rx) = tokio::sync::mpsc::channel::<SocketAddr>(16);
let comms = Arc::new(Self {
info_hash,
peer_id,
port: 6881,
uploaded: 0,
downloaded: 0,
left: 1024 * 1024,
compact: true,
no_peer_id: false,
event: Some(TrackerRequestEvent::Started),
ip: Some("127.0.0.1".parse().unwrap()),
numwant: None,
key: None,
trackerid: None,
stats,
force_tracker_interval: force_interval,
cancellation_token,
tx,
tcp_listen_port,
});
let mut added = false;
for tracker in trackers {
if let Err(e) = comms.clone().add_tracker(&tracker) {
info!(tracker = tracker, "error adding tracker: {:#}", e)
} else {
added = true;
}
}
if !added {
return None;
}
Some(tokio_stream::wrappers::ReceiverStream::new(rx))
}
fn add_tracker(self: Arc<Self>, tracker: &str) -> anyhow::Result<()> {
if tracker.starts_with("http://") || tracker.starts_with("https://") {
spawn_with_cancel(
error_span!(
parent: None,
"http_tracker",
tracker = tracker,
info_hash = ?self.info_hash
),
self.cancellation_token.clone(),
{
let comms = self;
let url = Url::parse(tracker).context("can't parse URL")?;
async move { comms.task_single_tracker_monitor_http(url).await }
},
);
} else if tracker.starts_with("udp://") {
spawn_with_cancel(
error_span!(parent: None, "udp_tracker", tracker = tracker, info_hash = ?self.info_hash),
self.cancellation_token.clone(),
{
let comms = self;
let url = Url::parse(tracker).context("can't parse URL")?;
async move { comms.task_single_tracker_monitor_udp(url).await }
},
);
} else {
bail!("unsupported tracker url {}", tracker)
}
Ok(())
}
async fn task_single_tracker_monitor_http(
self: Arc<Self>,
mut tracker_url: Url,
) -> anyhow::Result<()> {
let mut event = Some(tracker_comms_http::TrackerRequestEvent::Started);
loop {
let stats = self.stats.get();
let request = tracker_comms_http::TrackerRequest {
info_hash: self.info_hash,
peer_id: self.peer_id,
port: 6778,
uploaded: stats.uploaded_bytes,
downloaded: stats.downloaded_bytes,
left: stats.get_left_to_download_bytes(),
compact: true,
no_peer_id: false,
event,
ip: None,
numwant: None,
key: None,
trackerid: None,
};
let request_query = request.as_querystring();
tracker_url.set_query(Some(&request_query));
match self.tracker_one_request_http(tracker_url.clone()).await {
Ok(interval) => {
event = None;
let interval = self
.force_tracker_interval
.unwrap_or_else(|| Duration::from_secs(interval));
debug!(
"sleeping for {:?} after calling tracker {}",
interval,
tracker_url.host().unwrap()
);
tokio::time::sleep(interval).await;
}
Err(e) => {
debug!("error calling the tracker {}: {:#}", tracker_url, e);
tokio::time::sleep(Duration::from_secs(60)).await;
}
};
}
}
async fn tracker_one_request_http(&self, tracker_url: Url) -> anyhow::Result<u64> {
let response: reqwest::Response = reqwest::get(tracker_url).await?;
if !response.status().is_success() {
anyhow::bail!("tracker responded with {:?}", response.status());
}
let bytes = response.bytes().await?;
if let Ok(error) = bencode::from_bytes::<tracker_comms_http::TrackerError>(&bytes) {
anyhow::bail!(
"tracker returned failure. Failure reason: {}",
error.failure_reason
)
};
dbg!(request.as_querystring());
let response = bencode::from_bytes::<tracker_comms_http::TrackerResponse>(&bytes)?;
for peer in response.peers.iter_sockaddrs() {
self.tx.send(peer).await?;
}
Ok(response.interval)
}
async fn task_single_tracker_monitor_udp(&self, url: Url) -> anyhow::Result<()> {
use tracker_comms_udp::*;
if url.scheme() != "udp" {
bail!("expected UDP scheme in {}", url);
}
let hp: (&str, u16) = (
url.host_str().context("missing host")?,
url.port().context("missing port")?,
);
let mut requester = UdpTrackerRequester::new(hp)
.await
.context("error creating UDP tracker requester")?;
let mut sleep_interval: Option<Duration> = None;
loop {
if let Some(i) = sleep_interval {
trace!(interval=?sleep_interval, "sleeping");
tokio::time::sleep(i).await;
}
let stats = self.stats.get();
let request = AnnounceFields {
info_hash: self.info_hash,
peer_id: self.peer_id,
downloaded: stats.downloaded_bytes,
left: stats.get_left_to_download_bytes(),
uploaded: stats.uploaded_bytes,
event: EVENT_NONE,
key: 0, // whatever that is?
port: self.tcp_listen_port.unwrap_or(0),
};
match requester.announce(request).await {
Ok(response) => {
trace!(len = response.addrs.len(), "received announce response");
for addr in response.addrs {
self.tx
.send(SocketAddr::V4(addr))
.await
.context("rx closed")?;
}
let new_interval = response.interval.max(5);
let new_interval = Duration::from_secs(new_interval as u64);
sleep_interval = Some(self.force_tracker_interval.unwrap_or(new_interval));
}
Err(e) => {
debug!(url = ?url, "error reading announce response: {e:#}");
if sleep_interval.is_none() {
sleep_interval = Some(
self.force_tracker_interval
.unwrap_or(Duration::from_secs(60)),
);
}
}
}
}
}
}

View file

@ -0,0 +1,233 @@
use buffers::ByteBuf;
use byteorder::ByteOrder;
use serde::{Deserialize, Deserializer};
use std::{
fmt::Write,
marker::PhantomData,
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
str::FromStr,
};
use librqbit_core::hash_id::Id20;
#[derive(Clone, Copy)]
pub enum TrackerRequestEvent {
Started,
#[allow(dead_code)]
Stopped,
#[allow(dead_code)]
Completed,
}
pub struct TrackerRequest {
pub info_hash: Id20,
pub peer_id: Id20,
pub event: Option<TrackerRequestEvent>,
pub port: u16,
pub uploaded: u64,
pub downloaded: u64,
pub left: u64,
pub compact: bool,
pub no_peer_id: bool,
pub ip: Option<std::net::IpAddr>,
pub numwant: Option<usize>,
pub key: Option<String>,
pub trackerid: Option<String>,
}
#[derive(Deserialize, Debug)]
pub struct TrackerError<'a> {
#[serde(rename = "failure reason", borrow)]
pub failure_reason: ByteBuf<'a>,
}
#[derive(Deserialize, Debug)]
pub struct DictPeer<'a> {
#[serde(deserialize_with = "deserialize_ip_string")]
ip: IpAddr,
#[serde(borrow)]
#[allow(dead_code)]
peer_id: Option<ByteBuf<'a>>,
port: u16,
}
impl<'a> DictPeer<'a> {
fn as_sockaddr(&self) -> SocketAddr {
SocketAddr::new(self.ip, self.port)
}
}
#[derive(Debug)]
pub struct Peers {
addrs: Vec<SocketAddr>,
}
impl Peers {
pub fn iter_sockaddrs(&self) -> impl Iterator<Item = std::net::SocketAddr> + '_ {
self.addrs.iter().copied()
}
}
impl<'de> serde::de::Deserialize<'de> for Peers {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct Visitor<'de> {
phantom: std::marker::PhantomData<&'de ()>,
}
impl<'de> serde::de::Visitor<'de> for Visitor<'de> {
type Value = Peers;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a list of peers in dict or binary format")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let mut peers = Vec::new();
while let Some(peer) = seq.next_element::<DictPeer>()? {
peers.push(peer.as_sockaddr())
}
Ok(Peers { addrs: peers })
}
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Peers {
addrs: parse_compact_peers(v)
.into_iter()
.map(|v| v.into())
.collect(),
})
}
}
deserializer.deserialize_any(Visitor {
phantom: PhantomData,
})
}
}
fn deserialize_ip_string<'de, D>(de: D) -> Result<IpAddr, D::Error>
where
D: Deserializer<'de>,
{
struct Visitor;
impl<'de> serde::de::Visitor<'de> for Visitor {
type Value = IpAddr;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("expecting an IPv4 address")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
IpAddr::from_str(v).map_err(|e| E::custom(format!("cannot parse ip: {e}")))
}
}
de.deserialize_str(Visitor {})
}
fn parse_compact_peers(b: &[u8]) -> Vec<SocketAddrV4> {
let mut ips = Vec::new();
for chunk in b.chunks_exact(6) {
let ip_chunk = &chunk[..4];
let port_chunk = &chunk[4..6];
let ipaddr = Ipv4Addr::new(ip_chunk[0], ip_chunk[1], ip_chunk[2], ip_chunk[3]);
let port = byteorder::BigEndian::read_u16(port_chunk);
ips.push(SocketAddrV4::new(ipaddr, port));
}
ips
}
#[derive(Deserialize, Debug)]
pub struct TrackerResponse<'a> {
#[serde(rename = "warning message", borrow)]
pub warning_message: Option<ByteBuf<'a>>,
pub complete: u64,
pub interval: u64,
#[serde(rename = "min interval")]
pub min_interval: Option<u64>,
pub tracker_id: Option<ByteBuf<'a>>,
pub incomplete: u64,
pub peers: Peers,
}
impl TrackerRequest {
pub fn as_querystring(&self) -> String {
use urlencoding as u;
let mut s = String::new();
s.push_str("info_hash=");
s.push_str(u::encode_binary(&self.info_hash.0).as_ref());
s.push_str("&peer_id=");
s.push_str(u::encode_binary(&self.peer_id.0).as_ref());
if let Some(event) = self.event {
write!(
s,
"&event={}",
match event {
TrackerRequestEvent::Started => "started",
TrackerRequestEvent::Stopped => "stopped",
TrackerRequestEvent::Completed => "completed",
}
)
.unwrap();
}
write!(s, "&port={}", self.port).unwrap();
write!(s, "&uploaded={}", self.uploaded).unwrap();
write!(s, "&downloaded={}", self.downloaded).unwrap();
write!(s, "&left={}", self.left).unwrap();
write!(s, "&compact={}", if self.compact { 1 } else { 0 }).unwrap();
write!(s, "&no_peer_id={}", if self.no_peer_id { 1 } else { 0 }).unwrap();
if let Some(ip) = &self.ip {
write!(s, "&ip={ip}").unwrap();
}
if let Some(numwant) = &self.numwant {
write!(s, "&numwant={numwant}").unwrap();
}
if let Some(key) = &self.key {
write!(s, "&key={key}").unwrap();
}
if let Some(trackerid) = &self.trackerid {
write!(s, "&trackerid={trackerid}").unwrap();
}
s
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_serialize() {
let info_hash = Id20::new([
1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
]);
let peer_id = Id20::new([
1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
]);
let request = TrackerRequest {
info_hash,
peer_id,
port: 6881,
uploaded: 0,
downloaded: 0,
left: 1024 * 1024,
compact: true,
no_peer_id: false,
event: Some(TrackerRequestEvent::Started),
ip: Some("127.0.0.1".parse().unwrap()),
numwant: None,
key: None,
trackerid: None,
};
dbg!(request.as_querystring());
}
}

View file

@ -0,0 +1,341 @@
use std::net::{Ipv4Addr, SocketAddrV4};
use anyhow::{bail, Context};
use librqbit_core::hash_id::Id20;
use rand::Rng;
use tokio::net::ToSocketAddrs;
use tracing::trace;
const ACTION_CONNECT: u32 = 0;
const ACTION_ANNOUNCE: u32 = 1;
// const ACTION_SCRAPE: u32 = 2;
// const ACTION_ERROR: u32 = 3;
pub const EVENT_NONE: u32 = 0;
pub const EVENT_COMPLETED: u32 = 1;
pub const EVENT_STARTED: u32 = 2;
pub const EVENT_STOPPED: u32 = 3;
pub type ConnectionId = u64;
const CONNECTION_ID_MAGIC: ConnectionId = 0x41727101980;
pub type TransactionId = u32;
pub fn new_transaction_id() -> TransactionId {
rand::thread_rng().gen()
}
#[derive(Debug)]
pub struct AnnounceFields {
pub info_hash: Id20,
pub peer_id: Id20,
pub downloaded: u64,
pub left: u64,
pub uploaded: u64,
pub event: u32,
pub key: u32,
pub port: u16,
}
#[derive(Debug)]
pub enum Request {
Connect,
Announce(ConnectionId, AnnounceFields),
}
impl Request {
pub fn serialize(&self, transaction_id: TransactionId, buf: &mut Vec<u8>) -> usize {
let cur_len = buf.len();
match self {
Request::Connect => {
buf.extend_from_slice(&CONNECTION_ID_MAGIC.to_be_bytes());
buf.extend_from_slice(&ACTION_CONNECT.to_be_bytes());
buf.extend_from_slice(&transaction_id.to_be_bytes());
}
Request::Announce(connection_id, fields) => {
buf.extend_from_slice(&connection_id.to_be_bytes());
buf.extend_from_slice(&ACTION_ANNOUNCE.to_be_bytes());
buf.extend_from_slice(&transaction_id.to_be_bytes());
buf.extend_from_slice(&fields.info_hash.0);
buf.extend_from_slice(&fields.peer_id.0);
buf.extend_from_slice(&fields.downloaded.to_be_bytes());
buf.extend_from_slice(&fields.left.to_be_bytes());
buf.extend_from_slice(&fields.uploaded.to_be_bytes());
buf.extend_from_slice(&fields.event.to_be_bytes());
buf.extend_from_slice(&0u32.to_be_bytes()); // ip address 0
buf.extend_from_slice(&fields.key.to_be_bytes());
buf.extend_from_slice(&(-1i32).to_be_bytes()); // num want -1
buf.extend_from_slice(&fields.port.to_be_bytes());
}
}
buf.len() - cur_len
}
}
#[derive(Debug)]
pub struct AnnounceResponse {
pub interval: u32,
pub leechers: u32,
pub seeders: u32,
pub addrs: Vec<SocketAddrV4>,
}
#[derive(Debug)]
pub enum Response {
Connect(ConnectionId),
Announce(AnnounceResponse),
}
fn split_slice(s: &[u8], first_len: usize) -> Option<(&[u8], &[u8])> {
if s.len() < first_len {
return None;
}
Some(s.split_at(first_len))
}
fn s_to_arr<const T: usize>(buf: &[u8]) -> [u8; T] {
let mut arr = [0u8; T];
arr.copy_from_slice(buf);
arr
}
trait ParseNum: Sized {
fn parse_num(buf: &[u8]) -> anyhow::Result<(Self, &[u8])>;
}
macro_rules! parse_impl {
($ty:tt, $size:expr) => {
impl ParseNum for $ty {
fn parse_num(buf: &[u8]) -> anyhow::Result<($ty, &[u8])> {
let (bytes, rest) =
split_slice(buf, $size).with_context(|| format!("expected {} bytes", $size))?;
let num = $ty::from_be_bytes(s_to_arr(bytes));
Ok((num, rest))
}
}
};
}
parse_impl!(u32, 4);
parse_impl!(u64, 8);
parse_impl!(u16, 2);
parse_impl!(i32, 4);
parse_impl!(i64, 8);
parse_impl!(i16, 2);
impl Response {
pub fn parse(buf: &[u8]) -> anyhow::Result<(TransactionId, Self)> {
let (action, buf) = u32::parse_num(buf).context("can't parse action")?;
let (tid, mut buf) = u32::parse_num(buf).context("can't parse transaction id")?;
let response = match action {
ACTION_CONNECT => {
let (connection_id, b) =
u64::parse_num(buf).context("can't parse connection id")?;
buf = b;
Response::Connect(connection_id)
}
ACTION_ANNOUNCE => {
let (interval, b) = u32::parse_num(buf).context("can't parse interval")?;
let (leechers, b) = u32::parse_num(b).context("can't parse leechers")?;
let (seeders, mut b) = u32::parse_num(b).context("can't parse seeders")?;
let mut addrs = Vec::new();
while !b.is_empty() {
let (ip, b2) = u32::parse_num(b)?;
let ip = Ipv4Addr::from(ip);
b = b2;
let (port, b2) = u16::parse_num(b)?;
b = b2;
addrs.push(SocketAddrV4::new(ip, port));
}
buf = b;
Response::Announce(AnnounceResponse {
interval,
leechers,
seeders,
addrs,
})
}
_ => bail!("unsupported action {action}"),
};
if !buf.is_empty() {
bail!(
"parsed {response:?} so far, but got {} remaining bytes",
buf.len()
);
}
Ok((tid, response))
}
}
pub struct UdpTrackerRequester {
sock: tokio::net::UdpSocket,
connection_id: ConnectionId,
read_buf: Vec<u8>,
write_buf: Vec<u8>,
}
impl UdpTrackerRequester {
// Addr is "host:port"
pub async fn new(addr: impl ToSocketAddrs) -> anyhow::Result<Self> {
let sock = tokio::net::UdpSocket::bind("0.0.0.0:0")
.await
.context("error binding UDP socket")?;
sock.connect(addr)
.await
.context("error connecting UDP socket")?;
let tid = new_transaction_id();
let mut write_buf = Vec::new();
let mut read_buf = vec![0u8; 4096];
trace!("sending connect request");
Request::Connect.serialize(tid, &mut write_buf);
sock.send(&write_buf)
.await
.context("error sending to socket")?;
let size = sock
.recv(&mut read_buf)
.await
.context("error receiving from socket")?;
let (rtid, response) =
Response::parse(&read_buf[..size]).context("error parsing response")?;
if tid != rtid {
bail!("expected transaction id {} == {}", tid, rtid);
}
trace!(response=?response, "received");
let connection_id = match response {
Response::Connect(connection_id) => connection_id,
other => bail!("unexpected response {other:?}"),
};
trace!(connection_id);
Ok(Self {
sock,
connection_id,
read_buf,
write_buf,
})
}
pub async fn announce(&mut self, fields: AnnounceFields) -> anyhow::Result<AnnounceResponse> {
let request = Request::Announce(self.connection_id, fields);
let response = self.request(request).await?;
match response {
Response::Announce(r) => Ok(r),
other => bail!("unexpected response {other:?}, expected announce"),
}
}
pub async fn request(&mut self, request: Request) -> anyhow::Result<Response> {
let tid = new_transaction_id();
self.write_buf.clear();
let size = request.serialize(tid, &mut self.write_buf);
trace!(request=?request, tid, "sending");
self.sock
.send(&self.write_buf[..size])
.await
.context("error sending")?;
let size = self.sock.recv(&mut self.read_buf).await.unwrap();
let (rtid, response) = Response::parse(&self.read_buf[..size]).unwrap();
trace!("received response");
if tid != rtid {
bail!("unexpected transaction id");
}
Ok(response)
}
}
#[cfg(test)]
mod tests {
use std::{io::Write, str::FromStr};
use librqbit_core::{hash_id::Id20, peer_id::generate_peer_id};
use crate::tracker_comms_udp::{
new_transaction_id, AnnounceFields, Request, Response, EVENT_NONE,
};
#[test]
fn test_parse_announce() {
let b = include_bytes!("../resources/test/udp-tracker-announce-response.bin");
let (tid, response) = Response::parse(b).unwrap();
dbg!(tid, response);
}
#[ignore]
#[tokio::test]
async fn test_announce() {
let sock = tokio::net::UdpSocket::bind("0.0.0.0:0").await.unwrap();
sock.connect("opentor.net:6969").await.unwrap();
let tid = new_transaction_id();
let mut write_buf = Vec::new();
let mut read_buf = vec![0u8; 4096];
Request::Connect.serialize(tid, &mut write_buf);
sock.send(&write_buf).await.unwrap();
let size = sock.recv(&mut read_buf).await.unwrap();
let (rtid, response) = Response::parse(&read_buf[..size]).unwrap();
assert_eq!(tid, rtid);
let connection_id = match response {
Response::Connect(connection_id) => {
dbg!(connection_id)
}
other => panic!("unexpected response {other:?}"),
};
let hash = Id20::from_str("775459190aa65566591634203f8d9f17d341f969").unwrap();
let tid = new_transaction_id();
let request = Request::Announce(
connection_id,
AnnounceFields {
info_hash: hash,
peer_id: generate_peer_id(),
downloaded: 0,
left: 0,
uploaded: 0,
event: EVENT_NONE,
key: 0, // whatever that is?
port: 24563,
},
);
write_buf.clear();
let size = request.serialize(tid, &mut write_buf);
sock.send(&write_buf[..size]).await.unwrap();
let size = sock.recv(&mut read_buf).await.unwrap();
{
let mut f = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open("/tmp/proto.bin")
.unwrap();
f.write_all(&read_buf[..size]).unwrap();
}
dbg!(&read_buf[..size]);
let (rtid, response) = Response::parse(&read_buf[..size]).unwrap();
assert_eq!(tid, rtid);
match response {
Response::Announce(r) => {
dbg!(r);
}
other => panic!("unexpected response {other:?}"),
}
}
}

View file

@ -1,5 +1,8 @@
use std::net::SocketAddr;
use futures::Stream;
pub type BF = bitvec::vec::BitVec<u8, bitvec::order::Msb0>;
pub type PeerHandle = SocketAddr;
pub type PeerStream = Box<dyn Stream<Item = SocketAddr> + Unpin + Send + Sync + 'static>;

View file

@ -7,3 +7,5 @@ pub mod peer_id;
pub mod spawn_utils;
pub mod speed_estimator;
pub mod torrent_metainfo;
pub use hash_id::Id20;

View file

@ -4,7 +4,6 @@ use anyhow::Context;
use crate::hash_id::{Id20, Id32};
/// A parsed magnet link.
pub struct Magnet {
id20: Option<Id20>,
@ -45,7 +44,7 @@ impl Magnet {
} else {
anyhow::bail!("expected xt to start with btih or btmh");
}
},
}
"tr" => trackers.push(value.into()),
_ => {}
}
@ -93,7 +92,6 @@ impl std::fmt::Display for Magnet {
}
}
#[cfg(test)]
mod tests {
#[test]
@ -109,8 +107,10 @@ mod tests {
use std::str::FromStr;
let magnet = "magnet:?xt=urn:btmh:1220caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e&dn=bittorrent-v2-test
";
let info_hash = Id32::from_str("caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e").unwrap();
let m = Magnet::parse(&magnet).unwrap();
let info_hash =
Id32::from_str("caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e")
.unwrap();
let m = Magnet::parse(magnet).unwrap();
assert!(m.as_id32() == Some(info_hash));
}
}