From 6a23f311e1123971610c71fedeeea31ec630d018 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 7 Apr 2024 15:33:07 +0400 Subject: [PATCH] Send cancellations on steal --- crates/librqbit/src/torrent_state/live/mod.rs | 62 +++++++++++-------- .../src/torrent_state/live/peer/mod.rs | 18 +----- .../src/torrent_state/live/peers/mod.rs | 24 +++++++ crates/librqbit_core/src/lengths.rs | 2 +- 4 files changed, 62 insertions(+), 44 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index ece1090..012c0cc 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -96,7 +96,7 @@ use self::{ atomic::PeerCountersAtomic as AtomicPeerCounters, snapshot::{PeerStatsFilter, PeerStatsSnapshot}, }, - InflightRequest, PeerRx, PeerState, PeerTx, + PeerRx, PeerState, PeerTx, }, peers::PeerStates, stats::{atomic::AtomicStats, snapshot::StatsSnapshot}, @@ -894,11 +894,11 @@ impl PeerHandler { for req in live.inflight_requests { debug!( "peer dead, marking chunk request cancelled, index={}, chunk={}", - req.piece.get(), - req.chunk + req.piece_index.get(), + req.chunk_index ); g.get_chunks_mut()? - .mark_chunk_request_cancelled(req.piece, req.chunk); + .mark_chunk_request_cancelled(req.piece_index, req.chunk_index); } } PeerState::NotNeeded => { @@ -1030,26 +1030,37 @@ impl PeerHandler { None => return None, }; - let mut g = self.state.lock_write("try_steal_old_slow_piece"); - let (idx, elapsed, piece_req) = g - .inflight_pieces - .iter_mut() - // don't steal from myself - .filter(|(_, r)| r.peer != self.addr) - .map(|(p, r)| (p, r.started.elapsed(), r)) - .max_by_key(|(_, e, _)| *e)?; + let (stolen_idx, from_peer) = { + let mut g = self.state.lock_write("try_steal_old_slow_piece"); + let (idx, elapsed, piece_req) = g + .inflight_pieces + .iter_mut() + // don't steal from myself + .filter(|(_, r)| r.peer != self.addr) + .map(|(p, r)| (p, r.started.elapsed(), r)) + .max_by_key(|(_, e, _)| *e)?; - // heuristic for "too slow peer" - if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold { - debug!( - "will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}", - idx, piece_req.peer, elapsed, my_avg_time - ); - piece_req.peer = self.addr; - piece_req.started = Instant::now(); - return Some(*idx); + // heuristic for "too slow peer" + if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold { + debug!( + "will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}", + idx, piece_req.peer, elapsed, my_avg_time + ); + let old = piece_req.peer; + piece_req.peer = self.addr; + piece_req.started = Instant::now(); + (*idx, old) + } else { + return None; + } + }; + + // Send cancellations to old peer. + { + self.state.peers.send_cancellations(from_peer, stolen_idx); } - None + + Some(stolen_idx) } fn on_download_request(&self, request: Request) -> anyhow::Result<()> { @@ -1225,7 +1236,7 @@ impl PeerHandler { .state .peers .with_live_mut(handle, "add chunk request", |live| { - live.inflight_requests.insert(InflightRequest::from(&chunk)) + live.inflight_requests.insert(chunk) }) { Some(true) => {} Some(false) => { @@ -1310,10 +1321,7 @@ impl PeerHandler { self.state .peers .with_live_mut(self.addr, "inflight_requests.remove", |h| { - if !h - .inflight_requests - .remove(&InflightRequest::from(&chunk_info)) - { + if !h.inflight_requests.remove(&chunk_info) { anyhow::bail!( "peer sent us a piece we did not ask. Requested pieces: {:?}. Got: {:?}", &h.inflight_requests, diff --git a/crates/librqbit/src/torrent_state/live/peer/mod.rs b/crates/librqbit/src/torrent_state/live/peer/mod.rs index 50c95da..c96d846 100644 --- a/crates/librqbit/src/torrent_state/live/peer/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peer/mod.rs @@ -3,7 +3,7 @@ pub mod stats; use std::collections::HashSet; use librqbit_core::hash_id::Id20; -use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; +use librqbit_core::lengths::ChunkInfo; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; @@ -12,21 +12,7 @@ use crate::type_aliases::BF; use super::peers::stats::atomic::AggregatePeerStatsAtomic; -#[derive(Debug, Hash, PartialEq, Eq)] -pub(crate) struct InflightRequest { - pub piece: ValidPieceIndex, - pub chunk: u32, -} - -impl From<&ChunkInfo> for InflightRequest { - fn from(c: &ChunkInfo) -> Self { - Self { - piece: c.piece_index, - chunk: c.chunk_index, - } - } -} - +pub(crate) type InflightRequest = ChunkInfo; pub(crate) type PeerRx = UnboundedReceiver; pub(crate) type PeerTx = UnboundedSender; diff --git a/crates/librqbit/src/torrent_state/live/peers/mod.rs b/crates/librqbit/src/torrent_state/live/peers/mod.rs index 4e51d7b..d258e4f 100644 --- a/crates/librqbit/src/torrent_state/live/peers/mod.rs +++ b/crates/librqbit/src/torrent_state/live/peers/mod.rs @@ -3,8 +3,11 @@ use std::net::SocketAddr; use anyhow::Context; use backoff::backoff::Backoff; use dashmap::DashMap; +use librqbit_core::lengths::ValidPieceIndex; +use peer_binary_protocol::{Message, Request}; use crate::{ + peer_connection::WriterRequest, torrent_state::utils::{atomic_inc, TimedExistence}, type_aliases::{PeerHandle, BF}, }; @@ -109,4 +112,25 @@ impl PeerStates { })?; Some(prev) } + + pub(crate) fn send_cancellations(&self, from_peer: SocketAddr, stolen_idx: ValidPieceIndex) { + self.with_live_mut(from_peer, "send_cancellations", |live| { + let to_remove = live + .inflight_requests + .iter() + .filter(|r| r.piece_index == stolen_idx) + .copied() + .collect::>(); + for req in to_remove { + let _ = live + .tx + .send(WriterRequest::Message(Message::Cancel(Request { + index: stolen_idx.get(), + begin: req.offset, + length: req.size, + }))); + live.inflight_requests.remove(&req); + } + }); + } } diff --git a/crates/librqbit_core/src/lengths.rs b/crates/librqbit_core/src/lengths.rs index 3ebab9b..3e76463 100644 --- a/crates/librqbit_core/src/lengths.rs +++ b/crates/librqbit_core/src/lengths.rs @@ -19,7 +19,7 @@ pub struct PieceInfo { pub len: u32, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct ChunkInfo { pub piece_index: ValidPieceIndex,