From d77d96bd48192182b89c5170ac0329677ce91214 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Thu, 15 Aug 2024 11:20:20 +0100 Subject: [PATCH] Now saving torrent updates properly to the new db --- crates/librqbit/src/api.rs | 13 +++-- crates/librqbit/src/http_api.rs | 5 +- crates/librqbit/src/session.rs | 39 ++++++++++---- .../librqbit/src/session_persistence/json.rs | 53 ++++++++++++------- .../librqbit/src/session_persistence/mod.rs | 6 +++ crates/librqbit/src/torrent_state/mod.rs | 13 ++++- 6 files changed, 92 insertions(+), 37 deletions(-) diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 9ed239f..b23b895 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -96,19 +96,21 @@ impl Api { .per_peer_stats_snapshot(filter)) } - pub fn api_torrent_action_pause(&self, idx: TorrentId) -> Result { + pub async fn api_torrent_action_pause(&self, idx: TorrentId) -> Result { let handle = self.mgr_handle(idx)?; - handle - .pause() + self.session() + .pause(&handle) + .await .context("error pausing torrent") .with_error_status_code(StatusCode::BAD_REQUEST)?; Ok(Default::default()) } - pub fn api_torrent_action_start(&self, idx: TorrentId) -> Result { + pub async fn api_torrent_action_start(&self, idx: TorrentId) -> Result { let handle = self.mgr_handle(idx)?; self.session .unpause(&handle) + .await .context("error unpausing torrent") .with_error_status_code(StatusCode::BAD_REQUEST)?; Ok(Default::default()) @@ -130,7 +132,7 @@ impl Api { Ok(Default::default()) } - pub fn api_torrent_action_update_only_files( + pub async fn api_torrent_action_update_only_files( &self, idx: TorrentId, only_files: &HashSet, @@ -138,6 +140,7 @@ impl Api { let handle = self.mgr_handle(idx)?; self.session .update_only_files(&handle, only_files) + .await .context("error updating only_files")?; Ok(Default::default()) } diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 699ce5d..a300d24 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -368,14 +368,14 @@ impl HttpApi { State(state): State, Path(idx): Path, ) -> Result { - state.api_torrent_action_pause(idx).map(axum::Json) + state.api_torrent_action_pause(idx).await.map(axum::Json) } async fn torrent_action_start( State(state): State, Path(idx): Path, ) -> Result { - state.api_torrent_action_start(idx).map(axum::Json) + state.api_torrent_action_start(idx).await.map(axum::Json) } async fn torrent_action_forget( @@ -404,6 +404,7 @@ impl HttpApi { ) -> Result { state .api_torrent_action_update_only_files(idx, &req.only_files.into_iter().collect()) + .await .map(axum::Json) } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 9751e50..b57b407 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -993,7 +993,17 @@ impl Session { })); } + let id = if let Some(id) = opts.preferred_id { + id + } else if let Some(p) = self.persistence.as_ref() { + p.next_id().await? + } else { + self.next_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + }; + let mut builder = ManagedTorrentBuilder::new( + id, info, info_hash, torrent_bytes, @@ -1029,15 +1039,6 @@ impl Session { builder.peer_read_write_timeout(t); } - let id = if let Some(id) = opts.preferred_id { - id - } else if let Some(p) = self.persistence.as_ref() { - p.next_id().await? - } else { - self.next_id - .fetch_add(1, std::sync::atomic::Ordering::Relaxed) - }; - let managed_torrent = { let mut g = self.db.write(); if let Some((id, handle)) = g.torrents.iter().find_map(|(eid, t)| { @@ -1175,7 +1176,21 @@ impl Session { Ok(merge_two_optional_streams(dht_rx, peer_rx)) } - pub fn unpause(self: &Arc, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { + async fn try_update_persistence_metadata(&self, handle: &ManagedTorrentHandle) { + if let Some(p) = self.persistence.as_ref() { + if let Err(e) = p.update_metadata(handle.id(), handle).await { + warn!(storage=?p, error=?e, "error updating metadata") + } + } + } + + pub async fn pause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { + handle.pause()?; + self.try_update_persistence_metadata(handle).await; + Ok(()) + } + + pub async fn unpause(self: &Arc, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { let peer_rx = self.make_peer_rx( handle.info_hash(), handle.info().trackers.clone().into_iter().collect(), @@ -1183,15 +1198,17 @@ impl Session { handle.info().options.force_tracker_interval, )?; handle.start(peer_rx, false, self.cancellation_token.child_token())?; + self.try_update_persistence_metadata(handle).await; Ok(()) } - pub fn update_only_files( + pub async fn update_only_files( self: &Arc, handle: &ManagedTorrentHandle, only_files: &HashSet, ) -> anyhow::Result<()> { handle.update_only_files(only_files)?; + self.try_update_persistence_metadata(handle).await; Ok(()) } diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index 880c9bb..c8438dd 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -92,24 +92,13 @@ impl JsonSessionPersistenceStore { fn torrent_bytes_filename(&self, info_hash: &Id20) -> PathBuf { self.output_folder.join(format!("{:?}.torrent", info_hash)) } -} -#[async_trait] -impl SessionPersistenceStore for JsonSessionPersistenceStore { - async fn next_id(&self) -> anyhow::Result { - Ok(self - .db_content - .read() - .await - .torrents - .keys() - .copied() - .max() - .map(|max| max + 1) - .unwrap_or(0)) - } - - async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> { + async fn update_db( + &self, + id: TorrentId, + torrent: &ManagedTorrentHandle, + write_torrent_file: bool, + ) -> anyhow::Result<()> { if !torrent .storage_factory .is_type_id(TypeId::of::()) @@ -132,7 +121,7 @@ impl SessionPersistenceStore for JsonSessionPersistenceStore { output_folder: torrent.info().options.output_folder.clone(), }; - if !torrent.info().torrent_bytes.is_empty() { + if write_torrent_file && !torrent.info().torrent_bytes.is_empty() { let torrent_bytes_file = self.torrent_bytes_filename(&torrent.info_hash()); match tokio::fs::OpenOptions::new() .create(true) @@ -157,6 +146,22 @@ impl SessionPersistenceStore for JsonSessionPersistenceStore { Ok(()) } +} + +#[async_trait] +impl SessionPersistenceStore for JsonSessionPersistenceStore { + async fn next_id(&self) -> anyhow::Result { + Ok(self + .db_content + .read() + .await + .torrents + .keys() + .copied() + .max() + .map(|max| max + 1) + .unwrap_or(0)) + } async fn delete(&self, id: TorrentId) -> anyhow::Result<()> { if let Some(t) = self.db_content.write().await.torrents.remove(&id) { @@ -211,4 +216,16 @@ impl SessionPersistenceStore for JsonSessionPersistenceStore { .then(move |id| async move { self.get(id).await.map(move |st| (id, st)) }) .boxed()) } + + async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> { + self.update_db(id, torrent, true).await + } + + async fn update_metadata( + &self, + id: TorrentId, + torrent: &ManagedTorrentHandle, + ) -> anyhow::Result<()> { + self.update_db(id, torrent, false).await + } } diff --git a/crates/librqbit/src/session_persistence/mod.rs b/crates/librqbit/src/session_persistence/mod.rs index c3ee7a2..c574d3c 100644 --- a/crates/librqbit/src/session_persistence/mod.rs +++ b/crates/librqbit/src/session_persistence/mod.rs @@ -59,12 +59,18 @@ impl SerializedTorrent { } } +// TODO: make this info_hash first, ID-second. #[async_trait] pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync { async fn next_id(&self) -> anyhow::Result; async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()>; async fn delete(&self, id: TorrentId) -> anyhow::Result<()>; async fn get(&self, id: TorrentId) -> anyhow::Result; + async fn update_metadata( + &self, + id: TorrentId, + torrent: &ManagedTorrentHandle, + ) -> anyhow::Result<()>; async fn stream_all( &self, ) -> anyhow::Result>>; diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 7537dbc..09e7389 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -36,6 +36,7 @@ use tracing::warn; use crate::chunk_tracker::ChunkTracker; use crate::file_info::FileInfo; +use crate::session::TorrentId; use crate::spawn_utils::BlockingSpawner; use crate::storage::BoxStorageFactory; use crate::stream_connect::StreamConnector; @@ -114,6 +115,8 @@ pub struct ManagedTorrentInfo { } pub struct ManagedTorrent { + pub id: TorrentId, + // TODO: merge ManagedTorrent and ManagedTorrentInfo pub info: Arc, pub(crate) storage_factory: BoxStorageFactory, @@ -122,6 +125,10 @@ pub struct ManagedTorrent { } impl ManagedTorrent { + pub fn id(&self) -> TorrentId { + self.id + } + pub fn info(&self) -> &ManagedTorrentInfo { &self.info } @@ -344,7 +351,7 @@ impl ManagedTorrent { } /// Pause the torrent if it's live. - pub fn pause(&self) -> anyhow::Result<()> { + pub(crate) fn pause(&self) -> anyhow::Result<()> { let mut g = self.locked.write(); match &g.state { ManagedTorrentState::Live(live) => { @@ -501,6 +508,7 @@ impl ManagedTorrent { } pub(crate) struct ManagedTorrentBuilder { + id: TorrentId, info: TorrentMetaV1Info, output_folder: PathBuf, info_hash: Id20, @@ -521,6 +529,7 @@ pub(crate) struct ManagedTorrentBuilder { impl ManagedTorrentBuilder { pub fn new( + id: usize, info: TorrentMetaV1Info, info_hash: Id20, torrent_bytes: Bytes, @@ -529,6 +538,7 @@ impl ManagedTorrentBuilder { storage_factory: BoxStorageFactory, ) -> Self { Self { + id, info, info_hash, torrent_bytes, @@ -641,6 +651,7 @@ impl ManagedTorrentBuilder { self.storage_factory.create_and_init(&info)?, )); Ok(Arc::new(ManagedTorrent { + id: self.id, locked: RwLock::new(ManagedTorrentLocked { state: ManagedTorrentState::Initializing(initializing), only_files: self.only_files,