From 6f3383050e2ff4f474db63114d9c104c9236c462 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 11 Feb 2024 12:12:37 +0000 Subject: [PATCH] Implement ser/de for UDP tracker protocol --- .../test/udp-tracker-announce-response.bin | Bin 0 -> 1220 bytes crates/librqbit/src/lib.rs | 1 + crates/librqbit/src/tracker_comms_udp.rs | 258 ++++++++++++++++++ 3 files changed, 259 insertions(+) create mode 100644 crates/librqbit/resources/test/udp-tracker-announce-response.bin create mode 100644 crates/librqbit/src/tracker_comms_udp.rs diff --git a/crates/librqbit/resources/test/udp-tracker-announce-response.bin b/crates/librqbit/resources/test/udp-tracker-announce-response.bin new file mode 100644 index 0000000000000000000000000000000000000000..4b1bc3aa41c1c1041424a3e05d07afc867a90614 GIT binary patch literal 1220 zcmV;#1Uvfx0005yl-(2n00wjb0099201XW;06%FO;aa^eBT~JI)Uu;p4zg9SG!TmLd0%M0?s}SDDZaHjGh6NClg6@HZ7+t7PE(7s zImp#dp>-^V1*FIB4nUbD<4I1H*;A?*x;cd9UF^5|P_mz-SK64$b-86BNA9bSV41RCL7|N2cT zDOn_>T6i)~i{CGYc0P>sJ29&+@-r6)v?YueV6R>U14pb2Z5>!KQht(K` zDoOvmq6|_nh`a!)Ro=X^%={`wTC!QJ3QH)mLW3rMNU+pj`B9g!z?qz8d2Rekgz@ZO zns#P`j9(hlva)1dt7g77@?3n)h|^?Q7~{w?W>D>17goMct8tlXmQ8HcNvH)bioW9- z;U(5cCK}-aKV_$@4AtJ^cXX4&yOp;_6uiF`GWg8MS6#r2bJT6MxlTE?eAJSgN8|b(*-&=2UGxpR1};qInr~C{R$u7XGR}*}fQJqb>k62v`s$ zaK6?tXeBw3#VA$<|G$ywh6Rh_yoTP?+~q-IxYQuP$f`TM^oJwfDZC-+X#Z5KZ>;Ue z)ouLNSVQMu-NkhZL0?WV75ar=JHuYY5?>`pS}Y)68cM<7EngY1-GwJ!8RPm@2woII zH7fpF)`^;jc3Js=%Dc)~ljwt>`&Enq9i2s0bzMGL6I3VkZ(B%Di9K8O1l*2*165+*L zp}91T=Z5*Rn-O{7$*^>#+mf2Fb7Al4RIp}5)F)eQ{E(Q=VQr(>-;pR^8PE8FuU{D? zT-Bpq6Wq|J99%v-F-Y=UC62OH`d9fq(!wuPHU)Thl23j}PN*d=@(VISqAsm8rdKpB ziV-483oa@+lgQO2PFajCTLt7%>l)z&rH_z48(Y7EWxt46@p1RA?Zwn;u`&vaoRYDMU)KsYl)b literal 0 HcmV?d00001 diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 38091bf..a1897cb 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -37,6 +37,7 @@ mod spawn_utils; mod torrent_state; pub mod tracing_subscriber_config_utils; mod tracker_comms; +pub mod tracker_comms_udp; mod type_aliases; pub use api::Api; diff --git a/crates/librqbit/src/tracker_comms_udp.rs b/crates/librqbit/src/tracker_comms_udp.rs new file mode 100644 index 0000000..306076b --- /dev/null +++ b/crates/librqbit/src/tracker_comms_udp.rs @@ -0,0 +1,258 @@ +use std::net::{Ipv4Addr, SocketAddrV4}; + +use anyhow::{bail, Context}; +use librqbit_core::hash_id::Id20; +use rand::Rng; + +const ACTION_CONNECT: u32 = 0; +const ACTION_ANNOUNCE: u32 = 1; +// const ACTION_SCRAPE: u32 = 2; +// const ACTION_ERROR: u32 = 3; + +pub const EVENT_NONE: u32 = 0; +pub const EVENT_COMPLETED: u32 = 1; +pub const EVENT_STARTED: u32 = 2; +pub const EVENT_STOPPED: u32 = 3; + +pub type ConnectionId = u64; +const CONNECTION_ID_MAGIC: ConnectionId = 0x41727101980; + +pub type TransactionId = u32; + +pub fn new_transaction_id() -> TransactionId { + rand::thread_rng().gen() +} + +#[derive(Debug)] +pub struct AnnounceFields { + pub info_hash: Id20, + pub peer_id: Id20, + pub downloaded: u64, + pub left: u64, + pub uploaded: u64, + pub event: u32, + pub key: u32, + pub port: u16, +} + +#[derive(Debug)] +pub enum Request { + Connect, + Announce(ConnectionId, AnnounceFields), +} + +impl Request { + pub fn serialize(&self, transaction_id: TransactionId, buf: &mut Vec) -> usize { + let cur_len = buf.len(); + match self { + Request::Connect => { + buf.extend_from_slice(&CONNECTION_ID_MAGIC.to_be_bytes()); + buf.extend_from_slice(&ACTION_CONNECT.to_be_bytes()); + buf.extend_from_slice(&transaction_id.to_be_bytes()); + } + Request::Announce(connection_id, fields) => { + buf.extend_from_slice(&connection_id.to_be_bytes()); + buf.extend_from_slice(&ACTION_ANNOUNCE.to_be_bytes()); + buf.extend_from_slice(&transaction_id.to_be_bytes()); + buf.extend_from_slice(&fields.info_hash.0); + buf.extend_from_slice(&fields.peer_id.0); + buf.extend_from_slice(&fields.downloaded.to_be_bytes()); + buf.extend_from_slice(&fields.left.to_be_bytes()); + buf.extend_from_slice(&fields.uploaded.to_be_bytes()); + buf.extend_from_slice(&fields.event.to_be_bytes()); + buf.extend_from_slice(&0u32.to_be_bytes()); // ip address 0 + buf.extend_from_slice(&fields.key.to_be_bytes()); + buf.extend_from_slice(&(-1i32).to_be_bytes()); // num want -1 + buf.extend_from_slice(&fields.port.to_be_bytes()); + } + } + buf.len() - cur_len + } +} + +#[derive(Debug)] +pub enum Response { + Connect(ConnectionId), + Announce { + interval: u32, + leechers: u32, + seeders: u32, + addrs: Vec, + }, +} + +fn split_slice(s: &[u8], first_len: usize) -> Option<(&[u8], &[u8])> { + if s.len() < first_len { + return None; + } + Some(s.split_at(first_len)) +} + +fn s_to_arr(buf: &[u8]) -> [u8; T] { + let mut arr = [0u8; T]; + arr.copy_from_slice(buf); + arr +} + +trait ParseNum: Sized { + fn parse_num(buf: &[u8]) -> anyhow::Result<(Self, &[u8])>; +} + +macro_rules! parse_impl { + ($ty:tt, $size:expr) => { + impl ParseNum for $ty { + fn parse_num(buf: &[u8]) -> anyhow::Result<($ty, &[u8])> { + let (bytes, rest) = + split_slice(buf, $size).with_context(|| format!("expected {} bytes", $size))?; + let num = $ty::from_be_bytes(s_to_arr(bytes)); + Ok((num, rest)) + } + } + }; +} + +parse_impl!(u32, 4); +parse_impl!(u64, 8); +parse_impl!(u16, 2); +parse_impl!(i32, 4); +parse_impl!(i64, 8); +parse_impl!(i16, 2); + +impl Response { + pub fn parse(buf: &[u8]) -> anyhow::Result<(TransactionId, Self)> { + let (action, buf) = u32::parse_num(buf).context("can't parse action")?; + let (tid, mut buf) = u32::parse_num(buf).context("can't parse transaction id")?; + let response = match action { + ACTION_CONNECT => { + let (connection_id, b) = + u64::parse_num(buf).context("can't parse connection id")?; + buf = b; + Response::Connect(connection_id) + } + ACTION_ANNOUNCE => { + let (interval, b) = u32::parse_num(buf).context("can't parse interval")?; + let (leechers, b) = u32::parse_num(b).context("can't parse leechers")?; + let (seeders, mut b) = u32::parse_num(b).context("can't parse seeders")?; + let mut addrs = Vec::new(); + while !b.is_empty() { + let (ip, b2) = u32::parse_num(b)?; + let ip = Ipv4Addr::from(ip); + b = b2; + + let (port, b2) = u16::parse_num(b)?; + b = b2; + addrs.push(SocketAddrV4::new(ip, port)); + } + buf = b; + Response::Announce { + interval, + leechers, + seeders, + addrs, + } + } + _ => bail!("unsupported action {action}"), + }; + + if !buf.is_empty() { + bail!( + "parsed {response:?} so far, but got {} remaining bytes", + buf.len() + ); + } + + Ok((tid, response)) + } +} + +#[cfg(test)] +mod tests { + use std::{io::Write, str::FromStr}; + + use librqbit_core::{hash_id::Id20, peer_id::generate_peer_id}; + pub use rand::Rng; + + use crate::tracker_comms_udp::{ + new_transaction_id, AnnounceFields, Request, Response, EVENT_NONE, + }; + + #[test] + fn test_parse_announce() { + let b = include_bytes!("../resources/test/udp-tracker-announce-response.bin"); + let (tid, response) = Response::parse(b).unwrap(); + dbg!(tid, response); + } + + #[ignore] + #[tokio::test] + async fn test_announce() { + let sock = tokio::net::UdpSocket::bind("0.0.0.0:0").await.unwrap(); + sock.connect("opentor.net:6969").await.unwrap(); + + let tid = new_transaction_id(); + let mut write_buf = Vec::new(); + let mut read_buf = vec![0u8; 4096]; + + Request::Connect.serialize(tid, &mut write_buf); + + sock.send(&write_buf).await.unwrap(); + + let size = sock.recv(&mut read_buf).await.unwrap(); + + let (rtid, response) = Response::parse(&read_buf[..size]).unwrap(); + assert_eq!(tid, rtid); + let connection_id = match response { + Response::Connect(connection_id) => { + dbg!(connection_id) + } + other => panic!("unexpected response {other:?}"), + }; + + let hash = Id20::from_str("775459190aa65566591634203f8d9f17d341f969").unwrap(); + + let tid = new_transaction_id(); + let request = Request::Announce( + connection_id, + AnnounceFields { + info_hash: hash, + peer_id: generate_peer_id(), + downloaded: 0, + left: 0, + uploaded: 0, + event: EVENT_NONE, + key: 0, // whatever that is? + port: 24563, + }, + ); + write_buf.clear(); + let size = request.serialize(tid, &mut write_buf); + + sock.send(&write_buf[..size]).await.unwrap(); + let size = sock.recv(&mut read_buf).await.unwrap(); + + { + let mut f = std::fs::OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open("/tmp/proto.bin") + .unwrap(); + f.write_all(&read_buf[..size]).unwrap(); + } + + dbg!(&read_buf[..size]); + let (rtid, response) = Response::parse(&read_buf[..size]).unwrap(); + assert_eq!(tid, rtid); + match response { + Response::Announce { + interval, + leechers, + seeders, + addrs, + } => { + dbg!(interval, leechers, seeders, addrs); + } + other => panic!("unexpected response {other:?}"), + } + } +}