From 6f3383050e2ff4f474db63114d9c104c9236c462 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 11 Feb 2024 12:12:37 +0000 Subject: [PATCH 01/12] Implement ser/de for UDP tracker protocol --- .../test/udp-tracker-announce-response.bin | Bin 0 -> 1220 bytes crates/librqbit/src/lib.rs | 1 + crates/librqbit/src/tracker_comms_udp.rs | 258 ++++++++++++++++++ 3 files changed, 259 insertions(+) create mode 100644 crates/librqbit/resources/test/udp-tracker-announce-response.bin create mode 100644 crates/librqbit/src/tracker_comms_udp.rs diff --git a/crates/librqbit/resources/test/udp-tracker-announce-response.bin b/crates/librqbit/resources/test/udp-tracker-announce-response.bin new file mode 100644 index 0000000000000000000000000000000000000000..4b1bc3aa41c1c1041424a3e05d07afc867a90614 GIT binary patch literal 1220 zcmV;#1Uvfx0005yl-(2n00wjb0099201XW;06%FO;aa^eBT~JI)Uu;p4zg9SG!TmLd0%M0?s}SDDZaHjGh6NClg6@HZ7+t7PE(7s zImp#dp>-^V1*FIB4nUbD<4I1H*;A?*x;cd9UF^5|P_mz-SK64$b-86BNA9bSV41RCL7|N2cT zDOn_>T6i)~i{CGYc0P>sJ29&+@-r6)v?YueV6R>U14pb2Z5>!KQht(K` zDoOvmq6|_nh`a!)Ro=X^%={`wTC!QJ3QH)mLW3rMNU+pj`B9g!z?qz8d2Rekgz@ZO zns#P`j9(hlva)1dt7g77@?3n)h|^?Q7~{w?W>D>17goMct8tlXmQ8HcNvH)bioW9- z;U(5cCK}-aKV_$@4AtJ^cXX4&yOp;_6uiF`GWg8MS6#r2bJT6MxlTE?eAJSgN8|b(*-&=2UGxpR1};qInr~C{R$u7XGR}*}fQJqb>k62v`s$ zaK6?tXeBw3#VA$<|G$ywh6Rh_yoTP?+~q-IxYQuP$f`TM^oJwfDZC-+X#Z5KZ>;Ue z)ouLNSVQMu-NkhZL0?WV75ar=JHuYY5?>`pS}Y)68cM<7EngY1-GwJ!8RPm@2woII zH7fpF)`^;jc3Js=%Dc)~ljwt>`&Enq9i2s0bzMGL6I3VkZ(B%Di9K8O1l*2*165+*L zp}91T=Z5*Rn-O{7$*^>#+mf2Fb7Al4RIp}5)F)eQ{E(Q=VQr(>-;pR^8PE8FuU{D? zT-Bpq6Wq|J99%v-F-Y=UC62OH`d9fq(!wuPHU)Thl23j}PN*d=@(VISqAsm8rdKpB ziV-483oa@+lgQO2PFajCTLt7%>l)z&rH_z48(Y7EWxt46@p1RA?Zwn;u`&vaoRYDMU)KsYl)b literal 0 HcmV?d00001 diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 38091bf..a1897cb 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -37,6 +37,7 @@ mod spawn_utils; mod torrent_state; pub mod tracing_subscriber_config_utils; mod tracker_comms; +pub mod tracker_comms_udp; mod type_aliases; pub use api::Api; diff --git a/crates/librqbit/src/tracker_comms_udp.rs b/crates/librqbit/src/tracker_comms_udp.rs new file mode 100644 index 0000000..306076b --- /dev/null +++ b/crates/librqbit/src/tracker_comms_udp.rs @@ -0,0 +1,258 @@ +use std::net::{Ipv4Addr, SocketAddrV4}; + +use anyhow::{bail, Context}; +use librqbit_core::hash_id::Id20; +use rand::Rng; + +const ACTION_CONNECT: u32 = 0; +const ACTION_ANNOUNCE: u32 = 1; +// const ACTION_SCRAPE: u32 = 2; +// const ACTION_ERROR: u32 = 3; + +pub const EVENT_NONE: u32 = 0; +pub const EVENT_COMPLETED: u32 = 1; +pub const EVENT_STARTED: u32 = 2; +pub const EVENT_STOPPED: u32 = 3; + +pub type ConnectionId = u64; +const CONNECTION_ID_MAGIC: ConnectionId = 0x41727101980; + +pub type TransactionId = u32; + +pub fn new_transaction_id() -> TransactionId { + rand::thread_rng().gen() +} + +#[derive(Debug)] +pub struct AnnounceFields { + pub info_hash: Id20, + pub peer_id: Id20, + pub downloaded: u64, + pub left: u64, + pub uploaded: u64, + pub event: u32, + pub key: u32, + pub port: u16, +} + +#[derive(Debug)] +pub enum Request { + Connect, + Announce(ConnectionId, AnnounceFields), +} + +impl Request { + pub fn serialize(&self, transaction_id: TransactionId, buf: &mut Vec) -> usize { + let cur_len = buf.len(); + match self { + Request::Connect => { + buf.extend_from_slice(&CONNECTION_ID_MAGIC.to_be_bytes()); + buf.extend_from_slice(&ACTION_CONNECT.to_be_bytes()); + buf.extend_from_slice(&transaction_id.to_be_bytes()); + } + Request::Announce(connection_id, fields) => { + buf.extend_from_slice(&connection_id.to_be_bytes()); + buf.extend_from_slice(&ACTION_ANNOUNCE.to_be_bytes()); + buf.extend_from_slice(&transaction_id.to_be_bytes()); + buf.extend_from_slice(&fields.info_hash.0); + buf.extend_from_slice(&fields.peer_id.0); + buf.extend_from_slice(&fields.downloaded.to_be_bytes()); + buf.extend_from_slice(&fields.left.to_be_bytes()); + buf.extend_from_slice(&fields.uploaded.to_be_bytes()); + buf.extend_from_slice(&fields.event.to_be_bytes()); + buf.extend_from_slice(&0u32.to_be_bytes()); // ip address 0 + buf.extend_from_slice(&fields.key.to_be_bytes()); + buf.extend_from_slice(&(-1i32).to_be_bytes()); // num want -1 + buf.extend_from_slice(&fields.port.to_be_bytes()); + } + } + buf.len() - cur_len + } +} + +#[derive(Debug)] +pub enum Response { + Connect(ConnectionId), + Announce { + interval: u32, + leechers: u32, + seeders: u32, + addrs: Vec, + }, +} + +fn split_slice(s: &[u8], first_len: usize) -> Option<(&[u8], &[u8])> { + if s.len() < first_len { + return None; + } + Some(s.split_at(first_len)) +} + +fn s_to_arr(buf: &[u8]) -> [u8; T] { + let mut arr = [0u8; T]; + arr.copy_from_slice(buf); + arr +} + +trait ParseNum: Sized { + fn parse_num(buf: &[u8]) -> anyhow::Result<(Self, &[u8])>; +} + +macro_rules! parse_impl { + ($ty:tt, $size:expr) => { + impl ParseNum for $ty { + fn parse_num(buf: &[u8]) -> anyhow::Result<($ty, &[u8])> { + let (bytes, rest) = + split_slice(buf, $size).with_context(|| format!("expected {} bytes", $size))?; + let num = $ty::from_be_bytes(s_to_arr(bytes)); + Ok((num, rest)) + } + } + }; +} + +parse_impl!(u32, 4); +parse_impl!(u64, 8); +parse_impl!(u16, 2); +parse_impl!(i32, 4); +parse_impl!(i64, 8); +parse_impl!(i16, 2); + +impl Response { + pub fn parse(buf: &[u8]) -> anyhow::Result<(TransactionId, Self)> { + let (action, buf) = u32::parse_num(buf).context("can't parse action")?; + let (tid, mut buf) = u32::parse_num(buf).context("can't parse transaction id")?; + let response = match action { + ACTION_CONNECT => { + let (connection_id, b) = + u64::parse_num(buf).context("can't parse connection id")?; + buf = b; + Response::Connect(connection_id) + } + ACTION_ANNOUNCE => { + let (interval, b) = u32::parse_num(buf).context("can't parse interval")?; + let (leechers, b) = u32::parse_num(b).context("can't parse leechers")?; + let (seeders, mut b) = u32::parse_num(b).context("can't parse seeders")?; + let mut addrs = Vec::new(); + while !b.is_empty() { + let (ip, b2) = u32::parse_num(b)?; + let ip = Ipv4Addr::from(ip); + b = b2; + + let (port, b2) = u16::parse_num(b)?; + b = b2; + addrs.push(SocketAddrV4::new(ip, port)); + } + buf = b; + Response::Announce { + interval, + leechers, + seeders, + addrs, + } + } + _ => bail!("unsupported action {action}"), + }; + + if !buf.is_empty() { + bail!( + "parsed {response:?} so far, but got {} remaining bytes", + buf.len() + ); + } + + Ok((tid, response)) + } +} + +#[cfg(test)] +mod tests { + use std::{io::Write, str::FromStr}; + + use librqbit_core::{hash_id::Id20, peer_id::generate_peer_id}; + pub use rand::Rng; + + use crate::tracker_comms_udp::{ + new_transaction_id, AnnounceFields, Request, Response, EVENT_NONE, + }; + + #[test] + fn test_parse_announce() { + let b = include_bytes!("../resources/test/udp-tracker-announce-response.bin"); + let (tid, response) = Response::parse(b).unwrap(); + dbg!(tid, response); + } + + #[ignore] + #[tokio::test] + async fn test_announce() { + let sock = tokio::net::UdpSocket::bind("0.0.0.0:0").await.unwrap(); + sock.connect("opentor.net:6969").await.unwrap(); + + let tid = new_transaction_id(); + let mut write_buf = Vec::new(); + let mut read_buf = vec![0u8; 4096]; + + Request::Connect.serialize(tid, &mut write_buf); + + sock.send(&write_buf).await.unwrap(); + + let size = sock.recv(&mut read_buf).await.unwrap(); + + let (rtid, response) = Response::parse(&read_buf[..size]).unwrap(); + assert_eq!(tid, rtid); + let connection_id = match response { + Response::Connect(connection_id) => { + dbg!(connection_id) + } + other => panic!("unexpected response {other:?}"), + }; + + let hash = Id20::from_str("775459190aa65566591634203f8d9f17d341f969").unwrap(); + + let tid = new_transaction_id(); + let request = Request::Announce( + connection_id, + AnnounceFields { + info_hash: hash, + peer_id: generate_peer_id(), + downloaded: 0, + left: 0, + uploaded: 0, + event: EVENT_NONE, + key: 0, // whatever that is? + port: 24563, + }, + ); + write_buf.clear(); + let size = request.serialize(tid, &mut write_buf); + + sock.send(&write_buf[..size]).await.unwrap(); + let size = sock.recv(&mut read_buf).await.unwrap(); + + { + let mut f = std::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open("/tmp/proto.bin") + .unwrap(); + f.write_all(&read_buf[..size]).unwrap(); + } + + dbg!(&read_buf[..size]); + let (rtid, response) = Response::parse(&read_buf[..size]).unwrap(); + assert_eq!(tid, rtid); + match response { + Response::Announce { + interval, + leechers, + seeders, + addrs, + } => { + dbg!(interval, leechers, seeders, addrs); + } + other => panic!("unexpected response {other:?}"), + } + } +} From 8733538d8302ae7965e065c52e650d541686dbd9 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 17 Feb 2024 10:51:09 +0000 Subject: [PATCH 02/12] Abstracting tracker comms --- crates/librqbit/src/lib.rs | 3 +- crates/librqbit/src/torrent_state/live/mod.rs | 78 ---- crates/librqbit/src/tracker_comms.rs | 429 +++++++++--------- crates/librqbit/src/tracker_comms_http.rs | 233 ++++++++++ crates/librqbit/src/tracker_comms_udp.rs | 106 ++++- crates/librqbit_core/src/lib.rs | 2 + 6 files changed, 539 insertions(+), 312 deletions(-) create mode 100644 crates/librqbit/src/tracker_comms_http.rs diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index a1897cb..817c086 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -36,7 +36,8 @@ mod session; mod spawn_utils; mod torrent_state; pub mod tracing_subscriber_config_utils; -mod tracker_comms; +pub mod tracker_comms; +pub mod tracker_comms_http; pub mod tracker_comms_udp; mod type_aliases; diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 859d7f2..3de7f0c 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -57,7 +57,6 @@ use std::{ use anyhow::{bail, Context}; use backoff::backoff::Backoff; -use bencode::from_bytes; use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; use futures::{stream::FuturesUnordered, StreamExt}; @@ -83,7 +82,6 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use tracing::{debug, error, error_span, info, trace, warn}; -use url::Url; use crate::{ chunk_tracker::{ChunkMarkingResult, ChunkTracker}, @@ -93,7 +91,6 @@ use crate::{ }, session::CheckedIncomingConnection, torrent_state::{peer::Peer, utils::atomic_inc}, - tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse}, type_aliases::{PeerHandle, BF}, }; @@ -237,13 +234,6 @@ impl TorrentStateLive { cancellation_token, }); - for tracker in state.meta.trackers.iter() { - state.spawn( - error_span!(parent: state.meta.span.clone(), "tracker_monitor", url = tracker.to_string()), - state.clone().task_single_tracker_monitor(tracker.clone()), - ); - } - state.spawn( error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), { @@ -297,74 +287,6 @@ impl TorrentStateLive { &self.up_speed_estimator } - async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result { - let response: reqwest::Response = reqwest::get(tracker_url).await?; - if !response.status().is_success() { - anyhow::bail!("tracker responded with {:?}", response.status()); - } - let bytes = response.bytes().await?; - if let Ok(error) = from_bytes::(&bytes) { - anyhow::bail!( - "tracker returned failure. Failure reason: {}", - error.failure_reason - ) - }; - let response = from_bytes::(&bytes)?; - - for peer in response.peers.iter_sockaddrs() { - self.add_peer_if_not_seen(peer)?; - } - Ok(response.interval) - } - - async fn task_single_tracker_monitor( - self: Arc, - mut tracker_url: Url, - ) -> anyhow::Result<()> { - let mut event = Some(TrackerRequestEvent::Started); - loop { - let request = TrackerRequest { - info_hash: self.info_hash(), - peer_id: self.peer_id(), - port: 6778, - uploaded: self.get_uploaded_bytes(), - downloaded: self.get_downloaded_bytes(), - left: self.get_left_to_download_bytes(), - compact: true, - no_peer_id: false, - event, - ip: None, - numwant: None, - key: None, - trackerid: None, - }; - - let request_query = request.as_querystring(); - tracker_url.set_query(Some(&request_query)); - - match self.tracker_one_request(tracker_url.clone()).await { - Ok(interval) => { - event = None; - let interval = self - .meta - .options - .force_tracker_interval - .unwrap_or_else(|| Duration::from_secs(interval)); - debug!( - "sleeping for {:?} after calling tracker {}", - interval, - tracker_url.host().unwrap() - ); - tokio::time::sleep(interval).await; - } - Err(e) => { - debug!("error calling the tracker {}: {:#}", tracker_url, e); - tokio::time::sleep(Duration::from_secs(60)).await; - } - }; - } - } - pub(crate) fn add_incoming_peer( self: &Arc, checked_peer: CheckedIncomingConnection, diff --git a/crates/librqbit/src/tracker_comms.rs b/crates/librqbit/src/tracker_comms.rs index e263be7..8482552 100644 --- a/crates/librqbit/src/tracker_comms.rs +++ b/crates/librqbit/src/tracker_comms.rs @@ -1,233 +1,226 @@ -use buffers::ByteBuf; -use byteorder::ByteOrder; -use serde::{Deserialize, Deserializer}; -use std::{ - fmt::Write, - marker::PhantomData, - net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, - str::FromStr, -}; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; +use anyhow::bail; +use anyhow::Context; +use futures::Stream; +use librqbit_core::spawn_utils::spawn_with_cancel; +use tokio_util::sync::CancellationToken; +use tracing::debug; +use tracing::error_span; +use tracing::info; +use url::Url; + +use crate::tracker_comms_http; +use crate::tracker_comms_udp; use librqbit_core::hash_id::Id20; -#[derive(Clone, Copy)] -pub enum TrackerRequestEvent { - Started, - #[allow(dead_code)] - Stopped, - #[allow(dead_code)] - Completed, +pub struct TrackerComms { + info_hash: Id20, + peer_id: Id20, + stats: Box, + force_tracker_interval: Option, + cancellation_token: CancellationToken, + tx: Sender, + tcp_listen_port: Option, } -pub struct TrackerRequest { - pub info_hash: Id20, - pub peer_id: Id20, - pub event: Option, - pub port: u16, - pub uploaded: u64, - pub downloaded: u64, - pub left: u64, - pub compact: bool, - pub no_peer_id: bool, +pub trait TorrentStatsForTracker: Send + Sync { + fn get_uploaded_bytes(&self) -> u64; + fn get_downloaded_bytes(&self) -> u64; + fn get_total_bytes(&self) -> u64; - pub ip: Option, - pub numwant: Option, - pub key: Option, - pub trackerid: Option, -} - -#[derive(Deserialize, Debug)] -pub struct TrackerError<'a> { - #[serde(rename = "failure reason", borrow)] - pub failure_reason: ByteBuf<'a>, -} - -#[derive(Deserialize, Debug)] -pub struct DictPeer<'a> { - #[serde(deserialize_with = "deserialize_ip_string")] - ip: IpAddr, - #[serde(borrow)] - #[allow(dead_code)] - peer_id: Option>, - port: u16, -} - -impl<'a> DictPeer<'a> { - fn as_sockaddr(&self) -> SocketAddr { - SocketAddr::new(self.ip, self.port) + fn get_left_to_download_bytes(&self) -> u64 { + let total = self.get_total_bytes(); + let down = self.get_downloaded_bytes(); + if total >= down { + return total - down; + } + 0 } } -#[derive(Debug)] -pub struct Peers { - addrs: Vec, -} +type Sender = tokio::sync::mpsc::Sender; -impl Peers { - pub fn iter_sockaddrs(&self) -> impl Iterator + '_ { - self.addrs.iter().copied() - } -} - -impl<'de> serde::de::Deserialize<'de> for Peers { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct Visitor<'de> { - phantom: std::marker::PhantomData<&'de ()>, - } - impl<'de> serde::de::Visitor<'de> for Visitor<'de> { - type Value = Peers; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("a list of peers in dict or binary format") - } - - fn visit_seq(self, mut seq: A) -> Result - where - A: serde::de::SeqAccess<'de>, - { - let mut peers = Vec::new(); - while let Some(peer) = seq.next_element::()? { - peers.push(peer.as_sockaddr()) - } - Ok(Peers { addrs: peers }) - } - - fn visit_bytes(self, v: &[u8]) -> Result - where - E: serde::de::Error, - { - Ok(Peers { - addrs: parse_compact_peers(v) - .into_iter() - .map(|v| v.into()) - .collect(), - }) - } - } - deserializer.deserialize_any(Visitor { - phantom: PhantomData, - }) - } -} - -fn deserialize_ip_string<'de, D>(de: D) -> Result -where - D: Deserializer<'de>, -{ - struct Visitor; - impl<'de> serde::de::Visitor<'de> for Visitor { - type Value = IpAddr; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("expecting an IPv4 address") - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - IpAddr::from_str(v).map_err(|e| E::custom(format!("cannot parse ip: {e}"))) - } - } - de.deserialize_str(Visitor {}) -} - -fn parse_compact_peers(b: &[u8]) -> Vec { - let mut ips = Vec::new(); - for chunk in b.chunks_exact(6) { - let ip_chunk = &chunk[..4]; - let port_chunk = &chunk[4..6]; - let ipaddr = Ipv4Addr::new(ip_chunk[0], ip_chunk[1], ip_chunk[2], ip_chunk[3]); - let port = byteorder::BigEndian::read_u16(port_chunk); - ips.push(SocketAddrV4::new(ipaddr, port)); - } - ips -} - -#[derive(Deserialize, Debug)] -pub struct TrackerResponse<'a> { - #[serde(rename = "warning message", borrow)] - pub warning_message: Option>, - pub complete: u64, - pub interval: u64, - #[serde(rename = "min interval")] - pub min_interval: Option, - pub tracker_id: Option>, - pub incomplete: u64, - pub peers: Peers, -} - -impl TrackerRequest { - pub fn as_querystring(&self) -> String { - use urlencoding as u; - let mut s = String::new(); - s.push_str("info_hash="); - s.push_str(u::encode_binary(&self.info_hash.0).as_ref()); - s.push_str("&peer_id="); - s.push_str(u::encode_binary(&self.peer_id.0).as_ref()); - if let Some(event) = self.event { - write!( - s, - "&event={}", - match event { - TrackerRequestEvent::Started => "started", - TrackerRequestEvent::Stopped => "stopped", - TrackerRequestEvent::Completed => "completed", - } - ) - .unwrap(); - } - write!(s, "&port={}", self.port).unwrap(); - write!(s, "&uploaded={}", self.uploaded).unwrap(); - write!(s, "&downloaded={}", self.downloaded).unwrap(); - write!(s, "&left={}", self.left).unwrap(); - write!(s, "&compact={}", if self.compact { 1 } else { 0 }).unwrap(); - write!(s, "&no_peer_id={}", if self.no_peer_id { 1 } else { 0 }).unwrap(); - if let Some(ip) = &self.ip { - write!(s, "&ip={ip}").unwrap(); - } - if let Some(numwant) = &self.numwant { - write!(s, "&numwant={numwant}").unwrap(); - } - if let Some(key) = &self.key { - write!(s, "&key={key}").unwrap(); - } - if let Some(trackerid) = &self.trackerid { - write!(s, "&trackerid={trackerid}").unwrap(); - } - s - } -} - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn test_serialize() { - let info_hash = Id20::new([ - 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, - ]); - let peer_id = Id20::new([ - 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, - ]); - let request = TrackerRequest { +impl TrackerComms { + pub fn start( + info_hash: Id20, + peer_id: Id20, + trackers: Vec, + stats: Box, + force_interval: Option, + cancellation_token: CancellationToken, + tcp_listen_port: Option, + ) -> anyhow::Result + Send + Sync + Unpin + 'static> { + let (tx, rx) = tokio::sync::mpsc::channel::(16); + let comms = Arc::new(Self { info_hash, peer_id, - port: 6881, - uploaded: 0, - downloaded: 0, - left: 1024 * 1024, - compact: true, - no_peer_id: false, - event: Some(TrackerRequestEvent::Started), - ip: Some("127.0.0.1".parse().unwrap()), - numwant: None, - key: None, - trackerid: None, + stats, + force_tracker_interval: force_interval, + cancellation_token, + tx, + tcp_listen_port, + }); + for tracker in trackers { + if let Err(e) = comms.clone().add_tracker(&tracker) { + info!(tracker = tracker, "error adding tracker: {:#}", e) + } + } + Ok(tokio_stream::wrappers::ReceiverStream::new(rx)) + } + + fn add_tracker(self: Arc, tracker: &str) -> anyhow::Result<()> { + if tracker.starts_with("http://") || tracker.starts_with("https://") { + spawn_with_cancel( + error_span!( + "http_tracker", + tracker = tracker, + info_hash = ?self.info_hash + ), + self.cancellation_token.clone(), + { + let comms = self; + let url = Url::parse(tracker).context("can't parse URL")?; + async move { comms.task_single_tracker_monitor_http(url).await } + }, + ); + } else if tracker.starts_with("udp://") { + spawn_with_cancel( + error_span!("udp_tracker", tracker = tracker), + self.cancellation_token.clone(), + { + let comms = self; + let url = Url::parse(tracker).context("can't parse URL")?; + async move { comms.task_single_tracker_monitor_udp(url).await } + }, + ); + } else { + bail!("unsupported tracker url {}", tracker) + } + Ok(()) + } + + async fn task_single_tracker_monitor_http( + self: Arc, + mut tracker_url: Url, + ) -> anyhow::Result<()> { + let mut event = Some(tracker_comms_http::TrackerRequestEvent::Started); + loop { + let request = tracker_comms_http::TrackerRequest { + info_hash: self.info_hash, + peer_id: self.peer_id, + port: 6778, + uploaded: self.stats.get_uploaded_bytes(), + downloaded: self.stats.get_downloaded_bytes(), + left: self.stats.get_left_to_download_bytes(), + compact: true, + no_peer_id: false, + event, + ip: None, + numwant: None, + key: None, + trackerid: None, + }; + + let request_query = request.as_querystring(); + tracker_url.set_query(Some(&request_query)); + + match self.tracker_one_request_http(tracker_url.clone()).await { + Ok(interval) => { + event = None; + let interval = self + .force_tracker_interval + .unwrap_or_else(|| Duration::from_secs(interval)); + debug!( + "sleeping for {:?} after calling tracker {}", + interval, + tracker_url.host().unwrap() + ); + tokio::time::sleep(interval).await; + } + Err(e) => { + debug!("error calling the tracker {}: {:#}", tracker_url, e); + tokio::time::sleep(Duration::from_secs(60)).await; + } + }; + } + } + + async fn tracker_one_request_http(&self, tracker_url: Url) -> anyhow::Result { + let response: reqwest::Response = reqwest::get(tracker_url).await?; + if !response.status().is_success() { + anyhow::bail!("tracker responded with {:?}", response.status()); + } + let bytes = response.bytes().await?; + if let Ok(error) = bencode::from_bytes::(&bytes) { + anyhow::bail!( + "tracker returned failure. Failure reason: {}", + error.failure_reason + ) }; - dbg!(request.as_querystring()); + let response = bencode::from_bytes::(&bytes)?; + + for peer in response.peers.iter_sockaddrs() { + self.tx.send(peer).await?; + } + Ok(response.interval) + } + + async fn task_single_tracker_monitor_udp(&self, url: Url) -> anyhow::Result<()> { + use tracker_comms_udp::*; + + if url.scheme() != "udp" { + bail!("expected UDP scheme in {}", url); + } + let hp: (&str, u16) = ( + url.host_str().context("missing host")?, + url.port().context("missing port")?, + ); + let mut requester = UdpTrackerRequester::new(hp) + .await + .context("error creating UDP tracker requester")?; + + let mut sleep_interval: Option = None; + loop { + if let Some(i) = sleep_interval { + tokio::time::sleep(i).await; + } + + let request = AnnounceFields { + info_hash: self.info_hash, + peer_id: self.peer_id, + downloaded: self.stats.get_downloaded_bytes(), + left: self.stats.get_left_to_download_bytes(), + uploaded: self.stats.get_uploaded_bytes(), + event: EVENT_NONE, + key: 0, // whatever that is? + port: self.tcp_listen_port.unwrap_or(0), + }; + + match requester.announce(request).await { + Ok(response) => { + for addr in response.addrs { + self.tx + .send(SocketAddr::V4(addr)) + .await + .context("rx closed")?; + } + let new_interval = response.interval.max(5); + let new_interval = Duration::from_secs(new_interval as u64); + sleep_interval = Some(self.force_tracker_interval.unwrap_or(new_interval)); + } + Err(e) => { + debug!(url = ?url, "error reading announce response: {e:#}"); + if sleep_interval.is_none() { + sleep_interval = Some( + self.force_tracker_interval + .unwrap_or(Duration::from_secs(60)), + ); + } + } + } + } } } diff --git a/crates/librqbit/src/tracker_comms_http.rs b/crates/librqbit/src/tracker_comms_http.rs new file mode 100644 index 0000000..e263be7 --- /dev/null +++ b/crates/librqbit/src/tracker_comms_http.rs @@ -0,0 +1,233 @@ +use buffers::ByteBuf; +use byteorder::ByteOrder; +use serde::{Deserialize, Deserializer}; +use std::{ + fmt::Write, + marker::PhantomData, + net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, + str::FromStr, +}; + +use librqbit_core::hash_id::Id20; + +#[derive(Clone, Copy)] +pub enum TrackerRequestEvent { + Started, + #[allow(dead_code)] + Stopped, + #[allow(dead_code)] + Completed, +} + +pub struct TrackerRequest { + pub info_hash: Id20, + pub peer_id: Id20, + pub event: Option, + pub port: u16, + pub uploaded: u64, + pub downloaded: u64, + pub left: u64, + pub compact: bool, + pub no_peer_id: bool, + + pub ip: Option, + pub numwant: Option, + pub key: Option, + pub trackerid: Option, +} + +#[derive(Deserialize, Debug)] +pub struct TrackerError<'a> { + #[serde(rename = "failure reason", borrow)] + pub failure_reason: ByteBuf<'a>, +} + +#[derive(Deserialize, Debug)] +pub struct DictPeer<'a> { + #[serde(deserialize_with = "deserialize_ip_string")] + ip: IpAddr, + #[serde(borrow)] + #[allow(dead_code)] + peer_id: Option>, + port: u16, +} + +impl<'a> DictPeer<'a> { + fn as_sockaddr(&self) -> SocketAddr { + SocketAddr::new(self.ip, self.port) + } +} + +#[derive(Debug)] +pub struct Peers { + addrs: Vec, +} + +impl Peers { + pub fn iter_sockaddrs(&self) -> impl Iterator + '_ { + self.addrs.iter().copied() + } +} + +impl<'de> serde::de::Deserialize<'de> for Peers { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct Visitor<'de> { + phantom: std::marker::PhantomData<&'de ()>, + } + impl<'de> serde::de::Visitor<'de> for Visitor<'de> { + type Value = Peers; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a list of peers in dict or binary format") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut peers = Vec::new(); + while let Some(peer) = seq.next_element::()? { + peers.push(peer.as_sockaddr()) + } + Ok(Peers { addrs: peers }) + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + Ok(Peers { + addrs: parse_compact_peers(v) + .into_iter() + .map(|v| v.into()) + .collect(), + }) + } + } + deserializer.deserialize_any(Visitor { + phantom: PhantomData, + }) + } +} + +fn deserialize_ip_string<'de, D>(de: D) -> Result +where + D: Deserializer<'de>, +{ + struct Visitor; + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = IpAddr; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expecting an IPv4 address") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + IpAddr::from_str(v).map_err(|e| E::custom(format!("cannot parse ip: {e}"))) + } + } + de.deserialize_str(Visitor {}) +} + +fn parse_compact_peers(b: &[u8]) -> Vec { + let mut ips = Vec::new(); + for chunk in b.chunks_exact(6) { + let ip_chunk = &chunk[..4]; + let port_chunk = &chunk[4..6]; + let ipaddr = Ipv4Addr::new(ip_chunk[0], ip_chunk[1], ip_chunk[2], ip_chunk[3]); + let port = byteorder::BigEndian::read_u16(port_chunk); + ips.push(SocketAddrV4::new(ipaddr, port)); + } + ips +} + +#[derive(Deserialize, Debug)] +pub struct TrackerResponse<'a> { + #[serde(rename = "warning message", borrow)] + pub warning_message: Option>, + pub complete: u64, + pub interval: u64, + #[serde(rename = "min interval")] + pub min_interval: Option, + pub tracker_id: Option>, + pub incomplete: u64, + pub peers: Peers, +} + +impl TrackerRequest { + pub fn as_querystring(&self) -> String { + use urlencoding as u; + let mut s = String::new(); + s.push_str("info_hash="); + s.push_str(u::encode_binary(&self.info_hash.0).as_ref()); + s.push_str("&peer_id="); + s.push_str(u::encode_binary(&self.peer_id.0).as_ref()); + if let Some(event) = self.event { + write!( + s, + "&event={}", + match event { + TrackerRequestEvent::Started => "started", + TrackerRequestEvent::Stopped => "stopped", + TrackerRequestEvent::Completed => "completed", + } + ) + .unwrap(); + } + write!(s, "&port={}", self.port).unwrap(); + write!(s, "&uploaded={}", self.uploaded).unwrap(); + write!(s, "&downloaded={}", self.downloaded).unwrap(); + write!(s, "&left={}", self.left).unwrap(); + write!(s, "&compact={}", if self.compact { 1 } else { 0 }).unwrap(); + write!(s, "&no_peer_id={}", if self.no_peer_id { 1 } else { 0 }).unwrap(); + if let Some(ip) = &self.ip { + write!(s, "&ip={ip}").unwrap(); + } + if let Some(numwant) = &self.numwant { + write!(s, "&numwant={numwant}").unwrap(); + } + if let Some(key) = &self.key { + write!(s, "&key={key}").unwrap(); + } + if let Some(trackerid) = &self.trackerid { + write!(s, "&trackerid={trackerid}").unwrap(); + } + s + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_serialize() { + let info_hash = Id20::new([ + 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + ]); + let peer_id = Id20::new([ + 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + ]); + let request = TrackerRequest { + info_hash, + peer_id, + port: 6881, + uploaded: 0, + downloaded: 0, + left: 1024 * 1024, + compact: true, + no_peer_id: false, + event: Some(TrackerRequestEvent::Started), + ip: Some("127.0.0.1".parse().unwrap()), + numwant: None, + key: None, + trackerid: None, + }; + dbg!(request.as_querystring()); + } +} diff --git a/crates/librqbit/src/tracker_comms_udp.rs b/crates/librqbit/src/tracker_comms_udp.rs index 306076b..c32a143 100644 --- a/crates/librqbit/src/tracker_comms_udp.rs +++ b/crates/librqbit/src/tracker_comms_udp.rs @@ -3,6 +3,7 @@ use std::net::{Ipv4Addr, SocketAddrV4}; use anyhow::{bail, Context}; use librqbit_core::hash_id::Id20; use rand::Rng; +use tokio::net::ToSocketAddrs; const ACTION_CONNECT: u32 = 0; const ACTION_ANNOUNCE: u32 = 1; @@ -70,15 +71,18 @@ impl Request { } } +#[derive(Debug)] +pub struct AnnounceResponse { + pub interval: u32, + pub leechers: u32, + pub seeders: u32, + pub addrs: Vec, +} + #[derive(Debug)] pub enum Response { Connect(ConnectionId), - Announce { - interval: u32, - leechers: u32, - seeders: u32, - addrs: Vec, - }, + Announce(AnnounceResponse), } fn split_slice(s: &[u8], first_len: usize) -> Option<(&[u8], &[u8])> { @@ -144,12 +148,12 @@ impl Response { addrs.push(SocketAddrV4::new(ip, port)); } buf = b; - Response::Announce { + Response::Announce(AnnounceResponse { interval, leechers, seeders, addrs, - } + }) } _ => bail!("unsupported action {action}"), }; @@ -165,6 +169,83 @@ impl Response { } } +pub struct UdpTrackerRequester { + sock: tokio::net::UdpSocket, + connection_id: ConnectionId, + read_buf: Vec, + write_buf: Vec, +} + +impl UdpTrackerRequester { + // Addr is "host:port" + pub async fn new(addr: impl ToSocketAddrs) -> anyhow::Result { + let sock = tokio::net::UdpSocket::bind("0.0.0.0:0") + .await + .context("error binding UDP socket")?; + sock.connect(addr) + .await + .context("error connecting UDP socket")?; + + let tid = new_transaction_id(); + let mut write_buf = Vec::new(); + let mut read_buf = vec![0u8; 4096]; + + Request::Connect.serialize(tid, &mut write_buf); + + sock.send(&write_buf) + .await + .context("error sending to socket")?; + + let size = sock + .recv(&mut read_buf) + .await + .context("error receiving from socket")?; + + let (rtid, response) = + Response::parse(&read_buf[..size]).context("error parsing response")?; + if tid != rtid { + bail!("expected transaction id {} == {}", tid, rtid); + } + + let connection_id = match response { + Response::Connect(connection_id) => connection_id, + other => bail!("unexpected response {other:?}"), + }; + + Ok(Self { + sock, + connection_id, + read_buf, + write_buf, + }) + } + + pub async fn announce(&mut self, fields: AnnounceFields) -> anyhow::Result { + let request = Request::Announce(self.connection_id, fields); + let response = self.request(request).await?; + match response { + Response::Announce(r) => Ok(r), + other => bail!("unexpected response {other:?}, expected announce"), + } + } + + pub async fn request(&mut self, request: Request) -> anyhow::Result { + let tid = new_transaction_id(); + self.write_buf.clear(); + let size = request.serialize(tid, &mut self.write_buf); + + self.sock + .send(&self.write_buf[..size]) + .await + .context("error sending")?; + let size = self.sock.recv(&mut self.read_buf).await.unwrap(); + + let (rtid, response) = Response::parse(&self.read_buf[..size]).unwrap(); + assert_eq!(tid, rtid); + Ok(response) + } +} + #[cfg(test)] mod tests { use std::{io::Write, str::FromStr}; @@ -244,13 +325,8 @@ mod tests { let (rtid, response) = Response::parse(&read_buf[..size]).unwrap(); assert_eq!(tid, rtid); match response { - Response::Announce { - interval, - leechers, - seeders, - addrs, - } => { - dbg!(interval, leechers, seeders, addrs); + Response::Announce(r) => { + dbg!(r); } other => panic!("unexpected response {other:?}"), } diff --git a/crates/librqbit_core/src/lib.rs b/crates/librqbit_core/src/lib.rs index 6086598..63577d6 100644 --- a/crates/librqbit_core/src/lib.rs +++ b/crates/librqbit_core/src/lib.rs @@ -7,3 +7,5 @@ pub mod peer_id; pub mod spawn_utils; pub mod speed_estimator; pub mod torrent_metainfo; + +pub use hash_id::Id20; From 95769cca6a42cabe2e16f25763ffe0ce30f81ff5 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 17 Feb 2024 11:14:40 +0000 Subject: [PATCH 03/12] Start calling trackers before going live --- crates/librqbit/src/session.rs | 118 ++++++++++++++--------- crates/librqbit/src/torrent_state/mod.rs | 5 +- crates/librqbit/src/tracker_comms.rs | 25 ++++- crates/librqbit/src/type_aliases.rs | 3 + 4 files changed, 99 insertions(+), 52 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 998befb..1bfd586 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -16,7 +16,7 @@ use clone_to_owned::CloneToOwned; use dht::{ Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream, }; -use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}; +use futures::{stream::FuturesUnordered, Stream, TryFutureExt}; use librqbit_core::{ directories::get_configuration_directory, magnet::Magnet, @@ -28,10 +28,10 @@ use librqbit_core::{ }; use parking_lot::RwLock; use peer_binary_protocol::Handshake; -use reqwest::Url; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::serde_as; use tokio::net::{TcpListener, TcpStream}; +use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; @@ -43,6 +43,8 @@ use crate::{ torrent_state::{ ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, }, + tracker_comms::{TorrentStatsForTrackerDummy, TrackerComms}, + type_aliases::PeerStream, }; pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; @@ -366,6 +368,18 @@ async fn create_tcp_listener( bail!("no free TCP ports in range {port_range:?}"); } +fn merge_peer_rx( + dht_rx: Option, + peer_rx: Option + Unpin + Send + Sync + 'static>, +) -> Option { + match (dht_rx, peer_rx) { + (Some(dht_rx), None) => Some(Box::new(dht_rx)), + (None, Some(peer_rx)) => Some(Box::new(peer_rx)), + (None, None) => None, + (Some(dht_rx), Some(peer_rx)) => Some(Box::new(dht_rx.merge(peer_rx))), + } +} + pub(crate) struct CheckedIncomingConnection { pub addr: SocketAddr, pub stream: tokio::net::TcpStream, @@ -548,7 +562,10 @@ impl Session { )); } - bail!("didn't find a matching torrent for {:?}", Id20::new(h.info_hash)) + bail!( + "didn't find a matching torrent for {:?}", + Id20::new(h.info_hash) + ) } async fn task_tcp_listener(self: Arc, l: TcpListener) -> anyhow::Result<()> { @@ -751,34 +768,41 @@ impl Session { self.tcp_listen_port }; - let (info_hash, info, dht_rx, trackers, initial_peers) = match add { + let (info_hash, info, peer_rx, initial_peers) = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { - let magnet = Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; - let info_hash = magnet.as_id20().context("magnet link didn't contain a BTv1 infohash")?; + let magnet = + Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; + let info_hash = magnet + .as_id20() + .context("magnet link didn't contain a BTv1 infohash")?; - let dht_rx = self + let dht_peer_rx = self .dht .as_ref() - .context("magnet links without DHT are not supported")? - .get_peers(info_hash, announce_port)?; + .map(|d| d.get_peers(info_hash, announce_port)) + .transpose()?; - let trackers = magnet.trackers - .into_iter() - .filter_map(|url| match reqwest::Url::parse(&url) { - Ok(url) => Some(url), - Err(e) => { - warn!("error parsing tracker {} as url: {}", url, e); - None - } - }) - .collect(); + let tracker_peer_rx = TrackerComms::start( + info_hash, + self.peer_id, + magnet.trackers, + Box::new(TorrentStatsForTrackerDummy {}), + opts.force_tracker_interval, + self.cancellation_token().clone(), + self.tcp_listen_port, + ); + + let peer_rx = match merge_peer_rx(dht_peer_rx, tracker_peer_rx) { + Some(peer_rx) => peer_rx, + None => bail!("can't find peers: DHT disabled and no trackers in magnet"), + }; debug!(?info_hash, "querying DHT"); - let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver( + let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( self.peer_id, info_hash, opts.initial_peers.clone().unwrap_or_default(), - dht_rx, + peer_rx, Some(self.merge_peer_opts(opts.peer_opts)), ) .await @@ -795,9 +819,8 @@ impl Session { if opts.paused || opts.list_only { None } else { - Some(dht_rx) + Some(peer_rx) }, - trackers, initial_peers, ) } @@ -829,28 +852,31 @@ impl Session { }; let trackers = torrent .iter_announce() - .filter_map(|tracker| { - let url = match std::str::from_utf8(tracker.as_ref()) { - Ok(url) => url, - Err(_) => { - warn!("cannot parse tracker url as utf-8, ignoring"); - return None; - } - }; - match Url::parse(url) { - Ok(url) => Some(url), - Err(e) => { - warn!("cannot parse tracker URL {}: {}", url, e); - None - } + .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { + Ok(url) => Some(url.to_owned()), + Err(_) => { + warn!("cannot parse tracker url as utf-8, ignoring"); + return None; } }) .collect::>(); + + let tracker_peer_rx = TrackerComms::start( + torrent.info_hash, + self.peer_id, + trackers, + Box::new(TorrentStatsForTrackerDummy {}), + opts.force_tracker_interval, + self.cancellation_token().clone(), + self.tcp_listen_port, + ); + + let peer_rx = merge_peer_rx(dht_rx, tracker_peer_rx); + ( torrent.info_hash, torrent.info, - dht_rx, - trackers, + peer_rx, opts.initial_peers .clone() .unwrap_or_default() @@ -863,9 +889,8 @@ impl Session { self.main_torrent_info( info_hash, info, - dht_rx, + peer_rx, initial_peers.into_iter().collect(), - trackers, opts, ) .await @@ -876,9 +901,8 @@ impl Session { &self, info_hash: Id20, info: TorrentMetaV1Info, - dht_peer_rx: Option, + peer_rx: Option, initial_peers: Vec, - trackers: Vec, opts: AddTorrentOptions, ) -> anyhow::Result { debug!("Torrent info: {:#?}", &info); @@ -966,10 +990,6 @@ impl Session { .cancellation_token(self.cancellation_token.child_token()) .peer_id(self.peer_id); - if opts.disable_trackers { - builder.trackers(trackers); - } - if let Some(only_files) = only_files { builder.only_files(only_files); } @@ -1004,7 +1024,7 @@ impl Session { let span = managed_torrent.info.span.clone(); let _ = span.enter(); managed_torrent - .start(initial_peers, dht_peer_rx, opts.paused) + .start(initial_peers, peer_rx, opts.paused) .context("error starting torrent")?; } @@ -1059,6 +1079,8 @@ impl Session { .as_ref() .map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port)) .transpose()?; + todo!(); + let peer_rx = None; handle.start(Default::default(), peer_rx, false)?; Ok(()) } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 7d29b79..da80053 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -37,6 +37,7 @@ use url::Url; use crate::chunk_tracker::ChunkTracker; use crate::spawn_utils::BlockingSpawner; use crate::torrent_state::stats::LiveStats; +use crate::type_aliases::PeerStream; use initializing::TorrentStateInitializing; @@ -173,7 +174,7 @@ impl ManagedTorrent { pub(crate) fn start( self: &Arc, initial_peers: Vec, - peer_rx: Option, + peer_rx: Option, start_paused: bool, ) -> anyhow::Result<()> { let mut g = self.locked.write(); @@ -204,7 +205,7 @@ impl ManagedTorrent { fn spawn_peer_adder( live: &Arc, initial_peers: Vec, - peer_rx: Option, + peer_rx: Option, ) { live.spawn( error_span!(parent: live.meta().span.clone(), "external_peer_adder"), diff --git a/crates/librqbit/src/tracker_comms.rs b/crates/librqbit/src/tracker_comms.rs index 8482552..61fe467 100644 --- a/crates/librqbit/src/tracker_comms.rs +++ b/crates/librqbit/src/tracker_comms.rs @@ -41,6 +41,21 @@ pub trait TorrentStatsForTracker: Send + Sync { } } +pub struct TorrentStatsForTrackerDummy {} +impl TorrentStatsForTracker for TorrentStatsForTrackerDummy { + fn get_uploaded_bytes(&self) -> u64 { + 0 + } + + fn get_downloaded_bytes(&self) -> u64 { + 0 + } + + fn get_total_bytes(&self) -> u64 { + 0 + } +} + type Sender = tokio::sync::mpsc::Sender; impl TrackerComms { @@ -52,7 +67,7 @@ impl TrackerComms { force_interval: Option, cancellation_token: CancellationToken, tcp_listen_port: Option, - ) -> anyhow::Result + Send + Sync + Unpin + 'static> { + ) -> Option + Send + Sync + Unpin + 'static> { let (tx, rx) = tokio::sync::mpsc::channel::(16); let comms = Arc::new(Self { info_hash, @@ -63,12 +78,18 @@ impl TrackerComms { tx, tcp_listen_port, }); + let mut added = false; for tracker in trackers { if let Err(e) = comms.clone().add_tracker(&tracker) { info!(tracker = tracker, "error adding tracker: {:#}", e) + } else { + added = true; } } - Ok(tokio_stream::wrappers::ReceiverStream::new(rx)) + if !added { + return None; + } + Some(tokio_stream::wrappers::ReceiverStream::new(rx)) } fn add_tracker(self: Arc, tracker: &str) -> anyhow::Result<()> { diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs index 29fa9df..2b6efa9 100644 --- a/crates/librqbit/src/type_aliases.rs +++ b/crates/librqbit/src/type_aliases.rs @@ -1,5 +1,8 @@ use std::net::SocketAddr; +use futures::Stream; + pub type BF = bitvec::vec::BitVec; pub type PeerHandle = SocketAddr; +pub type PeerStream = Box + Unpin + Send + Sync + 'static>; From 1582d16cc51c53d516e23cef3caef126ee45c2ad Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sat, 17 Feb 2024 21:13:57 +0000 Subject: [PATCH 04/12] Fix unpause --- crates/librqbit/src/session.rs | 30 +++++++++++++++++------- crates/librqbit/src/torrent_state/mod.rs | 6 ++--- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 1bfd586..4508e9c 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -43,7 +43,7 @@ use crate::{ torrent_state::{ ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, }, - tracker_comms::{TorrentStatsForTrackerDummy, TrackerComms}, + tracker_comms::{self, TorrentStatsForTrackerDummy, TrackerComms}, type_aliases::PeerStream, }; @@ -768,7 +768,7 @@ impl Session { self.tcp_listen_port }; - let (info_hash, info, peer_rx, initial_peers) = match add { + let (info_hash, info, trackers, peer_rx, initial_peers) = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { let magnet = Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; @@ -785,7 +785,7 @@ impl Session { let tracker_peer_rx = TrackerComms::start( info_hash, self.peer_id, - magnet.trackers, + magnet.trackers.clone(), Box::new(TorrentStatsForTrackerDummy {}), opts.force_tracker_interval, self.cancellation_token().clone(), @@ -816,6 +816,7 @@ impl Session { ( info_hash, info, + magnet.trackers, if opts.paused || opts.list_only { None } else { @@ -856,7 +857,7 @@ impl Session { Ok(url) => Some(url.to_owned()), Err(_) => { warn!("cannot parse tracker url as utf-8, ignoring"); - return None; + None } }) .collect::>(); @@ -864,7 +865,7 @@ impl Session { let tracker_peer_rx = TrackerComms::start( torrent.info_hash, self.peer_id, - trackers, + trackers.clone(), Box::new(TorrentStatsForTrackerDummy {}), opts.force_tracker_interval, self.cancellation_token().clone(), @@ -876,6 +877,7 @@ impl Session { ( torrent.info_hash, torrent.info, + trackers, peer_rx, opts.initial_peers .clone() @@ -889,6 +891,7 @@ impl Session { self.main_torrent_info( info_hash, info, + trackers, peer_rx, initial_peers.into_iter().collect(), opts, @@ -901,6 +904,7 @@ impl Session { &self, info_hash: Id20, info: TorrentMetaV1Info, + trackers: Vec, peer_rx: Option, initial_peers: Vec, opts: AddTorrentOptions, @@ -988,6 +992,7 @@ impl Session { .overwrite(opts.overwrite) .spawner(self.spawner) .cancellation_token(self.cancellation_token.child_token()) + .trackers(trackers) .peer_id(self.peer_id); if let Some(only_files) = only_files { @@ -1074,13 +1079,22 @@ impl Session { } pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { - let peer_rx = self + let dht_rx = self .dht .as_ref() .map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port)) .transpose()?; - todo!(); - let peer_rx = None; + let trackers = handle.info().trackers.clone(); + let peer_rx = TrackerComms::start( + handle.info.info_hash, + handle.info.peer_id, + trackers.into_iter().collect(), + Box::new(tracker_comms::TorrentStatsForTrackerDummy {}), + None, + self.cancellation_token.clone(), + self.tcp_listen_port, + ); + let peer_rx = merge_peer_rx(dht_rx, peer_rx); handle.start(Default::default(), peer_rx, false)?; Ok(()) } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index da80053..0899750 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -84,7 +84,7 @@ pub struct ManagedTorrentInfo { pub info_hash: Id20, pub out_dir: PathBuf, pub(crate) spawner: BlockingSpawner, - pub trackers: HashSet, + pub trackers: HashSet, pub peer_id: Id20, pub lengths: Lengths, pub span: tracing::Span, @@ -422,7 +422,7 @@ pub struct ManagedTorrentBuilder { peer_connect_timeout: Option, peer_read_write_timeout: Option, only_files: Option>, - trackers: Vec, + trackers: Vec, peer_id: Option, overwrite: bool, spawner: Option, @@ -461,7 +461,7 @@ impl ManagedTorrentBuilder { self } - pub fn trackers(&mut self, trackers: Vec) -> &mut Self { + pub fn trackers(&mut self, trackers: Vec) -> &mut Self { self.trackers = trackers; self } From f5ccb8632b00069ef8d70aac1aabbc685d11ec23 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 18 Feb 2024 20:11:12 +0000 Subject: [PATCH 05/12] Cancellation... It compiles now at least with latest changes, but not sure if they work or are correct --- crates/librqbit/src/session.rs | 98 +++++++++++++----------- crates/librqbit/src/torrent_state/mod.rs | 30 ++++---- 2 files changed, 71 insertions(+), 57 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 4508e9c..6250425 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -768,7 +768,10 @@ impl Session { self.tcp_listen_port }; - let (info_hash, info, trackers, peer_rx, initial_peers) = match add { + let cancellation_token = self.cancellation_token.child_token(); + let cancellation_token_drop_guard = cancellation_token.clone().drop_guard(); + + let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { let magnet = Magnet::parse(&magnet).context("provided path is not a valid magnet URL")?; @@ -776,23 +779,13 @@ impl Session { .as_id20() .context("magnet link didn't contain a BTv1 infohash")?; - let dht_peer_rx = self - .dht - .as_ref() - .map(|d| d.get_peers(info_hash, announce_port)) - .transpose()?; - - let tracker_peer_rx = TrackerComms::start( + let peer_rx = self.make_peer_rx( info_hash, - self.peer_id, magnet.trackers.clone(), - Box::new(TorrentStatsForTrackerDummy {}), - opts.force_tracker_interval, - self.cancellation_token().clone(), - self.tcp_listen_port, - ); - - let peer_rx = match merge_peer_rx(dht_peer_rx, tracker_peer_rx) { + cancellation_token.clone(), + announce_port, + )?; + let peer_rx = match peer_rx { Some(peer_rx) => peer_rx, None => bail!("can't find peers: DHT disabled and no trackers in magnet"), }; @@ -823,6 +816,7 @@ impl Session { Some(peer_rx) }, initial_peers, + cancellation_token, ) } other => { @@ -844,13 +838,6 @@ impl Session { AddTorrent::TorrentInfo(t) => *t, }; - let dht_rx = match self.dht.as_ref() { - Some(dht) if !opts.paused && !opts.list_only => { - debug!(info_hash=?torrent.info_hash, "reading peers from DHT"); - Some(dht.get_peers(torrent.info_hash, announce_port)?) - } - _ => None, - }; let trackers = torrent .iter_announce() .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { @@ -862,17 +849,12 @@ impl Session { }) .collect::>(); - let tracker_peer_rx = TrackerComms::start( + let peer_rx = self.make_peer_rx( torrent.info_hash, - self.peer_id, trackers.clone(), - Box::new(TorrentStatsForTrackerDummy {}), - opts.force_tracker_interval, - self.cancellation_token().clone(), - self.tcp_listen_port, - ); - - let peer_rx = merge_peer_rx(dht_rx, tracker_peer_rx); + cancellation_token.clone(), + announce_port, + )?; ( torrent.info_hash, @@ -884,10 +866,13 @@ impl Session { .unwrap_or_default() .into_iter() .collect(), + cancellation_token, ) } }; + cancellation_token_drop_guard.disarm(); + self.main_torrent_info( info_hash, info, @@ -895,6 +880,7 @@ impl Session { peer_rx, initial_peers.into_iter().collect(), opts, + cancellation_token, ) .await } @@ -908,9 +894,12 @@ impl Session { peer_rx: Option, initial_peers: Vec, opts: AddTorrentOptions, + cancellation_token: CancellationToken, ) -> anyhow::Result { debug!("Torrent info: {:#?}", &info); + let drop_guard = cancellation_token.clone().drop_guard(); + let get_only_files = |only_files: Option>, only_files_regex: Option, list_only: bool| { match (only_files, only_files_regex) { @@ -991,7 +980,7 @@ impl Session { builder .overwrite(opts.overwrite) .spawner(self.spawner) - .cancellation_token(self.cancellation_token.child_token()) + .cancellation_token(cancellation_token.clone()) .trackers(trackers) .peer_id(self.peer_id); @@ -1029,10 +1018,16 @@ impl Session { let span = managed_torrent.info.span.clone(); let _ = span.enter(); managed_torrent - .start(initial_peers, peer_rx, opts.paused) + .start( + initial_peers, + peer_rx, + opts.paused, + cancellation_token.child_token(), + ) .context("error starting torrent")?; } + drop_guard.disarm(); Ok(AddTorrentResponse::Added(id, managed_torrent)) } @@ -1078,24 +1073,41 @@ impl Session { } } - pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { + fn make_peer_rx( + &self, + info_hash: Id20, + trackers: Vec, + cancel: CancellationToken, + announce_port: Option, + ) -> anyhow::Result> { + let announce_port = announce_port.or(self.tcp_listen_port); let dht_rx = self .dht .as_ref() - .map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port)) + .map(|dht| dht.get_peers(info_hash, announce_port)) .transpose()?; - let trackers = handle.info().trackers.clone(); let peer_rx = TrackerComms::start( - handle.info.info_hash, - handle.info.peer_id, - trackers.into_iter().collect(), + info_hash, + self.peer_id, + trackers, Box::new(tracker_comms::TorrentStatsForTrackerDummy {}), None, - self.cancellation_token.clone(), - self.tcp_listen_port, + cancel, + announce_port, ); let peer_rx = merge_peer_rx(dht_rx, peer_rx); - handle.start(Default::default(), peer_rx, false)?; + Ok(peer_rx) + } + + pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { + let token = handle.cancellation_token.child_token(); + let peer_rx = self.make_peer_rx( + handle.info_hash(), + handle.info().trackers.clone().into_iter().collect(), + token.clone(), + self.tcp_listen_port, + )?; + handle.start(Default::default(), peer_rx, false, token)?; Ok(()) } } diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 0899750..d8267fa 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -15,7 +15,6 @@ use std::time::Duration; use anyhow::bail; use anyhow::Context; use buffers::ByteString; -use dht::RequestPeersStream; use librqbit_core::hash_id::Id20; use librqbit_core::lengths::Lengths; use librqbit_core::peer_id::generate_peer_id; @@ -32,7 +31,6 @@ use tracing::debug; use tracing::error_span; use tracing::trace; use tracing::warn; -use url::Url; use crate::chunk_tracker::ChunkTracker; use crate::spawn_utils::BlockingSpawner; @@ -176,13 +174,15 @@ impl ManagedTorrent { initial_peers: Vec, peer_rx: Option, start_paused: bool, + live_cancellation_token: CancellationToken, ) -> anyhow::Result<()> { let mut g = self.locked.write(); let spawn_fatal_errors_receiver = - |state: &Arc, rx: tokio::sync::oneshot::Receiver| { + |state: &Arc, + rx: tokio::sync::oneshot::Receiver, + token: CancellationToken| { let span = state.info.span.clone(); - let token = state.cancellation_token.clone(); let state = Arc::downgrade(state); spawn_with_cancel( error_span!(parent: span, "fatal_errors_receiver"), @@ -258,7 +258,7 @@ impl ManagedTorrent { drop(g); let t = self.clone(); let span = self.info().span.clone(); - let token = self.cancellation_token.clone(); + let token = live_cancellation_token.clone(); spawn_with_cancel( error_span!(parent: span.clone(), "initialize_and_start"), token.clone(), @@ -278,10 +278,11 @@ impl ManagedTorrent { } let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new(paused, tx, token.child_token()); + let live = + TorrentStateLive::new(paused, tx, live_cancellation_token); g.state = ManagedTorrentState::Live(live.clone()); - spawn_fatal_errors_receiver(&t, rx); + spawn_fatal_errors_receiver(&t, rx, token); spawn_peer_adder(&live, initial_peers, peer_rx); Ok(()) @@ -299,13 +300,9 @@ impl ManagedTorrent { ManagedTorrentState::Paused(_) => { let paused = g.state.take().assert_paused(); let (tx, rx) = tokio::sync::oneshot::channel(); - let live = TorrentStateLive::new( - paused, - tx, - self.cancellation_token.child_token().clone(), - ); + let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone()); g.state = ManagedTorrentState::Live(live.clone()); - spawn_fatal_errors_receiver(self, rx); + spawn_fatal_errors_receiver(self, rx, live_cancellation_token); spawn_peer_adder(&live, initial_peers, peer_rx); Ok(()) } @@ -318,7 +315,12 @@ impl ManagedTorrent { drop(g); // Recurse. - self.start(initial_peers, peer_rx, start_paused) + self.start( + initial_peers, + peer_rx, + start_paused, + live_cancellation_token, + ) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), } From 76b7d231499b60e9db99dc1d404646180d878bd4 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 18 Feb 2024 20:16:18 +0000 Subject: [PATCH 06/12] Clippy --- crates/librqbit/src/session.rs | 31 ++++++++++++------------------ crates/librqbit_core/src/magnet.rs | 10 +++++----- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 6250425..73641af 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -13,10 +13,8 @@ use anyhow::{bail, Context}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use buffers::{ByteBuf, ByteBufT, ByteString}; use clone_to_owned::CloneToOwned; -use dht::{ - Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream, -}; -use futures::{stream::FuturesUnordered, Stream, TryFutureExt}; +use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig}; +use futures::{stream::FuturesUnordered, TryFutureExt}; use librqbit_core::{ directories::get_configuration_directory, magnet::Magnet, @@ -43,7 +41,7 @@ use crate::{ torrent_state::{ ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, }, - tracker_comms::{self, TorrentStatsForTrackerDummy, TrackerComms}, + tracker_comms::{self, TrackerComms}, type_aliases::PeerStream, }; @@ -368,18 +366,6 @@ async fn create_tcp_listener( bail!("no free TCP ports in range {port_range:?}"); } -fn merge_peer_rx( - dht_rx: Option, - peer_rx: Option + Unpin + Send + Sync + 'static>, -) -> Option { - match (dht_rx, peer_rx) { - (Some(dht_rx), None) => Some(Box::new(dht_rx)), - (None, Some(peer_rx)) => Some(Box::new(peer_rx)), - (None, None) => None, - (Some(dht_rx), Some(peer_rx)) => Some(Box::new(dht_rx.merge(peer_rx))), - } -} - pub(crate) struct CheckedIncomingConnection { pub addr: SocketAddr, pub stream: tokio::net::TcpStream, @@ -1073,6 +1059,7 @@ impl Session { } } + // Get a peer stream from both DHT and trackers. fn make_peer_rx( &self, info_hash: Id20, @@ -1095,8 +1082,14 @@ impl Session { cancel, announce_port, ); - let peer_rx = merge_peer_rx(dht_rx, peer_rx); - Ok(peer_rx) + + // Merge DHT rx and tracker comms peer rx. + match (dht_rx, peer_rx) { + (Some(dht_rx), None) => Ok(Some(Box::new(dht_rx))), + (None, Some(peer_rx)) => Ok(Some(Box::new(peer_rx))), + (None, None) => Ok(None), + (Some(dht_rx), Some(peer_rx)) => Ok(Some(Box::new(dht_rx.merge(peer_rx)))), + } } pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { diff --git a/crates/librqbit_core/src/magnet.rs b/crates/librqbit_core/src/magnet.rs index 477dbf6..f0942bb 100644 --- a/crates/librqbit_core/src/magnet.rs +++ b/crates/librqbit_core/src/magnet.rs @@ -4,7 +4,6 @@ use anyhow::Context; use crate::hash_id::{Id20, Id32}; - /// A parsed magnet link. pub struct Magnet { id20: Option, @@ -45,7 +44,7 @@ impl Magnet { } else { anyhow::bail!("expected xt to start with btih or btmh"); } - }, + } "tr" => trackers.push(value.into()), _ => {} } @@ -93,7 +92,6 @@ impl std::fmt::Display for Magnet { } } - #[cfg(test)] mod tests { #[test] @@ -109,8 +107,10 @@ mod tests { use std::str::FromStr; let magnet = "magnet:?xt=urn:btmh:1220caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e&dn=bittorrent-v2-test "; - let info_hash = Id32::from_str("caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e").unwrap(); - let m = Magnet::parse(&magnet).unwrap(); + let info_hash = + Id32::from_str("caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e") + .unwrap(); + let m = Magnet::parse(magnet).unwrap(); assert!(m.as_id32() == Some(info_hash)); } } From 3a7207265fceaa6570503a7f2def08a7a41cb448 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 18 Feb 2024 20:21:12 +0000 Subject: [PATCH 07/12] Make force_tracker_interval be used again --- crates/librqbit/src/session.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 73641af..f38c85d 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -30,7 +30,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::serde_as; use tokio::net::{TcpListener, TcpStream}; use tokio_stream::StreamExt; -use tokio_util::sync::CancellationToken; +use tokio_util::sync::{CancellationToken, DropGuard}; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ @@ -172,6 +172,9 @@ pub struct Session { tcp_listen_port: Option, cancellation_token: CancellationToken, + + // This is stored for all tasks to stop when session is dropped. + _cancellation_token_drop_guard: DropGuard, } async fn torrent_from_url(url: &str) -> anyhow::Result { @@ -440,6 +443,7 @@ impl Session { spawner, output_folder, db: RwLock::new(Default::default()), + _cancellation_token_drop_guard: token.clone().drop_guard(), cancellation_token: token, tcp_listen_port, }); @@ -770,6 +774,7 @@ impl Session { magnet.trackers.clone(), cancellation_token.clone(), announce_port, + opts.force_tracker_interval, )?; let peer_rx = match peer_rx { Some(peer_rx) => peer_rx, @@ -840,6 +845,7 @@ impl Session { trackers.clone(), cancellation_token.clone(), announce_port, + opts.force_tracker_interval, )?; ( @@ -1066,6 +1072,7 @@ impl Session { trackers: Vec, cancel: CancellationToken, announce_port: Option, + force_tracker_interval: Option, ) -> anyhow::Result> { let announce_port = announce_port.or(self.tcp_listen_port); let dht_rx = self @@ -1077,8 +1084,9 @@ impl Session { info_hash, self.peer_id, trackers, + // TODO: report actual bytes, not zeroes. Box::new(tracker_comms::TorrentStatsForTrackerDummy {}), - None, + force_tracker_interval, cancel, announce_port, ); @@ -1099,6 +1107,7 @@ impl Session { handle.info().trackers.clone().into_iter().collect(), token.clone(), self.tcp_listen_port, + handle.info().options.force_tracker_interval, )?; handle.start(Default::default(), peer_rx, false, token)?; Ok(()) From 51dba8ab67195304a0389fdfc180a3147fbcca94 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 08:55:53 +0000 Subject: [PATCH 08/12] Remove unused field --- crates/librqbit/src/session.rs | 3 +-- crates/librqbit/src/torrent_state/mod.rs | 11 +---------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index f38c85d..3d81b66 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -972,7 +972,6 @@ impl Session { builder .overwrite(opts.overwrite) .spawner(self.spawner) - .cancellation_token(cancellation_token.clone()) .trackers(trackers) .peer_id(self.peer_id); @@ -1101,7 +1100,7 @@ impl Session { } pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { - let token = handle.cancellation_token.child_token(); + let token = self.cancellation_token.child_token(); let peer_rx = self.make_peer_rx( handle.info_hash(), handle.info().trackers.clone().into_iter().collect(), diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index d8267fa..90bfd8e 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -91,7 +91,6 @@ pub struct ManagedTorrentInfo { pub struct ManagedTorrent { pub info: Arc, - pub cancellation_token: CancellationToken, pub(crate) only_files: Option>, locked: RwLock, } @@ -428,7 +427,6 @@ pub struct ManagedTorrentBuilder { peer_id: Option, overwrite: bool, spawner: Option, - cancellation_token: Option, } impl ManagedTorrentBuilder { @@ -449,15 +447,9 @@ impl ManagedTorrentBuilder { trackers: Default::default(), peer_id: None, overwrite: false, - cancellation_token: None, } } - pub fn cancellation_token(&mut self, token: CancellationToken) -> &mut Self { - self.cancellation_token = Some(token); - self - } - pub fn only_files(&mut self, only_files: Vec) -> &mut Self { self.only_files = Some(only_files); self @@ -498,7 +490,7 @@ impl ManagedTorrentBuilder { self } - pub(crate) fn build(mut self, span: tracing::Span) -> anyhow::Result { + pub(crate) fn build(self, span: tracing::Span) -> anyhow::Result { let lengths = Lengths::from_torrent(&self.info)?; let info = Arc::new(ManagedTorrentInfo { span, @@ -525,7 +517,6 @@ impl ManagedTorrentBuilder { locked: RwLock::new(ManagedTorrentLocked { state: ManagedTorrentState::Initializing(initializing), }), - cancellation_token: self.cancellation_token.take().unwrap_or_default(), info, })) } From e263441fb62a1360f80708bbf58862eb81873e5e Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 09:25:01 +0000 Subject: [PATCH 09/12] Better cancellation --- Makefile | 3 +-- crates/librqbit/src/session.rs | 49 ++++++++++++++++++++-------------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index 2e69dc6..751fb44 100644 --- a/Makefile +++ b/Makefile @@ -17,8 +17,7 @@ webui-build: webui-deps @PHONY: devserver devserver: - echo -n '' > /tmp/rqbit-log - cargo run --release -- \ + echo -n '' > /tmp/rqbit-log && cargo run --release -- \ --log-file /tmp/rqbit-log \ --log-file-rust-log=debug,librqbit=trace \ server start /tmp/scratch/ diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 3d81b66..d7bf12c 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -666,7 +666,7 @@ impl Session { .collect(); let info = TorrentMetaV1Owned { announce: trackers - .get(0) + .first() .cloned() .unwrap_or_else(|| ByteString(b"http://retracker.local/announce".to_vec())), announce_list: vec![trackers], @@ -760,6 +760,7 @@ impl Session { let cancellation_token = self.cancellation_token.child_token(); let cancellation_token_drop_guard = cancellation_token.clone().drop_guard(); + let paused = opts.list_only || opts.paused; let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { @@ -769,10 +770,11 @@ impl Session { .as_id20() .context("magnet link didn't contain a BTv1 infohash")?; + let peer_token = cancellation_token.child_token(); let peer_rx = self.make_peer_rx( info_hash, magnet.trackers.clone(), - cancellation_token.clone(), + peer_token.clone(), announce_port, opts.force_tracker_interval, )?; @@ -796,16 +798,15 @@ impl Session { anyhow::bail!("DHT died, no way to discover torrent metainfo") } }; + if paused { + peer_token.cancel(); + } debug!(?info, "received result from DHT"); ( info_hash, info, magnet.trackers, - if opts.paused || opts.list_only { - None - } else { - Some(peer_rx) - }, + Some(peer_rx), initial_peers, cancellation_token, ) @@ -840,13 +841,17 @@ impl Session { }) .collect::>(); - let peer_rx = self.make_peer_rx( - torrent.info_hash, - trackers.clone(), - cancellation_token.clone(), - announce_port, - opts.force_tracker_interval, - )?; + let peer_rx = if paused { + None + } else { + self.make_peer_rx( + torrent.info_hash, + trackers.clone(), + cancellation_token.clone(), + announce_port, + opts.force_tracker_interval, + )? + }; ( torrent.info_hash, @@ -1008,13 +1013,17 @@ impl Session { { let span = managed_torrent.info.span.clone(); let _ = span.enter(); + + // Just in case, cancel all tasks started for this torrent so far. + // This is defensive, and not proven necessary. + let token = if opts.paused { + cancellation_token.cancel(); + self.cancellation_token.child_token() + } else { + cancellation_token + }; managed_torrent - .start( - initial_peers, - peer_rx, - opts.paused, - cancellation_token.child_token(), - ) + .start(initial_peers, peer_rx, opts.paused, token) .context("error starting torrent")?; } From b30ee1338881f14bf0d23dc00f0e700ae5219c28 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 09:42:06 +0000 Subject: [PATCH 10/12] Logging for UDP trackers --- .../librqbit/src/tracing_subscriber_config_utils.rs | 1 - crates/librqbit/src/tracker_comms.rs | 6 +++++- crates/librqbit/src/tracker_comms_udp.rs | 13 ++++++++++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/crates/librqbit/src/tracing_subscriber_config_utils.rs b/crates/librqbit/src/tracing_subscriber_config_utils.rs index cc595b2..7cac1a2 100644 --- a/crates/librqbit/src/tracing_subscriber_config_utils.rs +++ b/crates/librqbit/src/tracing_subscriber_config_utils.rs @@ -96,7 +96,6 @@ pub fn init_logging(opts: InitLoggingOptions) -> anyhow::Result = None; loop { if let Some(i) = sleep_interval { + trace!(interval=?sleep_interval, "sleeping"); tokio::time::sleep(i).await; } @@ -222,6 +225,7 @@ impl TrackerComms { match requester.announce(request).await { Ok(response) => { + trace!(len = response.addrs.len(), "received announce response"); for addr in response.addrs { self.tx .send(SocketAddr::V4(addr)) diff --git a/crates/librqbit/src/tracker_comms_udp.rs b/crates/librqbit/src/tracker_comms_udp.rs index c32a143..1e72ae8 100644 --- a/crates/librqbit/src/tracker_comms_udp.rs +++ b/crates/librqbit/src/tracker_comms_udp.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Context}; use librqbit_core::hash_id::Id20; use rand::Rng; use tokio::net::ToSocketAddrs; +use tracing::trace; const ACTION_CONNECT: u32 = 0; const ACTION_ANNOUNCE: u32 = 1; @@ -190,6 +191,7 @@ impl UdpTrackerRequester { let mut write_buf = Vec::new(); let mut read_buf = vec![0u8; 4096]; + trace!("sending connect request"); Request::Connect.serialize(tid, &mut write_buf); sock.send(&write_buf) @@ -206,12 +208,15 @@ impl UdpTrackerRequester { if tid != rtid { bail!("expected transaction id {} == {}", tid, rtid); } + trace!(response=?response, "received"); let connection_id = match response { Response::Connect(connection_id) => connection_id, other => bail!("unexpected response {other:?}"), }; + trace!(connection_id); + Ok(Self { sock, connection_id, @@ -233,7 +238,7 @@ impl UdpTrackerRequester { let tid = new_transaction_id(); self.write_buf.clear(); let size = request.serialize(tid, &mut self.write_buf); - + trace!(request=?request, tid, "sending"); self.sock .send(&self.write_buf[..size]) .await @@ -241,7 +246,10 @@ impl UdpTrackerRequester { let size = self.sock.recv(&mut self.read_buf).await.unwrap(); let (rtid, response) = Response::parse(&self.read_buf[..size]).unwrap(); - assert_eq!(tid, rtid); + trace!("received response"); + if tid != rtid { + bail!("unexpected transaction id"); + } Ok(response) } } @@ -251,7 +259,6 @@ mod tests { use std::{io::Write, str::FromStr}; use librqbit_core::{hash_id::Id20, peer_id::generate_peer_id}; - pub use rand::Rng; use crate::tracker_comms_udp::{ new_transaction_id, AnnounceFields, Request, Response, EVENT_NONE, From 70c59834ba7d73c0491b2b734801c43f3478acd5 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 09:47:46 +0000 Subject: [PATCH 11/12] Simplify the trait for tracker comms --- crates/librqbit/src/session.rs | 4 +-- crates/librqbit/src/tracker_comms.rs | 52 ++++++++++++++-------------- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index d7bf12c..46c9694 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -41,7 +41,7 @@ use crate::{ torrent_state::{ ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, }, - tracker_comms::{self, TrackerComms}, + tracker_comms::TrackerComms, type_aliases::PeerStream, }; @@ -1093,7 +1093,7 @@ impl Session { self.peer_id, trackers, // TODO: report actual bytes, not zeroes. - Box::new(tracker_comms::TorrentStatsForTrackerDummy {}), + Box::new(()), force_tracker_interval, cancel, announce_port, diff --git a/crates/librqbit/src/tracker_comms.rs b/crates/librqbit/src/tracker_comms.rs index e5b83ef..d1a7813 100644 --- a/crates/librqbit/src/tracker_comms.rs +++ b/crates/librqbit/src/tracker_comms.rs @@ -20,21 +20,24 @@ use librqbit_core::hash_id::Id20; pub struct TrackerComms { info_hash: Id20, peer_id: Id20, - stats: Box, + stats: Box, force_tracker_interval: Option, cancellation_token: CancellationToken, tx: Sender, tcp_listen_port: Option, } -pub trait TorrentStatsForTracker: Send + Sync { - fn get_uploaded_bytes(&self) -> u64; - fn get_downloaded_bytes(&self) -> u64; - fn get_total_bytes(&self) -> u64; +#[derive(Default)] +pub struct TrackerCommsStats { + pub uploaded_bytes: u64, + pub downloaded_bytes: u64, + pub total_bytes: u64, +} - fn get_left_to_download_bytes(&self) -> u64 { - let total = self.get_total_bytes(); - let down = self.get_downloaded_bytes(); +impl TrackerCommsStats { + pub fn get_left_to_download_bytes(&self) -> u64 { + let total = self.total_bytes; + let down = self.downloaded_bytes; if total >= down { return total - down; } @@ -42,18 +45,13 @@ pub trait TorrentStatsForTracker: Send + Sync { } } -pub struct TorrentStatsForTrackerDummy {} -impl TorrentStatsForTracker for TorrentStatsForTrackerDummy { - fn get_uploaded_bytes(&self) -> u64 { - 0 - } +pub trait TorrentStatsProvider: Send + Sync { + fn get(&self) -> TrackerCommsStats; +} - fn get_downloaded_bytes(&self) -> u64 { - 0 - } - - fn get_total_bytes(&self) -> u64 { - 0 +impl TorrentStatsProvider for () { + fn get(&self) -> TrackerCommsStats { + Default::default() } } @@ -64,7 +62,7 @@ impl TrackerComms { info_hash: Id20, peer_id: Id20, trackers: Vec, - stats: Box, + stats: Box, force_interval: Option, cancellation_token: CancellationToken, tcp_listen_port: Option, @@ -131,13 +129,14 @@ impl TrackerComms { ) -> anyhow::Result<()> { let mut event = Some(tracker_comms_http::TrackerRequestEvent::Started); loop { + let stats = self.stats.get(); let request = tracker_comms_http::TrackerRequest { info_hash: self.info_hash, peer_id: self.peer_id, port: 6778, - uploaded: self.stats.get_uploaded_bytes(), - downloaded: self.stats.get_downloaded_bytes(), - left: self.stats.get_left_to_download_bytes(), + uploaded: stats.uploaded_bytes, + downloaded: stats.downloaded_bytes, + left: stats.get_left_to_download_bytes(), compact: true, no_peer_id: false, event, @@ -212,12 +211,13 @@ impl TrackerComms { tokio::time::sleep(i).await; } + let stats = self.stats.get(); let request = AnnounceFields { info_hash: self.info_hash, peer_id: self.peer_id, - downloaded: self.stats.get_downloaded_bytes(), - left: self.stats.get_left_to_download_bytes(), - uploaded: self.stats.get_uploaded_bytes(), + downloaded: stats.downloaded_bytes, + left: stats.get_left_to_download_bytes(), + uploaded: stats.uploaded_bytes, event: EVENT_NONE, key: 0, // whatever that is? port: self.tcp_listen_port.unwrap_or(0), From d3f017430be812b4c9d1230e62c1b5020e9d018b Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 26 Feb 2024 09:57:09 +0000 Subject: [PATCH 12/12] Comment updates --- crates/librqbit/src/session.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 46c9694..95d65ea 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -509,8 +509,6 @@ impl Session { addr: SocketAddr, mut stream: TcpStream, ) -> anyhow::Result<(Arc, CheckedIncomingConnection)> { - // TODO: move buffer handling to peer_connection - let rwtimeout = self .peer_opts .read_write_timeout @@ -762,6 +760,10 @@ impl Session { let cancellation_token_drop_guard = cancellation_token.clone().drop_guard(); let paused = opts.list_only || opts.paused; + // The main difference between magnet link and torrent file, is that we need to resolve the magnet link + // into a torrent file by connecting to peers that support extended handshakes. + // So we must discover at least one peer and connect to it to be able to proceed further. + let (info_hash, info, trackers, peer_rx, initial_peers, cancellation_token) = match add { AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => { let magnet =