Further e2e fixes for reliability

This commit is contained in:
Igor Katson 2024-08-19 16:02:11 +01:00
parent 6ed84ffcb3
commit 3dcf43f448
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 61 additions and 36 deletions

View file

@ -486,7 +486,7 @@ impl TorrentStateLive {
let state = self;
loop {
let addr = peer_queue_rx.recv().await.context("torrent closed")?;
if state.is_finished_and_dont_need_peers() {
if state.is_finished_and_no_active_streams() {
debug!("ignoring peer {} as we are finished", addr);
state.peers.mark_peer_not_needed(addr);
continue;
@ -656,6 +656,7 @@ impl TorrentStateLive {
Ok(())
}
// If we have all selected pieces but not necessarily all pieces.
pub(crate) fn is_finished(&self) -> bool {
self.get_hns().map(|h| h.finished()).unwrap_or_default()
}
@ -670,7 +671,9 @@ impl TorrentStateLive {
.any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id]))
}
fn is_finished_and_dont_need_peers(&self) -> bool {
// We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite
// them being selected, we aren't fully "finished".
fn is_finished_and_no_active_streams(&self) -> bool {
self.is_finished()
&& !self.has_active_streams_unfinished_files(
&self.lock_read("is_finished_and_dont_need_peers"),
@ -931,7 +934,7 @@ impl PeerHandler {
self.counters.errors.fetch_add(1, Ordering::Relaxed);
if self.state.is_finished_and_dont_need_peers() {
if self.state.is_finished_and_no_active_streams() {
debug!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return Ok(());
@ -953,7 +956,9 @@ impl PeerHandler {
duration = format!("{dur:?}")
),
async move {
debug!("waiting to reconnect again");
tokio::time::sleep(dur).await;
debug!("finished waiting");
self.state
.peers
.with_peer_mut(handle, "dead_to_queued", |peer| {
@ -1184,43 +1189,50 @@ impl PeerHandler {
.await;
}
// The job of this is to request chunks and also to keep peer alive.
// The moment this ends, the peer is disconnected.
async fn task_peer_chunk_requester(&self) -> anyhow::Result<()> {
let handle = self.addr;
self.wait_for_bitfield().await;
// TODO: this check needs to happen more often, we need to update our
// interested state with the other side, for now we send it only once.
if self.state.is_finished_and_dont_need_peers() {
self.tx
.send(WriterRequest::Message(MessageOwned::NotInterested))?;
if self
.state
.peers
.with_live(self.addr, |l| {
l.has_full_torrent(self.state.lengths.total_pieces() as usize)
})
.unwrap_or_default()
{
debug!("both peer and us have full torrent, disconnecting");
self.tx.send(WriterRequest::Disconnect(Ok(())))?;
// Sleep a bit to ensure this gets written to the network by manage_peer
tokio::time::sleep(Duration::from_millis(100)).await;
return Ok(());
let mut update_interest = {
let mut current = false;
move |h: &PeerHandler, new_value: bool| -> anyhow::Result<()> {
if new_value != current {
h.tx.send(if new_value {
WriterRequest::Message(MessageOwned::Interested)
} else {
WriterRequest::Message(MessageOwned::NotInterested)
})?;
current = new_value;
}
Ok(())
}
} else {
self.tx
.send(WriterRequest::Message(MessageOwned::Interested))?;
}
};
loop {
aframe!(self.wait_for_unchoke()).await;
if self.state.is_finished_and_dont_need_peers() {
debug!("nothing left to do, disconnecting peer");
return Ok(());
// If we have full torrent, we don't need to request more pieces.
// However we might still need to seed them to the peer.
if self.state.is_finished_and_no_active_streams() {
update_interest(self, false)?;
if !self.state.peers.is_peer_interested(self.addr) {
debug!("nothing left to do, neither of us is interested, disconnecting peer");
self.tx.send(WriterRequest::Disconnect(Ok(())))?;
// wait until the receiver gets the message so that it doesn't finish with an error.
tokio::time::sleep(Duration::from_millis(100)).await;
return Ok(());
} else {
// TODO: wait for a notification of interest, e.g. update of selected files or new streams or change
// in peer interest.
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
}
update_interest(self, true)?;
// Try steal a pice from a very slow peer first. Otherwise we might wait too long
// to download early pieces.
// Then try get the next one in queue.
@ -1235,7 +1247,8 @@ impl PeerHandler {
None => {
debug!("no pieces to request");
match aframe!(tokio::time::timeout(
Duration::from_secs(10),
// Half of default rw timeout not to race with it.
Duration::from_secs(5),
new_piece_notify
))
.await
@ -1277,7 +1290,7 @@ impl PeerHandler {
loop {
match aframe!(tokio::time::timeout(
Duration::from_secs(10),
Duration::from_secs(5),
aframe!(self.requests_sem.acquire())
))
.await

View file

@ -28,7 +28,7 @@ impl Peer {
tx: PeerTx,
counters: &AggregatePeerStatsAtomic,
) -> Self {
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx)));
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx, true)));
counters.inc(&state.0);
Self {
state,
@ -142,7 +142,10 @@ impl PeerStateNoMut {
}
match self.take(counters) {
PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => {
self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters);
self.set(
PeerState::Live(LivePeerState::new(peer_id, tx, true)),
counters,
);
}
PeerState::Connecting(..) | PeerState::Live(..) => unreachable!(),
}
@ -159,7 +162,10 @@ impl PeerStateNoMut {
PeerState::Connecting(tx) => tx,
_ => unreachable!(),
};
self.set(PeerState::Live(LivePeerState::new(peer_id, tx)), counters);
self.set(
PeerState::Live(LivePeerState::new(peer_id, tx, false)),
counters,
);
self.get_live_mut()
} else {
None
@ -189,10 +195,10 @@ pub(crate) struct LivePeerState {
}
impl LivePeerState {
pub fn new(peer_id: Id20, tx: PeerTx) -> Self {
pub fn new(peer_id: Id20, tx: PeerTx, initial_interested: bool) -> Self {
LivePeerState {
peer_id,
peer_interested: false,
peer_interested: initial_interested,
bitfield: BF::default(),
inflight_requests: Default::default(),
tx,

View file

@ -77,6 +77,11 @@ impl PeerStates {
Some(p)
}
pub fn is_peer_interested(&self, handle: PeerHandle) -> bool {
self.with_live(handle, |live| live.peer_interested)
.unwrap_or(false)
}
pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option<bool> {
self.with_live_mut(handle, "mark_peer_interested", |live| {
let prev = live.peer_interested;
@ -84,6 +89,7 @@ impl PeerStates {
prev
})
}
pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Box<[u8]>) -> Option<()> {
self.with_live_mut(handle, "update_bitfield_from_vec", |live| {
live.bitfield = BF::from_boxed_slice(bitfield);