All storage factories are generic now

This commit is contained in:
Igor Katson 2024-05-01 22:14:34 +01:00
parent e4adfa569a
commit 5027d8ccd1
12 changed files with 88 additions and 52 deletions

View file

@ -1,6 +1,9 @@
use std::time::Duration;
use librqbit::{storage::mmap::MmapStorageFactory, SessionOptions};
use librqbit::{
storage::{mmap::MmapStorageFactory, StorageFactoryExt},
SessionOptions,
};
use tracing::info;
#[tokio::main]
@ -28,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
include_bytes!("../resources/ubuntu-21.04-live-server-amd64.iso.torrent").into(),
),
Some(librqbit::AddTorrentOptions {
storage_factory: Some(Box::new(MmapStorageFactory {})),
storage_factory: Some(MmapStorageFactory::default().boxed()),
paused: false,
..Default::default()
}),

View file

@ -16,7 +16,7 @@ use crate::{
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
spawn_utils::BlockingSpawner,
storage::{filesystem::FilesystemStorageFactory, StorageFactory},
storage::{filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt},
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
@ -305,7 +305,7 @@ pub struct AddTorrentOptions {
/// This is used to restore the session from serialized state.
pub preferred_id: Option<usize>,
pub storage_factory: Option<Box<dyn StorageFactory>>,
pub storage_factory: Option<BoxStorageFactory>,
}
pub struct ListOnlyResponse {
@ -1004,7 +1004,7 @@ impl Session {
let storage_factory = opts
.storage_factory
.take()
.unwrap_or_else(|| Box::<FilesystemStorageFactory>::default());
.unwrap_or_else(|| FilesystemStorageFactory::default().boxed());
if opts.list_only {
return Ok(AddTorrentResponse::ListOnly(ListOnlyResponse {

View file

@ -19,21 +19,21 @@ impl InMemoryPiece {
}
}
#[derive(Default)]
pub struct InMemoryExampleStorageFactory {}
impl StorageFactory for InMemoryExampleStorageFactory {
type Storage = InMemoryExampleStorage;
fn init_storage(
&self,
info: &crate::torrent_state::ManagedTorrentInfo,
) -> anyhow::Result<Box<dyn TorrentStorage>> {
Ok(Box::new(InMemoryExampleStorage::new(
info.lengths,
info.file_infos.clone(),
)?))
) -> anyhow::Result<InMemoryExampleStorage> {
InMemoryExampleStorage::new(info.lengths, info.file_infos.clone())
}
}
struct InMemoryExampleStorage {
pub struct InMemoryExampleStorage {
lengths: Lengths,
file_infos: FileInfos,
map: RwLock<HashMap<ValidPieceIndex, InMemoryPiece>>,

View file

@ -18,7 +18,9 @@ use super::{StorageFactory, TorrentStorage};
pub struct FilesystemStorageFactory {}
impl StorageFactory for FilesystemStorageFactory {
fn init_storage(&self, meta: &ManagedTorrentInfo) -> anyhow::Result<Box<dyn TorrentStorage>> {
type Storage = FilesystemStorage;
fn init_storage(&self, meta: &ManagedTorrentInfo) -> anyhow::Result<FilesystemStorage> {
let mut files = Vec::<OpenedFile>::new();
let output_folder = &meta.options.output_folder;
for file_details in meta.info.iter_file_details(&meta.lengths)? {
@ -54,10 +56,10 @@ impl StorageFactory for FilesystemStorageFactory {
};
files.push(OpenedFile::new(file));
}
Ok(Box::new(FilesystemStorage {
Ok(FilesystemStorage {
output_folder: output_folder.clone(),
opened_files: files,
}))
})
}
}

View file

@ -6,26 +6,26 @@ use crate::{FileInfos, ManagedTorrentInfo};
use super::{StorageFactory, TorrentStorage};
#[derive(Default)]
pub struct MmapStorageFactory {}
struct MmapStorage {
pub struct MmapStorage {
mmap: RwLock<MmapMut>,
file_infos: FileInfos,
}
impl StorageFactory for MmapStorageFactory {
fn init_storage(
&self,
info: &ManagedTorrentInfo,
) -> anyhow::Result<Box<dyn crate::storage::TorrentStorage>> {
Ok(Box::new(MmapStorage {
type Storage = MmapStorage;
fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
Ok(MmapStorage {
mmap: RwLock::new(
MmapOptions::new()
.len(info.lengths.total_length().try_into()?)
.map_anon()?,
),
file_infos: info.file_infos.clone(),
}))
})
}
}

View file

@ -9,11 +9,40 @@ use std::{any::Any, path::Path};
use crate::torrent_state::ManagedTorrentInfo;
pub trait StorageFactory: Send + Sync + Any {
fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result<Box<dyn TorrentStorage>>;
type Storage: TorrentStorage;
fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result<Self::Storage>;
}
pub type BoxStorageFactory = Box<dyn StorageFactory<Storage = Box<dyn TorrentStorage>>>;
pub trait StorageFactoryExt {
fn boxed(self) -> BoxStorageFactory;
}
impl<SF: StorageFactory> StorageFactoryExt for SF {
fn boxed(self) -> BoxStorageFactory {
struct BoxFactory<SF> {
sf: SF,
}
impl<SF: StorageFactory> StorageFactory for BoxFactory<SF> {
type Storage = Box<dyn TorrentStorage>;
fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
let s = self.sf.init_storage(info)?;
Ok(Box::new(s))
}
}
Box::new(BoxFactory { sf: self })
}
}
impl<U: StorageFactory + ?Sized> StorageFactory for Box<U> {
fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result<Box<dyn TorrentStorage>> {
type Storage = U::Storage;
fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result<U::Storage> {
(**self).init_storage(info)
}
}

View file

@ -17,17 +17,16 @@ impl<U: StorageFactory> SlowStorageFactory<U> {
}
impl<U: StorageFactory> StorageFactory for SlowStorageFactory<U> {
fn init_storage(
&self,
info: &crate::ManagedTorrentInfo,
) -> anyhow::Result<Box<dyn TorrentStorage>> {
Ok(Box::new(SlowStorage {
type Storage = SlowStorage<U::Storage>;
fn init_storage(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
Ok(SlowStorage {
underlying: self.underlying_factory.init_storage(info)?,
}))
})
}
}
struct SlowStorage<U> {
pub struct SlowStorage<U> {
underlying: U,
}

View file

@ -15,18 +15,17 @@ impl<U> TimingStorageFactory<U> {
}
impl<U: StorageFactory> StorageFactory for TimingStorageFactory<U> {
fn init_storage(
&self,
info: &crate::ManagedTorrentInfo,
) -> anyhow::Result<Box<dyn TorrentStorage>> {
Ok(Box::new(TimingStorage {
type Storage = TimingStorage<U::Storage>;
fn init_storage(&self, info: &crate::ManagedTorrentInfo) -> anyhow::Result<Self::Storage> {
Ok(TimingStorage {
name: self.name.clone(),
underlying: self.underlying_factory.init_storage(info)?,
}))
})
}
}
struct TimingStorage<U> {
pub struct TimingStorage<U> {
name: String,
underlying: U,
}

View file

@ -5,8 +5,9 @@ use tokio::{io::AsyncReadExt, time::timeout};
use tracing::info;
use crate::{
create_torrent, storage::example::InMemoryExampleStorageFactory, AddTorrent,
CreateTorrentOptions, Session,
create_torrent,
storage::{example::InMemoryExampleStorageFactory, StorageFactoryExt},
AddTorrent, CreateTorrentOptions, Session,
};
use super::test_util::create_default_random_dir_with_torrents;
@ -85,7 +86,7 @@ async fn e2e_stream() -> anyhow::Result<()> {
AddTorrent::from_bytes(torrent.as_bytes()?),
Some(crate::AddTorrentOptions {
paused: false,
storage_factory: Some(Box::new(InMemoryExampleStorageFactory {})),
storage_factory: Some(InMemoryExampleStorageFactory::default().boxed()),
initial_peers: Some(vec![peer]),
..Default::default()
}),

View file

@ -8,7 +8,11 @@ use anyhow::Context;
use size_format::SizeFormatterBinary as SF;
use tracing::{debug, info, warn};
use crate::{chunk_tracker::ChunkTracker, file_ops::FileOps, storage::StorageFactory};
use crate::{
chunk_tracker::ChunkTracker,
file_ops::FileOps,
storage::{BoxStorageFactory, StorageFactory},
};
use super::{paused::TorrentStatePaused, ManagedTorrentInfo};
@ -34,7 +38,7 @@ impl TorrentStateInitializing {
pub async fn check(
&self,
storage_factory: &dyn StorageFactory,
storage_factory: &BoxStorageFactory,
) -> anyhow::Result<TorrentStatePaused> {
let files = storage_factory.init_storage(&self.meta)?;
info!("Doing initial checksum validation, this might take a while...");

View file

@ -36,7 +36,7 @@ use tracing::warn;
use crate::chunk_tracker::ChunkTracker;
use crate::file_info::FileInfo;
use crate::spawn_utils::BlockingSpawner;
use crate::storage::StorageFactory;
use crate::storage::BoxStorageFactory;
use crate::torrent_state::stats::LiveStats;
use crate::type_aliases::FileInfos;
use crate::type_aliases::PeerStream;
@ -108,7 +108,7 @@ pub struct ManagedTorrentInfo {
pub struct ManagedTorrent {
pub info: Arc<ManagedTorrentInfo>,
pub(crate) storage_factory: Box<dyn StorageFactory>,
pub(crate) storage_factory: BoxStorageFactory,
state_change_notify: Notify,
locked: RwLock<ManagedTorrentLocked>,
@ -273,7 +273,7 @@ impl ManagedTorrent {
error_span!(parent: span.clone(), "initialize_and_start"),
token.clone(),
async move {
match init.check(&*t.storage_factory).await {
match init.check(&t.storage_factory).await {
Ok(paused) => {
let mut g = t.locked.write();
if let ManagedTorrentState::Initializing(_) = &g.state {
@ -504,7 +504,7 @@ pub(crate) struct ManagedTorrentBuilder {
peer_id: Option<Id20>,
spawner: Option<BlockingSpawner>,
allow_overwrite: bool,
storage_factory: Box<dyn StorageFactory>,
storage_factory: BoxStorageFactory,
}
impl ManagedTorrentBuilder {
@ -512,7 +512,7 @@ impl ManagedTorrentBuilder {
info: TorrentMetaV1Info<ByteBufOwned>,
info_hash: Id20,
output_folder: PathBuf,
storage_factory: Box<dyn StorageFactory>,
storage_factory: BoxStorageFactory,
) -> Self {
Self {
info,

View file

@ -8,8 +8,7 @@ use librqbit::{
http_api::{HttpApi, HttpApiOptions},
http_api_client, librqbit_spawn,
storage::{
filesystem::FilesystemStorageFactory, slow::SlowStorageFactory,
timing::TimingStorageFactory,
filesystem::FilesystemStorageFactory, timing::TimingStorageFactory, StorageFactoryExt,
},
tracing_subscriber_config_utils::{init_logging, InitLoggingOptions},
AddTorrent, AddTorrentOptions, AddTorrentResponse, Api, ListOnlyResponse,
@ -382,9 +381,9 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
initial_peers: download_opts.initial_peers.clone().map(|p| p.0),
disable_trackers: download_opts.disable_trackers,
storage_factory: Some({
let sf = Box::<FilesystemStorageFactory>::default();
// let sf = Box::new(SlowStorageFactory::new(sf));
Box::new(TimingStorageFactory::new("fs".to_owned(), sf))
let sf = FilesystemStorageFactory::default();
// let sf = SlowStorageFactory::new(sf);
TimingStorageFactory::new("fs".to_owned(), sf).boxed()
}),
..Default::default()
};