This commit is contained in:
Igor Katson 2021-07-13 13:16:59 +01:00
parent 951f610cfd
commit 1cd6caee76
6 changed files with 31 additions and 18 deletions

View file

@ -179,6 +179,7 @@ impl DhtState {
MessageKind::Response(r) => r, MessageKind::Response(r) => r,
_ => unreachable!(), _ => unreachable!(),
}; };
self.routing_table.mark_response(&response.id);
match outstanding.request { match outstanding.request {
Request::FindNode(id) => { Request::FindNode(id) => {
let nodes = response let nodes = response
@ -243,9 +244,6 @@ impl DhtState {
} }
} }
} }
InsertResult::WasExisting => {
self.routing_table.mark_response(&source);
}
_ => {} _ => {}
}; };
for node in nodes.nodes { for node in nodes.nodes {
@ -276,6 +274,8 @@ impl DhtState {
data: bprotocol::Response<ByteString>, data: bprotocol::Response<ByteString>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
self.routing_table.add_node(source, source_addr); self.routing_table.add_node(source, source_addr);
self.routing_table.mark_response(&source);
if let Some(peers) = data.values { if let Some(peers) = data.values {
let subscribers = match self.subscribers.get(&target) { let subscribers = match self.subscribers.get(&target) {
Some(subscribers) => subscribers, Some(subscribers) => subscribers,

View file

@ -1,5 +1,6 @@
use std::{collections::HashSet, net::SocketAddr}; use std::{collections::HashSet, net::SocketAddr};
use anyhow::Context;
use buffers::ByteString; use buffers::ByteString;
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
@ -32,10 +33,22 @@ pub async fn read_metainfo_from_peer_receiver<A: StreamExt<Item = SocketAddr> +
}; };
seen.insert(first_addr); seen.insert(first_addr);
let semaphore = tokio::sync::Semaphore::new(128);
let read_info_guarded = |addr| {
let semaphore = &semaphore;
async move {
let token = semaphore.acquire().await?;
let ret = peer_info_reader::read_metainfo_from_peer(addr, peer_id, info_hash)
.await
.with_context(|| format!("error reading metainfo from {}", addr));
drop(token);
ret
}
};
let mut unordered = FuturesUnordered::new(); let mut unordered = FuturesUnordered::new();
unordered.push(peer_info_reader::read_metainfo_from_peer( unordered.push(read_info_guarded(first_addr));
first_addr, peer_id, info_hash,
));
loop { loop {
tokio::select! { tokio::select! {
@ -43,7 +56,7 @@ pub async fn read_metainfo_from_peer_receiver<A: StreamExt<Item = SocketAddr> +
match next_addr { match next_addr {
Some(addr) => { Some(addr) => {
if seen.insert(addr) { if seen.insert(addr) {
unordered.push(peer_info_reader::read_metainfo_from_peer(addr, peer_id, info_hash)); unordered.push(read_info_guarded(addr));
} }
}, },
None => return ReadMetainfoResult::ChannelClosed { seen }, None => return ReadMetainfoResult::ChannelClosed { seen },
@ -53,7 +66,7 @@ pub async fn read_metainfo_from_peer_receiver<A: StreamExt<Item = SocketAddr> +
match done { match done {
Some(Ok(info)) => return ReadMetainfoResult::Found { info, seen, rx: addrs }, Some(Ok(info)) => return ReadMetainfoResult::Found { info, seen, rx: addrs },
Some(Err(e)) => { Some(Err(e)) => {
debug!("error in peer_info_reader::read_metainfo_from_peer: {}", e); debug!("{:#}", e);
}, },
None => unreachable!() None => unreachable!()
} }
@ -85,8 +98,8 @@ mod tests {
let peer_rx = dht.get_peers(info_hash).await; let peer_rx = dht.get_peers(info_hash).await;
let peer_id = generate_peer_id(); let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx).await { match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx).await {
ReadMetainfoResult::Found { info, rx, seen } => dbg!(info), ReadMetainfoResult::Found { info, .. } => dbg!(info),
ReadMetainfoResult::ChannelClosed { seen } => todo!("should not have happened"), ReadMetainfoResult::ChannelClosed { .. } => todo!("should not have happened"),
}; };
} }
} }

View file

@ -143,9 +143,9 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
debug!( debug!(
"connected peer {}: {:?}", "connected peer {}: {:?}",
self.addr, self.addr,
try_decode_peer_id(h.peer_id) try_decode_peer_id(Id20(h.peer_id))
); );
if h.info_hash != self.info_hash { if h.info_hash != self.info_hash.0 {
anyhow::bail!("info hash does not match"); anyhow::bail!("info hash does not match");
} }

View file

@ -422,7 +422,7 @@ impl TorrentState {
let mut g = self.locked.write(); let mut g = self.locked.write();
match g.peers.states.get_mut(&handle) { match g.peers.states.get_mut(&handle) {
Some(s @ &mut PeerState::Connecting) => { Some(s @ &mut PeerState::Connecting) => {
*s = PeerState::Live(LivePeerState::new(h.peer_id)); *s = PeerState::Live(LivePeerState::new(Id20(h.peer_id)));
} }
_ => { _ => {
warn!("peer {} was in wrong state", handle); warn!("peer {} was in wrong state", handle);

View file

@ -30,7 +30,7 @@ pub struct ExtendedHandshake<ByteBuf: Eq + std::hash::Hash> {
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub metadata_size: Option<u32>, pub metadata_size: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub complete_ago: Option<u32>, pub complete_ago: Option<i32>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub upload_only: Option<u32>, pub upload_only: Option<u32>,
} }

View file

@ -474,8 +474,8 @@ where
pub struct Handshake<'a> { pub struct Handshake<'a> {
pub pstr: &'a str, pub pstr: &'a str,
pub reserved: [u8; 8], pub reserved: [u8; 8],
pub info_hash: Id20, pub info_hash: [u8; 20],
pub peer_id: Id20, pub peer_id: [u8; 20],
} }
fn bopts() -> impl bincode::Options { fn bopts() -> impl bincode::Options {
@ -497,8 +497,8 @@ impl<'a> Handshake<'a> {
Handshake { Handshake {
pstr: PSTR_BT1, pstr: PSTR_BT1,
reserved: reserved_arr, reserved: reserved_arr,
info_hash, info_hash: info_hash.0,
peer_id, peer_id: peer_id.0,
} }
} }
pub fn supports_extended(&self) -> bool { pub fn supports_extended(&self) -> bool {