diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 6d90908..fb8308c 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -179,6 +179,7 @@ impl DhtState { MessageKind::Response(r) => r, _ => unreachable!(), }; + self.routing_table.mark_response(&response.id); match outstanding.request { Request::FindNode(id) => { let nodes = response @@ -243,9 +244,6 @@ impl DhtState { } } } - InsertResult::WasExisting => { - self.routing_table.mark_response(&source); - } _ => {} }; for node in nodes.nodes { @@ -276,6 +274,8 @@ impl DhtState { data: bprotocol::Response, ) -> anyhow::Result<()> { self.routing_table.add_node(source, source_addr); + self.routing_table.mark_response(&source); + if let Some(peers) = data.values { let subscribers = match self.subscribers.get(&target) { Some(subscribers) => subscribers, diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index 27ced68..ce74841 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -1,5 +1,6 @@ use std::{collections::HashSet, net::SocketAddr}; +use anyhow::Context; use buffers::ByteString; use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; @@ -32,10 +33,22 @@ pub async fn read_metainfo_from_peer_receiver + }; seen.insert(first_addr); + let semaphore = tokio::sync::Semaphore::new(128); + + let read_info_guarded = |addr| { + let semaphore = &semaphore; + async move { + let token = semaphore.acquire().await?; + let ret = peer_info_reader::read_metainfo_from_peer(addr, peer_id, info_hash) + .await + .with_context(|| format!("error reading metainfo from {}", addr)); + drop(token); + ret + } + }; + let mut unordered = FuturesUnordered::new(); - unordered.push(peer_info_reader::read_metainfo_from_peer( - first_addr, peer_id, info_hash, - )); + unordered.push(read_info_guarded(first_addr)); loop { tokio::select! { @@ -43,7 +56,7 @@ pub async fn read_metainfo_from_peer_receiver + match next_addr { Some(addr) => { if seen.insert(addr) { - unordered.push(peer_info_reader::read_metainfo_from_peer(addr, peer_id, info_hash)); + unordered.push(read_info_guarded(addr)); } }, None => return ReadMetainfoResult::ChannelClosed { seen }, @@ -53,7 +66,7 @@ pub async fn read_metainfo_from_peer_receiver + match done { Some(Ok(info)) => return ReadMetainfoResult::Found { info, seen, rx: addrs }, Some(Err(e)) => { - debug!("error in peer_info_reader::read_metainfo_from_peer: {}", e); + debug!("{:#}", e); }, None => unreachable!() } @@ -85,8 +98,8 @@ mod tests { let peer_rx = dht.get_peers(info_hash).await; let peer_id = generate_peer_id(); match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx).await { - ReadMetainfoResult::Found { info, rx, seen } => dbg!(info), - ReadMetainfoResult::ChannelClosed { seen } => todo!("should not have happened"), + ReadMetainfoResult::Found { info, .. } => dbg!(info), + ReadMetainfoResult::ChannelClosed { .. } => todo!("should not have happened"), }; } } diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 8236016..6f1fcfe 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -143,9 +143,9 @@ impl PeerConnection { debug!( "connected peer {}: {:?}", self.addr, - try_decode_peer_id(h.peer_id) + try_decode_peer_id(Id20(h.peer_id)) ); - if h.info_hash != self.info_hash { + if h.info_hash != self.info_hash.0 { anyhow::bail!("info hash does not match"); } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 8432195..04d75f4 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -422,7 +422,7 @@ impl TorrentState { let mut g = self.locked.write(); match g.peers.states.get_mut(&handle) { Some(s @ &mut PeerState::Connecting) => { - *s = PeerState::Live(LivePeerState::new(h.peer_id)); + *s = PeerState::Live(LivePeerState::new(Id20(h.peer_id))); } _ => { warn!("peer {} was in wrong state", handle); diff --git a/crates/peer_binary_protocol/src/extended/handshake.rs b/crates/peer_binary_protocol/src/extended/handshake.rs index 5e9ef09..db41803 100644 --- a/crates/peer_binary_protocol/src/extended/handshake.rs +++ b/crates/peer_binary_protocol/src/extended/handshake.rs @@ -30,7 +30,7 @@ pub struct ExtendedHandshake { #[serde(skip_serializing_if = "Option::is_none")] pub metadata_size: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub complete_ago: Option, + pub complete_ago: Option, #[serde(skip_serializing_if = "Option::is_none")] pub upload_only: Option, } diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index 675c497..514a4da 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -474,8 +474,8 @@ where pub struct Handshake<'a> { pub pstr: &'a str, pub reserved: [u8; 8], - pub info_hash: Id20, - pub peer_id: Id20, + pub info_hash: [u8; 20], + pub peer_id: [u8; 20], } fn bopts() -> impl bincode::Options { @@ -497,8 +497,8 @@ impl<'a> Handshake<'a> { Handshake { pstr: PSTR_BT1, reserved: reserved_arr, - info_hash, - peer_id, + info_hash: info_hash.0, + peer_id: peer_id.0, } } pub fn supports_extended(&self) -> bool {