DHT announce compiles

This commit is contained in:
Igor Katson 2023-12-05 21:13:31 +00:00
parent 80df2c1001
commit 162afe3056
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -1,6 +1,7 @@
use std::{
cmp::Reverse,
net::SocketAddr,
net::{SocketAddr, SocketAddrV4},
str::FromStr,
sync::{
atomic::{AtomicU16, Ordering},
Arc,
@ -11,8 +12,8 @@ use std::{
use crate::{
bprotocol::{
self, CompactNodeInfo, ErrorDescription, FindNodeRequest, GetPeersRequest, Message,
MessageKind, Node, PingRequest, Response,
self, AnnouncePeer, CompactNodeInfo, ErrorDescription, FindNodeRequest, GetPeersRequest,
Message, MessageKind, Node, PingRequest, Response,
},
peer_store::PeerStore,
routing_table::{InsertResult, NodeStatus, RoutingTable},
@ -93,17 +94,52 @@ trait RecursiveRequestCallbacks: Sized + Send + Sync + 'static {
);
}
struct RecursiveRequestCallbacksGetPeers {}
struct RecursiveRequestCallbacksGetPeers {
// Id20::from_str("00000fffffffffffffffffffffffffffffffffff").unwrap()
min_distance_to_announce: Id20,
}
impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers {
fn on_request_start(&self, _: &RecursiveRequest<Self>, _: Id20, _: SocketAddr) {}
fn on_request_end(
&self,
_: &RecursiveRequest<Self>,
_: Id20,
_: SocketAddr,
_: &anyhow::Result<ResponseOrError>,
req: &RecursiveRequest<Self>,
target_node: Id20,
addr: SocketAddr,
resp: &anyhow::Result<ResponseOrError>,
) {
let announce_addr = match req.dht.announce_addr {
Some(a) => a,
None => return,
};
let resp = match resp {
Ok(ResponseOrError::Response(resp)) => resp,
_ => return,
};
let token = match &resp.token {
Some(token) => token,
None => return,
};
if req.info_hash.distance(&target_node) > self.min_distance_to_announce {
trace!(
"not announcing, {:?} is too far from {:?}",
target_node,
req.info_hash
);
return;
}
let (tid, message) = req.dht.create_request(Request::Announce {
info_hash: req.info_hash,
token: token.clone(),
addr: announce_addr,
});
let _ = req.dht.worker_sender.send(WorkerSendRequest {
our_tid: Some(tid),
message,
addr,
});
}
}
@ -165,7 +201,12 @@ impl RequestPeersStream {
useful_nodes: RwLock::new(Vec::new()),
peer_tx,
node_tx,
callbacks: RecursiveRequestCallbacksGetPeers {},
callbacks: RecursiveRequestCallbacksGetPeers {
min_distance_to_announce: Id20::from_str(
"000000ffffffffffffffffffffffffffffffffff",
)
.unwrap(),
},
});
let join_handle = rp.request_peers_forever(node_rx);
Self {
@ -351,7 +392,7 @@ impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> {
self.callbacks.on_request_start(self, id, addr);
}
let response = self.dht.request(self.request, addr).await.map(|r| {
let response = self.dht.request(self.request.clone(), addr).await.map(|r| {
self.mark_node_responded(addr, &r);
r
});
@ -359,7 +400,7 @@ impl<C: RecursiveRequestCallbacks> RecursiveRequest<C> {
self.callbacks.on_request_end(self, id, addr, &response);
}
let response = match self.dht.request(self.request, addr).await {
let response = match self.dht.request(self.request.clone(), addr).await {
Ok(ResponseOrError::Response(r)) => r,
Ok(ResponseOrError::Error(e)) => bail!("error response: {:?}", e),
Err(e) => {
@ -493,6 +534,7 @@ pub struct DhtState {
worker_sender: UnboundedSender<WorkerSendRequest>,
pub(crate) peer_store: PeerStore,
announce_addr: Option<SocketAddrV4>,
}
impl DhtState {
@ -502,6 +544,7 @@ impl DhtState {
routing_table: Option<RoutingTable>,
listen_addr: SocketAddr,
peer_store: PeerStore,
announce_addr: Option<SocketAddrV4>,
) -> Self {
let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None));
Self {
@ -513,6 +556,7 @@ impl DhtState {
listen_addr,
rate_limiter: make_rate_limiter(),
peer_store,
announce_addr,
}
}
@ -581,6 +625,22 @@ impl DhtState {
ip: None,
kind: MessageKind::PingRequest(PingRequest { id: self.id }),
},
Request::Announce {
info_hash,
token,
addr,
} => Message {
kind: MessageKind::AnnouncePeer(AnnouncePeer {
id: self.id,
implied_port: 0,
info_hash,
port: addr.port(),
token,
}),
transaction_id: ByteString::from(transaction_id_buf.as_ref()),
version: None,
ip: None,
},
};
(transaction_id, message)
}
@ -744,10 +804,15 @@ impl DhtState {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
enum Request {
GetPeers(Id20),
FindNode(Id20),
Announce {
info_hash: Id20,
token: ByteString,
addr: SocketAddrV4,
},
Ping,
}
@ -1095,6 +1160,13 @@ impl DhtState {
config.routing_table,
listen_addr,
config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)),
config.announce_addr.and_then(|a| match a {
SocketAddr::V4(v4) => Some(v4),
SocketAddr::V6(_) => {
warn!("libqrqbit-dht doesn't support announcing IPv6 addresses");
None
}
}),
));
spawn(error_span!("dht"), {