From a800048b7e35d4ccb706d5e4e8e866610b1a624a Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 10 Jul 2023 11:24:30 +0100 Subject: [PATCH] Added --exit-on-download option, only works without the server so far --- crates/librqbit/src/chunk_tracker.rs | 2 +- crates/librqbit/src/torrent_manager.rs | 5 ++--- crates/librqbit/src/torrent_state.rs | 16 +++++++++++++++- crates/rqbit/src/main.rs | 21 +++++++++++++++++++-- 4 files changed, 37 insertions(+), 7 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index c068e6a..47c306d 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -127,7 +127,7 @@ impl ChunkTracker { } pub fn mark_piece_downloaded(&mut self, idx: ValidPieceIndex) { - self.have.set(idx.get() as usize, true) + self.have.set(idx.get() as usize, true); } pub fn is_chunk_downloaded(&self, chunk: &ChunkInfo) -> bool { diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 4273110..12dc1bf 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -140,9 +140,8 @@ impl TorrentManagerHandle { todo!() } pub async fn wait_until_completed(&self) -> anyhow::Result<()> { - loop { - tokio::time::sleep(Duration::from_secs(60)).await; - } + self.manager.state.wait_until_completed().await; + Ok(()) } } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index ea8b7c1..2ee5b45 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -28,7 +28,7 @@ use sha1w::Sha1; use tokio::{ sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Semaphore, + Notify, Semaphore, }, time::timeout, }; @@ -242,6 +242,8 @@ pub struct TorrentState { peer_semaphore: Semaphore, peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver)>, + + finished_notify: Notify, } impl TorrentState { @@ -280,6 +282,7 @@ impl TorrentState { peer_semaphore: Semaphore::new(128), peer_queue_tx, + finished_notify: Notify::new(), }); spawn("peer adder", { let state = state.clone(); @@ -582,6 +585,13 @@ impl TorrentState { total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), } } + + pub async fn wait_until_completed(&self) { + if self.get_left_to_download() == 0 { + return; + } + self.finished_notify.notified().await; + } } #[derive(Clone)] @@ -1020,6 +1030,10 @@ impl PeerHandler { index, handle ); + if self.state.get_left_to_download() == 0 { + self.state.finished_notify.notify_waiters(); + } + self.state.maybe_transmit_haves(chunk_info.piece_index); } false => { diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 0a0522a..f43b3d1 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -128,6 +128,10 @@ struct DownloadOpts { /// Set if you are ok to write on top of existing files #[clap(long)] overwrite: bool, + + /// Exit the program once the torrents complete download. + #[clap(short = 'e', long)] + exit_on_finish: bool, } // server start @@ -346,6 +350,8 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> let mut added = false; + let mut handles = Vec::new(); + for path in &download_opts.torrent_path { let handle = match session.add_torrent(path, Some(torrent_opts.clone())).await { Ok(v) => match v { @@ -389,13 +395,24 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> }; http_api.add_torrent_handle(handle.clone()); + handles.push(handle); } if download_opts.list { Ok(()) } else if added { - loop { - tokio::time::sleep(Duration::from_secs(60)).await; + if download_opts.exit_on_finish { + let results = futures::future::join_all( + handles.iter().map(|h| h.wait_until_completed()), + ); + results.await; + info!("All downloads completed, exiting"); + Ok(()) + } else { + // Sleep forever. + loop { + tokio::time::sleep(Duration::from_secs(60)).await; + } } } else { anyhow::bail!("no torrents were added")