diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index c10ab16..db03efe 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -102,7 +102,7 @@ use self::{ atomic::PeerCountersAtomic as AtomicPeerCounters, snapshot::{PeerStatsFilter, PeerStatsSnapshot}, }, - InflightRequest, PeerRx, PeerState, PeerTx, SendMany, + InflightRequest, PeerRx, PeerState, PeerTx, }, peers::PeerStates, stats::{atomic::AtomicStats, snapshot::StatsSnapshot}, @@ -571,30 +571,6 @@ impl TorrentStateLive { TimedExistence::new(timeit(reason, || self.locked.write()), reason) } - fn get_next_needed_piece( - &self, - peer_handle: PeerHandle, - ) -> anyhow::Result> { - self.peers - .with_live_mut(peer_handle, "l(get_next_needed_piece)", |live| { - let g = self.lock_read("g(get_next_needed_piece)"); - let bf = &live.bitfield; - for n in g.get_chunks()?.iter_needed_pieces() { - if bf.get(n).map(|v| *v) == Some(true) { - // in theory it should be safe without validation, but whatever. - return Ok(self.lengths.validate_piece_index(n as u32)); - } - } - Ok(None) - }) - .transpose() - .map(|r| r.flatten()) - } - - fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { - matches!(self.get_next_needed_piece(handle), Ok(Some(_))) - } - fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { self.peers.with_peer_mut(handle, "set_peer_live", |p| { p.state @@ -1172,16 +1148,27 @@ impl PeerHandler { let handle = self.addr; self.wait_for_bitfield().await; - if !self.state.am_i_interested_in_peer(self.addr) { + // TODO: this check needs to happen more often + if self.state.is_finished() { self.tx .send(WriterRequest::Message(MessageOwned::NotInterested))?; - if self.state.is_finished() { + + if self + .state + .peers + .with_live(self.addr, |l| { + l.has_full_torrent(self.state.lengths.total_pieces() as usize) + }) + .unwrap_or_default() + { + debug!("both peer and us have full torrent, disconnecting"); self.tx.send(WriterRequest::Disconnect)?; + return Ok(()); } - return Ok(()); } + self.tx - .send_many([WriterRequest::Message(MessageOwned::Interested)])?; + .send(WriterRequest::Message(MessageOwned::Interested))?; loop { self.wait_for_unchoke().await; diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 75d182b..37cfce4 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -2,8 +2,6 @@ pub mod stats; use std::collections::HashSet; -use anyhow::Context; - use librqbit_core::id20::Id20; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; @@ -29,23 +27,9 @@ impl From<&ChunkInfo> for InflightRequest { } } -// TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak. pub(crate) type PeerRx = UnboundedReceiver; pub(crate) type PeerTx = UnboundedSender; -pub trait SendMany { - fn send_many(&self, requests: impl IntoIterator) -> anyhow::Result<()>; -} - -impl SendMany for PeerTx { - fn send_many(&self, requests: impl IntoIterator) -> anyhow::Result<()> { - requests - .into_iter() - .try_for_each(|r| self.send(r)) - .context("peer dropped") - } -} - #[derive(Debug, Default)] pub(crate) struct Peer { pub state: PeerStateNoMut, diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index e015708..502a180 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -338,6 +338,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { disable_dht: opts.disable_dht, disable_dht_persistence: opts.disable_dht_persistence, dht_config: None, + // This will be overriden by "server start" below if needed. persistence: false, persistence_filename: None, peer_id: None,