Update streaming if checks

This commit is contained in:
Igor Katson 2024-04-26 09:26:56 +01:00
parent 2b315a6524
commit 0a895c5b9c
3 changed files with 44 additions and 13 deletions

View file

@ -454,7 +454,7 @@ impl TorrentStateLive {
let state = self; let state = self;
loop { loop {
let addr = peer_queue_rx.recv().await.context("torrent closed")?; let addr = peer_queue_rx.recv().await.context("torrent closed")?;
if state.is_finished() { if state.is_finished_and_dont_need_peers() {
debug!("ignoring peer {} as we are finished", addr); debug!("ignoring peer {} as we are finished", addr);
state.peers.mark_peer_not_needed(addr); state.peers.mark_peer_not_needed(addr);
continue; continue;
@ -682,6 +682,16 @@ impl TorrentStateLive {
self.get_hns().map(|h| h.finished()).unwrap_or_default() self.get_hns().map(|h| h.finished()).unwrap_or_default()
} }
pub(crate) fn has_active_streams_unfinished_files(&self) -> bool {
self.streams
.streamed_file_ids()
.any(|file_id| !self.files[file_id].approx_is_finished())
}
pub(crate) fn is_finished_and_dont_need_peers(&self) -> bool {
self.is_finished() && !self.has_active_streams_unfinished_files()
}
fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> { fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> {
// if we have all the pieces of the file, reopen it read only // if we have all the pieces of the file, reopen it read only
for (idx, opened_file) in self for (idx, opened_file) in self
@ -703,9 +713,11 @@ impl TorrentStateLive {
info!("torrent finished downloading"); info!("torrent finished downloading");
self.finished_notify.notify_waiters(); self.finished_notify.notify_waiters();
// There is not poing being connected to peers that have all the torrent, when if !self.has_active_streams_unfinished_files() {
// we don't need anything from them, and they don't need anything from us. // There is not poing being connected to peers that have all the torrent, when
self.disconnect_all_peers_that_have_full_torrent(); // we don't need anything from them, and they don't need anything from us.
self.disconnect_all_peers_that_have_full_torrent();
}
} }
Ok(()) Ok(())
} }
@ -725,7 +737,7 @@ impl TorrentStateLive {
} }
} }
fn reconnect_all_not_needed_peers(&self) { pub(crate) fn reconnect_all_not_needed_peers(&self) {
for pe in self.peers.states.iter() { for pe in self.peers.states.iter() {
if let PeerState::NotNeeded = pe.value().state.get() { if let PeerState::NotNeeded = pe.value().state.get() {
if self.peer_queue_tx.send(*pe.key()).is_err() { if self.peer_queue_tx.send(*pe.key()).is_err() {
@ -902,7 +914,7 @@ impl PeerHandler {
self.counters.errors.fetch_add(1, Ordering::Relaxed); self.counters.errors.fetch_add(1, Ordering::Relaxed);
if self.state.is_finished() { if self.state.is_finished_and_dont_need_peers() {
trace!("torrent finished, not re-queueing"); trace!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats); pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return Ok(()); return Ok(());
@ -1155,7 +1167,7 @@ impl PeerHandler {
// TODO: this check needs to happen more often, we need to update our // TODO: this check needs to happen more often, we need to update our
// interested state with the other side, for now we send it only once. // interested state with the other side, for now we send it only once.
if self.state.is_finished() { if self.state.is_finished_and_dont_need_peers() {
self.tx self.tx
.send(WriterRequest::Message(MessageOwned::NotInterested))?; .send(WriterRequest::Message(MessageOwned::NotInterested))?;
@ -1181,11 +1193,9 @@ impl PeerHandler {
loop { loop {
self.wait_for_unchoke().await; self.wait_for_unchoke().await;
if self.state.is_finished() { if self.state.is_finished_and_dont_need_peers() {
debug!("nothing left to download, looping forever until manage_peer quits"); debug!("nothing left to do, disconnecting peer");
loop { return Ok(());
tokio::time::sleep(Duration::from_secs(86400)).await;
}
} }
// Try steal a pice from a very slow peer first. Otherwise we might wait too long // Try steal a pice from a very slow peer first. Otherwise we might wait too long

View file

@ -26,6 +26,7 @@ const PER_STREAM_BUF_PART: u64 = 10;
const PER_STREAM_BUF_MIN: u64 = 32 * 1024 * 1024; const PER_STREAM_BUF_MIN: u64 = 32 * 1024 * 1024;
struct StreamState { struct StreamState {
file_id: usize,
current_piece: ValidPieceIndex, current_piece: ValidPieceIndex,
file_len: u64, file_len: u64,
waker: Option<Waker>, waker: Option<Waker>,
@ -120,6 +121,10 @@ impl TorrentStreams {
trace!(stream_id, "dropping stream"); trace!(stream_id, "dropping stream");
self.streams.remove(&stream_id); self.streams.remove(&stream_id);
} }
pub(crate) fn streamed_file_ids(&self) -> impl Iterator<Item = usize> + '_ {
self.streams.iter().map(|s| s.value().file_id)
}
} }
pub struct FileStream { pub struct FileStream {
@ -299,6 +304,18 @@ impl ManagedTorrent {
}) })
} }
fn maybe_reconnect_needed_peers_for_file(&self, file_id: usize) {
// If we have the full file, don't bother.
if let Ok(true) = self.with_opened_file(file_id, |f| f.approx_is_finished()) {
return;
}
self.with_state(|state| {
if let crate::ManagedTorrentState::Live(l) = &state {
l.reconnect_all_not_needed_peers();
}
})
}
pub fn stream(self: Arc<Self>, file_id: usize) -> anyhow::Result<FileStream> { pub fn stream(self: Arc<Self>, file_id: usize) -> anyhow::Result<FileStream> {
let (fd_len, fd_offset) = let (fd_len, fd_offset) =
self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?; self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?;
@ -314,14 +331,17 @@ impl ManagedTorrent {
file_torrent_abs_offset: fd_offset, file_torrent_abs_offset: fd_offset,
torrent: self, torrent: self,
}; };
s.torrent.maybe_reconnect_needed_peers_for_file(file_id);
streams.streams.insert( streams.streams.insert(
s.stream_id, s.stream_id,
StreamState { StreamState {
file_id,
current_piece: first_piece, current_piece: first_piece,
waker: None, waker: None,
file_len: s.file_len, file_len: s.file_len,
}, },
); );
Ok(s) Ok(s)
} }
} }

View file

@ -312,7 +312,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
}; };
let peer_stats = &live_stats.snapshot.peer_stats; let peer_stats = &live_stats.snapshot.peer_stats;
info!( info!(
"[{}]: {:.2}% ({:.2} / {:.2}), ↓{:.2} MiB/s, ↑{:.2} MiB/s ({:.2}){}, {{live: {}, queued: {}, dead: {}}}", "[{}]: {:.2}% ({:.2} / {:.2}), ↓{:.2} MiB/s, ↑{:.2} MiB/s ({:.2}){}, {{live: {}, queued: {}, dead: {}, known: {}}}",
idx, idx,
downloaded_pct, downloaded_pct,
SF::new(progress), SF::new(progress),
@ -324,6 +324,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
peer_stats.live, peer_stats.live,
peer_stats.queued + peer_stats.connecting, peer_stats.queued + peer_stats.connecting,
peer_stats.dead, peer_stats.dead,
peer_stats.seen,
); );
} }
}); });