diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index a0ea3bc..e223d06 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -8,8 +8,9 @@ pub struct ChunkTracker { // This forms the basis of a "queue" to pull from. // It's set to 1 if we need a piece, but the moment we start requesting a peer, // it's set to 0. - - // Better to rename into piece_queue or smth, and maybe use some other form of a queue. + // + // Initially this is the opposite of "have", until we start making requests. + // An in-flight request is not in "needed", and not in "have". needed_pieces: BF, // This has a bit set per each chunk (block) that we have written to the output file. @@ -21,6 +22,7 @@ pub struct ChunkTracker { lengths: Lengths, + // What pieces to download first. priority_piece_ids: Vec, } @@ -168,17 +170,6 @@ impl ChunkTracker { piece.index, chunk_info, chunk_range, ); - // TODO: remove me, it's for debugging - // { - // use std::io::Write; - // let mut f = std::fs::OpenOptions::new() - // .write(true) - // .create(true) - // .open("/tmp/chunks") - // .unwrap(); - // write!(f, "{:?}", &self.have).unwrap(); - // } - if chunk_range.all() { return Some(ChunkMarkingResult::Completed); } diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 874146e..d076e2f 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -224,7 +224,7 @@ pub struct LivePeerState { pub i_am_choked: bool, pub peer_interested: bool, - // This is used to limit the number of requests we send to a peer at a time. + // This is used to limit the number of chunk requests we send to a peer at a time. pub requests_sem: Arc, // This is used to unpause processes after we were choked. diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index ee7affd..b5aa72b 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -170,9 +170,8 @@ impl PeerStates { } pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec) -> Option<()> { self.with_live_mut(handle, "update_bitfield_from_vec", |live| { - let bitfield = BF::from_vec(bitfield); - live.previously_requested_pieces = BF::with_capacity(bitfield.len()); - live.bitfield = bitfield; + live.previously_requested_pieces = BF::from_vec(vec![0; bitfield.len()]); + live.bitfield = BF::from_vec(bitfield); }) } pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result { @@ -205,14 +204,12 @@ impl PeerStates { } pub struct TorrentStateLocked { + // What chunks we have and need. pub chunks: ChunkTracker, - pub inflight_pieces: HashMap, -} -impl TorrentStateLocked { - pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option { - self.inflight_pieces.remove(&piece) - } + // At a moment in time, we are expecting a piece from only one peer. + // inflight_pieces stores this information. + pub inflight_pieces: HashMap, } #[derive(Default, Debug)] @@ -623,15 +620,22 @@ impl TorrentState { None } - // TODO: need to throttle this or make it smarter as it may loop and steal pieces forever from each other. + // NOTE: this doesn't actually "steal" it, but only returns an id we might steal. fn try_steal_piece(&self, handle: PeerHandle) -> Option { let mut rng = rand::thread_rng(); use rand::seq::IteratorRandom; + self.peers .with_live(handle, |live| { let g = self.lock_read("try_steal_piece"); g.inflight_pieces .keys() + .filter(|p| { + live.previously_requested_pieces + .get(p.get() as usize) + .map(|r| *r) + == Some(false) + }) .filter(|p| !live.inflight_requests.iter().any(|req| req.piece == **p)) .choose(&mut rng) .copied() @@ -1102,45 +1106,46 @@ impl PeerHandler { }, }; - let (tx, sem) = match self - .state - .peers - .with_live(handle, |l| (l.tx.clone(), l.requests_sem.clone())) - { - Some((tx, sem)) => (tx, sem), - None => return Ok(()), - }; - - for chunk in self.state.lengths.iter_chunk_infos(next) { - if self - .state - .lock_read("is_chunk_downloaded") - .chunks - .is_chunk_downloaded(&chunk) - { - continue; - } - + let (tx, sem) = match self .state .peers - .with_live_mut(handle, "inflight_requests.insert", |l| { - l.inflight_requests.insert(InflightRequest::from(&chunk)) + .with_live_mut(handle, "peer_setup_for_piece_request", |l| { + l.previously_requested_pieces.set(next.get() as usize, true); + (l.tx.clone(), l.requests_sem.clone()) }) { - Some(true) => {} - Some(false) => { - warn!("probably a bug, we already requested {:?}", chunk); - continue; - } + Some(res) => res, None => return Ok(()), - } + }; + for chunk in self.state.lengths.iter_chunk_infos(next) { let request = Request { index: next.get(), begin: chunk.offset, length: chunk.size, }; + match self + .state + .peers + .with_live_mut(handle, "add chunk request", |live| { + live.inflight_requests.insert(InflightRequest::from(&chunk)) + }) { + Some(true) => {} + Some(false) => { + // This request was already in-flight for this peer for this chunk. + // This might happen in theory, but not very likely. + // + // Example: + // someone stole a piece from us, and then died, the piece became "needed" again, and we reserved it + // all before the piece request was processed by us. + warn!("we already requested {:?} previously", chunk); + continue; + } + // peer died + None => return Ok(()), + }; + loop { match timeout(Duration::from_secs(10), sem.acquire()).await { Ok(acq) => break acq?.forget(), @@ -1241,12 +1246,33 @@ impl PeerHandler { let full_piece_download_time = { let mut g = self.state.lock_write("mark_chunk_downloaded"); + match g.inflight_pieces.get(&chunk_info.piece_index) { + Some(InflightPiece { peer, .. }) if *peer == handle => {} + Some(InflightPiece { peer, .. }) => { + debug!( + "in-flight piece {} was stolen by {}, ignoring", + chunk_info.piece_index, peer + ); + return Ok(()); + } + None => { + debug!( + "in-flight piece {} not found. it was probably completed by someone else", + chunk_info.piece_index + ); + return Ok(()); + } + }; + match g.chunks.mark_chunk_downloaded(&piece) { Some(ChunkMarkingResult::Completed) => { debug!("piece={} done, will write and checksum", piece.index,); // This will prevent others from stealing it. - g.remove_inflight_piece(chunk_info.piece_index) - .map(|t| t.started.elapsed()) + { + let piece = chunk_info.piece_index; + g.inflight_pieces.remove(&piece) + } + .map(|t| t.started.elapsed()) } Some(ChunkMarkingResult::PreviouslyCompleted) => { // TODO: we might need to send cancellations here. @@ -1263,6 +1289,9 @@ impl PeerHandler { } }; + // By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would + // have fallen off above in one of the defensive checks. + self.spawner .spawn_block_in_place(move || { let index = piece.index;