1/n Added peer store

This commit is contained in:
Igor Katson 2023-11-30 19:33:02 +00:00
parent 80b153dbca
commit 261ad3cc7c
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
5 changed files with 213 additions and 5 deletions

View file

@ -327,6 +327,15 @@ pub struct PingRequest {
pub id: Id20,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct AnnouncePeer<BufT> {
pub id: Id20,
pub implied_port: u8,
pub info_hash: Id20,
pub port: u16,
pub token: BufT,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound(serialize = "BufT: AsRef<[u8]> + Serialize"))]
#[serde(bound(deserialize = "BufT: From<&'de [u8]> + Deserialize<'de>"))]
@ -364,6 +373,7 @@ pub enum MessageKind<BufT> {
FindNodeRequest(FindNodeRequest),
Response(Response<BufT>),
PingRequest(PingRequest),
AnnouncePeer(AnnouncePeer<BufT>),
}
impl<BufT: core::fmt::Debug> core::fmt::Debug for MessageKind<BufT> {
@ -374,6 +384,7 @@ impl<BufT: core::fmt::Debug> core::fmt::Debug for MessageKind<BufT> {
Self::FindNodeRequest(r) => write!(f, "{r:?}"),
Self::Response(r) => write!(f, "{r:?}"),
Self::PingRequest(r) => write!(f, "{r:?}"),
Self::AnnouncePeer(r) => write!(f, "{r:?}"),
}
}
}
@ -452,6 +463,19 @@ pub fn serialize_message<'a, W: Write, BufT: Serialize + From<&'a [u8]>>(
};
Ok(bencode::bencode_serialize_to_writer(msg, writer)?)
}
MessageKind::AnnouncePeer(announce) => {
let msg: RawMessage<BufT, _, ()> = RawMessage {
message_type: MessageType::Request,
transaction_id,
error: None,
response: None,
method_name: Some(BufT::from(b"announce_peer")),
arguments: Some(announce),
ip,
version,
};
Ok(bencode::bencode_serialize_to_writer(msg, writer)?)
}
}
}
@ -490,6 +514,15 @@ where
kind: MessageKind::PingRequest(de.arguments.unwrap()),
})
}
b"announce_peer" => {
let de: RawMessage<BufT, AnnouncePeer<BufT>> = bencode::from_bytes(buf)?;
Ok(Message {
transaction_id: de.transaction_id,
version: de.version,
ip: de.ip.map(|c| c.addr),
kind: MessageKind::AnnouncePeer(de.arguments.unwrap())
})
}
other => anyhow::bail!("unsupported method {:?}", ByteBuf(other)),
},
_ => anyhow::bail!(
@ -652,6 +685,22 @@ mod tests {
test_deserialize_then_serialize_hex(WHAT_IS_THAT, "what_is_that")
}
#[test]
fn test_announce() {
let ann = b"d1:ad2:id20:abcdefghij012345678912:implied_porti1e9:info_hash20:mnopqrstuvwxyz1234564:porti6881e5:token8:aoeusnthe1:q13:announce_peer1:t2:aa1:y1:qe";
let msg = bprotocol::deserialize_message::<ByteBuf>(ann).unwrap();
match &msg.kind {
bprotocol::MessageKind::AnnouncePeer(ann) => {
dbg!(&ann);
}
_ => panic!("wrong kind"),
}
let mut buf = Vec::new();
bprotocol::serialize_message(&mut buf, msg.transaction_id, msg.version, msg.ip, msg.kind)
.unwrap();
assert_eq!(ann[..], buf[..]);
}
#[test]
fn deserialize_bencode_packets_captured_from_wireshark() {
debug_hex_bencode("req: find_node", FIND_NODE_REQUEST);

View file

@ -14,6 +14,7 @@ use crate::{
self, CompactNodeInfo, ErrorDescription, FindNodeRequest, GetPeersRequest, Message,
MessageKind, Node, PingRequest, Response,
},
peer_store::PeerStore,
routing_table::{InsertResult, NodeStatus, RoutingTable},
INACTIVITY_TIMEOUT, REQUERY_INTERVAL, RESPONSE_TIMEOUT,
};
@ -488,6 +489,8 @@ pub struct DhtState {
rate_limiter: RateLimiter,
// This is to send raw messages
worker_sender: UnboundedSender<WorkerSendRequest>,
peer_store: PeerStore,
}
impl DhtState {
@ -506,6 +509,7 @@ impl DhtState {
worker_sender: sender,
listen_addr,
rate_limiter: make_rate_limiter(),
peer_store: PeerStore::new(id),
}
}
@ -660,9 +664,29 @@ impl DhtState {
})?;
Ok(())
}
MessageKind::AnnouncePeer(ann) => {
self.routing_table.write().mark_last_query(&ann.id);
let added = self.peer_store.store_peer(ann, addr);
trace!("{addr}: added_peer={added}, announce={ann:?}");
let message = Message {
transaction_id: msg.transaction_id,
version: None,
ip: None,
kind: MessageKind::Response(bprotocol::Response {
id: self.id,
..Default::default()
}),
};
self.worker_sender.send(WorkerSendRequest {
our_tid: None,
message,
addr,
})?;
Ok(())
}
MessageKind::GetPeersRequest(req) => {
// TODO: respond with peer info, for now sending an empty response.
let compact_node_info = generate_compact_nodes(req.info_hash);
let compact_peer_info = self.peer_store.get_for_info_hash(req.info_hash);
self.routing_table.write().mark_last_query(&req.id);
let message = Message {
transaction_id: msg.transaction_id,
@ -671,7 +695,10 @@ impl DhtState {
kind: MessageKind::Response(bprotocol::Response {
id: self.id,
nodes: Some(compact_node_info),
..Default::default()
values: Some(compact_peer_info),
token: Some(ByteString(
self.peer_store.gen_token_for(req.id, addr).to_vec(),
)),
}),
};
self.worker_sender.send(WorkerSendRequest {

View file

@ -1,5 +1,6 @@
mod bprotocol;
mod dht;
mod peer_store;
mod persistence;
mod routing_table;
mod utils;

View file

@ -0,0 +1,126 @@
use std::{
collections::VecDeque,
net::{SocketAddr, SocketAddrV4},
str::FromStr,
sync::atomic::AtomicU64,
time::Instant,
};
use bencode::ByteString;
use librqbit_core::id20::Id20;
use parking_lot::RwLock;
use rand::RngCore;
use tracing::trace;
use crate::bprotocol::{AnnouncePeer, CompactPeerInfo, Response};
struct StoredToken {
token: [u8; 4],
node_id: Id20,
addr: SocketAddr,
}
struct StoredPeer {
addr: SocketAddrV4,
time: Instant,
}
pub struct PeerStore {
self_id: Id20,
max_remembered_tokens: usize,
max_remembered_peers: usize,
max_distance: Id20,
tokens: RwLock<VecDeque<StoredToken>>,
peers: dashmap::DashMap<Id20, Vec<StoredPeer>>,
peers_len: AtomicU64,
}
impl PeerStore {
pub fn new(self_id: Id20) -> Self {
Self {
self_id,
max_remembered_tokens: 1000,
max_remembered_peers: 1000,
max_distance: Id20::from_str("00000fffffffffffffffffffffffffffffffffff").unwrap(),
tokens: RwLock::new(VecDeque::new()),
peers: dashmap::DashMap::new(),
peers_len: AtomicU64::new(0),
}
}
pub fn gen_token_for(&self, node_id: Id20, addr: SocketAddr) -> [u8; 4] {
let mut token = [0u8; 4];
rand::thread_rng().fill_bytes(&mut token);
let mut tokens = self.tokens.write();
tokens.push_back(StoredToken {
token,
node_id,
addr,
});
if tokens.len() > self.max_remembered_tokens {
tokens.pop_front();
}
token
}
pub fn store_peer(&self, announce: &AnnouncePeer<ByteString>, addr: SocketAddr) -> bool {
// If the info_hash in announce is too far away from us, don't store it.
// If the token doesn't match, don't store it.
// If we are out of capacity, don't store it.
// Otherwise, store it.
let mut addr = match addr {
SocketAddr::V4(addr) => addr,
SocketAddr::V6(_) => {
trace!("peer store: IPv6 not supported");
return false;
}
};
if self.peers_len.load(std::sync::atomic::Ordering::SeqCst)
>= self.max_remembered_peers as u64
{
trace!("peer store: out of capacity");
return false;
}
if announce.info_hash.distance(&self.self_id) > self.max_distance {
trace!("peer store: info_hash too far to store");
return false;
}
if !self
.tokens
.read()
.iter()
.any(|t| t.token[..] == announce.token[..] && t.addr == std::net::SocketAddr::V4(addr))
{
trace!("peer store: can't find this token / addr combination");
return false;
}
if announce.implied_port == 0 {
addr.set_port(announce.port);
}
self.peers
.entry(announce.info_hash)
.or_default()
.push(StoredPeer {
addr,
time: Instant::now(),
});
self.peers_len
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
true
}
pub fn get_for_info_hash(&self, info_hash: Id20) -> Vec<CompactPeerInfo> {
if let Some(stored_peers) = self.peers.get(&info_hash) {
return stored_peers
.iter()
.map(|p| CompactPeerInfo { addr: p.addr })
.collect();
}
Vec::new()
}
pub fn garbage_collect_peers(&self) {
todo!()
}
}