Merge pull request #198 from ikatson/further-e2e-fixes

Further e2e fixes for reliability
This commit is contained in:
Igor Katson 2024-08-19 16:42:57 +01:00 committed by GitHub
commit e3ab7e2413
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 74 additions and 37 deletions

View file

@ -47,7 +47,7 @@ use std::{
collections::{HashMap, HashSet},
net::SocketAddr,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::{Duration, Instant},
@ -371,6 +371,7 @@ impl TorrentStateLive {
state: self.clone(),
tx,
counters,
first_message_received: AtomicBool::new(false),
};
let options = PeerConnectionOptions {
connect_timeout: self.meta.options.peer_connect_timeout,
@ -434,6 +435,7 @@ impl TorrentStateLive {
state: state.clone(),
tx,
counters,
first_message_received: AtomicBool::new(false),
};
let options = PeerConnectionOptions {
connect_timeout: state.meta.options.peer_connect_timeout,
@ -486,7 +488,7 @@ impl TorrentStateLive {
let state = self;
loop {
let addr = peer_queue_rx.recv().await.context("torrent closed")?;
if state.is_finished_and_dont_need_peers() {
if state.is_finished_and_no_active_streams() {
debug!("ignoring peer {} as we are finished", addr);
state.peers.mark_peer_not_needed(addr);
continue;
@ -656,6 +658,7 @@ impl TorrentStateLive {
Ok(())
}
// If we have all selected pieces but not necessarily all pieces.
pub(crate) fn is_finished(&self) -> bool {
self.get_hns().map(|h| h.finished()).unwrap_or_default()
}
@ -670,7 +673,9 @@ impl TorrentStateLive {
.any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id]))
}
fn is_finished_and_dont_need_peers(&self) -> bool {
// We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite
// them being selected, we aren't fully "finished".
fn is_finished_and_no_active_streams(&self) -> bool {
self.is_finished()
&& !self.has_active_streams_unfinished_files(
&self.lock_read("is_finished_and_dont_need_peers"),
@ -766,6 +771,8 @@ struct PeerHandler {
addr: SocketAddr,
tx: PeerTx,
first_message_received: AtomicBool,
}
impl<'a> PeerConnectionHandler for &'a PeerHandler {
@ -780,6 +787,14 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
}
async fn on_received_message(&self, message: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
// The first message must be "bitfield", but if it's not sent,
// assume the bitfield is all zeroes and was sent.
if !matches!(&message, Message::Bitfield(..))
&& !self.first_message_received.swap(true, Ordering::Relaxed)
{
self.on_bitfield_notify.notify_waiters();
}
match message {
Message::Request(request) => {
self.on_download_request(request)
@ -931,7 +946,7 @@ impl PeerHandler {
self.counters.errors.fetch_add(1, Ordering::Relaxed);
if self.state.is_finished_and_dont_need_peers() {
if self.state.is_finished_and_no_active_streams() {
debug!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return Ok(());
@ -953,7 +968,9 @@ impl PeerHandler {
duration = format!("{dur:?}")
),
async move {
debug!("waiting to reconnect again");
tokio::time::sleep(dur).await;
debug!("finished waiting");
self.state
.peers
.with_peer_mut(handle, "dead_to_queued", |peer| {
@ -1184,43 +1201,50 @@ impl PeerHandler {
.await;
}
// The job of this is to request chunks and also to keep peer alive.
// The moment this ends, the peer is disconnected.
async fn task_peer_chunk_requester(&self) -> anyhow::Result<()> {
let handle = self.addr;
self.wait_for_bitfield().await;
// TODO: this check needs to happen more often, we need to update our
// interested state with the other side, for now we send it only once.
if self.state.is_finished_and_dont_need_peers() {
self.tx
.send(WriterRequest::Message(MessageOwned::NotInterested))?;
if self
.state
.peers
.with_live(self.addr, |l| {
l.has_full_torrent(self.state.lengths.total_pieces() as usize)
})
.unwrap_or_default()
{
debug!("both peer and us have full torrent, disconnecting");
self.tx.send(WriterRequest::Disconnect(Ok(())))?;
// Sleep a bit to ensure this gets written to the network by manage_peer
tokio::time::sleep(Duration::from_millis(100)).await;
return Ok(());
let mut update_interest = {
let mut current = false;
move |h: &PeerHandler, new_value: bool| -> anyhow::Result<()> {
if new_value != current {
h.tx.send(if new_value {
WriterRequest::Message(MessageOwned::Interested)
} else {
WriterRequest::Message(MessageOwned::NotInterested)
})?;
current = new_value;
}
Ok(())
}
} else {
self.tx
.send(WriterRequest::Message(MessageOwned::Interested))?;
}
};
loop {
aframe!(self.wait_for_unchoke()).await;
if self.state.is_finished_and_dont_need_peers() {
debug!("nothing left to do, disconnecting peer");
return Ok(());
// If we have full torrent, we don't need to request more pieces.
// However we might still need to seed them to the peer.
if self.state.is_finished_and_no_active_streams() {
update_interest(self, false)?;
if !self.state.peers.is_peer_interested(self.addr) {
debug!("nothing left to do, neither of us is interested, disconnecting peer");
self.tx.send(WriterRequest::Disconnect(Ok(())))?;
// wait until the receiver gets the message so that it doesn't finish with an error.
tokio::time::sleep(Duration::from_millis(100)).await;
return Ok(());
} else {
// TODO: wait for a notification of interest, e.g. update of selected files or new streams or change
// in peer interest.
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
}
update_interest(self, true)?;
// Try steal a pice from a very slow peer first. Otherwise we might wait too long
// to download early pieces.
// Then try get the next one in queue.
@ -1235,7 +1259,8 @@ impl PeerHandler {
None => {
debug!("no pieces to request");
match aframe!(tokio::time::timeout(
Duration::from_secs(10),
// Half of default rw timeout not to race with it.
Duration::from_secs(5),
new_piece_notify
))
.await
@ -1277,7 +1302,7 @@ impl PeerHandler {
loop {
match aframe!(tokio::time::timeout(
Duration::from_secs(10),
Duration::from_secs(5),
aframe!(self.requests_sem.acquire())
))
.await

View file

@ -28,7 +28,7 @@ impl Peer {
tx: PeerTx,
counters: &AggregatePeerStatsAtomic,
) -> Self {
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx)));
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true)));
counters.inc(&state.0);
Self {
state,
@ -142,7 +142,10 @@ impl PeerStateNoMut {
}
match self.take(counters) {
PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => {
self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters);
self.set(
PeerState::Live(LivePeerState::new(peer_id, tx, true)),
counters,
);
}
PeerState::Connecting(..) | PeerState::Live(..) => unreachable!(),
}
@ -159,7 +162,10 @@ impl PeerStateNoMut {
PeerState::Connecting(tx) => tx,
_ => unreachable!(),
};
self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters);
self.set(
PeerState::Live(LivePeerState::new(peer_id, tx, false)),
counters,
);
self.get_live_mut()
} else {
None
@ -189,10 +195,10 @@ pub(crate) struct LivePeerState {
}
impl LivePeerState {
pub fn new(peer_id: Id20, tx: PeerTx) -> Self {
pub fn new(peer_id: Id20, tx: PeerTx, initial_interested: bool) -> Self {
LivePeerState {
peer_id,
peer_interested: false,
peer_interested: initial_interested,
bitfield: BF::default(),
inflight_requests: Default::default(),
tx,

View file

@ -77,6 +77,11 @@ impl PeerStates {
Some(p)
}
pub fn is_peer_interested(&self, handle: PeerHandle) -> bool {
self.with_live(handle, |live| live.peer_interested)
.unwrap_or(false)
}
pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option<bool> {
self.with_live_mut(handle, "mark_peer_interested", |live| {
let prev = live.peer_interested;
@ -84,6 +89,7 @@ impl PeerStates {
prev
})
}
pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Box<[u8]>) -> Option<()> {
self.with_live_mut(handle, "update_bitfield_from_vec", |live| {
live.bitfield = BF::from_boxed_slice(bitfield);