use std::{ collections::VecDeque, net::{SocketAddr, SocketAddrV4}, str::FromStr, sync::atomic::AtomicU32, }; use bencode::ByteBufOwned; use chrono::{DateTime, Utc}; use librqbit_core::hash_id::Id20; use parking_lot::RwLock; use rand::RngCore; use serde::{ ser::{SerializeMap, SerializeStruct}, Deserialize, Serialize, }; use tracing::trace; use crate::bprotocol::{AnnouncePeer, CompactPeerInfo}; #[derive(Serialize, Deserialize)] struct StoredToken { token: [u8; 4], #[serde(serialize_with = "crate::utils::serialize_id20")] node_id: Id20, addr: SocketAddr, } #[derive(Serialize, Deserialize)] struct StoredPeer { addr: SocketAddrV4, time: DateTime, } pub struct PeerStore { self_id: Id20, max_remembered_tokens: u32, max_remembered_peers: u32, max_distance: Id20, tokens: RwLock>, peers: dashmap::DashMap>, peers_len: AtomicU32, } impl Serialize for PeerStore { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { struct SerializePeers<'a> { peers: &'a dashmap::DashMap>, } impl<'a> Serialize for SerializePeers<'a> { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, { let mut m = serializer.serialize_map(None)?; for entry in self.peers.iter() { m.serialize_entry(&entry.key().as_string(), &entry.value())?; } m.end() } } let mut s = serializer.serialize_struct("PeerStore", 7)?; s.serialize_field("self_id", &self.self_id.as_string())?; s.serialize_field("max_remembered_tokens", &self.max_remembered_tokens)?; s.serialize_field("max_remembered_peers", &self.max_remembered_peers)?; s.serialize_field("max_distance", &self.max_distance.as_string())?; s.serialize_field("tokens", &*self.tokens.read())?; s.serialize_field("peers", &SerializePeers { peers: &self.peers })?; s.serialize_field( "peers_len", &self.peers_len.load(std::sync::atomic::Ordering::SeqCst), )?; s.end() } } impl<'de> Deserialize<'de> for PeerStore { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { #[derive(Deserialize)] struct Tmp { self_id: Id20, max_remembered_tokens: u32, max_remembered_peers: u32, max_distance: Id20, tokens: VecDeque, peers: dashmap::DashMap>, } Tmp::deserialize(deserializer).map(|tmp| Self { self_id: tmp.self_id, max_remembered_tokens: tmp.max_remembered_tokens, max_remembered_peers: tmp.max_remembered_peers, max_distance: tmp.max_distance, tokens: RwLock::new(tmp.tokens), peers_len: AtomicU32::new(tmp.peers.iter().map(|e| e.value().len() as u32).sum()), peers: tmp.peers, }) } } impl PeerStore { pub fn new(self_id: Id20) -> Self { Self { self_id, max_remembered_tokens: 1000, max_remembered_peers: 1000, max_distance: Id20::from_str("00000fffffffffffffffffffffffffffffffffff").unwrap(), tokens: RwLock::new(VecDeque::new()), peers: dashmap::DashMap::new(), peers_len: AtomicU32::new(0), } } pub fn gen_token_for(&self, node_id: Id20, addr: SocketAddr) -> [u8; 4] { let mut token = [0u8; 4]; rand::thread_rng().fill_bytes(&mut token); let mut tokens = self.tokens.write(); tokens.push_back(StoredToken { token, addr, node_id, }); if tokens.len() > self.max_remembered_tokens as usize { tokens.pop_front(); } token } pub fn store_peer(&self, announce: &AnnouncePeer, addr: SocketAddr) -> bool { // If the info_hash in announce is too far away from us, don't store it. // If the token doesn't match, don't store it. // If we are out of capacity, don't store it. // Otherwise, store it. let mut addr = match addr { SocketAddr::V4(addr) => addr, SocketAddr::V6(_) => { trace!("peer store: IPv6 not supported"); return false; } }; if announce.info_hash.distance(&self.self_id) > self.max_distance { trace!("peer store: info_hash too far to store"); return false; } if !self.tokens.read().iter().any(|t| { t.token[..] == announce.token[..] && t.addr == std::net::SocketAddr::V4(addr) && t.node_id == announce.id }) { trace!("peer store: can't find this token / addr combination"); return false; } if announce.implied_port == 0 { addr.set_port(announce.port); } use dashmap::mapref::entry::Entry; let peers_entry = self.peers.entry(announce.info_hash); let peers_len = self.peers_len.load(std::sync::atomic::Ordering::SeqCst); match peers_entry { Entry::Occupied(mut occ) => { if let Some(s) = occ.get_mut().iter_mut().find(|s| s.addr == addr) { s.time = Utc::now(); return true; } if peers_len >= self.max_remembered_peers { trace!("peer store: out of capacity"); return false; } occ.get_mut().push(StoredPeer { addr, time: Utc::now(), }); } Entry::Vacant(vac) => { if peers_len >= self.max_remembered_peers { trace!("peer store: out of capacity"); return false; } vac.insert(vec![StoredPeer { addr, time: Utc::now(), }]); } } self.peers_len .fetch_add(1, std::sync::atomic::Ordering::SeqCst); true } pub fn get_for_info_hash(&self, info_hash: Id20) -> Vec { if let Some(stored_peers) = self.peers.get(&info_hash) { return stored_peers .iter() .map(|p| CompactPeerInfo { addr: p.addr }) .collect(); } Vec::new() } #[allow(dead_code)] pub fn garbage_collect_peers(&self) { todo!() } }