diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 217fd9c..3105a99 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -6,6 +6,7 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, }, + time::Instant, }; use futures::{stream::FuturesUnordered, StreamExt}; @@ -25,11 +26,16 @@ use crate::{ type_aliases::{PeerHandle, BF}, }; +pub struct InflightPiece { + pub peer: PeerHandle, + pub started: Instant, +} + #[derive(Default)] pub struct PeerStates { states: HashMap, seen: HashSet, - inflight_pieces: HashSet, + inflight_pieces: HashMap, tx: HashMap>>, } @@ -130,7 +136,7 @@ impl PeerStates { pub fn clone_tx(&self, handle: PeerHandle) -> Option>> { Some(self.tx.get(&handle)?.clone()) } - pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> bool { + pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option { self.inflight_pieces.remove(&piece) } } @@ -201,7 +207,13 @@ impl TorrentState { self.lengths.validate_piece_index(n_opt? as u32)? }; - g.peers.inflight_pieces.insert(n); + g.peers.inflight_pieces.insert( + n, + InflightPiece { + peer: peer_handle, + started: Instant::now(), + }, + ); g.chunks.reserve_needed_piece(n); Some(n) } @@ -217,7 +229,7 @@ impl TorrentState { let pl = g.peers.get_live(handle)?; g.peers .inflight_pieces - .iter() + .keys() .filter(|p| !pl.inflight_requests.iter().any(|req| req.piece == **p)) .choose(&mut rng) .copied()