Added --exit-on-download option, only works without the server so far

This commit is contained in:
Igor Katson 2023-07-10 11:24:30 +01:00
parent af30affb37
commit a800048b7e
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 37 additions and 7 deletions

View file

@ -127,7 +127,7 @@ impl ChunkTracker {
} }
pub fn mark_piece_downloaded(&mut self, idx: ValidPieceIndex) { pub fn mark_piece_downloaded(&mut self, idx: ValidPieceIndex) {
self.have.set(idx.get() as usize, true) self.have.set(idx.get() as usize, true);
} }
pub fn is_chunk_downloaded(&self, chunk: &ChunkInfo) -> bool { pub fn is_chunk_downloaded(&self, chunk: &ChunkInfo) -> bool {

View file

@ -140,9 +140,8 @@ impl TorrentManagerHandle {
todo!() todo!()
} }
pub async fn wait_until_completed(&self) -> anyhow::Result<()> { pub async fn wait_until_completed(&self) -> anyhow::Result<()> {
loop { self.manager.state.wait_until_completed().await;
tokio::time::sleep(Duration::from_secs(60)).await; Ok(())
}
} }
} }

View file

@ -28,7 +28,7 @@ use sha1w::Sha1;
use tokio::{ use tokio::{
sync::{ sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Semaphore, Notify, Semaphore,
}, },
time::timeout, time::timeout,
}; };
@ -242,6 +242,8 @@ pub struct TorrentState {
peer_semaphore: Semaphore, peer_semaphore: Semaphore,
peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver<WriterRequest>)>, peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver<WriterRequest>)>,
finished_notify: Notify,
} }
impl TorrentState { impl TorrentState {
@ -280,6 +282,7 @@ impl TorrentState {
peer_semaphore: Semaphore::new(128), peer_semaphore: Semaphore::new(128),
peer_queue_tx, peer_queue_tx,
finished_notify: Notify::new(),
}); });
spawn("peer adder", { spawn("peer adder", {
let state = state.clone(); let state = state.clone();
@ -582,6 +585,13 @@ impl TorrentState {
total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed), total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed),
} }
} }
pub async fn wait_until_completed(&self) {
if self.get_left_to_download() == 0 {
return;
}
self.finished_notify.notified().await;
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -1020,6 +1030,10 @@ impl PeerHandler {
index, handle index, handle
); );
if self.state.get_left_to_download() == 0 {
self.state.finished_notify.notify_waiters();
}
self.state.maybe_transmit_haves(chunk_info.piece_index); self.state.maybe_transmit_haves(chunk_info.piece_index);
} }
false => { false => {

View file

@ -128,6 +128,10 @@ struct DownloadOpts {
/// Set if you are ok to write on top of existing files /// Set if you are ok to write on top of existing files
#[clap(long)] #[clap(long)]
overwrite: bool, overwrite: bool,
/// Exit the program once the torrents complete download.
#[clap(short = 'e', long)]
exit_on_finish: bool,
} }
// server start // server start
@ -346,6 +350,8 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
let mut added = false; let mut added = false;
let mut handles = Vec::new();
for path in &download_opts.torrent_path { for path in &download_opts.torrent_path {
let handle = match session.add_torrent(path, Some(torrent_opts.clone())).await { let handle = match session.add_torrent(path, Some(torrent_opts.clone())).await {
Ok(v) => match v { Ok(v) => match v {
@ -389,13 +395,24 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
}; };
http_api.add_torrent_handle(handle.clone()); http_api.add_torrent_handle(handle.clone());
handles.push(handle);
} }
if download_opts.list { if download_opts.list {
Ok(()) Ok(())
} else if added { } else if added {
loop { if download_opts.exit_on_finish {
tokio::time::sleep(Duration::from_secs(60)).await; let results = futures::future::join_all(
handles.iter().map(|h| h.wait_until_completed()),
);
results.await;
info!("All downloads completed, exiting");
Ok(())
} else {
// Sleep forever.
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
}
} }
} else { } else {
anyhow::bail!("no torrents were added") anyhow::bail!("no torrents were added")