From 0a895c5b9c929b5d4db59e955797e0e1f0816e9f Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 26 Apr 2024 09:26:56 +0100 Subject: [PATCH] Update streaming if checks --- crates/librqbit/src/torrent_state/live/mod.rs | 34 ++++++++++++------- .../librqbit/src/torrent_state/streaming.rs | 20 +++++++++++ crates/rqbit/src/main.rs | 3 +- 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 6c69c58..35f2f2d 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -454,7 +454,7 @@ impl TorrentStateLive { let state = self; loop { let addr = peer_queue_rx.recv().await.context("torrent closed")?; - if state.is_finished() { + if state.is_finished_and_dont_need_peers() { debug!("ignoring peer {} as we are finished", addr); state.peers.mark_peer_not_needed(addr); continue; @@ -682,6 +682,16 @@ impl TorrentStateLive { self.get_hns().map(|h| h.finished()).unwrap_or_default() } + pub(crate) fn has_active_streams_unfinished_files(&self) -> bool { + self.streams + .streamed_file_ids() + .any(|file_id| !self.files[file_id].approx_is_finished()) + } + + pub(crate) fn is_finished_and_dont_need_peers(&self) -> bool { + self.is_finished() && !self.has_active_streams_unfinished_files() + } + fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> { // if we have all the pieces of the file, reopen it read only for (idx, opened_file) in self @@ -703,9 +713,11 @@ impl TorrentStateLive { info!("torrent finished downloading"); self.finished_notify.notify_waiters(); - // There is not poing being connected to peers that have all the torrent, when - // we don't need anything from them, and they don't need anything from us. - self.disconnect_all_peers_that_have_full_torrent(); + if !self.has_active_streams_unfinished_files() { + // There is not poing being connected to peers that have all the torrent, when + // we don't need anything from them, and they don't need anything from us. + self.disconnect_all_peers_that_have_full_torrent(); + } } Ok(()) } @@ -725,7 +737,7 @@ impl TorrentStateLive { } } - fn reconnect_all_not_needed_peers(&self) { + pub(crate) fn reconnect_all_not_needed_peers(&self) { for pe in self.peers.states.iter() { if let PeerState::NotNeeded = pe.value().state.get() { if self.peer_queue_tx.send(*pe.key()).is_err() { @@ -902,7 +914,7 @@ impl PeerHandler { self.counters.errors.fetch_add(1, Ordering::Relaxed); - if self.state.is_finished() { + if self.state.is_finished_and_dont_need_peers() { trace!("torrent finished, not re-queueing"); pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); @@ -1155,7 +1167,7 @@ impl PeerHandler { // TODO: this check needs to happen more often, we need to update our // interested state with the other side, for now we send it only once. - if self.state.is_finished() { + if self.state.is_finished_and_dont_need_peers() { self.tx .send(WriterRequest::Message(MessageOwned::NotInterested))?; @@ -1181,11 +1193,9 @@ impl PeerHandler { loop { self.wait_for_unchoke().await; - if self.state.is_finished() { - debug!("nothing left to download, looping forever until manage_peer quits"); - loop { - tokio::time::sleep(Duration::from_secs(86400)).await; - } + if self.state.is_finished_and_dont_need_peers() { + debug!("nothing left to do, disconnecting peer"); + return Ok(()); } // Try steal a pice from a very slow peer first. Otherwise we might wait too long diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 1814d1a..839fab0 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -26,6 +26,7 @@ const PER_STREAM_BUF_PART: u64 = 10; const PER_STREAM_BUF_MIN: u64 = 32 * 1024 * 1024; struct StreamState { + file_id: usize, current_piece: ValidPieceIndex, file_len: u64, waker: Option, @@ -120,6 +121,10 @@ impl TorrentStreams { trace!(stream_id, "dropping stream"); self.streams.remove(&stream_id); } + + pub(crate) fn streamed_file_ids(&self) -> impl Iterator + '_ { + self.streams.iter().map(|s| s.value().file_id) + } } pub struct FileStream { @@ -299,6 +304,18 @@ impl ManagedTorrent { }) } + fn maybe_reconnect_needed_peers_for_file(&self, file_id: usize) { + // If we have the full file, don't bother. + if let Ok(true) = self.with_opened_file(file_id, |f| f.approx_is_finished()) { + return; + } + self.with_state(|state| { + if let crate::ManagedTorrentState::Live(l) = &state { + l.reconnect_all_not_needed_peers(); + } + }) + } + pub fn stream(self: Arc, file_id: usize) -> anyhow::Result { let (fd_len, fd_offset) = self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?; @@ -314,14 +331,17 @@ impl ManagedTorrent { file_torrent_abs_offset: fd_offset, torrent: self, }; + s.torrent.maybe_reconnect_needed_peers_for_file(file_id); streams.streams.insert( s.stream_id, StreamState { + file_id, current_piece: first_piece, waker: None, file_len: s.file_len, }, ); + Ok(s) } } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 75879e6..e0a8bd2 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -312,7 +312,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { }; let peer_stats = &live_stats.snapshot.peer_stats; info!( - "[{}]: {:.2}% ({:.2} / {:.2}), ↓{:.2} MiB/s, ↑{:.2} MiB/s ({:.2}){}, {{live: {}, queued: {}, dead: {}}}", + "[{}]: {:.2}% ({:.2} / {:.2}), ↓{:.2} MiB/s, ↑{:.2} MiB/s ({:.2}){}, {{live: {}, queued: {}, dead: {}, known: {}}}", idx, downloaded_pct, SF::new(progress), @@ -324,6 +324,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { peer_stats.live, peer_stats.queued + peer_stats.connecting, peer_stats.dead, + peer_stats.seen, ); } });