diff --git a/TODO.md b/TODO.md index 0f929a8..ac2c650 100644 --- a/TODO.md +++ b/TODO.md @@ -18,13 +18,18 @@ - [x] many nodes in "Unknown" status, do smth about it - [x] for torrents with a few seeds might be cool to re-query DHT once in a while. - [x] don't leak memory when deleting torrents (i.e. remove torrent information (seen peers etc) once the torrent is deleted) - - [ ] Routing table - is it balanced properly? - - [ ] Don't query Bad nodes + - [x] Routing table - is it balanced properly? + - [ ] + - [x] Don't query Bad nodes - [-] Buckets that have not been changed in 15 minutes should be "refreshed." (per RFC) - - [ ] Did it, but it's flawed: starts repeating the same queries again as neighboring refreshes + - [x] Did it, but it's flawed: starts repeating the same queries again as neighboring refreshes don't know about the other ones, and DHT returns the same nodes again and again. - [x] it's sending many requests now way too fast, locks up Mac OS UI annoyingly + - [x] store peers sent to us with "announce_peer" + - [ ] announced peers should be persisted - [ ] After the search is exhausted, the client then inserts the peer contact information for itself onto the responding nodes with IDs closest to the infohash of the torrent. + + To do this, a - [x] Ensure that if we query the "returned" nodes, they are even closer to our request than the responding node id was. someday: diff --git a/crates/dht/src/bprotocol.rs b/crates/dht/src/bprotocol.rs index 4f7875e..4e2e8eb 100644 --- a/crates/dht/src/bprotocol.rs +++ b/crates/dht/src/bprotocol.rs @@ -327,6 +327,15 @@ pub struct PingRequest { pub id: Id20, } +#[derive(Debug, Serialize, Deserialize)] +pub struct AnnouncePeer { + pub id: Id20, + pub implied_port: u8, + pub info_hash: Id20, + pub port: u16, + pub token: BufT, +} + #[derive(Debug, Serialize, Deserialize)] #[serde(bound(serialize = "BufT: AsRef<[u8]> + Serialize"))] #[serde(bound(deserialize = "BufT: From<&'de [u8]> + Deserialize<'de>"))] @@ -364,6 +373,7 @@ pub enum MessageKind { FindNodeRequest(FindNodeRequest), Response(Response), PingRequest(PingRequest), + AnnouncePeer(AnnouncePeer), } impl core::fmt::Debug for MessageKind { @@ -374,6 +384,7 @@ impl core::fmt::Debug for MessageKind { Self::FindNodeRequest(r) => write!(f, "{r:?}"), Self::Response(r) => write!(f, "{r:?}"), Self::PingRequest(r) => write!(f, "{r:?}"), + Self::AnnouncePeer(r) => write!(f, "{r:?}"), } } } @@ -452,6 +463,19 @@ pub fn serialize_message<'a, W: Write, BufT: Serialize + From<&'a [u8]>>( }; Ok(bencode::bencode_serialize_to_writer(msg, writer)?) } + MessageKind::AnnouncePeer(announce) => { + let msg: RawMessage = RawMessage { + message_type: MessageType::Request, + transaction_id, + error: None, + response: None, + method_name: Some(BufT::from(b"announce_peer")), + arguments: Some(announce), + ip, + version, + }; + Ok(bencode::bencode_serialize_to_writer(msg, writer)?) + } } } @@ -490,6 +514,15 @@ where kind: MessageKind::PingRequest(de.arguments.unwrap()), }) } + b"announce_peer" => { + let de: RawMessage> = bencode::from_bytes(buf)?; + Ok(Message { + transaction_id: de.transaction_id, + version: de.version, + ip: de.ip.map(|c| c.addr), + kind: MessageKind::AnnouncePeer(de.arguments.unwrap()) + }) + } other => anyhow::bail!("unsupported method {:?}", ByteBuf(other)), }, _ => anyhow::bail!( @@ -652,6 +685,22 @@ mod tests { test_deserialize_then_serialize_hex(WHAT_IS_THAT, "what_is_that") } + #[test] + fn test_announce() { + let ann = b"d1:ad2:id20:abcdefghij012345678912:implied_porti1e9:info_hash20:mnopqrstuvwxyz1234564:porti6881e5:token8:aoeusnthe1:q13:announce_peer1:t2:aa1:y1:qe"; + let msg = bprotocol::deserialize_message::(ann).unwrap(); + match &msg.kind { + bprotocol::MessageKind::AnnouncePeer(ann) => { + dbg!(&ann); + } + _ => panic!("wrong kind"), + } + let mut buf = Vec::new(); + bprotocol::serialize_message(&mut buf, msg.transaction_id, msg.version, msg.ip, msg.kind) + .unwrap(); + assert_eq!(ann[..], buf[..]); + } + #[test] fn deserialize_bencode_packets_captured_from_wireshark() { debug_hex_bencode("req: find_node", FIND_NODE_REQUEST); diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index b45f202..c7a9be3 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -14,6 +14,7 @@ use crate::{ self, CompactNodeInfo, ErrorDescription, FindNodeRequest, GetPeersRequest, Message, MessageKind, Node, PingRequest, Response, }, + peer_store::PeerStore, routing_table::{InsertResult, NodeStatus, RoutingTable}, INACTIVITY_TIMEOUT, REQUERY_INTERVAL, RESPONSE_TIMEOUT, }; @@ -488,6 +489,8 @@ pub struct DhtState { rate_limiter: RateLimiter, // This is to send raw messages worker_sender: UnboundedSender, + + peer_store: PeerStore, } impl DhtState { @@ -506,6 +509,7 @@ impl DhtState { worker_sender: sender, listen_addr, rate_limiter: make_rate_limiter(), + peer_store: PeerStore::new(id), } } @@ -660,9 +664,29 @@ impl DhtState { })?; Ok(()) } + MessageKind::AnnouncePeer(ann) => { + self.routing_table.write().mark_last_query(&ann.id); + let added = self.peer_store.store_peer(ann, addr); + trace!("{addr}: added_peer={added}, announce={ann:?}"); + let message = Message { + transaction_id: msg.transaction_id, + version: None, + ip: None, + kind: MessageKind::Response(bprotocol::Response { + id: self.id, + ..Default::default() + }), + }; + self.worker_sender.send(WorkerSendRequest { + our_tid: None, + message, + addr, + })?; + Ok(()) + } MessageKind::GetPeersRequest(req) => { - // TODO: respond with peer info, for now sending an empty response. let compact_node_info = generate_compact_nodes(req.info_hash); + let compact_peer_info = self.peer_store.get_for_info_hash(req.info_hash); self.routing_table.write().mark_last_query(&req.id); let message = Message { transaction_id: msg.transaction_id, @@ -671,7 +695,10 @@ impl DhtState { kind: MessageKind::Response(bprotocol::Response { id: self.id, nodes: Some(compact_node_info), - ..Default::default() + values: Some(compact_peer_info), + token: Some(ByteString( + self.peer_store.gen_token_for(req.id, addr).to_vec(), + )), }), }; self.worker_sender.send(WorkerSendRequest { diff --git a/crates/dht/src/lib.rs b/crates/dht/src/lib.rs index a7c50cc..94188d0 100644 --- a/crates/dht/src/lib.rs +++ b/crates/dht/src/lib.rs @@ -1,5 +1,6 @@ mod bprotocol; mod dht; +mod peer_store; mod persistence; mod routing_table; mod utils; diff --git a/crates/dht/src/peer_store.rs b/crates/dht/src/peer_store.rs new file mode 100644 index 0000000..b7a5266 --- /dev/null +++ b/crates/dht/src/peer_store.rs @@ -0,0 +1,126 @@ +use std::{ + collections::VecDeque, + net::{SocketAddr, SocketAddrV4}, + str::FromStr, + sync::atomic::AtomicU64, + time::Instant, +}; + +use bencode::ByteString; +use librqbit_core::id20::Id20; +use parking_lot::RwLock; +use rand::RngCore; +use tracing::trace; + +use crate::bprotocol::{AnnouncePeer, CompactPeerInfo, Response}; + +struct StoredToken { + token: [u8; 4], + node_id: Id20, + addr: SocketAddr, +} + +struct StoredPeer { + addr: SocketAddrV4, + time: Instant, +} + +pub struct PeerStore { + self_id: Id20, + max_remembered_tokens: usize, + max_remembered_peers: usize, + max_distance: Id20, + tokens: RwLock>, + peers: dashmap::DashMap>, + peers_len: AtomicU64, +} + +impl PeerStore { + pub fn new(self_id: Id20) -> Self { + Self { + self_id, + max_remembered_tokens: 1000, + max_remembered_peers: 1000, + max_distance: Id20::from_str("00000fffffffffffffffffffffffffffffffffff").unwrap(), + tokens: RwLock::new(VecDeque::new()), + peers: dashmap::DashMap::new(), + peers_len: AtomicU64::new(0), + } + } + + pub fn gen_token_for(&self, node_id: Id20, addr: SocketAddr) -> [u8; 4] { + let mut token = [0u8; 4]; + rand::thread_rng().fill_bytes(&mut token); + let mut tokens = self.tokens.write(); + tokens.push_back(StoredToken { + token, + node_id, + addr, + }); + if tokens.len() > self.max_remembered_tokens { + tokens.pop_front(); + } + token + } + + pub fn store_peer(&self, announce: &AnnouncePeer, addr: SocketAddr) -> bool { + // If the info_hash in announce is too far away from us, don't store it. + // If the token doesn't match, don't store it. + // If we are out of capacity, don't store it. + // Otherwise, store it. + let mut addr = match addr { + SocketAddr::V4(addr) => addr, + SocketAddr::V6(_) => { + trace!("peer store: IPv6 not supported"); + return false; + } + }; + if self.peers_len.load(std::sync::atomic::Ordering::SeqCst) + >= self.max_remembered_peers as u64 + { + trace!("peer store: out of capacity"); + return false; + } + + if announce.info_hash.distance(&self.self_id) > self.max_distance { + trace!("peer store: info_hash too far to store"); + return false; + } + if !self + .tokens + .read() + .iter() + .any(|t| t.token[..] == announce.token[..] && t.addr == std::net::SocketAddr::V4(addr)) + { + trace!("peer store: can't find this token / addr combination"); + return false; + } + if announce.implied_port == 0 { + addr.set_port(announce.port); + } + self.peers + .entry(announce.info_hash) + .or_default() + .push(StoredPeer { + addr, + time: Instant::now(), + }); + self.peers_len + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + true + } + + pub fn get_for_info_hash(&self, info_hash: Id20) -> Vec { + if let Some(stored_peers) = self.peers.get(&info_hash) { + return stored_peers + .iter() + .map(|p| CompactPeerInfo { addr: p.addr }) + .collect(); + } + Vec::new() + } + + pub fn garbage_collect_peers(&self) { + todo!() + } +}