From bc9e72df60e526e2b0b827054ac01ebb2ce34e8d Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 20 Aug 2024 20:42:24 +0100 Subject: [PATCH] Use actual BitV and factory everywhere --- crates/librqbit/src/bitv_factory.rs | 12 +++--- crates/librqbit/src/session.rs | 27 +++++++------ .../librqbit/src/session_persistence/json.rs | 21 ++-------- .../librqbit/src/session_persistence/mod.rs | 2 - .../src/session_persistence/postgres.rs | 5 +-- .../src/torrent_state/initializing.rs | 38 +++++++++++++------ crates/librqbit/src/torrent_state/mod.rs | 5 ++- 7 files changed, 57 insertions(+), 53 deletions(-) diff --git a/crates/librqbit/src/bitv_factory.rs b/crates/librqbit/src/bitv_factory.rs index 5b74617..c6dc41c 100644 --- a/crates/librqbit/src/bitv_factory.rs +++ b/crates/librqbit/src/bitv_factory.rs @@ -1,14 +1,12 @@ -use bitvec::{order::Lsb0, vec::BitVec}; - -use crate::{api::TorrentIdOrHash, bitv::BitV}; +use crate::{api::TorrentIdOrHash, bitv::BitV, type_aliases::BF}; #[async_trait::async_trait] -pub trait BitVFactory: Send { +pub trait BitVFactory: Send + Sync { async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result>>; async fn store_initial_check( &self, id: TorrentIdOrHash, - b: BitVec, + b: BF, ) -> anyhow::Result>; } @@ -21,8 +19,8 @@ impl BitVFactory for NonPersistentBitVFactory { } async fn store_initial_check( &self, - id: TorrentIdOrHash, - b: BitVec, + _id: TorrentIdOrHash, + b: BF, ) -> anyhow::Result> { Ok(Box::new(b)) } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 07ee4c6..edc81bc 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -10,13 +10,12 @@ use std::{ use crate::{ api::TorrentIdOrHash, + bitv_factory::{BitVFactory, NonPersistentBitVFactory}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, merge_streams::merge_streams, peer_connection::PeerConnectionOptions, read_buf::ReadBuf, - session_persistence::{ - json::JsonSessionPersistenceStore, BoxSessionPersistenceStore, SessionPersistenceStore, - }, + session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore}, spawn_utils::BlockingSpawner, storage::{ filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage, @@ -94,7 +93,8 @@ impl SessionDatabase { pub struct Session { peer_id: Id20, dht: Option, - persistence: Option>, + persistence: Option>, + bitv_factory: Arc, peer_opts: PeerConnectionOptions, spawner: BlockingSpawner, next_id: AtomicUsize, @@ -506,7 +506,7 @@ impl Session { async fn persistence_factory( opts: &SessionOptions, - ) -> anyhow::Result> { + ) -> anyhow::Result<(Option>, Arc)> { match &opts.persistence { Some(SessionPersistenceConfig::Json { folder }) => { let folder = match folder.as_ref() { @@ -514,23 +514,25 @@ impl Session { None => SessionPersistenceConfig::default_json_persistence_folder()?, }; - Ok(Some(Box::new( + let s = Arc::new( JsonSessionPersistenceStore::new(folder) .await .context("error initializing JsonSessionPersistenceStore")?, - ))) + ); + + Ok((Some(s.clone()), s)) }, #[cfg(feature = "postgres")] Some(SessionPersistenceConfig::Postgres { connection_string }) => { use crate::session_persistence::postgres::PostgresSessionStorage; - let p = PostgresSessionStorage::new(connection_string).await?; - Ok(Some(Box::new(p))) + let p = Arc::new(PostgresSessionStorage::new(connection_string).await?); + Ok((Some(p.clone()), p)) } - None => Ok(None), + None => Ok((None, Arc::new(NonPersistentBitVFactory {}))), } } - let persistence = persistence_factory(&opts) + let (persistence, bitv_factory) = persistence_factory(&opts) .await .context("error initializing session persistence store")?; @@ -570,6 +572,7 @@ impl Session { let session = Arc::new(Self { persistence, + bitv_factory, peer_id, dht, peer_opts, @@ -1129,6 +1132,7 @@ impl Session { opts.paused, self.cancellation_token.child_token(), self.concurrent_initialize_semaphore.clone(), + self.bitv_factory.clone(), ) .context("error starting torrent")?; } @@ -1284,6 +1288,7 @@ impl Session { false, self.cancellation_token.child_token(), self.concurrent_initialize_semaphore.clone(), + self.bitv_factory.clone(), )?; self.try_update_persistence_metadata(handle).await; Ok(()) diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index 9941373..52b4804 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -7,11 +7,11 @@ use crate::{ session::TorrentId, storage::filesystem::FilesystemStorageFactory, torrent_state::ManagedTorrentHandle, + type_aliases::BF, ManagedTorrentState, }; use anyhow::{bail, Context}; use async_trait::async_trait; -use bitvec::{order::Lsb0, vec::BitVec}; use futures::{stream::BoxStream, StreamExt}; use itertools::Itertools; use librqbit_core::Id20; @@ -71,20 +71,6 @@ impl JsonSessionPersistenceStore { }) } - async fn to_id(&self, id: TorrentIdOrHash) -> anyhow::Result { - match id { - TorrentIdOrHash::Id(id) => Ok(id), - TorrentIdOrHash::Hash(h) => self - .db_content - .read() - .await - .torrents - .iter() - .find_map(|(k, v)| if v.info_hash() == &h { Some(*k) } else { None }) - .context("not found"), - } - } - async fn to_hash(&self, id: TorrentIdOrHash) -> anyhow::Result { match id { TorrentIdOrHash::Id(id) => self @@ -208,7 +194,7 @@ impl BitVFactory for JsonSessionPersistenceStore { async fn store_initial_check( &self, id: TorrentIdOrHash, - b: BitVec, + b: BF, ) -> anyhow::Result> { let h = self.to_hash(id).await?; let filename = self.bitv_filename(&h); @@ -220,8 +206,7 @@ impl BitVFactory for JsonSessionPersistenceStore { .open(&filename) .await .with_context(|| format!("error opening {filename:?}"))?; - let b = b.into_vec(); - tokio::io::copy(&mut &b[..], &mut dst) + tokio::io::copy(&mut b.as_raw_slice(), &mut dst) .await .context("error writing bitslice to {filename:?}")?; tokio::fs::rename(tmp_filename, &filename).await?; diff --git a/crates/librqbit/src/session_persistence/mod.rs b/crates/librqbit/src/session_persistence/mod.rs index c499124..f1459f8 100644 --- a/crates/librqbit/src/session_persistence/mod.rs +++ b/crates/librqbit/src/session_persistence/mod.rs @@ -79,8 +79,6 @@ pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync + BitVFactory ) -> anyhow::Result>>; } -pub type BoxSessionPersistenceStore = Box; - fn serialize_info_hash(id: &Id20, serializer: S) -> Result where S: Serializer, diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index 08dea4f..27bdebf 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -2,10 +2,9 @@ use std::path::PathBuf; use crate::{ api::TorrentIdOrHash, bitv::BitV, bitv_factory::BitVFactory, session::TorrentId, - torrent_state::ManagedTorrentHandle, + torrent_state::ManagedTorrentHandle, type_aliases::BF, }; use anyhow::Context; -use bitvec::{order::Lsb0, vec::BitVec}; use futures::{stream::BoxStream, StreamExt}; use librqbit_core::Id20; use sqlx::{Pool, Postgres}; @@ -183,7 +182,7 @@ impl BitVFactory for PostgresSessionStorage { async fn store_initial_check( &self, _: TorrentIdOrHash, - b: BitVec, + b: BF, ) -> anyhow::Result> { Ok(b.into_dyn()) } diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 62b3622..9ea781a 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -10,7 +10,9 @@ use size_format::SizeFormatterBinary as SF; use tracing::{debug, info, warn}; use crate::{ + api::TorrentIdOrHash, bitv::BitV, + bitv_factory::BitVFactory, chunk_tracker::ChunkTracker, file_ops::FileOps, type_aliases::{FileStorage, BF}, @@ -63,17 +65,31 @@ impl TorrentStateInitializing { .load(std::sync::atomic::Ordering::Relaxed) } - pub async fn check(&self) -> anyhow::Result { - info!("Doing initial checksum validation, this might take a while..."); - let have_pieces = self.meta.spawner.spawn_block_in_place(|| { - FileOps::new( - &self.meta.info, - &self.files, - &self.meta.file_infos, - &self.meta.lengths, - ) - .initial_check(&self.checked_bytes) - })?; + pub async fn check( + &self, + bitv_factory: Arc, + ) -> anyhow::Result { + let id: TorrentIdOrHash = self.meta.info_hash.into(); + let have_pieces = bitv_factory + .load(id) + .await + .context("error loading have_pieces")?; + let have_pieces = match have_pieces { + Some(h) => h, + None => { + info!("Doing initial checksum validation, this might take a while..."); + let have_pieces = self.meta.spawner.spawn_block_in_place(|| { + FileOps::new( + &self.meta.info, + &self.files, + &self.meta.file_infos, + &self.meta.lengths, + ) + .initial_check(&self.checked_bytes) + })?; + bitv_factory.store_initial_check(id, have_pieces).await? + } + }; let selected_pieces = compute_selected_pieces( &self.meta.lengths, diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 68f4d04..2002924 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -34,6 +34,7 @@ use tracing::debug; use tracing::error_span; use tracing::warn; +use crate::bitv_factory::BitVFactory; use crate::chunk_tracker::ChunkTracker; use crate::file_info::FileInfo; use crate::session::TorrentId; @@ -209,6 +210,7 @@ impl ManagedTorrent { start_paused: bool, live_cancellation_token: CancellationToken, init_semaphore: Arc, + bitv_factory: Arc, ) -> anyhow::Result<()> { let mut g = self.locked.write(); @@ -301,7 +303,7 @@ impl ManagedTorrent { .await .context("bug: concurrent init semaphore was closed")?; - match init.check().await { + match init.check(bitv_factory).await { Ok(paused) => { let mut g = t.locked.write(); if let ManagedTorrentState::Initializing(_) = &g.state { @@ -368,6 +370,7 @@ impl ManagedTorrent { start_paused, live_cancellation_token, init_semaphore, + bitv_factory, ) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"),