From b3ab2c4d4cafad237f30ec66b6b252e91d556274 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 30 Nov 2023 20:50:49 +0000 Subject: [PATCH] Code to serialize peer store --- Cargo.lock | 64 +++++++++++++++ crates/dht/Cargo.toml | 5 +- crates/dht/src/dht.rs | 7 +- crates/dht/src/peer_store.rs | 147 +++++++++++++++++++++++++++------- crates/dht/src/persistence.rs | 26 ++++-- 5 files changed, 208 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3460a27..45a2730 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.4" @@ -275,6 +290,21 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "serde", + "wasm-bindgen", + "windows-targets", +] + [[package]] name = "clap" version = "4.4.8" @@ -462,6 +492,7 @@ dependencies = [ "lock_api", "once_cell", "parking_lot_core", + "serde", ] [[package]] @@ -886,6 +917,29 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "iana-time-zone" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.4.0" @@ -1090,6 +1144,7 @@ version = "3.2.0" dependencies = [ "anyhow", "backoff", + "chrono", "dashmap", "directories", "futures", @@ -2515,6 +2570,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.51.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/crates/dht/Cargo.toml b/crates/dht/Cargo.toml index 81decc1..ff6dfe6 100644 --- a/crates/dht/Cargo.toml +++ b/crates/dht/Cargo.toml @@ -32,10 +32,11 @@ futures = "0.3" rand = "0.8" indexmap = "2" directories = "5" -dashmap = "5.5.3" +dashmap = {version = "5.5.3", features = ["serde"]} clone_to_owned = {path="../clone_to_owned", package="librqbit-clone-to-owned", version = "2.2.1"} librqbit-core = {path="../librqbit_core", version = "3.1.0"} +chrono = {version = "0.4.31", features = ["serde"]} [dev-dependencies] -tracing-subscriber = "0.3" \ No newline at end of file +tracing-subscriber = "0.3" diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index c7a9be3..a9d0594 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -490,7 +490,7 @@ pub struct DhtState { // This is to send raw messages worker_sender: UnboundedSender, - peer_store: PeerStore, + pub(crate) peer_store: PeerStore, } impl DhtState { @@ -499,6 +499,7 @@ impl DhtState { sender: UnboundedSender, routing_table: Option, listen_addr: SocketAddr, + peer_store: PeerStore, ) -> Self { let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None)); Self { @@ -509,7 +510,7 @@ impl DhtState { worker_sender: sender, listen_addr, rate_limiter: make_rate_limiter(), - peer_store: PeerStore::new(id), + peer_store, } } @@ -1056,6 +1057,7 @@ pub struct DhtConfig { pub bootstrap_addrs: Option>, pub routing_table: Option, pub listen_addr: Option, + pub(crate) peer_store: Option, } impl DhtState { @@ -1089,6 +1091,7 @@ impl DhtState { in_tx, config.routing_table, listen_addr, + config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)), )); spawn(error_span!("dht"), { diff --git a/crates/dht/src/peer_store.rs b/crates/dht/src/peer_store.rs index b7a5266..259dc23 100644 --- a/crates/dht/src/peer_store.rs +++ b/crates/dht/src/peer_store.rs @@ -2,37 +2,108 @@ use std::{ collections::VecDeque, net::{SocketAddr, SocketAddrV4}, str::FromStr, - sync::atomic::AtomicU64, - time::Instant, + sync::atomic::AtomicU32, }; use bencode::ByteString; +use chrono::{DateTime, Utc}; use librqbit_core::id20::Id20; use parking_lot::RwLock; use rand::RngCore; +use serde::{ + ser::{SerializeMap, SerializeStruct}, + Deserialize, Serialize, +}; use tracing::trace; -use crate::bprotocol::{AnnouncePeer, CompactPeerInfo, Response}; +use crate::bprotocol::{AnnouncePeer, CompactPeerInfo}; +#[derive(Serialize, Deserialize)] struct StoredToken { token: [u8; 4], + #[serde(serialize_with = "crate::utils::serialize_id20")] node_id: Id20, addr: SocketAddr, } +#[derive(Serialize, Deserialize)] struct StoredPeer { addr: SocketAddrV4, - time: Instant, + time: DateTime, } pub struct PeerStore { self_id: Id20, - max_remembered_tokens: usize, - max_remembered_peers: usize, + max_remembered_tokens: u32, + max_remembered_peers: u32, max_distance: Id20, tokens: RwLock>, peers: dashmap::DashMap>, - peers_len: AtomicU64, + peers_len: AtomicU32, +} + +impl Serialize for PeerStore { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + struct SerializePeers<'a> { + peers: &'a dashmap::DashMap>, + } + + impl<'a> Serialize for SerializePeers<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut m = serializer.serialize_map(None)?; + for entry in self.peers.iter() { + m.serialize_entry(&entry.key().as_string(), &entry.value())?; + } + m.end() + } + } + + let mut s = serializer.serialize_struct("PeerStore", 7)?; + s.serialize_field("self_id", &self.self_id.as_string())?; + s.serialize_field("max_remembered_tokens", &self.max_remembered_tokens)?; + s.serialize_field("max_remembered_peers", &self.max_remembered_peers)?; + s.serialize_field("max_distance", &self.max_distance.as_string())?; + s.serialize_field("tokens", &*self.tokens.read())?; + s.serialize_field("peers", &SerializePeers { peers: &self.peers })?; + s.serialize_field( + "peers_len", + &self.peers_len.load(std::sync::atomic::Ordering::SeqCst), + )?; + s.end() + } +} + +impl<'de> Deserialize<'de> for PeerStore { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + #[derive(Deserialize)] + struct Tmp { + self_id: Id20, + max_remembered_tokens: u32, + max_remembered_peers: u32, + max_distance: Id20, + tokens: VecDeque, + peers: dashmap::DashMap>, + } + + Tmp::deserialize(deserializer).map(|tmp| Self { + self_id: tmp.self_id, + max_remembered_tokens: tmp.max_remembered_tokens, + max_remembered_peers: tmp.max_remembered_peers, + max_distance: tmp.max_distance, + tokens: RwLock::new(tmp.tokens), + peers_len: AtomicU32::new(tmp.peers.iter().map(|e| e.value().len() as u32).sum()), + peers: tmp.peers, + }) + } } impl PeerStore { @@ -44,7 +115,7 @@ impl PeerStore { max_distance: Id20::from_str("00000fffffffffffffffffffffffffffffffffff").unwrap(), tokens: RwLock::new(VecDeque::new()), peers: dashmap::DashMap::new(), - peers_len: AtomicU64::new(0), + peers_len: AtomicU32::new(0), } } @@ -54,10 +125,10 @@ impl PeerStore { let mut tokens = self.tokens.write(); tokens.push_back(StoredToken { token, - node_id, addr, + node_id, }); - if tokens.len() > self.max_remembered_tokens { + if tokens.len() > self.max_remembered_tokens as usize { tokens.pop_front(); } token @@ -75,36 +146,54 @@ impl PeerStore { return false; } }; - if self.peers_len.load(std::sync::atomic::Ordering::SeqCst) - >= self.max_remembered_peers as u64 - { - trace!("peer store: out of capacity"); - return false; - } if announce.info_hash.distance(&self.self_id) > self.max_distance { trace!("peer store: info_hash too far to store"); return false; } - if !self - .tokens - .read() - .iter() - .any(|t| t.token[..] == announce.token[..] && t.addr == std::net::SocketAddr::V4(addr)) - { + if !self.tokens.read().iter().any(|t| { + t.token[..] == announce.token[..] + && t.addr == std::net::SocketAddr::V4(addr) + && t.node_id == announce.id + }) { trace!("peer store: can't find this token / addr combination"); return false; } + if announce.implied_port == 0 { addr.set_port(announce.port); } - self.peers - .entry(announce.info_hash) - .or_default() - .push(StoredPeer { - addr, - time: Instant::now(), - }); + + use dashmap::mapref::entry::Entry; + let peers_entry = self.peers.entry(announce.info_hash); + let peers_len = self.peers_len.load(std::sync::atomic::Ordering::SeqCst); + match peers_entry { + Entry::Occupied(mut occ) => { + if let Some(s) = occ.get_mut().iter_mut().find(|s| s.addr == addr) { + s.time = Utc::now(); + return true; + } + if peers_len >= self.max_remembered_peers { + trace!("peer store: out of capacity"); + return false; + } + occ.get_mut().push(StoredPeer { + addr, + time: Utc::now(), + }); + } + Entry::Vacant(vac) => { + if peers_len >= self.max_remembered_peers { + trace!("peer store: out of capacity"); + return false; + } + vac.insert(vec![StoredPeer { + addr, + time: Utc::now(), + }]); + } + } + self.peers_len .fetch_add(1, std::sync::atomic::Ordering::SeqCst); true diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index f74b89f..08bcb8a 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -11,6 +11,7 @@ use std::time::Duration; use anyhow::Context; use tracing::{debug, error, error_span, info, trace, warn}; +use crate::peer_store::PeerStore; use crate::routing_table::RoutingTable; use crate::{Dht, DhtConfig, DhtState}; @@ -21,9 +22,10 @@ pub struct PersistentDhtConfig { } #[derive(Serialize, Deserialize)] -struct DhtSerialize { +struct DhtSerialize { addr: SocketAddr, table: Table, + peer_store: Option, } pub struct PersistentDht { @@ -40,9 +42,16 @@ fn dump_dht(dht: &Dht, filename: &Path, tempfile_name: &Path) -> anyhow::Result< let mut file = BufWriter::new(file); let addr = dht.listen_addr(); - match dht - .with_routing_table(|r| serde_json::to_writer(&mut file, &DhtSerialize { addr, table: r })) - { + match dht.with_routing_table(|r| { + serde_json::to_writer( + &mut file, + &DhtSerialize { + addr, + table: r, + peer_store: Some(&dht.peer_store), + }, + ) + }) { Ok(_) => { trace!("dumped DHT to {:?}", &tempfile_name); } @@ -79,7 +88,7 @@ impl PersistentDht { let de = match OpenOptions::new().read(true).open(&config_filename) { Ok(dht_json) => { let reader = BufReader::new(dht_json); - match serde_json::from_reader::<_, DhtSerialize>(reader) { + match serde_json::from_reader::<_, DhtSerialize>(reader) { Ok(r) => { info!("loaded DHT routing table from {:?}", &config_filename); Some(r) @@ -98,14 +107,15 @@ impl PersistentDht { _ => return Err(e).with_context(|| format!("error reading {config_filename:?}")), }, }; - let (listen_addr, routing_table) = de - .map(|de| (Some(de.addr), Some(de.table))) - .unwrap_or((None, None)); + let (listen_addr, routing_table, peer_store) = de + .map(|de| (Some(de.addr), Some(de.table), de.peer_store)) + .unwrap_or((None, None, None)); let peer_id = routing_table.as_ref().map(|r| r.id()); let dht_config = DhtConfig { peer_id, routing_table, listen_addr, + peer_store, ..Default::default() }; let dht = DhtState::with_config(dht_config).await?;