diff --git a/Makefile b/Makefile index 2e69dc6..751fb44 100644 --- a/Makefile +++ b/Makefile @@ -17,8 +17,7 @@ webui-build: webui-deps @PHONY: devserver devserver: - echo -n '' > /tmp/rqbit-log - cargo run --release -- \ + echo -n '' > /tmp/rqbit-log && cargo run --release -- \ --log-file /tmp/rqbit-log \ --log-file-rust-log=debug,librqbit=trace \ server start /tmp/scratch/ diff --git a/crates/librqbit/resources/test/udp-tracker-announce-response.bin b/crates/librqbit/resources/test/udp-tracker-announce-response.bin new file mode 100644 index 0000000..4b1bc3a Binary files /dev/null and b/crates/librqbit/resources/test/udp-tracker-announce-response.bin differ diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 38091bf..817c086 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -36,7 +36,9 @@ mod session; mod spawn_utils; mod torrent_state; pub mod tracing_subscriber_config_utils; -mod tracker_comms; +pub mod tracker_comms; +pub mod tracker_comms_http; +pub mod tracker_comms_udp; mod type_aliases; pub use api::Api; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 998befb..95d65ea 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -13,10 +13,8 @@ use anyhow::{bail, Context}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use buffers::{ByteBuf, ByteBufT, ByteString}; use clone_to_owned::CloneToOwned; -use dht::{ - Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream, -}; -use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}; +use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; +use futures::{stream::FuturesUnordered, TryFutureExt}; use librqbit_core::{ directories::get_configuration_directory, magnet::Magnet, @@ -28,11 +26,11 @@ use librqbit_core::{ }; use parking_lot::RwLock; use peer_binary_protocol::Handshake; -use reqwest::Url; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::serde_as; use tokio::net::{TcpListener, TcpStream}; -use tokio_util::sync::CancellationToken; +use tokio_stream::StreamExt; +use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ @@ -43,6 +41,8 @@ use crate::{ torrent_state::{ ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, }, + tracker_comms::TrackerComms, + type_aliases::PeerStream, }; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; @@ -172,6 +172,9 @@ pub struct Session { tcp_listen_port: Option, cancellation_token: CancellationToken, + + // This is stored for all tasks to stop when session is dropped. + _cancellation_token_drop_guard: DropGuard, } async fn torrent_from_url(url: &str) -> anyhow::Result { @@ -440,6 +443,7 @@ impl Session { spawner, output_folder, db: RwLock::new(Default::default()), + _cancellation_token_drop_guard: token.clone().drop_guard(), cancellation_token: token, tcp_listen_port, }); @@ -505,8 +509,6 @@ impl Session { addr: SocketAddr, mut stream: TcpStream, ) -> anyhow::Result<(Arc, CheckedIncomingConnection)> { - // TODO: move buffer handling to peer_connection - let rwtimeout = self .peer_opts .read_write_timeout @@ -548,7 +550,10 @@ impl Session { )); } - bail!("didn't find a matching torrent for {:?}", Id20::new(h.info_hash)) + bail!( + "didn't find a matching torrent for {:?}", + Id20::new(h.info_hash) + ) } async fn task_tcp_listener(self: Arc, l: TcpListener) -> anyhow::Result<()> { @@ -659,7 +664,7 @@ impl Session { .collect(); let info = TorrentMetaV1Owned { announce: trackers - .get(0) + .first() .cloned() .unwrap_or_else(|| ByteString(b"http://retracker.local/announce".to_vec())), announce_list: vec![trackers], @@ -751,34 +756,41 @@ impl Session { self.tcp_listen_port }; - let (info_hash, info, dht_rx, trackers, initial_peers) = match add { + let cancellation_token = self.cancellation_token.child_token(); + let cancellation_token_drop_guard = cancellation_token.clone().drop_guard(); + let paused = opts.list_only || opts.paused; + + // The main difference between magnet link and torrent file, is that we need to resolve the magnet link + // into a torrent file by connecting to peers that support extended handshakes. + // So we must discover at least one peer and connect to it to be able to proceed further. + + let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { - let magnet = Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; - let info_hash = magnet.as_id20().context("magnet link didn't contain a BTv1 infohash")?; + let magnet = + Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; + let info_hash = magnet + .as_id20() + .context("magnet link didn't contain a BTv1 infohash")?; - let dht_rx = self - .dht - .as_ref() - .context("magnet links without DHT are not supported")? - .get_peers(info_hash, announce_port)?; - - let trackers = magnet.trackers - .into_iter() - .filter_map(|url| match reqwest::Url::parse(&url) { - Ok(url) => Some(url), - Err(e) => { - warn!("error parsing tracker {} as url: {}", url, e); - None - } - }) - .collect(); + let peer_token = cancellation_token.child_token(); + let peer_rx = self.make_peer_rx( + info_hash, + magnet.trackers.clone(), + peer_token.clone(), + announce_port, + opts.force_tracker_interval, + )?; + let peer_rx = match peer_rx { + Some(peer_rx) => peer_rx, + None => bail!("can't find peers: DHT disabled and no trackers in magnet"), + }; debug!(?info_hash, "querying DHT"); - let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver( + let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( self.peer_id, info_hash, opts.initial_peers.clone().unwrap_or_default(), - dht_rx, + peer_rx, Some(self.merge_peer_opts(opts.peer_opts)), ) .await @@ -788,17 +800,17 @@ impl Session { anyhow::bail!("DHT died, no way to discover torrent metainfo") } }; + if paused { + peer_token.cancel(); + } debug!(?info, "received result from DHT"); ( info_hash, info, - if opts.paused || opts.list_only { - None - } else { - Some(dht_rx) - }, - trackers, + magnet.trackers, + Some(peer_rx), initial_peers, + cancellation_token, ) } other => { @@ -820,53 +832,54 @@ impl Session { AddTorrent::TorrentInfo(t) => *t, }; - let dht_rx = match self.dht.as_ref() { - Some(dht) if !opts.paused && !opts.list_only => { - debug!(info_hash=?torrent.info_hash, "reading peers from DHT"); - Some(dht.get_peers(torrent.info_hash, announce_port)?) - } - _ => None, - }; let trackers = torrent .iter_announce() - .filter_map(|tracker| { - let url = match std::str::from_utf8(tracker.as_ref()) { - Ok(url) => url, - Err(_) => { - warn!("cannot parse tracker url as utf-8, ignoring"); - return None; - } - }; - match Url::parse(url) { - Ok(url) => Some(url), - Err(e) => { - warn!("cannot parse tracker URL {}: {}", url, e); - None - } + .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => Some(url.to_owned()), + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + None } }) .collect::>(); + + let peer_rx = if paused { + None + } else { + self.make_peer_rx( + torrent.info_hash, + trackers.clone(), + cancellation_token.clone(), + announce_port, + opts.force_tracker_interval, + )? + }; + ( torrent.info_hash, torrent.info, - dht_rx, trackers, + peer_rx, opts.initial_peers .clone() .unwrap_or_default() .into_iter() .collect(), + cancellation_token, ) } }; + cancellation_token_drop_guard.disarm(); + self.main_torrent_info( info_hash, info, - dht_rx, - initial_peers.into_iter().collect(), trackers, + peer_rx, + initial_peers.into_iter().collect(), opts, + cancellation_token, ) .await } @@ -876,13 +889,16 @@ impl Session { &self, info_hash: Id20, info: TorrentMetaV1Info, - dht_peer_rx: Option, + trackers: Vec, + peer_rx: Option, initial_peers: Vec, - trackers: Vec, opts: AddTorrentOptions, + cancellation_token: CancellationToken, ) -> anyhow::Result { debug!("Torrent info: {:#?}", &info); + let drop_guard = cancellation_token.clone().drop_guard(); + let get_only_files = |only_files: Option>, only_files_regex: Option, list_only: bool| { match (only_files, only_files_regex) { @@ -963,13 +979,9 @@ impl Session { builder .overwrite(opts.overwrite) .spawner(self.spawner) - .cancellation_token(self.cancellation_token.child_token()) + .trackers(trackers) .peer_id(self.peer_id); - if opts.disable_trackers { - builder.trackers(trackers); - } - if let Some(only_files) = only_files { builder.only_files(only_files); } @@ -1003,11 +1015,21 @@ impl Session { { let span = managed_torrent.info.span.clone(); let _ = span.enter(); + + // Just in case, cancel all tasks started for this torrent so far. + // This is defensive, and not proven necessary. + let token = if opts.paused { + cancellation_token.cancel(); + self.cancellation_token.child_token() + } else { + cancellation_token + }; managed_torrent - .start(initial_peers, dht_peer_rx, opts.paused) + .start(initial_peers, peer_rx, opts.paused, token) .context("error starting torrent")?; } + drop_guard.disarm(); Ok(AddTorrentResponse::Added(id, managed_torrent)) } @@ -1053,13 +1075,51 @@ impl Session { } } - pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { - let peer_rx = self + // Get a peer stream from both DHT and trackers. + fn make_peer_rx( + &self, + info_hash: Id20, + trackers: Vec, + cancel: CancellationToken, + announce_port: Option, + force_tracker_interval: Option, + ) -> anyhow::Result> { + let announce_port = announce_port.or(self.tcp_listen_port); + let dht_rx = self .dht .as_ref() - .map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port)) + .map(|dht| dht.get_peers(info_hash, announce_port)) .transpose()?; - handle.start(Default::default(), peer_rx, false)?; + let peer_rx = TrackerComms::start( + info_hash, + self.peer_id, + trackers, + // TODO: report actual bytes, not zeroes. + Box::new(()), + force_tracker_interval, + cancel, + announce_port, + ); + + // Merge DHT rx and tracker comms peer rx. + match (dht_rx, peer_rx) { + (Some(dht_rx), None) => Ok(Some(Box::new(dht_rx))), + (None, Some(peer_rx)) => Ok(Some(Box::new(peer_rx))), + (None, None) => Ok(None), + (Some(dht_rx), Some(peer_rx)) => Ok(Some(Box::new(dht_rx.merge(peer_rx)))), + } + } + + pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { + let token = self.cancellation_token.child_token(); + let peer_rx = self.make_peer_rx( + handle.info_hash(), + handle.info().trackers.clone().into_iter().collect(), + token.clone(), + self.tcp_listen_port, + handle.info().options.force_tracker_interval, + )?; + handle.start(Default::default(), peer_rx, false, token)?; Ok(()) } } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 859d7f2..3de7f0c 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -57,7 +57,6 @@ use std::{ use anyhow::{bail, Context}; use backoff::backoff::Backoff; -use bencode::from_bytes; use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; use futures::{stream::FuturesUnordered, StreamExt}; @@ -83,7 +82,6 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{debug, error, error_span, info, trace, warn}; -use url::Url; use crate::{ chunk_tracker::{ChunkMarkingResult, ChunkTracker}, @@ -93,7 +91,6 @@ use crate::{ }, session::CheckedIncomingConnection, torrent_state::{peer::Peer, utils::atomic_inc}, - tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, type_aliases::{PeerHandle, BF}, }; @@ -237,13 +234,6 @@ impl TorrentStateLive { cancellation_token, }); - for tracker in state.meta.trackers.iter() { - state.spawn( - error_span!(parent: state.meta.span.clone(), "tracker_monitor", url = tracker.to_string()), - state.clone().task_single_tracker_monitor(tracker.clone()), - ); - } - state.spawn( error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), { @@ -297,74 +287,6 @@ impl TorrentStateLive { &self.up_speed_estimator } - async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result { - let response: reqwest::Response = reqwest::get(tracker_url).await?; - if !response.status().is_success() { - anyhow::bail!("tracker responded with {:?}", response.status()); - } - let bytes = response.bytes().await?; - if let Ok(error) = from_bytes::(&bytes) { - anyhow::bail!( - "tracker returned failure. Failure reason: {}", - error.failure_reason - ) - }; - let response = from_bytes::(&bytes)?; - - for peer in response.peers.iter_sockaddrs() { - self.add_peer_if_not_seen(peer)?; - } - Ok(response.interval) - } - - async fn task_single_tracker_monitor( - self: Arc, - mut tracker_url: Url, - ) -> anyhow::Result<()> { - let mut event = Some(TrackerRequestEvent::Started); - loop { - let request = TrackerRequest { - info_hash: self.info_hash(), - peer_id: self.peer_id(), - port: 6778, - uploaded: self.get_uploaded_bytes(), - downloaded: self.get_downloaded_bytes(), - left: self.get_left_to_download_bytes(), - compact: true, - no_peer_id: false, - event, - ip: None, - numwant: None, - key: None, - trackerid: None, - }; - - let request_query = request.as_querystring(); - tracker_url.set_query(Some(&request_query)); - - match self.tracker_one_request(tracker_url.clone()).await { - Ok(interval) => { - event = None; - let interval = self - .meta - .options - .force_tracker_interval - .unwrap_or_else(|| Duration::from_secs(interval)); - debug!( - "sleeping for {:?} after calling tracker {}", - interval, - tracker_url.host().unwrap() - ); - tokio::time::sleep(interval).await; - } - Err(e) => { - debug!("error calling the tracker {}: {:#}", tracker_url, e); - tokio::time::sleep(Duration::from_secs(60)).await; - } - }; - } - } - pub(crate) fn add_incoming_peer( self: &Arc, checked_peer: CheckedIncomingConnection, diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 7d29b79..90bfd8e 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -15,7 +15,6 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; use buffers::ByteString; -use dht::RequestPeersStream; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::Lengths; use librqbit_core::peer_id::generate_peer_id; @@ -32,11 +31,11 @@ use tracing::debug; use tracing::error_span; use tracing::trace; use tracing::warn; -use url::Url; use crate::chunk_tracker::ChunkTracker; use crate::spawn_utils::BlockingSpawner; use crate::torrent_state::stats::LiveStats; +use crate::type_aliases::PeerStream; use initializing::TorrentStateInitializing; @@ -83,7 +82,7 @@ pub struct ManagedTorrentInfo { pub info_hash: Id20, pub out_dir: PathBuf, pub(crate) spawner: BlockingSpawner, - pub trackers: HashSet, + pub trackers: HashSet, pub peer_id: Id20, pub lengths: Lengths, pub span: tracing::Span, @@ -92,7 +91,6 @@ pub struct ManagedTorrentInfo { pub struct ManagedTorrent { pub info: Arc, - pub cancellation_token: CancellationToken, pub(crate) only_files: Option>, locked: RwLock, } @@ -173,15 +171,17 @@ impl ManagedTorrent { pub(crate) fn start( self: &Arc, initial_peers: Vec, - peer_rx: Option, + peer_rx: Option, start_paused: bool, + live_cancellation_token: CancellationToken, ) -> anyhow::Result<()> { let mut g = self.locked.write(); let spawn_fatal_errors_receiver = - |state: &Arc, rx: tokio::sync::oneshot::Receiver| { + |state: &Arc, + rx: tokio::sync::oneshot::Receiver, + token: CancellationToken| { let span = state.info.span.clone(); - let token = state.cancellation_token.clone(); let state = Arc::downgrade(state); spawn_with_cancel( error_span!(parent: span, "fatal_errors_receiver"), @@ -204,7 +204,7 @@ impl ManagedTorrent { fn spawn_peer_adder( live: &Arc, initial_peers: Vec, - peer_rx: Option, + peer_rx: Option, ) { live.spawn( error_span!(parent: live.meta().span.clone(), "external_peer_adder"), @@ -257,7 +257,7 @@ impl ManagedTorrent { drop(g); let t = self.clone(); let span = self.info().span.clone(); - let token = self.cancellation_token.clone(); + let token = live_cancellation_token.clone(); spawn_with_cancel( error_span!(parent: span.clone(), "initialize_and_start"), token.clone(), @@ -277,10 +277,11 @@ impl ManagedTorrent { } let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, token.child_token()); + let live = + TorrentStateLive::new(paused, tx, live_cancellation_token); g.state = ManagedTorrentState::Live(live.clone()); - spawn_fatal_errors_receiver(&t, rx); + spawn_fatal_errors_receiver(&t, rx, token); spawn_peer_adder(&live, initial_peers, peer_rx); Ok(()) @@ -298,13 +299,9 @@ impl ManagedTorrent { ManagedTorrentState::Paused(_) => { let paused = g.state.take().assert_paused(); let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new( - paused, - tx, - self.cancellation_token.child_token().clone(), - ); + let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone()); g.state = ManagedTorrentState::Live(live.clone()); - spawn_fatal_errors_receiver(self, rx); + spawn_fatal_errors_receiver(self, rx, live_cancellation_token); spawn_peer_adder(&live, initial_peers, peer_rx); Ok(()) } @@ -317,7 +314,12 @@ impl ManagedTorrent { drop(g); // Recurse. - self.start(initial_peers, peer_rx, start_paused) + self.start( + initial_peers, + peer_rx, + start_paused, + live_cancellation_token, + ) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), } @@ -421,11 +423,10 @@ pub struct ManagedTorrentBuilder { peer_connect_timeout: Option, peer_read_write_timeout: Option, only_files: Option>, - trackers: Vec, + trackers: Vec, peer_id: Option, overwrite: bool, spawner: Option, - cancellation_token: Option, } impl ManagedTorrentBuilder { @@ -446,21 +447,15 @@ impl ManagedTorrentBuilder { trackers: Default::default(), peer_id: None, overwrite: false, - cancellation_token: None, } } - pub fn cancellation_token(&mut self, token: CancellationToken) -> &mut Self { - self.cancellation_token = Some(token); - self - } - pub fn only_files(&mut self, only_files: Vec) -> &mut Self { self.only_files = Some(only_files); self } - pub fn trackers(&mut self, trackers: Vec) -> &mut Self { + pub fn trackers(&mut self, trackers: Vec) -> &mut Self { self.trackers = trackers; self } @@ -495,7 +490,7 @@ impl ManagedTorrentBuilder { self } - pub(crate) fn build(mut self, span: tracing::Span) -> anyhow::Result { + pub(crate) fn build(self, span: tracing::Span) -> anyhow::Result { let lengths = Lengths::from_torrent(&self.info)?; let info = Arc::new(ManagedTorrentInfo { span, @@ -522,7 +517,6 @@ impl ManagedTorrentBuilder { locked: RwLock::new(ManagedTorrentLocked { state: ManagedTorrentState::Initializing(initializing), }), - cancellation_token: self.cancellation_token.take().unwrap_or_default(), info, })) } diff --git a/crates/librqbit/src/tracing_subscriber_config_utils.rs b/crates/librqbit/src/tracing_subscriber_config_utils.rs index cc595b2..7cac1a2 100644 --- a/crates/librqbit/src/tracing_subscriber_config_utils.rs +++ b/crates/librqbit/src/tracing_subscriber_config_utils.rs @@ -96,7 +96,6 @@ pub fn init_logging(opts: InitLoggingOptions) -> anyhow::Result, + force_tracker_interval: Option, + cancellation_token: CancellationToken, + tx: Sender, + tcp_listen_port: Option, } -pub struct TrackerRequest { - pub info_hash: Id20, - pub peer_id: Id20, - pub event: Option, - pub port: u16, - pub uploaded: u64, - pub downloaded: u64, - pub left: u64, - pub compact: bool, - pub no_peer_id: bool, - - pub ip: Option, - pub numwant: Option, - pub key: Option, - pub trackerid: Option, +#[derive(Default)] +pub struct TrackerCommsStats { + pub uploaded_bytes: u64, + pub downloaded_bytes: u64, + pub total_bytes: u64, } -#[derive(Deserialize, Debug)] -pub struct TrackerError<'a> { - #[serde(rename = "failure reason", borrow)] - pub failure_reason: ByteBuf<'a>, -} - -#[derive(Deserialize, Debug)] -pub struct DictPeer<'a> { - #[serde(deserialize_with = "deserialize_ip_string")] - ip: IpAddr, - #[serde(borrow)] - #[allow(dead_code)] - peer_id: Option>, - port: u16, -} - -impl<'a> DictPeer<'a> { - fn as_sockaddr(&self) -> SocketAddr { - SocketAddr::new(self.ip, self.port) +impl TrackerCommsStats { + pub fn get_left_to_download_bytes(&self) -> u64 { + let total = self.total_bytes; + let down = self.downloaded_bytes; + if total >= down { + return total - down; + } + 0 } } -#[derive(Debug)] -pub struct Peers { - addrs: Vec, +pub trait TorrentStatsProvider: Send + Sync { + fn get(&self) -> TrackerCommsStats; } -impl Peers { - pub fn iter_sockaddrs(&self) -> impl Iterator + '_ { - self.addrs.iter().copied() +impl TorrentStatsProvider for () { + fn get(&self) -> TrackerCommsStats { + Default::default() } } -impl<'de> serde::de::Deserialize<'de> for Peers { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct Visitor<'de> { - phantom: std::marker::PhantomData<&'de ()>, - } - impl<'de> serde::de::Visitor<'de> for Visitor<'de> { - type Value = Peers; +type Sender = tokio::sync::mpsc::Sender; - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("a list of peers in dict or binary format") - } - - fn visit_seq(self, mut seq: A) -> Result - where - A: serde::de::SeqAccess<'de>, - { - let mut peers = Vec::new(); - while let Some(peer) = seq.next_element::()? { - peers.push(peer.as_sockaddr()) - } - Ok(Peers { addrs: peers }) - } - - fn visit_bytes(self, v: &[u8]) -> Result - where - E: serde::de::Error, - { - Ok(Peers { - addrs: parse_compact_peers(v) - .into_iter() - .map(|v| v.into()) - .collect(), - }) - } - } - deserializer.deserialize_any(Visitor { - phantom: PhantomData, - }) - } -} - -fn deserialize_ip_string<'de, D>(de: D) -> Result -where - D: Deserializer<'de>, -{ - struct Visitor; - impl<'de> serde::de::Visitor<'de> for Visitor { - type Value = IpAddr; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("expecting an IPv4 address") - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - IpAddr::from_str(v).map_err(|e| E::custom(format!("cannot parse ip: {e}"))) - } - } - de.deserialize_str(Visitor {}) -} - -fn parse_compact_peers(b: &[u8]) -> Vec { - let mut ips = Vec::new(); - for chunk in b.chunks_exact(6) { - let ip_chunk = &chunk[..4]; - let port_chunk = &chunk[4..6]; - let ipaddr = Ipv4Addr::new(ip_chunk[0], ip_chunk[1], ip_chunk[2], ip_chunk[3]); - let port = byteorder::BigEndian::read_u16(port_chunk); - ips.push(SocketAddrV4::new(ipaddr, port)); - } - ips -} - -#[derive(Deserialize, Debug)] -pub struct TrackerResponse<'a> { - #[serde(rename = "warning message", borrow)] - pub warning_message: Option>, - pub complete: u64, - pub interval: u64, - #[serde(rename = "min interval")] - pub min_interval: Option, - pub tracker_id: Option>, - pub incomplete: u64, - pub peers: Peers, -} - -impl TrackerRequest { - pub fn as_querystring(&self) -> String { - use urlencoding as u; - let mut s = String::new(); - s.push_str("info_hash="); - s.push_str(u::encode_binary(&self.info_hash.0).as_ref()); - s.push_str("&peer_id="); - s.push_str(u::encode_binary(&self.peer_id.0).as_ref()); - if let Some(event) = self.event { - write!( - s, - "&event={}", - match event { - TrackerRequestEvent::Started => "started", - TrackerRequestEvent::Stopped => "stopped", - TrackerRequestEvent::Completed => "completed", - } - ) - .unwrap(); - } - write!(s, "&port={}", self.port).unwrap(); - write!(s, "&uploaded={}", self.uploaded).unwrap(); - write!(s, "&downloaded={}", self.downloaded).unwrap(); - write!(s, "&left={}", self.left).unwrap(); - write!(s, "&compact={}", if self.compact { 1 } else { 0 }).unwrap(); - write!(s, "&no_peer_id={}", if self.no_peer_id { 1 } else { 0 }).unwrap(); - if let Some(ip) = &self.ip { - write!(s, "&ip={ip}").unwrap(); - } - if let Some(numwant) = &self.numwant { - write!(s, "&numwant={numwant}").unwrap(); - } - if let Some(key) = &self.key { - write!(s, "&key={key}").unwrap(); - } - if let Some(trackerid) = &self.trackerid { - write!(s, "&trackerid={trackerid}").unwrap(); - } - s - } -} - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn test_serialize() { - let info_hash = Id20::new([ - 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, - ]); - let peer_id = Id20::new([ - 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, - ]); - let request = TrackerRequest { +impl TrackerComms { + pub fn start( + info_hash: Id20, + peer_id: Id20, + trackers: Vec, + stats: Box, + force_interval: Option, + cancellation_token: CancellationToken, + tcp_listen_port: Option, + ) -> Option + Send + Sync + Unpin + 'static> { + let (tx, rx) = tokio::sync::mpsc::channel::(16); + let comms = Arc::new(Self { info_hash, peer_id, - port: 6881, - uploaded: 0, - downloaded: 0, - left: 1024 * 1024, - compact: true, - no_peer_id: false, - event: Some(TrackerRequestEvent::Started), - ip: Some("127.0.0.1".parse().unwrap()), - numwant: None, - key: None, - trackerid: None, + stats, + force_tracker_interval: force_interval, + cancellation_token, + tx, + tcp_listen_port, + }); + let mut added = false; + for tracker in trackers { + if let Err(e) = comms.clone().add_tracker(&tracker) { + info!(tracker = tracker, "error adding tracker: {:#}", e) + } else { + added = true; + } + } + if !added { + return None; + } + Some(tokio_stream::wrappers::ReceiverStream::new(rx)) + } + + fn add_tracker(self: Arc, tracker: &str) -> anyhow::Result<()> { + if tracker.starts_with("http://") || tracker.starts_with("https://") { + spawn_with_cancel( + error_span!( + parent: None, + "http_tracker", + tracker = tracker, + info_hash = ?self.info_hash + ), + self.cancellation_token.clone(), + { + let comms = self; + let url = Url::parse(tracker).context("can't parse URL")?; + async move { comms.task_single_tracker_monitor_http(url).await } + }, + ); + } else if tracker.starts_with("udp://") { + spawn_with_cancel( + error_span!(parent: None, "udp_tracker", tracker = tracker, info_hash = ?self.info_hash), + self.cancellation_token.clone(), + { + let comms = self; + let url = Url::parse(tracker).context("can't parse URL")?; + async move { comms.task_single_tracker_monitor_udp(url).await } + }, + ); + } else { + bail!("unsupported tracker url {}", tracker) + } + Ok(()) + } + + async fn task_single_tracker_monitor_http( + self: Arc, + mut tracker_url: Url, + ) -> anyhow::Result<()> { + let mut event = Some(tracker_comms_http::TrackerRequestEvent::Started); + loop { + let stats = self.stats.get(); + let request = tracker_comms_http::TrackerRequest { + info_hash: self.info_hash, + peer_id: self.peer_id, + port: 6778, + uploaded: stats.uploaded_bytes, + downloaded: stats.downloaded_bytes, + left: stats.get_left_to_download_bytes(), + compact: true, + no_peer_id: false, + event, + ip: None, + numwant: None, + key: None, + trackerid: None, + }; + + let request_query = request.as_querystring(); + tracker_url.set_query(Some(&request_query)); + + match self.tracker_one_request_http(tracker_url.clone()).await { + Ok(interval) => { + event = None; + let interval = self + .force_tracker_interval + .unwrap_or_else(|| Duration::from_secs(interval)); + debug!( + "sleeping for {:?} after calling tracker {}", + interval, + tracker_url.host().unwrap() + ); + tokio::time::sleep(interval).await; + } + Err(e) => { + debug!("error calling the tracker {}: {:#}", tracker_url, e); + tokio::time::sleep(Duration::from_secs(60)).await; + } + }; + } + } + + async fn tracker_one_request_http(&self, tracker_url: Url) -> anyhow::Result { + let response: reqwest::Response = reqwest::get(tracker_url).await?; + if !response.status().is_success() { + anyhow::bail!("tracker responded with {:?}", response.status()); + } + let bytes = response.bytes().await?; + if let Ok(error) = bencode::from_bytes::(&bytes) { + anyhow::bail!( + "tracker returned failure. Failure reason: {}", + error.failure_reason + ) }; - dbg!(request.as_querystring()); + let response = bencode::from_bytes::(&bytes)?; + + for peer in response.peers.iter_sockaddrs() { + self.tx.send(peer).await?; + } + Ok(response.interval) + } + + async fn task_single_tracker_monitor_udp(&self, url: Url) -> anyhow::Result<()> { + use tracker_comms_udp::*; + + if url.scheme() != "udp" { + bail!("expected UDP scheme in {}", url); + } + let hp: (&str, u16) = ( + url.host_str().context("missing host")?, + url.port().context("missing port")?, + ); + let mut requester = UdpTrackerRequester::new(hp) + .await + .context("error creating UDP tracker requester")?; + + let mut sleep_interval: Option = None; + loop { + if let Some(i) = sleep_interval { + trace!(interval=?sleep_interval, "sleeping"); + tokio::time::sleep(i).await; + } + + let stats = self.stats.get(); + let request = AnnounceFields { + info_hash: self.info_hash, + peer_id: self.peer_id, + downloaded: stats.downloaded_bytes, + left: stats.get_left_to_download_bytes(), + uploaded: stats.uploaded_bytes, + event: EVENT_NONE, + key: 0, // whatever that is? + port: self.tcp_listen_port.unwrap_or(0), + }; + + match requester.announce(request).await { + Ok(response) => { + trace!(len = response.addrs.len(), "received announce response"); + for addr in response.addrs { + self.tx + .send(SocketAddr::V4(addr)) + .await + .context("rx closed")?; + } + let new_interval = response.interval.max(5); + let new_interval = Duration::from_secs(new_interval as u64); + sleep_interval = Some(self.force_tracker_interval.unwrap_or(new_interval)); + } + Err(e) => { + debug!(url = ?url, "error reading announce response: {e:#}"); + if sleep_interval.is_none() { + sleep_interval = Some( + self.force_tracker_interval + .unwrap_or(Duration::from_secs(60)), + ); + } + } + } + } } } diff --git a/crates/librqbit/src/tracker_comms_http.rs b/crates/librqbit/src/tracker_comms_http.rs new file mode 100644 index 0000000..e263be7 --- /dev/null +++ b/crates/librqbit/src/tracker_comms_http.rs @@ -0,0 +1,233 @@ +use buffers::ByteBuf; +use byteorder::ByteOrder; +use serde::{Deserialize, Deserializer}; +use std::{ + fmt::Write, + marker::PhantomData, + net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, + str::FromStr, +}; + +use librqbit_core::hash_id::Id20; + +#[derive(Clone, Copy)] +pub enum TrackerRequestEvent { + Started, + #[allow(dead_code)] + Stopped, + #[allow(dead_code)] + Completed, +} + +pub struct TrackerRequest { + pub info_hash: Id20, + pub peer_id: Id20, + pub event: Option, + pub port: u16, + pub uploaded: u64, + pub downloaded: u64, + pub left: u64, + pub compact: bool, + pub no_peer_id: bool, + + pub ip: Option, + pub numwant: Option, + pub key: Option, + pub trackerid: Option, +} + +#[derive(Deserialize, Debug)] +pub struct TrackerError<'a> { + #[serde(rename = "failure reason", borrow)] + pub failure_reason: ByteBuf<'a>, +} + +#[derive(Deserialize, Debug)] +pub struct DictPeer<'a> { + #[serde(deserialize_with = "deserialize_ip_string")] + ip: IpAddr, + #[serde(borrow)] + #[allow(dead_code)] + peer_id: Option>, + port: u16, +} + +impl<'a> DictPeer<'a> { + fn as_sockaddr(&self) -> SocketAddr { + SocketAddr::new(self.ip, self.port) + } +} + +#[derive(Debug)] +pub struct Peers { + addrs: Vec, +} + +impl Peers { + pub fn iter_sockaddrs(&self) -> impl Iterator + '_ { + self.addrs.iter().copied() + } +} + +impl<'de> serde::de::Deserialize<'de> for Peers { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct Visitor<'de> { + phantom: std::marker::PhantomData<&'de ()>, + } + impl<'de> serde::de::Visitor<'de> for Visitor<'de> { + type Value = Peers; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a list of peers in dict or binary format") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut peers = Vec::new(); + while let Some(peer) = seq.next_element::()? { + peers.push(peer.as_sockaddr()) + } + Ok(Peers { addrs: peers }) + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + Ok(Peers { + addrs: parse_compact_peers(v) + .into_iter() + .map(|v| v.into()) + .collect(), + }) + } + } + deserializer.deserialize_any(Visitor { + phantom: PhantomData, + }) + } +} + +fn deserialize_ip_string<'de, D>(de: D) -> Result +where + D: Deserializer<'de>, +{ + struct Visitor; + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = IpAddr; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expecting an IPv4 address") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + IpAddr::from_str(v).map_err(|e| E::custom(format!("cannot parse ip: {e}"))) + } + } + de.deserialize_str(Visitor {}) +} + +fn parse_compact_peers(b: &[u8]) -> Vec { + let mut ips = Vec::new(); + for chunk in b.chunks_exact(6) { + let ip_chunk = &chunk[..4]; + let port_chunk = &chunk[4..6]; + let ipaddr = Ipv4Addr::new(ip_chunk[0], ip_chunk[1], ip_chunk[2], ip_chunk[3]); + let port = byteorder::BigEndian::read_u16(port_chunk); + ips.push(SocketAddrV4::new(ipaddr, port)); + } + ips +} + +#[derive(Deserialize, Debug)] +pub struct TrackerResponse<'a> { + #[serde(rename = "warning message", borrow)] + pub warning_message: Option>, + pub complete: u64, + pub interval: u64, + #[serde(rename = "min interval")] + pub min_interval: Option, + pub tracker_id: Option>, + pub incomplete: u64, + pub peers: Peers, +} + +impl TrackerRequest { + pub fn as_querystring(&self) -> String { + use urlencoding as u; + let mut s = String::new(); + s.push_str("info_hash="); + s.push_str(u::encode_binary(&self.info_hash.0).as_ref()); + s.push_str("&peer_id="); + s.push_str(u::encode_binary(&self.peer_id.0).as_ref()); + if let Some(event) = self.event { + write!( + s, + "&event={}", + match event { + TrackerRequestEvent::Started => "started", + TrackerRequestEvent::Stopped => "stopped", + TrackerRequestEvent::Completed => "completed", + } + ) + .unwrap(); + } + write!(s, "&port={}", self.port).unwrap(); + write!(s, "&uploaded={}", self.uploaded).unwrap(); + write!(s, "&downloaded={}", self.downloaded).unwrap(); + write!(s, "&left={}", self.left).unwrap(); + write!(s, "&compact={}", if self.compact { 1 } else { 0 }).unwrap(); + write!(s, "&no_peer_id={}", if self.no_peer_id { 1 } else { 0 }).unwrap(); + if let Some(ip) = &self.ip { + write!(s, "&ip={ip}").unwrap(); + } + if let Some(numwant) = &self.numwant { + write!(s, "&numwant={numwant}").unwrap(); + } + if let Some(key) = &self.key { + write!(s, "&key={key}").unwrap(); + } + if let Some(trackerid) = &self.trackerid { + write!(s, "&trackerid={trackerid}").unwrap(); + } + s + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_serialize() { + let info_hash = Id20::new([ + 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + ]); + let peer_id = Id20::new([ + 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + ]); + let request = TrackerRequest { + info_hash, + peer_id, + port: 6881, + uploaded: 0, + downloaded: 0, + left: 1024 * 1024, + compact: true, + no_peer_id: false, + event: Some(TrackerRequestEvent::Started), + ip: Some("127.0.0.1".parse().unwrap()), + numwant: None, + key: None, + trackerid: None, + }; + dbg!(request.as_querystring()); + } +} diff --git a/crates/librqbit/src/tracker_comms_udp.rs b/crates/librqbit/src/tracker_comms_udp.rs new file mode 100644 index 0000000..1e72ae8 --- /dev/null +++ b/crates/librqbit/src/tracker_comms_udp.rs @@ -0,0 +1,341 @@ +use std::net::{Ipv4Addr, SocketAddrV4}; + +use anyhow::{bail, Context}; +use librqbit_core::hash_id::Id20; +use rand::Rng; +use tokio::net::ToSocketAddrs; +use tracing::trace; + +const ACTION_CONNECT: u32 = 0; +const ACTION_ANNOUNCE: u32 = 1; +// const ACTION_SCRAPE: u32 = 2; +// const ACTION_ERROR: u32 = 3; + +pub const EVENT_NONE: u32 = 0; +pub const EVENT_COMPLETED: u32 = 1; +pub const EVENT_STARTED: u32 = 2; +pub const EVENT_STOPPED: u32 = 3; + +pub type ConnectionId = u64; +const CONNECTION_ID_MAGIC: ConnectionId = 0x41727101980; + +pub type TransactionId = u32; + +pub fn new_transaction_id() -> TransactionId { + rand::thread_rng().gen() +} + +#[derive(Debug)] +pub struct AnnounceFields { + pub info_hash: Id20, + pub peer_id: Id20, + pub downloaded: u64, + pub left: u64, + pub uploaded: u64, + pub event: u32, + pub key: u32, + pub port: u16, +} + +#[derive(Debug)] +pub enum Request { + Connect, + Announce(ConnectionId, AnnounceFields), +} + +impl Request { + pub fn serialize(&self, transaction_id: TransactionId, buf: &mut Vec) -> usize { + let cur_len = buf.len(); + match self { + Request::Connect => { + buf.extend_from_slice(&CONNECTION_ID_MAGIC.to_be_bytes()); + buf.extend_from_slice(&ACTION_CONNECT.to_be_bytes()); + buf.extend_from_slice(&transaction_id.to_be_bytes()); + } + Request::Announce(connection_id, fields) => { + buf.extend_from_slice(&connection_id.to_be_bytes()); + buf.extend_from_slice(&ACTION_ANNOUNCE.to_be_bytes()); + buf.extend_from_slice(&transaction_id.to_be_bytes()); + buf.extend_from_slice(&fields.info_hash.0); + buf.extend_from_slice(&fields.peer_id.0); + buf.extend_from_slice(&fields.downloaded.to_be_bytes()); + buf.extend_from_slice(&fields.left.to_be_bytes()); + buf.extend_from_slice(&fields.uploaded.to_be_bytes()); + buf.extend_from_slice(&fields.event.to_be_bytes()); + buf.extend_from_slice(&0u32.to_be_bytes()); // ip address 0 + buf.extend_from_slice(&fields.key.to_be_bytes()); + buf.extend_from_slice(&(-1i32).to_be_bytes()); // num want -1 + buf.extend_from_slice(&fields.port.to_be_bytes()); + } + } + buf.len() - cur_len + } +} + +#[derive(Debug)] +pub struct AnnounceResponse { + pub interval: u32, + pub leechers: u32, + pub seeders: u32, + pub addrs: Vec, +} + +#[derive(Debug)] +pub enum Response { + Connect(ConnectionId), + Announce(AnnounceResponse), +} + +fn split_slice(s: &[u8], first_len: usize) -> Option<(&[u8], &[u8])> { + if s.len() < first_len { + return None; + } + Some(s.split_at(first_len)) +} + +fn s_to_arr(buf: &[u8]) -> [u8; T] { + let mut arr = [0u8; T]; + arr.copy_from_slice(buf); + arr +} + +trait ParseNum: Sized { + fn parse_num(buf: &[u8]) -> anyhow::Result<(Self, &[u8])>; +} + +macro_rules! parse_impl { + ($ty:tt, $size:expr) => { + impl ParseNum for $ty { + fn parse_num(buf: &[u8]) -> anyhow::Result<($ty, &[u8])> { + let (bytes, rest) = + split_slice(buf, $size).with_context(|| format!("expected {} bytes", $size))?; + let num = $ty::from_be_bytes(s_to_arr(bytes)); + Ok((num, rest)) + } + } + }; +} + +parse_impl!(u32, 4); +parse_impl!(u64, 8); +parse_impl!(u16, 2); +parse_impl!(i32, 4); +parse_impl!(i64, 8); +parse_impl!(i16, 2); + +impl Response { + pub fn parse(buf: &[u8]) -> anyhow::Result<(TransactionId, Self)> { + let (action, buf) = u32::parse_num(buf).context("can't parse action")?; + let (tid, mut buf) = u32::parse_num(buf).context("can't parse transaction id")?; + let response = match action { + ACTION_CONNECT => { + let (connection_id, b) = + u64::parse_num(buf).context("can't parse connection id")?; + buf = b; + Response::Connect(connection_id) + } + ACTION_ANNOUNCE => { + let (interval, b) = u32::parse_num(buf).context("can't parse interval")?; + let (leechers, b) = u32::parse_num(b).context("can't parse leechers")?; + let (seeders, mut b) = u32::parse_num(b).context("can't parse seeders")?; + let mut addrs = Vec::new(); + while !b.is_empty() { + let (ip, b2) = u32::parse_num(b)?; + let ip = Ipv4Addr::from(ip); + b = b2; + + let (port, b2) = u16::parse_num(b)?; + b = b2; + addrs.push(SocketAddrV4::new(ip, port)); + } + buf = b; + Response::Announce(AnnounceResponse { + interval, + leechers, + seeders, + addrs, + }) + } + _ => bail!("unsupported action {action}"), + }; + + if !buf.is_empty() { + bail!( + "parsed {response:?} so far, but got {} remaining bytes", + buf.len() + ); + } + + Ok((tid, response)) + } +} + +pub struct UdpTrackerRequester { + sock: tokio::net::UdpSocket, + connection_id: ConnectionId, + read_buf: Vec, + write_buf: Vec, +} + +impl UdpTrackerRequester { + // Addr is "host:port" + pub async fn new(addr: impl ToSocketAddrs) -> anyhow::Result { + let sock = tokio::net::UdpSocket::bind("0.0.0.0:0") + .await + .context("error binding UDP socket")?; + sock.connect(addr) + .await + .context("error connecting UDP socket")?; + + let tid = new_transaction_id(); + let mut write_buf = Vec::new(); + let mut read_buf = vec![0u8; 4096]; + + trace!("sending connect request"); + Request::Connect.serialize(tid, &mut write_buf); + + sock.send(&write_buf) + .await + .context("error sending to socket")?; + + let size = sock + .recv(&mut read_buf) + .await + .context("error receiving from socket")?; + + let (rtid, response) = + Response::parse(&read_buf[..size]).context("error parsing response")?; + if tid != rtid { + bail!("expected transaction id {} == {}", tid, rtid); + } + trace!(response=?response, "received"); + + let connection_id = match response { + Response::Connect(connection_id) => connection_id, + other => bail!("unexpected response {other:?}"), + }; + + trace!(connection_id); + + Ok(Self { + sock, + connection_id, + read_buf, + write_buf, + }) + } + + pub async fn announce(&mut self, fields: AnnounceFields) -> anyhow::Result { + let request = Request::Announce(self.connection_id, fields); + let response = self.request(request).await?; + match response { + Response::Announce(r) => Ok(r), + other => bail!("unexpected response {other:?}, expected announce"), + } + } + + pub async fn request(&mut self, request: Request) -> anyhow::Result { + let tid = new_transaction_id(); + self.write_buf.clear(); + let size = request.serialize(tid, &mut self.write_buf); + trace!(request=?request, tid, "sending"); + self.sock + .send(&self.write_buf[..size]) + .await + .context("error sending")?; + let size = self.sock.recv(&mut self.read_buf).await.unwrap(); + + let (rtid, response) = Response::parse(&self.read_buf[..size]).unwrap(); + trace!("received response"); + if tid != rtid { + bail!("unexpected transaction id"); + } + Ok(response) + } +} + +#[cfg(test)] +mod tests { + use std::{io::Write, str::FromStr}; + + use librqbit_core::{hash_id::Id20, peer_id::generate_peer_id}; + + use crate::tracker_comms_udp::{ + new_transaction_id, AnnounceFields, Request, Response, EVENT_NONE, + }; + + #[test] + fn test_parse_announce() { + let b = include_bytes!("../resources/test/udp-tracker-announce-response.bin"); + let (tid, response) = Response::parse(b).unwrap(); + dbg!(tid, response); + } + + #[ignore] + #[tokio::test] + async fn test_announce() { + let sock = tokio::net::UdpSocket::bind("0.0.0.0:0").await.unwrap(); + sock.connect("opentor.net:6969").await.unwrap(); + + let tid = new_transaction_id(); + let mut write_buf = Vec::new(); + let mut read_buf = vec![0u8; 4096]; + + Request::Connect.serialize(tid, &mut write_buf); + + sock.send(&write_buf).await.unwrap(); + + let size = sock.recv(&mut read_buf).await.unwrap(); + + let (rtid, response) = Response::parse(&read_buf[..size]).unwrap(); + assert_eq!(tid, rtid); + let connection_id = match response { + Response::Connect(connection_id) => { + dbg!(connection_id) + } + other => panic!("unexpected response {other:?}"), + }; + + let hash = Id20::from_str("775459190aa65566591634203f8d9f17d341f969").unwrap(); + + let tid = new_transaction_id(); + let request = Request::Announce( + connection_id, + AnnounceFields { + info_hash: hash, + peer_id: generate_peer_id(), + downloaded: 0, + left: 0, + uploaded: 0, + event: EVENT_NONE, + key: 0, // whatever that is? + port: 24563, + }, + ); + write_buf.clear(); + let size = request.serialize(tid, &mut write_buf); + + sock.send(&write_buf[..size]).await.unwrap(); + let size = sock.recv(&mut read_buf).await.unwrap(); + + { + let mut f = std::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open("/tmp/proto.bin") + .unwrap(); + f.write_all(&read_buf[..size]).unwrap(); + } + + dbg!(&read_buf[..size]); + let (rtid, response) = Response::parse(&read_buf[..size]).unwrap(); + assert_eq!(tid, rtid); + match response { + Response::Announce(r) => { + dbg!(r); + } + other => panic!("unexpected response {other:?}"), + } + } +} diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs index 29fa9df..2b6efa9 100644 --- a/crates/librqbit/src/type_aliases.rs +++ b/crates/librqbit/src/type_aliases.rs @@ -1,5 +1,8 @@ use std::net::SocketAddr; +use futures::Stream; + pub type BF = bitvec::vec::BitVec; pub type PeerHandle = SocketAddr; +pub type PeerStream = Box + Unpin + Send + Sync + 'static>; diff --git a/crates/librqbit_core/src/lib.rs b/crates/librqbit_core/src/lib.rs index 6086598..63577d6 100644 --- a/crates/librqbit_core/src/lib.rs +++ b/crates/librqbit_core/src/lib.rs @@ -7,3 +7,5 @@ pub mod peer_id; pub mod spawn_utils; pub mod speed_estimator; pub mod torrent_metainfo; + +pub use hash_id::Id20; diff --git a/crates/librqbit_core/src/magnet.rs b/crates/librqbit_core/src/magnet.rs index 477dbf6..f0942bb 100644 --- a/crates/librqbit_core/src/magnet.rs +++ b/crates/librqbit_core/src/magnet.rs @@ -4,7 +4,6 @@ use anyhow::Context; use crate::hash_id::{Id20, Id32}; - /// A parsed magnet link. pub struct Magnet { id20: Option, @@ -45,7 +44,7 @@ impl Magnet { } else { anyhow::bail!("expected xt to start with btih or btmh"); } - }, + } "tr" => trackers.push(value.into()), _ => {} } @@ -93,7 +92,6 @@ impl std::fmt::Display for Magnet { } } - #[cfg(test)] mod tests { #[test] @@ -109,8 +107,10 @@ mod tests { use std::str::FromStr; let magnet = "magnet:?xt=urn:btmh:1220caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e&dn=bittorrent-v2-test "; - let info_hash = Id32::from_str("caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e").unwrap(); - let m = Magnet::parse(&magnet).unwrap(); + let info_hash = + Id32::from_str("caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e") + .unwrap(); + let m = Magnet::parse(magnet).unwrap(); assert!(m.as_id32() == Some(info_hash)); } }