Use actual BitV and factory everywhere

This commit is contained in:
Igor Katson 2024-08-20 20:42:24 +01:00
parent a55dfc6e0e
commit bc9e72df60
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
7 changed files with 57 additions and 53 deletions

View file

@ -1,14 +1,12 @@
use bitvec::{order::Lsb0, vec::BitVec};
use crate::{api::TorrentIdOrHash, bitv::BitV};
use crate::{api::TorrentIdOrHash, bitv::BitV, type_aliases::BF};
#[async_trait::async_trait]
pub trait BitVFactory: Send {
pub trait BitVFactory: Send + Sync {
async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result<Option<Box<dyn BitV>>>;
async fn store_initial_check(
&self,
id: TorrentIdOrHash,
b: BitVec<u8, Lsb0>,
b: BF,
) -> anyhow::Result<Box<dyn BitV>>;
}
@ -21,8 +19,8 @@ impl BitVFactory for NonPersistentBitVFactory {
}
async fn store_initial_check(
&self,
id: TorrentIdOrHash,
b: BitVec<u8, Lsb0>,
_id: TorrentIdOrHash,
b: BF,
) -> anyhow::Result<Box<dyn BitV>> {
Ok(Box::new(b))
}

View file

@ -10,13 +10,12 @@ use std::{
use crate::{
api::TorrentIdOrHash,
bitv_factory::{BitVFactory, NonPersistentBitVFactory},
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
merge_streams::merge_streams,
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
session_persistence::{
json::JsonSessionPersistenceStore, BoxSessionPersistenceStore, SessionPersistenceStore,
},
session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore},
spawn_utils::BlockingSpawner,
storage::{
filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage,
@ -94,7 +93,8 @@ impl SessionDatabase {
pub struct Session {
peer_id: Id20,
dht: Option<Dht>,
persistence: Option<Box<dyn SessionPersistenceStore>>,
persistence: Option<Arc<dyn SessionPersistenceStore>>,
bitv_factory: Arc<dyn BitVFactory>,
peer_opts: PeerConnectionOptions,
spawner: BlockingSpawner,
next_id: AtomicUsize,
@ -506,7 +506,7 @@ impl Session {
async fn persistence_factory(
opts: &SessionOptions,
) -> anyhow::Result<Option<BoxSessionPersistenceStore>> {
) -> anyhow::Result<(Option<Arc<dyn SessionPersistenceStore>>, Arc<dyn BitVFactory>)> {
match &opts.persistence {
Some(SessionPersistenceConfig::Json { folder }) => {
let folder = match folder.as_ref() {
@ -514,23 +514,25 @@ impl Session {
None => SessionPersistenceConfig::default_json_persistence_folder()?,
};
Ok(Some(Box::new(
let s = Arc::new(
JsonSessionPersistenceStore::new(folder)
.await
.context("error initializing JsonSessionPersistenceStore")?,
)))
);
Ok((Some(s.clone()), s))
},
#[cfg(feature = "postgres")]
Some(SessionPersistenceConfig::Postgres { connection_string }) => {
use crate::session_persistence::postgres::PostgresSessionStorage;
let p = PostgresSessionStorage::new(connection_string).await?;
Ok(Some(Box::new(p)))
let p = Arc::new(PostgresSessionStorage::new(connection_string).await?);
Ok((Some(p.clone()), p))
}
None => Ok(None),
None => Ok((None, Arc::new(NonPersistentBitVFactory {}))),
}
}
let persistence = persistence_factory(&opts)
let (persistence, bitv_factory) = persistence_factory(&opts)
.await
.context("error initializing session persistence store")?;
@ -570,6 +572,7 @@ impl Session {
let session = Arc::new(Self {
persistence,
bitv_factory,
peer_id,
dht,
peer_opts,
@ -1129,6 +1132,7 @@ impl Session {
opts.paused,
self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(),
self.bitv_factory.clone(),
)
.context("error starting torrent")?;
}
@ -1284,6 +1288,7 @@ impl Session {
false,
self.cancellation_token.child_token(),
self.concurrent_initialize_semaphore.clone(),
self.bitv_factory.clone(),
)?;
self.try_update_persistence_metadata(handle).await;
Ok(())

View file

@ -7,11 +7,11 @@ use crate::{
session::TorrentId,
storage::filesystem::FilesystemStorageFactory,
torrent_state::ManagedTorrentHandle,
type_aliases::BF,
ManagedTorrentState,
};
use anyhow::{bail, Context};
use async_trait::async_trait;
use bitvec::{order::Lsb0, vec::BitVec};
use futures::{stream::BoxStream, StreamExt};
use itertools::Itertools;
use librqbit_core::Id20;
@ -71,20 +71,6 @@ impl JsonSessionPersistenceStore {
})
}
async fn to_id(&self, id: TorrentIdOrHash) -> anyhow::Result<TorrentId> {
match id {
TorrentIdOrHash::Id(id) => Ok(id),
TorrentIdOrHash::Hash(h) => self
.db_content
.read()
.await
.torrents
.iter()
.find_map(|(k, v)| if v.info_hash() == &h { Some(*k) } else { None })
.context("not found"),
}
}
async fn to_hash(&self, id: TorrentIdOrHash) -> anyhow::Result<Id20> {
match id {
TorrentIdOrHash::Id(id) => self
@ -208,7 +194,7 @@ impl BitVFactory for JsonSessionPersistenceStore {
async fn store_initial_check(
&self,
id: TorrentIdOrHash,
b: BitVec<u8, Lsb0>,
b: BF,
) -> anyhow::Result<Box<dyn BitV>> {
let h = self.to_hash(id).await?;
let filename = self.bitv_filename(&h);
@ -220,8 +206,7 @@ impl BitVFactory for JsonSessionPersistenceStore {
.open(&filename)
.await
.with_context(|| format!("error opening {filename:?}"))?;
let b = b.into_vec();
tokio::io::copy(&mut &b[..], &mut dst)
tokio::io::copy(&mut b.as_raw_slice(), &mut dst)
.await
.context("error writing bitslice to {filename:?}")?;
tokio::fs::rename(tmp_filename, &filename).await?;

View file

@ -79,8 +79,6 @@ pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync + BitVFactory
) -> anyhow::Result<BoxStream<'_, anyhow::Result<(TorrentId, SerializedTorrent)>>>;
}
pub type BoxSessionPersistenceStore = Box<dyn SessionPersistenceStore>;
fn serialize_info_hash<S>(id: &Id20, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,

View file

@ -2,10 +2,9 @@ use std::path::PathBuf;
use crate::{
api::TorrentIdOrHash, bitv::BitV, bitv_factory::BitVFactory, session::TorrentId,
torrent_state::ManagedTorrentHandle,
torrent_state::ManagedTorrentHandle, type_aliases::BF,
};
use anyhow::Context;
use bitvec::{order::Lsb0, vec::BitVec};
use futures::{stream::BoxStream, StreamExt};
use librqbit_core::Id20;
use sqlx::{Pool, Postgres};
@ -183,7 +182,7 @@ impl BitVFactory for PostgresSessionStorage {
async fn store_initial_check(
&self,
_: TorrentIdOrHash,
b: BitVec<u8, Lsb0>,
b: BF,
) -> anyhow::Result<Box<dyn BitV>> {
Ok(b.into_dyn())
}

View file

@ -10,7 +10,9 @@ use size_format::SizeFormatterBinary as SF;
use tracing::{debug, info, warn};
use crate::{
api::TorrentIdOrHash,
bitv::BitV,
bitv_factory::BitVFactory,
chunk_tracker::ChunkTracker,
file_ops::FileOps,
type_aliases::{FileStorage, BF},
@ -63,17 +65,31 @@ impl TorrentStateInitializing {
.load(std::sync::atomic::Ordering::Relaxed)
}
pub async fn check(&self) -> anyhow::Result<TorrentStatePaused> {
info!("Doing initial checksum validation, this might take a while...");
let have_pieces = self.meta.spawner.spawn_block_in_place(|| {
FileOps::new(
&self.meta.info,
&self.files,
&self.meta.file_infos,
&self.meta.lengths,
)
.initial_check(&self.checked_bytes)
})?;
pub async fn check(
&self,
bitv_factory: Arc<dyn BitVFactory>,
) -> anyhow::Result<TorrentStatePaused> {
let id: TorrentIdOrHash = self.meta.info_hash.into();
let have_pieces = bitv_factory
.load(id)
.await
.context("error loading have_pieces")?;
let have_pieces = match have_pieces {
Some(h) => h,
None => {
info!("Doing initial checksum validation, this might take a while...");
let have_pieces = self.meta.spawner.spawn_block_in_place(|| {
FileOps::new(
&self.meta.info,
&self.files,
&self.meta.file_infos,
&self.meta.lengths,
)
.initial_check(&self.checked_bytes)
})?;
bitv_factory.store_initial_check(id, have_pieces).await?
}
};
let selected_pieces = compute_selected_pieces(
&self.meta.lengths,

View file

@ -34,6 +34,7 @@ use tracing::debug;
use tracing::error_span;
use tracing::warn;
use crate::bitv_factory::BitVFactory;
use crate::chunk_tracker::ChunkTracker;
use crate::file_info::FileInfo;
use crate::session::TorrentId;
@ -209,6 +210,7 @@ impl ManagedTorrent {
start_paused: bool,
live_cancellation_token: CancellationToken,
init_semaphore: Arc<tokio::sync::Semaphore>,
bitv_factory: Arc<dyn BitVFactory>,
) -> anyhow::Result<()> {
let mut g = self.locked.write();
@ -301,7 +303,7 @@ impl ManagedTorrent {
.await
.context("bug: concurrent init semaphore was closed")?;
match init.check().await {
match init.check(bitv_factory).await {
Ok(paused) => {
let mut g = t.locked.write();
if let ManagedTorrentState::Initializing(_) = &g.state {
@ -368,6 +370,7 @@ impl ManagedTorrent {
start_paused,
live_cancellation_token,
init_semaphore,
bitv_factory,
)
}
ManagedTorrentState::None => bail!("bug: torrent is in empty state"),