BitVFactory going on

This commit is contained in:
Igor Katson 2024-08-20 16:51:34 +01:00
parent d7236f05a9
commit e771162fa7
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
6 changed files with 148 additions and 10 deletions

View file

@ -3,11 +3,13 @@ use std::fs::File;
use anyhow::Context;
use bitvec::{order::Lsb0, slice::BitSlice, vec::BitVec, view::AsBits, view::AsMutBits};
#[async_trait::async_trait]
pub trait BitV: Send {
fn as_slice(&self) -> &BitSlice<u8, Lsb0>;
fn as_slice_mut(&mut self) -> &mut BitSlice<u8, Lsb0>;
fn flush(&mut self) -> anyhow::Result<()>;
fn into_dyn(self) -> Box<dyn BitV>;
async fn flush(&mut self) -> anyhow::Result<()>;
}
pub struct MmapBitV {
@ -23,6 +25,7 @@ impl MmapBitV {
}
}
#[async_trait::async_trait]
impl BitV for BitVec<u8, Lsb0> {
fn as_slice(&self) -> &BitSlice<u8, Lsb0> {
self.as_bitslice()
@ -32,7 +35,7 @@ impl BitV for BitVec<u8, Lsb0> {
self.as_mut_bitslice()
}
fn flush(&mut self) -> anyhow::Result<()> {
async fn flush(&mut self) -> anyhow::Result<()> {
Ok(())
}
@ -41,6 +44,7 @@ impl BitV for BitVec<u8, Lsb0> {
}
}
#[async_trait::async_trait]
impl BitV for MmapBitV {
fn as_slice(&self) -> &BitSlice<u8, Lsb0> {
self.mmap.as_bits()
@ -50,7 +54,7 @@ impl BitV for MmapBitV {
self.mmap.as_mut_bits()
}
fn flush(&mut self) -> anyhow::Result<()> {
async fn flush(&mut self) -> anyhow::Result<()> {
Ok(self.mmap.flush()?)
}
@ -59,6 +63,7 @@ impl BitV for MmapBitV {
}
}
#[async_trait::async_trait]
impl BitV for Box<dyn BitV> {
fn as_slice(&self) -> &BitSlice<u8, Lsb0> {
(**self).as_slice()
@ -68,8 +73,8 @@ impl BitV for Box<dyn BitV> {
(**self).as_slice_mut()
}
fn flush(&mut self) -> anyhow::Result<()> {
(**self).flush()
async fn flush(&mut self) -> anyhow::Result<()> {
(**self).flush().await
}
fn into_dyn(self) -> Box<dyn BitV> {

View file

@ -0,0 +1,29 @@
use bitvec::{order::Lsb0, vec::BitVec};
use crate::{api::TorrentIdOrHash, bitv::BitV};
#[async_trait::async_trait]
pub trait BitVFactory: Send {
async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result<Option<Box<dyn BitV>>>;
async fn store_initial_check(
&self,
id: TorrentIdOrHash,
b: BitVec<u8, Lsb0>,
) -> anyhow::Result<Box<dyn BitV>>;
}
pub struct NonPersistentBitVFactory {}
#[async_trait::async_trait]
impl BitVFactory for NonPersistentBitVFactory {
async fn load(&self, _: TorrentIdOrHash) -> anyhow::Result<Option<Box<dyn BitV>>> {
Ok(None)
}
async fn store_initial_check(
&self,
id: TorrentIdOrHash,
b: BitVec<u8, Lsb0>,
) -> anyhow::Result<Box<dyn BitV>> {
Ok(Box::new(b))
}
}

View file

@ -41,6 +41,7 @@ macro_rules! aframe {
pub mod api;
mod api_error;
mod bitv;
mod bitv_factory;
mod chunk_tracker;
mod create_torrent_file;
mod dht_utils;

View file

@ -1,11 +1,17 @@
use std::{any::TypeId, collections::HashMap, path::PathBuf};
use crate::{
session::TorrentId, storage::filesystem::FilesystemStorageFactory,
torrent_state::ManagedTorrentHandle, ManagedTorrentState,
api::TorrentIdOrHash,
bitv::{BitV, MmapBitV},
bitv_factory::BitVFactory,
session::TorrentId,
storage::filesystem::FilesystemStorageFactory,
torrent_state::ManagedTorrentHandle,
ManagedTorrentState,
};
use anyhow::{bail, Context};
use async_trait::async_trait;
use bitvec::{order::Lsb0, vec::BitVec};
use futures::{stream::BoxStream, StreamExt};
use itertools::Itertools;
use librqbit_core::Id20;
@ -65,6 +71,34 @@ impl JsonSessionPersistenceStore {
})
}
async fn to_id(&self, id: TorrentIdOrHash) -> anyhow::Result<TorrentId> {
match id {
TorrentIdOrHash::Id(id) => Ok(id),
TorrentIdOrHash::Hash(h) => self
.db_content
.read()
.await
.torrents
.iter()
.find_map(|(k, v)| if v.info_hash() == &h { Some(*k) } else { None })
.context("not found"),
}
}
async fn to_hash(&self, id: TorrentIdOrHash) -> anyhow::Result<Id20> {
match id {
TorrentIdOrHash::Id(id) => self
.db_content
.read()
.await
.torrents
.get(&id)
.map(|v| *v.info_hash())
.context("not found"),
TorrentIdOrHash::Hash(h) => Ok(h),
}
}
async fn flush(&self) -> anyhow::Result<()> {
let tmp_filename = format!("{}.tmp", self.db_filename.to_str().unwrap());
let mut tmp = tokio::fs::OpenOptions::new()
@ -97,6 +131,10 @@ impl JsonSessionPersistenceStore {
self.output_folder.join(format!("{:?}.torrent", info_hash))
}
fn bitv_filename(&self, info_hash: &Id20) -> PathBuf {
self.output_folder.join(format!("{:?}.bitv", info_hash))
}
async fn update_db(
&self,
id: TorrentId,
@ -152,6 +190,49 @@ impl JsonSessionPersistenceStore {
}
}
#[async_trait::async_trait]
impl BitVFactory for JsonSessionPersistenceStore {
async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result<Option<Box<dyn BitV>>> {
let h = self.to_hash(id).await?;
let filename = self.bitv_filename(&h);
let f = match std::fs::OpenOptions::new().write(true).open(&filename) {
Ok(f) => f,
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => return Ok(None),
_ => return Err(e).with_context(|| format!("error opening {filename:?}")),
},
};
Ok(Some(MmapBitV::new(f)?.into_dyn()))
}
async fn store_initial_check(
&self,
id: TorrentIdOrHash,
b: BitVec<u8, Lsb0>,
) -> anyhow::Result<Box<dyn BitV>> {
let h = self.to_hash(id).await?;
let filename = self.bitv_filename(&h);
let tmp_filename = format!("{}.tmp", filename.to_str().context("bug")?);
let mut dst = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&filename)
.await
.with_context(|| format!("error opening {filename:?}"))?;
let b = b.into_vec();
tokio::io::copy(&mut &b[..], &mut dst)
.await
.context("error writing bitslice to {filename:?}")?;
tokio::fs::rename(tmp_filename, &filename).await?;
let f = std::fs::OpenOptions::new()
.write(true)
.open(&filename)
.with_context(|| format!("error opening {filename:?}"))?;
Ok(MmapBitV::new(f)?.into_dyn())
}
}
#[async_trait]
impl SessionPersistenceStore for JsonSessionPersistenceStore {
async fn next_id(&self) -> anyhow::Result<TorrentId> {

View file

@ -13,7 +13,8 @@ use librqbit_core::Id20;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::{
session::TorrentId, torrent_state::ManagedTorrentHandle, AddTorrent, AddTorrentOptions,
bitv_factory::BitVFactory, session::TorrentId, torrent_state::ManagedTorrentHandle, AddTorrent,
AddTorrentOptions,
};
#[derive(Serialize, Deserialize, Clone)]
@ -63,7 +64,7 @@ impl SerializedTorrent {
// TODO: make this info_hash first, ID-second.
#[async_trait]
pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync {
pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync + BitVFactory {
async fn next_id(&self) -> anyhow::Result<TorrentId>;
async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()>;
async fn delete(&self, id: TorrentId) -> anyhow::Result<()>;

View file

@ -1,10 +1,15 @@
use std::path::PathBuf;
use crate::{session::TorrentId, torrent_state::ManagedTorrentHandle};
use crate::{
api::TorrentIdOrHash, bitv::BitV, bitv_factory::BitVFactory, session::TorrentId,
torrent_state::ManagedTorrentHandle,
};
use anyhow::Context;
use bitvec::{order::Lsb0, vec::BitVec};
use futures::{stream::BoxStream, StreamExt};
use librqbit_core::Id20;
use sqlx::{Pool, Postgres};
use tracing::debug;
use super::{SerializedTorrent, SessionPersistenceStore};
@ -167,3 +172,19 @@ impl SessionPersistenceStore for PostgresSessionStorage {
Ok(futures::stream::iter(torrents).boxed())
}
}
#[async_trait::async_trait]
impl BitVFactory for PostgresSessionStorage {
async fn load(&self, _: TorrentIdOrHash) -> anyhow::Result<Option<Box<dyn BitV>>> {
debug!("BitVFactory not implemented for PostgresSessionStorage: fastresume not available");
Ok(None)
}
async fn store_initial_check(
&self,
_: TorrentIdOrHash,
b: BitVec<u8, Lsb0>,
) -> anyhow::Result<Box<dyn BitV>> {
Ok(b.into_dyn())
}
}