From 0c4844f5345d3f1f94c809469f0cd01f58330e40 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 24 Nov 2023 14:19:39 +0000 Subject: [PATCH] 1/n Add pause/start actions --- crates/dht/src/dht.rs | 3 ++- crates/librqbit/src/dht_utils.rs | 2 +- crates/librqbit/src/http_api.rs | 42 ++++++++++++++++++++++++++++---- crates/librqbit/src/session.rs | 15 ++++++++---- 4 files changed, 50 insertions(+), 12 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 3808ecd..bcbc576 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -690,7 +690,8 @@ impl Dht { }); Ok(Dht { state }) } - pub async fn get_peers( + + pub fn get_peers( &self, info_hash: Id20, ) -> anyhow::Result + Unpin> { diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index 2c9f3fc..9e7d60f 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -107,7 +107,7 @@ mod tests { let info_hash = Id20::from_str("cf3ea75e2ebbd30e0da6e6e215e2226bf35f2e33").unwrap(); let dht = Dht::new().await.unwrap(); - let peer_rx = dht.get_peers(info_hash).await.unwrap(); + let peer_rx = dht.get_peers(info_hash).unwrap(); let peer_id = generate_peer_id(); match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx, None).await { ReadMetainfoResult::Found { info, .. } => dbg!(info), diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index bd94f2b..0f56493 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -2,7 +2,7 @@ use anyhow::Context; use axum::body::Bytes; use axum::extract::{Path, Query, State}; use axum::response::IntoResponse; -use axum::routing::get; +use axum::routing::{get, post}; use buffers::ByteString; use dht::{Dht, DhtStats}; use http::StatusCode; @@ -116,6 +116,20 @@ impl HttpApi { state.api_peer_stats(idx, filter).map(axum::Json) } + async fn torrent_action_pause( + State(state): State, + Path(idx): Path, + ) -> Result { + state.api_torrent_action_pause(idx) + } + + async fn torrent_action_start( + State(state): State, + Path(idx): Path, + ) -> Result { + state.api_torrent_action_start(idx) + } + #[allow(unused_mut)] let mut app = Router::new() .route("/", get(api_root)) @@ -125,7 +139,9 @@ impl HttpApi { .route("/torrents/:id", get(torrent_details)) .route("/torrents/:id/haves", get(torrent_haves)) .route("/torrents/:id/stats", get(torrent_stats)) - .route("/torrents/:id/peer_stats", get(peer_stats)); + .route("/torrents/:id/peer_stats", get(peer_stats)) + .route("/torrents/:id/pause", post(torrent_action_pause)) + .route("/torrents/:id/start", post(torrent_action_start)); #[cfg(feature = "webui")] { @@ -362,14 +378,14 @@ impl ApiInternal { TorrentListResponse { torrents: items } } - fn api_torrent_details(&self, idx: usize) -> Result { + fn api_torrent_details(&self, idx: TorrentId) -> Result { let handle = self.mgr_handle(idx)?; let info_hash = handle.info().info_hash; let only_files = handle.only_files(); make_torrent_details(&info_hash, &handle.info().info, only_files.as_deref()) } - fn api_peer_stats(&self, idx: usize, filter: PeerStatsFilter) -> Result { + fn api_peer_stats(&self, idx: TorrentId, filter: PeerStatsFilter) -> Result { let handle = self.mgr_handle(idx)?; Ok(handle .live() @@ -377,6 +393,22 @@ impl ApiInternal { .per_peer_stats_snapshot(filter)) } + fn api_torrent_action_pause(&self, idx: TorrentId) -> Result<()> { + let handle = self.mgr_handle(idx)?; + handle + .pause() + .context("error pausing torrent") + .with_error_status_code(StatusCode::BAD_REQUEST) + } + + fn api_torrent_action_start(&self, idx: TorrentId) -> Result<()> { + let handle = self.mgr_handle(idx)?; + self.session + .unpause(&handle) + .context("error unpausing torrent") + .with_error_status_code(StatusCode::BAD_REQUEST) + } + pub async fn api_add_torrent( &self, add: AddTorrent<'_>, @@ -436,7 +468,7 @@ impl ApiInternal { Ok(dht.with_routing_table(|r| r.clone())) } - fn api_stats(&self, idx: usize) -> Result { + fn api_stats(&self, idx: TorrentId) -> Result { let mgr = self.mgr_handle(idx)?; let live = mgr.live().context("not live")?; let snapshot = live.stats_snapshot(); diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index c5e5703..fe7c0ec 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -224,8 +224,7 @@ impl Session { .dht .as_ref() .context("magnet links without DHT are not supported")? - .get_peers(info_hash) - .await?; + .get_peers(info_hash)?; let trackers = trackers .into_iter() @@ -274,7 +273,7 @@ impl Session { let dht_rx = match self.dht.as_ref() { Some(dht) => { debug!("reading peers for {:?} from DHT", torrent.info_hash); - Some(dht.get_peers(torrent.info_hash).await?) + Some(dht.get_peers(torrent.info_hash)?) } None => None, }; @@ -430,7 +429,13 @@ impl Session { self.locked.read().torrents.get(id).cloned() } - pub fn restart(&self, id: usize) -> anyhow::Result<()> { - todo!() + pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { + let peer_rx = self + .dht + .as_ref() + .map(|dht| dht.get_peers(handle.info_hash())) + .transpose()?; + handle.start(Default::default(), peer_rx); + return Ok(()); } }