diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 5cff57f..7cdd5a6 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -15,7 +15,7 @@ use crate::{ MessageKind, Node, PingRequest, Response, }, routing_table::{InsertResult, RoutingTable}, - REQUERY_INTERVAL, RESPONSE_TIMEOUT, + INACTIVITY_TIMEOUT, REQUERY_INTERVAL, RESPONSE_TIMEOUT, }; use anyhow::{bail, Context}; use backoff::{backoff::Backoff, ExponentialBackoffBuilder}; @@ -190,10 +190,11 @@ impl Stream for RequestPeersStream { } impl RecursiveRequest { - async fn bootstrap(dht: Arc, target: Id20, hostname: &str) -> anyhow::Result<()> { - let addrs = tokio::net::lookup_host(hostname) - .await - .with_context(|| format!("error looking up {}", hostname))?; + async fn find_node_for_routing_table( + dht: Arc, + target: Id20, + addrs: impl Iterator, + ) -> anyhow::Result<()> { let (node_tx, mut node_rx) = unbounded_channel(); let req = RecursiveRequest { info_hash: target, @@ -728,9 +729,10 @@ impl DhtWorker { } async fn bootstrap_hostname(&self, hostname: &str) -> anyhow::Result<()> { - RecursiveRequest::bootstrap(self.dht.clone(), self.dht.id, hostname) - .instrument(error_span!("bootstrap", hostname = hostname)) + let addrs = tokio::net::lookup_host(hostname) .await + .with_context(|| format!("error looking up {}", hostname))?; + RecursiveRequest::find_node_for_routing_table(self.dht.clone(), self.dht.id, addrs).await } async fn bootstrap_hostname_with_backoff(&self, addr: &str) -> anyhow::Result<()> { @@ -742,7 +744,11 @@ impl DhtWorker { .build(); loop { - let backoff = match self.bootstrap_hostname(addr).await { + let backoff = match self + .bootstrap_hostname(addr) + .instrument(error_span!("bootstrap", hostname = addr)) + .await + { Ok(_) => return Ok(()), Err(e) => { warn!("error: {}", e); @@ -776,7 +782,48 @@ impl DhtWorker { } async fn bucket_refresher(&self) -> anyhow::Result<()> { - todo!() + let (tx, mut rx) = unbounded_channel(); + + let mut futs = FuturesUnordered::new(); + let filler = async { + let mut interval = tokio::time::interval(INACTIVITY_TIMEOUT); + tokio::time::sleep(INACTIVITY_TIMEOUT).await; + loop { + interval.tick().await; + for bucket in self.dht.routing_table.read().iter_buckets() { + if bucket.leaf.last_refreshed.elapsed() < INACTIVITY_TIMEOUT { + continue; + } + let random_id = bucket.random_within(); + tx.send(random_id).unwrap(); + } + } + }; + + tokio::pin!(filler); + + loop { + tokio::select! { + _ = &mut filler => {}, + random_id = rx.recv() => { + let random_id = random_id.unwrap(); + let addrs = self + .dht + .routing_table + .read() + .sorted_by_distance_from(random_id) + .iter() + .map(|n| n.addr()) + .take(8).collect::>(); + futs.push( + RecursiveRequest::find_node_for_routing_table( + self.dht.clone(), random_id, addrs.into_iter() + ).instrument(error_span!("refresh_bucket", random_id=format!("{:?}", random_id))) + ); + }, + _ = futs.next(), if !futs.is_empty() => {}, + } + } } async fn pinger(&self, mut rx: UnboundedReceiver<(Id20, SocketAddr)>) -> anyhow::Result<()> { @@ -902,11 +949,13 @@ impl DhtWorker { .instrument(debug_span!("dht_responese_reader")); let pinger = self.pinger(ping_rx); + let bucket_refresher = self.bucket_refresher(); tokio::pin!(framer); tokio::pin!(bootstrap); tokio::pin!(response_reader); tokio::pin!(pinger); + tokio::pin!(bucket_refresher); loop { tokio::select! { @@ -920,6 +969,9 @@ impl DhtWorker { err = &mut pinger => { anyhow::bail!("pinger quit: {:?}", err) }, + err = &mut bucket_refresher => { + anyhow::bail!("bucket_refresher quit: {:?}", err) + }, err = &mut response_reader => {anyhow::bail!("response reader quit: {:?}", err)} } } diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index db45de5..4223757 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -1,6 +1,7 @@ use std::{net::SocketAddr, time::Instant}; use librqbit_core::id20::Id20; +use rand::RngCore; use serde::{ ser::{SerializeMap, SerializeStruct}, Deserialize, Serialize, Serializer, @@ -10,9 +11,9 @@ use tracing::debug; use crate::INACTIVITY_TIMEOUT; #[derive(Clone, Debug)] -struct LeafBucket { - nodes: Vec, - last_refreshed: Instant, +pub struct LeafBucket { + pub nodes: Vec, + pub last_refreshed: Instant, } impl Serialize for LeafBucket { @@ -177,61 +178,70 @@ impl Serialize for BucketTree { } } -pub struct BucketTreeIterator<'a> { +pub struct BucketTreeIteratorItem<'a> { + pub bits: u8, + pub start: &'a Id20, + pub end_inclusive: &'a Id20, + pub leaf: &'a LeafBucket, +} + +impl<'a> BucketTreeIteratorItem<'a> { + pub fn random_within(&self) -> Id20 { + generate_random_id(self.start, self.bits) + } +} + +struct BucketTreeIterator<'a> { tree: &'a BucketTree, - current: std::slice::Iter<'a, RoutingTableNode>, queue: Vec, } impl<'a> BucketTreeIterator<'a> { fn new(tree: &'a BucketTree) -> Self { - let mut queue = Vec::new(); - let mut current = 0; - let current_slice = loop { - match &tree.data[current].data { - BucketTreeNodeData::Leaf(leaf) => break leaf.nodes.iter(), - BucketTreeNodeData::LeftRight(left, right) => { - queue.push(*right); - current = *left; - } - } - }; - BucketTreeIterator { - tree, - current: current_slice, - queue, - } + let queue = vec![0]; + BucketTreeIterator { tree, queue } } } impl<'a> Iterator for BucketTreeIterator<'a> { - type Item = &'a RoutingTableNode; + type Item = BucketTreeIteratorItem<'a>; fn next(&mut self) -> Option { - if let Some(v) = self.current.next() { - return Some(v); - }; - loop { let idx = self.queue.pop()?; - match &self.tree.data[idx].data { - BucketTreeNodeData::Leaf(leaf) => { - self.current = leaf.nodes.iter(); - match self.current.next() { - Some(v) => return Some(v), - None => continue, + match self.tree.data.get(idx) { + Some(node) => match &node.data { + BucketTreeNodeData::Leaf(leaf) => { + return Some(BucketTreeIteratorItem { + bits: node.bits, + start: &node.start, + end_inclusive: &node.end_inclusive, + leaf, + }); } - } - BucketTreeNodeData::LeftRight(left, right) => { - self.queue.push(*right); - self.queue.push(*left); - continue; - } + BucketTreeNodeData::LeftRight(left, right) => { + self.queue.push(*right); + self.queue.push(*left); + continue; + } + }, + None => continue, } } } } +pub fn generate_random_id(start: &Id20, bits: u8) -> Id20 { + let mut data = [0u8; 20]; + rand::thread_rng().fill_bytes(&mut data); + let mut data = Id20(data); + let remaining_bits = 160 - bits; + for bit in 0..remaining_bits { + data.set_bit(bit, start.get_bit(bit)); + } + data +} + fn compute_split_start_end( start: Id20, end_inclusive: Id20, @@ -297,10 +307,15 @@ impl BucketTree { }], } } - pub fn iter(&self) -> BucketTreeIterator<'_> { + + fn iter_leaves(&self) -> BucketTreeIterator<'_> { BucketTreeIterator::new(self) } + fn iter(&self) -> impl Iterator + '_ { + self.iter_leaves().flat_map(|l| l.leaf.nodes.iter()) + } + fn get_leaf(&self, id: &Id20) -> usize { let mut idx = 0; loop { @@ -602,6 +617,10 @@ impl RoutingTable { result } + pub fn iter_buckets(&self) -> impl Iterator> + '_ { + self.buckets.iter_leaves() + } + pub fn add_node( &mut self, id: Id20, diff --git a/crates/librqbit_core/src/id20.rs b/crates/librqbit_core/src/id20.rs index eee5adc..8562a3d 100644 --- a/crates/librqbit_core/src/id20.rs +++ b/crates/librqbit_core/src/id20.rs @@ -102,6 +102,12 @@ impl Id20 { } Id20(xor) } + pub fn get_bit(&self, bit: u8) -> bool { + let n = self.0[(bit / 8) as usize]; + let mask = !(1 << (7 - bit % 8)); + n & mask > 0 + } + pub fn set_bit(&mut self, bit: u8, value: bool) { let n = &mut self.0[(bit / 8) as usize]; if value {