From ef441b18e6c446f568190efc773dea1ead3437e2 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 20 Nov 2023 12:24:28 +0000 Subject: [PATCH] Refactor stealing logic, make it simpler and less bugged (hopefully). Seems to work like a charm --- crates/librqbit/src/peer_state.rs | 10 +- crates/librqbit/src/torrent_state.rs | 351 ++++++++++++--------------- 2 files changed, 161 insertions(+), 200 deletions(-) diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 949736e..12c7a68 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -1,6 +1,6 @@ +use std::collections::HashSet; use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; -use std::{collections::HashSet, sync::Arc}; use anyhow::Context; use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; @@ -8,7 +8,6 @@ use librqbit_core::id20::Id20; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; use serde::Serialize; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::{Notify, Semaphore}; use crate::peer_connection::WriterRequest; use crate::type_aliases::BF; @@ -225,16 +224,11 @@ impl PeerStateNoMut { #[derive(Debug)] pub struct LivePeerState { pub peer_id: Id20, - pub i_am_choked: bool, pub peer_interested: bool, // This is used to track the pieces the peer has. pub bitfield: BF, - // This is used to only request a piece from a peer once when stealing from others. - // So that you don't steal then re-steal the same piece in a loop. - pub previously_requested_pieces: BF, - // When the peer sends us data this is used to track if we asked for it. pub inflight_requests: HashSet, @@ -246,10 +240,8 @@ impl LivePeerState { pub fn new(peer_id: Id20, tx: PeerTx) -> Self { LivePeerState { peer_id, - i_am_choked: true, peer_interested: false, bitfield: BF::new(), - previously_requested_pieces: BF::new(), inflight_requests: Default::default(), tx, } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index f5dcd0c..c02b33e 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -17,11 +17,11 @@ // - spawns new peers as they become known. It pulls them from a queue. The queue is filled in by DHT and torrent trackers. // Also gets updated when peers are reconnecting after errors. // -// Each peer has at least 2 tasks: +// Each peer has one main task "manage_peer". It's composed of 2 futures running as one task through tokio::select: // - "manage_peer" - this talks to the peer over network and calls callbacks on PeerHandler. The callbacks are not async, // and are supposed to finish quickly (apart from writing to disk, which is accounted for as "spawn_blocking"). // - "peer_chunk_requester" - this continuously sends requests for chunks to the peer. -// it MAY steal chunks/pieces from other peers, which +// it may steal chunks/pieces from other peers. // // ## Peer lifecycle // State transitions: @@ -185,13 +185,7 @@ impl PeerStates { self.stats.dec(p.state.get()); Some(p) } - pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option { - self.with_live_mut(handle, "mark_i_am_choked", |live| { - let prev = live.i_am_choked; - live.i_am_choked = is_choked; - prev - }) - } + pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option { self.with_live_mut(handle, "mark_peer_interested", |live| { let prev = live.peer_interested; @@ -201,7 +195,6 @@ impl PeerStates { } pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec) -> Option<()> { self.with_live_mut(handle, "update_bitfield_from_vec", |live| { - live.previously_requested_pieces = BF::from_vec(vec![0; bitfield.len()]); live.bitfield = BF::from_vec(bitfield); }) } @@ -475,7 +468,11 @@ impl TorrentState { let handler = PeerHandler { addr, on_bitfield_notify: Default::default(), - have_notify: Default::default(), + unchoke_notify: Default::default(), + locked: RwLock::new(PeerHandlerLocked { + i_am_choked: true, + previously_requested_pieces: BF::new(), + }), requests_sem: Semaphore::new(0), state: state.clone(), tx, @@ -496,7 +493,6 @@ impl TorrentState { spawner, ); let requester = handler.task_peer_chunk_requester(addr); - let res = tokio::select! { r = requester => {r} r = peer_connection.manage_peer(rx) => {r} @@ -584,103 +580,10 @@ impl TorrentState { })? } - fn am_i_choked(&self, peer_handle: PeerHandle) -> Option { - self.peers.with_live(peer_handle, |l| l.i_am_choked) - } - - fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { - // TODO: locking one inside the other in different order results in deadlocks. - self.peers - .with_live_mut(peer_handle, "reserve_next_needed_piece", |live| { - if live.i_am_choked { - debug!("we are choked, can't reserve next piece"); - return None; - } - let mut g = self.lock_write("reserve_next_needed_piece"); - - let n = { - let mut n_opt = None; - let bf = &live.bitfield; - for n in g.chunks.iter_needed_pieces() { - if bf.get(n).map(|v| *v) == Some(true) { - n_opt = Some(n); - break; - } - } - - self.lengths.validate_piece_index(n_opt? as u32)? - }; - g.inflight_pieces.insert( - n, - InflightPiece { - peer: peer_handle, - started: Instant::now(), - }, - ); - g.chunks.reserve_needed_piece(n); - Some(n) - }) - .flatten() - } - fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { self.get_next_needed_piece(handle).is_some() } - fn try_steal_old_slow_piece(&self, handle: PeerHandle) -> Option { - let total = self.stats.downloaded_pieces.load(Ordering::Relaxed); - - // heuristic for not enough precision in average time - if total < 20 { - return None; - } - let avg_time = self.stats.average_piece_download_time()?; - - let mut g = self.lock_write("try_steal_old_slow_piece"); - let (idx, elapsed, piece_req) = g - .inflight_pieces - .iter_mut() - // don't steal from myself - .filter(|(_, r)| r.peer != handle) - .map(|(p, r)| (p, r.started.elapsed(), r)) - .max_by_key(|(_, e, _)| *e)?; - - // heuristic for "too slow peer" - if elapsed > avg_time * 10 { - debug!( - "will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}", - idx, piece_req.peer, elapsed, avg_time - ); - piece_req.peer = handle; - piece_req.started = Instant::now(); - return Some(*idx); - } - None - } - - // NOTE: this doesn't actually "steal" it, but only returns an id we might steal. - fn try_steal_piece(&self, handle: PeerHandle) -> Option { - let mut rng = rand::thread_rng(); - use rand::seq::IteratorRandom; - - self.peers - .with_live(handle, |live| { - let g = self.lock_read("try_steal_piece"); - g.inflight_pieces - .keys() - .filter(|p| { - live.previously_requested_pieces - .get(p.get() as usize) - .map(|r| *r) - == Some(false) - }) - .filter(|p| !live.inflight_requests.iter().any(|req| req.piece == **p)) - .choose(&mut rng) - .copied() - }) - .flatten() - } - fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| { p.state @@ -901,14 +804,31 @@ impl TorrentState { } } +struct PeerHandlerLocked { + pub i_am_choked: bool, + + // This is used to only request a piece from a peer once when stealing from others. + // So that you don't steal then re-steal the same piece in a loop. + pub previously_requested_pieces: BF, +} + +// All peer state that would never be used by other actors should pe put here. struct PeerHandler { state: Arc, + + // Semantically, we don't need an RwLock here, as this is only requested from + // one future (requester + manage_peer). + // + // However as PeerConnectionHandler takes &self everywhere, we need shared mutability. + // RefCell would do, but tokio is unhappy when we use it. + locked: RwLock, + // This is used to unpause chunk requester once the bitfield // is received. on_bitfield_notify: Notify, // This is used to unpause after we were choked. - have_notify: Notify, + unchoke_notify: Notify, // This is used to limit the number of chunk requests we send to a peer at a time. requests_sem: Semaphore, @@ -923,22 +843,20 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn on_received_message(&self, message: Message>) -> anyhow::Result<()> { match message { Message::Request(request) => { - self.on_download_request(self.addr, request) + self.on_download_request(request) .context("on_download_request")?; } Message::Bitfield(b) => self - .on_bitfield(self.addr, b.clone_to_owned()) + .on_bitfield(b.clone_to_owned()) .context("on_bitfield")?, - Message::Choke => self.on_i_am_choked(self.addr), - Message::Unchoke => self.on_i_am_unchoked(self.addr), - Message::Interested => self.on_peer_interested(self.addr), - Message::Piece(piece) => self - .on_received_piece(self.addr, piece) - .context("on_received_piece")?, + Message::Choke => self.on_i_am_choked(), + Message::Unchoke => self.on_i_am_unchoked(), + Message::Interested => self.on_peer_interested(), + Message::Piece(piece) => self.on_received_piece(piece).context("on_received_piece")?, Message::KeepAlive => { debug!("keepalive received"); } - Message::Have(h) => self.on_have(self.addr, h), + Message::Have(h) => self.on_have(h), Message::NotInterested => { info!("received \"not interested\", but we don't care yet") } @@ -983,7 +901,74 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { } impl PeerHandler { - fn on_download_request(&self, peer_handle: PeerHandle, request: Request) -> anyhow::Result<()> { + fn reserve_next_needed_piece(&self) -> Option { + // TODO: locking one inside the other in different order results in deadlocks. + self.state + .peers + .with_live_mut(self.addr, "reserve_next_needed_piece", |live| { + if self.locked.read().i_am_choked { + debug!("we are choked, can't reserve next piece"); + return None; + } + let mut g = self.state.lock_write("reserve_next_needed_piece"); + + let n = { + let mut n_opt = None; + let bf = &live.bitfield; + for n in g.chunks.iter_needed_pieces() { + if bf.get(n).map(|v| *v) == Some(true) { + n_opt = Some(n); + break; + } + } + + self.state.lengths.validate_piece_index(n_opt? as u32)? + }; + g.inflight_pieces.insert( + n, + InflightPiece { + peer: self.addr, + started: Instant::now(), + }, + ); + g.chunks.reserve_needed_piece(n); + Some(n) + }) + .flatten() + } + + fn try_steal_old_slow_piece(&self, threshold: f64) -> Option { + let total = self.state.stats.downloaded_pieces.load(Ordering::Relaxed); + + // heuristic for not enough precision in average time + if total < 20 { + return None; + } + let avg_time = self.state.stats.average_piece_download_time()?; + + let mut g = self.state.lock_write("try_steal_old_slow_piece"); + let (idx, elapsed, piece_req) = g + .inflight_pieces + .iter_mut() + // don't steal from myself + .filter(|(_, r)| r.peer != self.addr) + .map(|(p, r)| (p, r.started.elapsed(), r)) + .max_by_key(|(_, e, _)| *e)?; + + // heuristic for "too slow peer" + if elapsed.as_secs_f64() > avg_time.as_secs_f64() * threshold { + debug!( + "will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}", + idx, piece_req.peer, elapsed, avg_time + ); + piece_req.peer = self.addr; + piece_req.started = Instant::now(); + return Some(*idx); + } + None + } + + fn on_download_request(&self, request: Request) -> anyhow::Result<()> { let piece_index = match self.state.lengths.validate_piece_index(request.index) { Some(p) => p, None => { @@ -1027,14 +1012,16 @@ impl PeerHandler { Ok::<_, anyhow::Error>(self.tx.send(request)?) } - fn on_have(&self, handle: PeerHandle, have: u32) { - self.state.peers.with_live_mut(handle, "on_have", |live| { - live.bitfield.set(have as usize, true); - debug!("updated bitfield with have={}", have); - }); + fn on_have(&self, have: u32) { + self.state + .peers + .with_live_mut(self.addr, "on_have", |live| { + live.bitfield.set(have as usize, true); + debug!("updated bitfield with have={}", have); + }); } - fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) -> anyhow::Result<()> { + fn on_bitfield(&self, bitfield: ByteString) -> anyhow::Result<()> { if bitfield.len() != self.state.lengths.piece_bitfield_bytes() { anyhow::bail!( "dropping peer as its bitfield has unexpected size. Got {}, expected {}", @@ -1042,11 +1029,12 @@ impl PeerHandler { self.state.lengths.piece_bitfield_bytes(), ); } + self.locked.write().previously_requested_pieces = BF::from_vec(vec![0; bitfield.len()]); self.state .peers - .update_bitfield_from_vec(handle, bitfield.0); + .update_bitfield_from_vec(self.addr, bitfield.0); - if !self.state.am_i_interested_in_peer(handle) { + if !self.state.am_i_interested_in_peer(self.addr) { self.tx .send(WriterRequest::Message(MessageOwned::Unchoke))?; self.tx @@ -1068,64 +1056,49 @@ impl PeerHandler { WriterRequest::Message(MessageOwned::Interested), ])?; - let notify = &self.have_notify; #[allow(unused_must_use)] { - timeout(Duration::from_secs(60), notify.notified()).await; + timeout(Duration::from_secs(60), self.unchoke_notify.notified()).await; } loop { - match self.state.am_i_choked(handle) { - Some(true) => { - debug!("we are choked, can't reserve next piece"); - #[allow(unused_must_use)] - { - timeout(Duration::from_secs(60), notify.notified()).await; - } - continue; + if self.locked.read().i_am_choked { + debug!("we are choked, can't reserve next piece"); + #[allow(unused_must_use)] + { + timeout(Duration::from_secs(60), self.unchoke_notify.notified()).await; } - Some(false) => {} - None => return Ok(()), + continue; } - // Try steal a pice from a very slow peer first. - let next = match self.state.try_steal_old_slow_piece(handle) { - Some(next) => next, - None => match self.state.reserve_next_needed_piece(handle) { - Some(next) => next, - None => { - if self.state.is_finished() { - debug!("nothing left to download, closing requester"); - return Ok(()); - } + if self.state.is_finished() { + debug!("nothing left to download, looping forever until manage_peer quits"); + loop { + tokio::time::sleep(Duration::from_secs(86400)).await; + } + } - if let Some(piece) = self.state.try_steal_piece(handle) { - debug!("stole a piece {}", piece); - piece - } else { - debug!("no pieces to request"); - #[allow(unused_must_use)] - { - timeout(Duration::from_secs(60), notify.notified()).await; - } - continue; - } - } - }, + // Try steal a pice from a very slow peer first. Otherwise we might wait too long + // to download early pieces. + // Then try get the next one in queue. + // Afterwards means we are close to completion, try stealing more aggressively. + let next = match self + .try_steal_old_slow_piece(10.) + .or_else(|| self.reserve_next_needed_piece()) + .or_else(|| self.try_steal_old_slow_piece(2.)) + { + Some(next) => next, + None => { + debug!("no pieces to request"); + tokio::time::sleep(Duration::from_secs(10)).await; + continue; + } }; - let sem = &self.requests_sem; - let tx = - match self - .state - .peers - .with_live_mut(handle, "peer_setup_for_piece_request", |l| { - l.previously_requested_pieces.set(next.get() as usize, true); - l.tx.clone() - }) { - Some(res) => res, - None => return Ok(()), - }; + self.locked + .write() + .previously_requested_pieces + .set(next.get() as usize, true); for chunk in self.state.lengths.iter_chunk_infos(next) { let request = Request { @@ -1156,13 +1129,14 @@ impl PeerHandler { }; loop { - match timeout(Duration::from_secs(10), sem.acquire()).await { + match timeout(Duration::from_secs(10), self.requests_sem.acquire()).await { Ok(acq) => break acq?.forget(), Err(_) => continue, }; } - if tx + if self + .tx .send(WriterRequest::Message(MessageOwned::Request(request))) .is_err() { @@ -1172,14 +1146,13 @@ impl PeerHandler { } } - fn on_i_am_choked(&self, handle: PeerHandle) { - debug!("we are choked"); - self.state.peers.mark_i_am_choked(handle, true); + fn on_i_am_choked(&self) { + self.locked.write().i_am_choked = true; } - fn on_peer_interested(&self, handle: PeerHandle) { + fn on_peer_interested(&self) { debug!("peer is interested"); - self.state.peers.mark_peer_interested(handle, true); + self.state.peers.mark_peer_interested(self.addr, true); } fn reopen_read_only(&self) -> anyhow::Result<()> { @@ -1215,18 +1188,14 @@ impl PeerHandler { Ok(()) } - fn on_i_am_unchoked(&self, handle: PeerHandle) { + fn on_i_am_unchoked(&self) { debug!("we are unchoked"); - self.have_notify.notify_waiters(); + self.locked.write().i_am_choked = false; + self.unchoke_notify.notify_waiters(); self.requests_sem.add_permits(16); - self.state - .peers - .with_live_mut(handle, "on_i_am_unchoked", |live| { - live.i_am_choked = false; - }); } - fn on_received_piece(&self, handle: PeerHandle, piece: Piece) -> anyhow::Result<()> { + fn on_received_piece(&self, piece: Piece) -> anyhow::Result<()> { let chunk_info = match self.state.lengths.chunk_info_from_received_piece( piece.index, piece.begin, @@ -1242,7 +1211,7 @@ impl PeerHandler { self.state .peers - .with_live_mut(handle, "inflight_requests.remove", |h| { + .with_live_mut(self.addr, "inflight_requests.remove", |h| { self.state .stats .fetched_bytes @@ -1266,7 +1235,7 @@ impl PeerHandler { let mut g = self.state.lock_write("mark_chunk_downloaded"); match g.inflight_pieces.get(&chunk_info.piece_index) { - Some(InflightPiece { peer, .. }) if *peer == handle => {} + Some(InflightPiece { peer, .. }) if *peer == self.addr => {} Some(InflightPiece { peer, .. }) => { debug!( "in-flight piece {} was stolen by {}, ignoring", @@ -1322,7 +1291,7 @@ impl PeerHandler { match self .state .file_ops() - .write_chunk(handle, &piece, &chunk_info) + .write_chunk(self.addr, &piece, &chunk_info) { Ok(()) => {} Err(e) => { @@ -1339,7 +1308,7 @@ impl PeerHandler { match self .state .file_ops() - .check_piece(handle, chunk_info.piece_index, &chunk_info) + .check_piece(self.addr, chunk_info.piece_index, &chunk_info) .with_context(|| format!("error checking piece={index}"))? { true => { @@ -1372,7 +1341,7 @@ impl PeerHandler { g.chunks.mark_piece_downloaded(chunk_info.piece_index); } - self.state.peers.reset_peer_backoff(handle); + self.state.peers.reset_peer_backoff(self.addr); debug!("piece={} successfully downloaded and verified", index);