Refactor stealing logic, make it simpler and less bugged (hopefully). Seems to work like a charm

This commit is contained in:
Igor Katson 2023-11-20 12:24:28 +00:00
parent 2693c0eb71
commit ef441b18e6
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 161 additions and 200 deletions

View file

@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
use std::{collections::HashSet, sync::Arc};
use anyhow::Context;
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
@ -8,7 +8,6 @@ use librqbit_core::id20::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
use serde::Serialize;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::{Notify, Semaphore};
use crate::peer_connection::WriterRequest;
use crate::type_aliases::BF;
@ -225,16 +224,11 @@ impl PeerStateNoMut {
#[derive(Debug)]
pub struct LivePeerState {
pub peer_id: Id20,
pub i_am_choked: bool,
pub peer_interested: bool,
// This is used to track the pieces the peer has.
pub bitfield: BF,
// This is used to only request a piece from a peer once when stealing from others.
// So that you don't steal then re-steal the same piece in a loop.
pub previously_requested_pieces: BF,
// When the peer sends us data this is used to track if we asked for it.
pub inflight_requests: HashSet<InflightRequest>,
@ -246,10 +240,8 @@ impl LivePeerState {
pub fn new(peer_id: Id20, tx: PeerTx) -> Self {
LivePeerState {
peer_id,
i_am_choked: true,
peer_interested: false,
bitfield: BF::new(),
previously_requested_pieces: BF::new(),
inflight_requests: Default::default(),
tx,
}

View file

@ -17,11 +17,11 @@
// - spawns new peers as they become known. It pulls them from a queue. The queue is filled in by DHT and torrent trackers.
// Also gets updated when peers are reconnecting after errors.
//
// Each peer has at least 2 tasks:
// Each peer has one main task "manage_peer". It's composed of 2 futures running as one task through tokio::select:
// - "manage_peer" - this talks to the peer over network and calls callbacks on PeerHandler. The callbacks are not async,
// and are supposed to finish quickly (apart from writing to disk, which is accounted for as "spawn_blocking").
// - "peer_chunk_requester" - this continuously sends requests for chunks to the peer.
// it MAY steal chunks/pieces from other peers, which
// it may steal chunks/pieces from other peers.
//
// ## Peer lifecycle
// State transitions:
@ -185,13 +185,7 @@ impl PeerStates {
self.stats.dec(p.state.get());
Some(p)
}
pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option<bool> {
self.with_live_mut(handle, "mark_i_am_choked", |live| {
let prev = live.i_am_choked;
live.i_am_choked = is_choked;
prev
})
}
pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option<bool> {
self.with_live_mut(handle, "mark_peer_interested", |live| {
let prev = live.peer_interested;
@ -201,7 +195,6 @@ impl PeerStates {
}
pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec<u8>) -> Option<()> {
self.with_live_mut(handle, "update_bitfield_from_vec", |live| {
live.previously_requested_pieces = BF::from_vec(vec![0; bitfield.len()]);
live.bitfield = BF::from_vec(bitfield);
})
}
@ -475,7 +468,11 @@ impl TorrentState {
let handler = PeerHandler {
addr,
on_bitfield_notify: Default::default(),
have_notify: Default::default(),
unchoke_notify: Default::default(),
locked: RwLock::new(PeerHandlerLocked {
i_am_choked: true,
previously_requested_pieces: BF::new(),
}),
requests_sem: Semaphore::new(0),
state: state.clone(),
tx,
@ -496,7 +493,6 @@ impl TorrentState {
spawner,
);
let requester = handler.task_peer_chunk_requester(addr);
let res = tokio::select! {
r = requester => {r}
r = peer_connection.manage_peer(rx) => {r}
@ -584,103 +580,10 @@ impl TorrentState {
})?
}
fn am_i_choked(&self, peer_handle: PeerHandle) -> Option<bool> {
self.peers.with_live(peer_handle, |l| l.i_am_choked)
}
fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
// TODO: locking one inside the other in different order results in deadlocks.
self.peers
.with_live_mut(peer_handle, "reserve_next_needed_piece", |live| {
if live.i_am_choked {
debug!("we are choked, can't reserve next piece");
return None;
}
let mut g = self.lock_write("reserve_next_needed_piece");
let n = {
let mut n_opt = None;
let bf = &live.bitfield;
for n in g.chunks.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
n_opt = Some(n);
break;
}
}
self.lengths.validate_piece_index(n_opt? as u32)?
};
g.inflight_pieces.insert(
n,
InflightPiece {
peer: peer_handle,
started: Instant::now(),
},
);
g.chunks.reserve_needed_piece(n);
Some(n)
})
.flatten()
}
fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool {
self.get_next_needed_piece(handle).is_some()
}
fn try_steal_old_slow_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
let total = self.stats.downloaded_pieces.load(Ordering::Relaxed);
// heuristic for not enough precision in average time
if total < 20 {
return None;
}
let avg_time = self.stats.average_piece_download_time()?;
let mut g = self.lock_write("try_steal_old_slow_piece");
let (idx, elapsed, piece_req) = g
.inflight_pieces
.iter_mut()
// don't steal from myself
.filter(|(_, r)| r.peer != handle)
.map(|(p, r)| (p, r.started.elapsed(), r))
.max_by_key(|(_, e, _)| *e)?;
// heuristic for "too slow peer"
if elapsed > avg_time * 10 {
debug!(
"will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}",
idx, piece_req.peer, elapsed, avg_time
);
piece_req.peer = handle;
piece_req.started = Instant::now();
return Some(*idx);
}
None
}
// NOTE: this doesn't actually "steal" it, but only returns an id we might steal.
fn try_steal_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
let mut rng = rand::thread_rng();
use rand::seq::IteratorRandom;
self.peers
.with_live(handle, |live| {
let g = self.lock_read("try_steal_piece");
g.inflight_pieces
.keys()
.filter(|p| {
live.previously_requested_pieces
.get(p.get() as usize)
.map(|r| *r)
== Some(false)
})
.filter(|p| !live.inflight_requests.iter().any(|req| req.piece == **p))
.choose(&mut rng)
.copied()
})
.flatten()
}
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state
@ -901,14 +804,31 @@ impl TorrentState {
}
}
struct PeerHandlerLocked {
pub i_am_choked: bool,
// This is used to only request a piece from a peer once when stealing from others.
// So that you don't steal then re-steal the same piece in a loop.
pub previously_requested_pieces: BF,
}
// All peer state that would never be used by other actors should pe put here.
struct PeerHandler {
state: Arc<TorrentState>,
// Semantically, we don't need an RwLock here, as this is only requested from
// one future (requester + manage_peer).
//
// However as PeerConnectionHandler takes &self everywhere, we need shared mutability.
// RefCell would do, but tokio is unhappy when we use it.
locked: RwLock<PeerHandlerLocked>,
// This is used to unpause chunk requester once the bitfield
// is received.
on_bitfield_notify: Notify,
// This is used to unpause after we were choked.
have_notify: Notify,
unchoke_notify: Notify,
// This is used to limit the number of chunk requests we send to a peer at a time.
requests_sem: Semaphore,
@ -923,22 +843,20 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn on_received_message(&self, message: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
match message {
Message::Request(request) => {
self.on_download_request(self.addr, request)
self.on_download_request(request)
.context("on_download_request")?;
}
Message::Bitfield(b) => self
.on_bitfield(self.addr, b.clone_to_owned())
.on_bitfield(b.clone_to_owned())
.context("on_bitfield")?,
Message::Choke => self.on_i_am_choked(self.addr),
Message::Unchoke => self.on_i_am_unchoked(self.addr),
Message::Interested => self.on_peer_interested(self.addr),
Message::Piece(piece) => self
.on_received_piece(self.addr, piece)
.context("on_received_piece")?,
Message::Choke => self.on_i_am_choked(),
Message::Unchoke => self.on_i_am_unchoked(),
Message::Interested => self.on_peer_interested(),
Message::Piece(piece) => self.on_received_piece(piece).context("on_received_piece")?,
Message::KeepAlive => {
debug!("keepalive received");
}
Message::Have(h) => self.on_have(self.addr, h),
Message::Have(h) => self.on_have(h),
Message::NotInterested => {
info!("received \"not interested\", but we don't care yet")
}
@ -983,7 +901,74 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
}
impl PeerHandler {
fn on_download_request(&self, peer_handle: PeerHandle, request: Request) -> anyhow::Result<()> {
fn reserve_next_needed_piece(&self) -> Option<ValidPieceIndex> {
// TODO: locking one inside the other in different order results in deadlocks.
self.state
.peers
.with_live_mut(self.addr, "reserve_next_needed_piece", |live| {
if self.locked.read().i_am_choked {
debug!("we are choked, can't reserve next piece");
return None;
}
let mut g = self.state.lock_write("reserve_next_needed_piece");
let n = {
let mut n_opt = None;
let bf = &live.bitfield;
for n in g.chunks.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
n_opt = Some(n);
break;
}
}
self.state.lengths.validate_piece_index(n_opt? as u32)?
};
g.inflight_pieces.insert(
n,
InflightPiece {
peer: self.addr,
started: Instant::now(),
},
);
g.chunks.reserve_needed_piece(n);
Some(n)
})
.flatten()
}
fn try_steal_old_slow_piece(&self, threshold: f64) -> Option<ValidPieceIndex> {
let total = self.state.stats.downloaded_pieces.load(Ordering::Relaxed);
// heuristic for not enough precision in average time
if total < 20 {
return None;
}
let avg_time = self.state.stats.average_piece_download_time()?;
let mut g = self.state.lock_write("try_steal_old_slow_piece");
let (idx, elapsed, piece_req) = g
.inflight_pieces
.iter_mut()
// don't steal from myself
.filter(|(_, r)| r.peer != self.addr)
.map(|(p, r)| (p, r.started.elapsed(), r))
.max_by_key(|(_, e, _)| *e)?;
// heuristic for "too slow peer"
if elapsed.as_secs_f64() > avg_time.as_secs_f64() * threshold {
debug!(
"will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}",
idx, piece_req.peer, elapsed, avg_time
);
piece_req.peer = self.addr;
piece_req.started = Instant::now();
return Some(*idx);
}
None
}
fn on_download_request(&self, request: Request) -> anyhow::Result<()> {
let piece_index = match self.state.lengths.validate_piece_index(request.index) {
Some(p) => p,
None => {
@ -1027,14 +1012,16 @@ impl PeerHandler {
Ok::<_, anyhow::Error>(self.tx.send(request)?)
}
fn on_have(&self, handle: PeerHandle, have: u32) {
self.state.peers.with_live_mut(handle, "on_have", |live| {
live.bitfield.set(have as usize, true);
debug!("updated bitfield with have={}", have);
});
fn on_have(&self, have: u32) {
self.state
.peers
.with_live_mut(self.addr, "on_have", |live| {
live.bitfield.set(have as usize, true);
debug!("updated bitfield with have={}", have);
});
}
fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) -> anyhow::Result<()> {
fn on_bitfield(&self, bitfield: ByteString) -> anyhow::Result<()> {
if bitfield.len() != self.state.lengths.piece_bitfield_bytes() {
anyhow::bail!(
"dropping peer as its bitfield has unexpected size. Got {}, expected {}",
@ -1042,11 +1029,12 @@ impl PeerHandler {
self.state.lengths.piece_bitfield_bytes(),
);
}
self.locked.write().previously_requested_pieces = BF::from_vec(vec![0; bitfield.len()]);
self.state
.peers
.update_bitfield_from_vec(handle, bitfield.0);
.update_bitfield_from_vec(self.addr, bitfield.0);
if !self.state.am_i_interested_in_peer(handle) {
if !self.state.am_i_interested_in_peer(self.addr) {
self.tx
.send(WriterRequest::Message(MessageOwned::Unchoke))?;
self.tx
@ -1068,64 +1056,49 @@ impl PeerHandler {
WriterRequest::Message(MessageOwned::Interested),
])?;
let notify = &self.have_notify;
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), notify.notified()).await;
timeout(Duration::from_secs(60), self.unchoke_notify.notified()).await;
}
loop {
match self.state.am_i_choked(handle) {
Some(true) => {
debug!("we are choked, can't reserve next piece");
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), notify.notified()).await;
}
continue;
if self.locked.read().i_am_choked {
debug!("we are choked, can't reserve next piece");
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), self.unchoke_notify.notified()).await;
}
Some(false) => {}
None => return Ok(()),
continue;
}
// Try steal a pice from a very slow peer first.
let next = match self.state.try_steal_old_slow_piece(handle) {
Some(next) => next,
None => match self.state.reserve_next_needed_piece(handle) {
Some(next) => next,
None => {
if self.state.is_finished() {
debug!("nothing left to download, closing requester");
return Ok(());
}
if self.state.is_finished() {
debug!("nothing left to download, looping forever until manage_peer quits");
loop {
tokio::time::sleep(Duration::from_secs(86400)).await;
}
}
if let Some(piece) = self.state.try_steal_piece(handle) {
debug!("stole a piece {}", piece);
piece
} else {
debug!("no pieces to request");
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), notify.notified()).await;
}
continue;
}
}
},
// Try steal a pice from a very slow peer first. Otherwise we might wait too long
// to download early pieces.
// Then try get the next one in queue.
// Afterwards means we are close to completion, try stealing more aggressively.
let next = match self
.try_steal_old_slow_piece(10.)
.or_else(|| self.reserve_next_needed_piece())
.or_else(|| self.try_steal_old_slow_piece(2.))
{
Some(next) => next,
None => {
debug!("no pieces to request");
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
};
let sem = &self.requests_sem;
let tx =
match self
.state
.peers
.with_live_mut(handle, "peer_setup_for_piece_request", |l| {
l.previously_requested_pieces.set(next.get() as usize, true);
l.tx.clone()
}) {
Some(res) => res,
None => return Ok(()),
};
self.locked
.write()
.previously_requested_pieces
.set(next.get() as usize, true);
for chunk in self.state.lengths.iter_chunk_infos(next) {
let request = Request {
@ -1156,13 +1129,14 @@ impl PeerHandler {
};
loop {
match timeout(Duration::from_secs(10), sem.acquire()).await {
match timeout(Duration::from_secs(10), self.requests_sem.acquire()).await {
Ok(acq) => break acq?.forget(),
Err(_) => continue,
};
}
if tx
if self
.tx
.send(WriterRequest::Message(MessageOwned::Request(request)))
.is_err()
{
@ -1172,14 +1146,13 @@ impl PeerHandler {
}
}
fn on_i_am_choked(&self, handle: PeerHandle) {
debug!("we are choked");
self.state.peers.mark_i_am_choked(handle, true);
fn on_i_am_choked(&self) {
self.locked.write().i_am_choked = true;
}
fn on_peer_interested(&self, handle: PeerHandle) {
fn on_peer_interested(&self) {
debug!("peer is interested");
self.state.peers.mark_peer_interested(handle, true);
self.state.peers.mark_peer_interested(self.addr, true);
}
fn reopen_read_only(&self) -> anyhow::Result<()> {
@ -1215,18 +1188,14 @@ impl PeerHandler {
Ok(())
}
fn on_i_am_unchoked(&self, handle: PeerHandle) {
fn on_i_am_unchoked(&self) {
debug!("we are unchoked");
self.have_notify.notify_waiters();
self.locked.write().i_am_choked = false;
self.unchoke_notify.notify_waiters();
self.requests_sem.add_permits(16);
self.state
.peers
.with_live_mut(handle, "on_i_am_unchoked", |live| {
live.i_am_choked = false;
});
}
fn on_received_piece(&self, handle: PeerHandle, piece: Piece<ByteBuf>) -> anyhow::Result<()> {
fn on_received_piece(&self, piece: Piece<ByteBuf>) -> anyhow::Result<()> {
let chunk_info = match self.state.lengths.chunk_info_from_received_piece(
piece.index,
piece.begin,
@ -1242,7 +1211,7 @@ impl PeerHandler {
self.state
.peers
.with_live_mut(handle, "inflight_requests.remove", |h| {
.with_live_mut(self.addr, "inflight_requests.remove", |h| {
self.state
.stats
.fetched_bytes
@ -1266,7 +1235,7 @@ impl PeerHandler {
let mut g = self.state.lock_write("mark_chunk_downloaded");
match g.inflight_pieces.get(&chunk_info.piece_index) {
Some(InflightPiece { peer, .. }) if *peer == handle => {}
Some(InflightPiece { peer, .. }) if *peer == self.addr => {}
Some(InflightPiece { peer, .. }) => {
debug!(
"in-flight piece {} was stolen by {}, ignoring",
@ -1322,7 +1291,7 @@ impl PeerHandler {
match self
.state
.file_ops()
.write_chunk(handle, &piece, &chunk_info)
.write_chunk(self.addr, &piece, &chunk_info)
{
Ok(()) => {}
Err(e) => {
@ -1339,7 +1308,7 @@ impl PeerHandler {
match self
.state
.file_ops()
.check_piece(handle, chunk_info.piece_index, &chunk_info)
.check_piece(self.addr, chunk_info.piece_index, &chunk_info)
.with_context(|| format!("error checking piece={index}"))?
{
true => {
@ -1372,7 +1341,7 @@ impl PeerHandler {
g.chunks.mark_piece_downloaded(chunk_info.piece_index);
}
self.state.peers.reset_peer_backoff(handle);
self.state.peers.reset_peer_backoff(self.addr);
debug!("piece={} successfully downloaded and verified", index);