From e771162fa738eb67f7b96a7198d17b0ae5830d04 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 20 Aug 2024 16:51:34 +0100 Subject: [PATCH] BitVFactory going on --- crates/librqbit/src/bitv.rs | 15 ++-- crates/librqbit/src/bitv_factory.rs | 29 +++++++ crates/librqbit/src/lib.rs | 1 + .../librqbit/src/session_persistence/json.rs | 85 ++++++++++++++++++- .../librqbit/src/session_persistence/mod.rs | 5 +- .../src/session_persistence/postgres.rs | 23 ++++- 6 files changed, 148 insertions(+), 10 deletions(-) create mode 100644 crates/librqbit/src/bitv_factory.rs diff --git a/crates/librqbit/src/bitv.rs b/crates/librqbit/src/bitv.rs index 47c9cc8..7d2ff2b 100644 --- a/crates/librqbit/src/bitv.rs +++ b/crates/librqbit/src/bitv.rs @@ -3,11 +3,13 @@ use std::fs::File; use anyhow::Context; use bitvec::{order::Lsb0, slice::BitSlice, vec::BitVec, view::AsBits, view::AsMutBits}; +#[async_trait::async_trait] pub trait BitV: Send { fn as_slice(&self) -> &BitSlice; fn as_slice_mut(&mut self) -> &mut BitSlice; - fn flush(&mut self) -> anyhow::Result<()>; fn into_dyn(self) -> Box; + + async fn flush(&mut self) -> anyhow::Result<()>; } pub struct MmapBitV { @@ -23,6 +25,7 @@ impl MmapBitV { } } +#[async_trait::async_trait] impl BitV for BitVec { fn as_slice(&self) -> &BitSlice { self.as_bitslice() @@ -32,7 +35,7 @@ impl BitV for BitVec { self.as_mut_bitslice() } - fn flush(&mut self) -> anyhow::Result<()> { + async fn flush(&mut self) -> anyhow::Result<()> { Ok(()) } @@ -41,6 +44,7 @@ impl BitV for BitVec { } } +#[async_trait::async_trait] impl BitV for MmapBitV { fn as_slice(&self) -> &BitSlice { self.mmap.as_bits() @@ -50,7 +54,7 @@ impl BitV for MmapBitV { self.mmap.as_mut_bits() } - fn flush(&mut self) -> anyhow::Result<()> { + async fn flush(&mut self) -> anyhow::Result<()> { Ok(self.mmap.flush()?) } @@ -59,6 +63,7 @@ impl BitV for MmapBitV { } } +#[async_trait::async_trait] impl BitV for Box { fn as_slice(&self) -> &BitSlice { (**self).as_slice() @@ -68,8 +73,8 @@ impl BitV for Box { (**self).as_slice_mut() } - fn flush(&mut self) -> anyhow::Result<()> { - (**self).flush() + async fn flush(&mut self) -> anyhow::Result<()> { + (**self).flush().await } fn into_dyn(self) -> Box { diff --git a/crates/librqbit/src/bitv_factory.rs b/crates/librqbit/src/bitv_factory.rs new file mode 100644 index 0000000..5b74617 --- /dev/null +++ b/crates/librqbit/src/bitv_factory.rs @@ -0,0 +1,29 @@ +use bitvec::{order::Lsb0, vec::BitVec}; + +use crate::{api::TorrentIdOrHash, bitv::BitV}; + +#[async_trait::async_trait] +pub trait BitVFactory: Send { + async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result>>; + async fn store_initial_check( + &self, + id: TorrentIdOrHash, + b: BitVec, + ) -> anyhow::Result>; +} + +pub struct NonPersistentBitVFactory {} + +#[async_trait::async_trait] +impl BitVFactory for NonPersistentBitVFactory { + async fn load(&self, _: TorrentIdOrHash) -> anyhow::Result>> { + Ok(None) + } + async fn store_initial_check( + &self, + id: TorrentIdOrHash, + b: BitVec, + ) -> anyhow::Result> { + Ok(Box::new(b)) + } +} diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index b23e2dd..b6028c7 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -41,6 +41,7 @@ macro_rules! aframe { pub mod api; mod api_error; mod bitv; +mod bitv_factory; mod chunk_tracker; mod create_torrent_file; mod dht_utils; diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index c681a7e..9941373 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -1,11 +1,17 @@ use std::{any::TypeId, collections::HashMap, path::PathBuf}; use crate::{ - session::TorrentId, storage::filesystem::FilesystemStorageFactory, - torrent_state::ManagedTorrentHandle, ManagedTorrentState, + api::TorrentIdOrHash, + bitv::{BitV, MmapBitV}, + bitv_factory::BitVFactory, + session::TorrentId, + storage::filesystem::FilesystemStorageFactory, + torrent_state::ManagedTorrentHandle, + ManagedTorrentState, }; use anyhow::{bail, Context}; use async_trait::async_trait; +use bitvec::{order::Lsb0, vec::BitVec}; use futures::{stream::BoxStream, StreamExt}; use itertools::Itertools; use librqbit_core::Id20; @@ -65,6 +71,34 @@ impl JsonSessionPersistenceStore { }) } + async fn to_id(&self, id: TorrentIdOrHash) -> anyhow::Result { + match id { + TorrentIdOrHash::Id(id) => Ok(id), + TorrentIdOrHash::Hash(h) => self + .db_content + .read() + .await + .torrents + .iter() + .find_map(|(k, v)| if v.info_hash() == &h { Some(*k) } else { None }) + .context("not found"), + } + } + + async fn to_hash(&self, id: TorrentIdOrHash) -> anyhow::Result { + match id { + TorrentIdOrHash::Id(id) => self + .db_content + .read() + .await + .torrents + .get(&id) + .map(|v| *v.info_hash()) + .context("not found"), + TorrentIdOrHash::Hash(h) => Ok(h), + } + } + async fn flush(&self) -> anyhow::Result<()> { let tmp_filename = format!("{}.tmp", self.db_filename.to_str().unwrap()); let mut tmp = tokio::fs::OpenOptions::new() @@ -97,6 +131,10 @@ impl JsonSessionPersistenceStore { self.output_folder.join(format!("{:?}.torrent", info_hash)) } + fn bitv_filename(&self, info_hash: &Id20) -> PathBuf { + self.output_folder.join(format!("{:?}.bitv", info_hash)) + } + async fn update_db( &self, id: TorrentId, @@ -152,6 +190,49 @@ impl JsonSessionPersistenceStore { } } +#[async_trait::async_trait] +impl BitVFactory for JsonSessionPersistenceStore { + async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result>> { + let h = self.to_hash(id).await?; + let filename = self.bitv_filename(&h); + let f = match std::fs::OpenOptions::new().write(true).open(&filename) { + Ok(f) => f, + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => return Ok(None), + _ => return Err(e).with_context(|| format!("error opening {filename:?}")), + }, + }; + Ok(Some(MmapBitV::new(f)?.into_dyn())) + } + + async fn store_initial_check( + &self, + id: TorrentIdOrHash, + b: BitVec, + ) -> anyhow::Result> { + let h = self.to_hash(id).await?; + let filename = self.bitv_filename(&h); + let tmp_filename = format!("{}.tmp", filename.to_str().context("bug")?); + let mut dst = tokio::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&filename) + .await + .with_context(|| format!("error opening {filename:?}"))?; + let b = b.into_vec(); + tokio::io::copy(&mut &b[..], &mut dst) + .await + .context("error writing bitslice to {filename:?}")?; + tokio::fs::rename(tmp_filename, &filename).await?; + let f = std::fs::OpenOptions::new() + .write(true) + .open(&filename) + .with_context(|| format!("error opening {filename:?}"))?; + Ok(MmapBitV::new(f)?.into_dyn()) + } +} + #[async_trait] impl SessionPersistenceStore for JsonSessionPersistenceStore { async fn next_id(&self) -> anyhow::Result { diff --git a/crates/librqbit/src/session_persistence/mod.rs b/crates/librqbit/src/session_persistence/mod.rs index 3de287d..c499124 100644 --- a/crates/librqbit/src/session_persistence/mod.rs +++ b/crates/librqbit/src/session_persistence/mod.rs @@ -13,7 +13,8 @@ use librqbit_core::Id20; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::{ - session::TorrentId, torrent_state::ManagedTorrentHandle, AddTorrent, AddTorrentOptions, + bitv_factory::BitVFactory, session::TorrentId, torrent_state::ManagedTorrentHandle, AddTorrent, + AddTorrentOptions, }; #[derive(Serialize, Deserialize, Clone)] @@ -63,7 +64,7 @@ impl SerializedTorrent { // TODO: make this info_hash first, ID-second. #[async_trait] -pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync { +pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync + BitVFactory { async fn next_id(&self) -> anyhow::Result; async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()>; async fn delete(&self, id: TorrentId) -> anyhow::Result<()>; diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index bd1d45b..08dea4f 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -1,10 +1,15 @@ use std::path::PathBuf; -use crate::{session::TorrentId, torrent_state::ManagedTorrentHandle}; +use crate::{ + api::TorrentIdOrHash, bitv::BitV, bitv_factory::BitVFactory, session::TorrentId, + torrent_state::ManagedTorrentHandle, +}; use anyhow::Context; +use bitvec::{order::Lsb0, vec::BitVec}; use futures::{stream::BoxStream, StreamExt}; use librqbit_core::Id20; use sqlx::{Pool, Postgres}; +use tracing::debug; use super::{SerializedTorrent, SessionPersistenceStore}; @@ -167,3 +172,19 @@ impl SessionPersistenceStore for PostgresSessionStorage { Ok(futures::stream::iter(torrents).boxed()) } } + +#[async_trait::async_trait] +impl BitVFactory for PostgresSessionStorage { + async fn load(&self, _: TorrentIdOrHash) -> anyhow::Result>> { + debug!("BitVFactory not implemented for PostgresSessionStorage: fastresume not available"); + Ok(None) + } + + async fn store_initial_check( + &self, + _: TorrentIdOrHash, + b: BitVec, + ) -> anyhow::Result> { + Ok(b.into_dyn()) + } +}