Send cancellations on steal

This commit is contained in:
Igor Katson 2024-04-07 15:33:07 +04:00
parent 841c84ff25
commit 6a23f311e1
4 changed files with 62 additions and 44 deletions

View file

@ -96,7 +96,7 @@ use self::{
atomic::PeerCountersAtomic as AtomicPeerCounters,
snapshot::{PeerStatsFilter, PeerStatsSnapshot},
},
InflightRequest, PeerRx, PeerState, PeerTx,
PeerRx, PeerState, PeerTx,
},
peers::PeerStates,
stats::{atomic::AtomicStats, snapshot::StatsSnapshot},
@ -894,11 +894,11 @@ impl PeerHandler {
for req in live.inflight_requests {
debug!(
"peer dead, marking chunk request cancelled, index={}, chunk={}",
req.piece.get(),
req.chunk
req.piece_index.get(),
req.chunk_index
);
g.get_chunks_mut()?
.mark_chunk_request_cancelled(req.piece, req.chunk);
.mark_chunk_request_cancelled(req.piece_index, req.chunk_index);
}
}
PeerState::NotNeeded => {
@ -1030,26 +1030,37 @@ impl PeerHandler {
None => return None,
};
let mut g = self.state.lock_write("try_steal_old_slow_piece");
let (idx, elapsed, piece_req) = g
.inflight_pieces
.iter_mut()
// don't steal from myself
.filter(|(_, r)| r.peer != self.addr)
.map(|(p, r)| (p, r.started.elapsed(), r))
.max_by_key(|(_, e, _)| *e)?;
let (stolen_idx, from_peer) = {
let mut g = self.state.lock_write("try_steal_old_slow_piece");
let (idx, elapsed, piece_req) = g
.inflight_pieces
.iter_mut()
// don't steal from myself
.filter(|(_, r)| r.peer != self.addr)
.map(|(p, r)| (p, r.started.elapsed(), r))
.max_by_key(|(_, e, _)| *e)?;
// heuristic for "too slow peer"
if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold {
debug!(
"will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}",
idx, piece_req.peer, elapsed, my_avg_time
);
piece_req.peer = self.addr;
piece_req.started = Instant::now();
return Some(*idx);
// heuristic for "too slow peer"
if elapsed.as_secs_f64() > my_avg_time.as_secs_f64() * threshold {
debug!(
"will steal piece {} from {}: elapsed time {:?}, my avg piece time: {:?}",
idx, piece_req.peer, elapsed, my_avg_time
);
let old = piece_req.peer;
piece_req.peer = self.addr;
piece_req.started = Instant::now();
(*idx, old)
} else {
return None;
}
};
// Send cancellations to old peer.
{
self.state.peers.send_cancellations(from_peer, stolen_idx);
}
None
Some(stolen_idx)
}
fn on_download_request(&self, request: Request) -> anyhow::Result<()> {
@ -1225,7 +1236,7 @@ impl PeerHandler {
.state
.peers
.with_live_mut(handle, "add chunk request", |live| {
live.inflight_requests.insert(InflightRequest::from(&chunk))
live.inflight_requests.insert(chunk)
}) {
Some(true) => {}
Some(false) => {
@ -1310,10 +1321,7 @@ impl PeerHandler {
self.state
.peers
.with_live_mut(self.addr, "inflight_requests.remove", |h| {
if !h
.inflight_requests
.remove(&InflightRequest::from(&chunk_info))
{
if !h.inflight_requests.remove(&chunk_info) {
anyhow::bail!(
"peer sent us a piece we did not ask. Requested pieces: {:?}. Got: {:?}",
&h.inflight_requests,

View file

@ -3,7 +3,7 @@ pub mod stats;
use std::collections::HashSet;
use librqbit_core::hash_id::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
use librqbit_core::lengths::ChunkInfo;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
@ -12,21 +12,7 @@ use crate::type_aliases::BF;
use super::peers::stats::atomic::AggregatePeerStatsAtomic;
#[derive(Debug, Hash, PartialEq, Eq)]
pub(crate) struct InflightRequest {
pub piece: ValidPieceIndex,
pub chunk: u32,
}
impl From<&ChunkInfo> for InflightRequest {
fn from(c: &ChunkInfo) -> Self {
Self {
piece: c.piece_index,
chunk: c.chunk_index,
}
}
}
pub(crate) type InflightRequest = ChunkInfo;
pub(crate) type PeerRx = UnboundedReceiver<WriterRequest>;
pub(crate) type PeerTx = UnboundedSender<WriterRequest>;

View file

@ -3,8 +3,11 @@ use std::net::SocketAddr;
use anyhow::Context;
use backoff::backoff::Backoff;
use dashmap::DashMap;
use librqbit_core::lengths::ValidPieceIndex;
use peer_binary_protocol::{Message, Request};
use crate::{
peer_connection::WriterRequest,
torrent_state::utils::{atomic_inc, TimedExistence},
type_aliases::{PeerHandle, BF},
};
@ -109,4 +112,25 @@ impl PeerStates {
})?;
Some(prev)
}
pub(crate) fn send_cancellations(&self, from_peer: SocketAddr, stolen_idx: ValidPieceIndex) {
self.with_live_mut(from_peer, "send_cancellations", |live| {
let to_remove = live
.inflight_requests
.iter()
.filter(|r| r.piece_index == stolen_idx)
.copied()
.collect::<Vec<_>>();
for req in to_remove {
let _ = live
.tx
.send(WriterRequest::Message(Message::Cancel(Request {
index: stolen_idx.get(),
begin: req.offset,
length: req.size,
})));
live.inflight_requests.remove(&req);
}
});
}
}

View file

@ -19,7 +19,7 @@ pub struct PieceInfo {
pub len: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ChunkInfo {
pub piece_index: ValidPieceIndex,