From 42bbf84ea50d3cfa0412be63fc247d17ca0c7671 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 30 Apr 2024 08:55:00 +0100 Subject: [PATCH] Fixing up initialization to allow passing in custom storages --- crates/librqbit/src/api.rs | 5 +- crates/librqbit/src/file_info.rs | 2 +- crates/librqbit/src/file_ops.rs | 2 +- crates/librqbit/src/session.rs | 4 +- crates/librqbit/src/storage.rs | 67 +++++++++++++++--- .../src/torrent_state/initializing.rs | 67 +++--------------- crates/librqbit/src/torrent_state/live/mod.rs | 2 +- crates/librqbit/src/torrent_state/mod.rs | 69 +++++++++++++++---- 8 files changed, 128 insertions(+), 90 deletions(-) diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 436ccc5..79ea3ff 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -173,10 +173,9 @@ impl Api { { AddTorrentResponse::AlreadyManaged(id, managed) => { return Err(anyhow::anyhow!( - "{:?} is already managed, id={}, downloaded to {:?}", + "{:?} is already managed, id={}", managed.info_hash(), id, - &managed.info().out_dir )) .with_error_status_code(StatusCode::CONFLICT); } @@ -203,8 +202,8 @@ impl Api { ApiAddTorrentResponse { id: Some(id), details, - output_folder: handle.info().out_dir.to_string_lossy().into_owned(), seen_peers: None, + output_folder: "".to_owned(), } } }; diff --git a/crates/librqbit/src/file_info.rs b/crates/librqbit/src/file_info.rs index babc77f..0d122bc 100644 --- a/crates/librqbit/src/file_info.rs +++ b/crates/librqbit/src/file_info.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; #[derive(Debug, Clone)] pub struct FileInfo { - pub filename: PathBuf, + pub relative_filename: PathBuf, pub offset_in_torrent: u64, pub piece_range: std::ops::Range, pub len: u64, diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index 3eae81e..e8a200d 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -186,7 +186,7 @@ impl<'a> FileOps<'a> { ) { debug!( "error reading from file {} ({:?}) at {}: {:#}", - current_file.index, current_file.fi.filename, pos, &err + current_file.index, current_file.fi.relative_filename, pos, &err ); current_file.is_broken = true; some_files_broken = true; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 997a6a0..25ffad8 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1093,8 +1093,8 @@ impl Session { } (Ok(Some(paused)), true) => { for (id, fi) in removed.info().file_infos.iter().enumerate() { - if let Err(e) = paused.files.remove_file(id, &fi.filename) { - warn!(?fi.filename, error=?e, "could not delete file"); + if let Err(e) = paused.files.remove_file(id, &fi.relative_filename) { + warn!(?fi.relative_filename, error=?e, "could not delete file"); } } } diff --git a/crates/librqbit/src/storage.rs b/crates/librqbit/src/storage.rs index 0593cb7..5c3c1de 100644 --- a/crates/librqbit/src/storage.rs +++ b/crates/librqbit/src/storage.rs @@ -1,14 +1,19 @@ use std::{ collections::HashMap, + fs::OpenOptions, io::{Read, Seek, SeekFrom, Write}, - path::Path, + path::{Path, PathBuf}, }; use anyhow::Context; use librqbit_core::lengths::{Lengths, ValidPieceIndex}; use parking_lot::RwLock; -use crate::{opened_file::OpenedFile, type_aliases::FileInfos}; +use crate::{opened_file::OpenedFile, torrent_state::ManagedTorrentInfo, type_aliases::FileInfos}; + +pub trait StorageFactory: Send + Sync { + fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result>; +} pub trait TorrentStorage: Send + Sync { fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()>; @@ -22,16 +27,54 @@ pub trait TorrentStorage: Send + Sync { fn take(&self) -> anyhow::Result>; } -pub struct FilesystemStorage { - opened_files: Vec, +pub struct FilesystemStorageFactory { + pub output_folder: PathBuf, + pub allow_overwrite: bool, } -impl FilesystemStorage { - pub fn new(opened_files: Vec) -> Self { - Self { opened_files } +impl StorageFactory for FilesystemStorageFactory { + fn init_storage(&self, meta: &ManagedTorrentInfo) -> anyhow::Result> { + let mut files = Vec::::new(); + for file_details in meta.info.iter_file_details(&meta.lengths)? { + let mut full_path = self.output_folder.clone(); + let relative_path = file_details + .filename + .to_pathbuf() + .context("error converting file to path")?; + full_path.push(relative_path); + + std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; + let file = if self.allow_overwrite { + OpenOptions::new() + .create(true) + .truncate(false) + .read(true) + .write(true) + .open(&full_path) + .with_context(|| format!("error opening {full_path:?} in read/write mode"))? + } else { + // create_new does not seem to work with read(true), so calling this twice. + OpenOptions::new() + .create_new(true) + .write(true) + .open(&full_path) + .with_context(|| format!("error creating {:?}", &full_path))?; + OpenOptions::new().read(true).write(true).open(&full_path)? + }; + files.push(OpenedFile::new(file)); + } + Ok(Box::new(FilesystemStorage { + output_folder: self.output_folder.clone(), + opened_files: files, + })) } } +pub struct FilesystemStorage { + output_folder: PathBuf, + opened_files: Vec, +} + impl TorrentStorage for FilesystemStorage { fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { let mut g = self @@ -56,7 +99,7 @@ impl TorrentStorage for FilesystemStorage { } fn remove_file(&self, _file_id: usize, filename: &Path) -> anyhow::Result<()> { - Ok(std::fs::remove_file(filename)?) + Ok(std::fs::remove_file(self.output_folder.join(filename))?) } fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> { @@ -64,12 +107,14 @@ impl TorrentStorage for FilesystemStorage { } fn take(&self) -> anyhow::Result> { - Ok(Box::new(Self::new( - self.opened_files + Ok(Box::new(Self { + opened_files: self + .opened_files .iter() .map(|f| f.take_clone()) .collect::>>()?, - ))) + output_folder: self.output_folder.clone(), + })) } } diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 746e8ce..c26a432 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -1,5 +1,4 @@ use std::{ - fs::OpenOptions, sync::{atomic::AtomicU64, Arc}, time::Instant, }; @@ -9,12 +8,7 @@ use anyhow::Context; use size_format::SizeFormatterBinary as SF; use tracing::{debug, info, warn}; -use crate::{ - chunk_tracker::ChunkTracker, - file_ops::FileOps, - opened_file::OpenedFile, - storage::{FilesystemStorage, InMemoryGarbageCollectingStorage, TorrentStorage}, -}; +use crate::{chunk_tracker::ChunkTracker, file_ops::FileOps, storage::StorageFactory}; use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; @@ -38,56 +32,11 @@ impl TorrentStateInitializing { .load(std::sync::atomic::Ordering::Relaxed) } - pub async fn check(&self) -> anyhow::Result { - // Return in-memory store - let store = - InMemoryGarbageCollectingStorage::new(self.meta.lengths, self.meta.file_infos.clone())?; - let ct = ChunkTracker::new_empty(self.meta.lengths, &self.meta.file_infos)?; - - Ok(TorrentStatePaused { - info: self.meta.clone(), - files: Box::new(store), - chunk_tracker: ct, - streams: Arc::new(Default::default()), - }) - - // self.check_disk().await - } - - pub async fn check_disk(&self) -> anyhow::Result { - let mut files = Vec::::new(); - for file_details in self.meta.info.iter_file_details(&self.meta.lengths)? { - let mut full_path = self.meta.out_dir.clone(); - let relative_path = file_details - .filename - .to_pathbuf() - .context("error converting file to path")?; - full_path.push(relative_path); - - std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?; - let file = if self.meta.options.overwrite { - OpenOptions::new() - .create(true) - .truncate(false) - .read(true) - .write(true) - .open(&full_path) - .with_context(|| format!("error opening {full_path:?} in read/write mode"))? - } else { - // TODO: create_new does not seem to work with read(true), so calling this twice. - OpenOptions::new() - .create_new(true) - .write(true) - .open(&full_path) - .with_context(|| format!("error creating {:?}", &full_path))?; - OpenOptions::new().read(true).write(true).open(&full_path)? - }; - files.push(OpenedFile::new(file)); - } - let files: Box = Box::new(FilesystemStorage::new(files)); - - debug!("computed lengths: {:?}", &self.meta.lengths); - + pub async fn check( + &self, + storage_factory: &dyn StorageFactory, + ) -> anyhow::Result { + let files = storage_factory.init_storage(&self.meta)?; info!("Doing initial checksum validation, this might take a while..."); let initial_check_results = self.meta.spawner.spawn_block_in_place(|| { FileOps::new( @@ -119,12 +68,12 @@ impl TorrentStateInitializing { if let Err(err) = files.ensure_file_length(idx, fi.len) { warn!( "Error setting length for file {:?} to {}: {:#?}", - fi.filename, fi.len, err + fi.relative_filename, fi.len, err ); } else { debug!( "Set length for file {:?} to {} in {:?}", - fi.filename, + fi.relative_filename, SF::new(fi.len), now.elapsed() ); diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index fe61184..1001451 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -203,7 +203,7 @@ impl TorrentStateLive { .info .file_infos .get(*id) - .map(|fi| fi.filename.as_path()) + .map(|fi| fi.relative_filename.as_path()) }); pri }; diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 7340829..2990b73 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -36,6 +36,8 @@ use tracing::warn; use crate::chunk_tracker::ChunkTracker; use crate::file_info::FileInfo; use crate::spawn_utils::BlockingSpawner; +use crate::storage::FilesystemStorageFactory; +use crate::storage::StorageFactory; use crate::torrent_state::stats::LiveStats; use crate::type_aliases::FileInfos; use crate::type_aliases::PeerStream; @@ -89,13 +91,11 @@ pub(crate) struct ManagedTorrentOptions { pub force_tracker_interval: Option, pub peer_connect_timeout: Option, pub peer_read_write_timeout: Option, - pub overwrite: bool, } pub struct ManagedTorrentInfo { pub info: TorrentMetaV1Info, pub info_hash: Id20, - pub out_dir: PathBuf, pub(crate) spawner: BlockingSpawner, pub trackers: HashSet, pub peer_id: Id20, @@ -107,6 +107,7 @@ pub struct ManagedTorrentInfo { pub struct ManagedTorrent { pub info: Arc, + storage_factory: Box, locked: RwLock, } @@ -267,7 +268,7 @@ impl ManagedTorrent { error_span!(parent: span.clone(), "initialize_and_start"), token.clone(), async move { - match init.check().await { + match init.check(&*self.storage_factory).await { Ok(paused) => { let mut g = t.locked.write(); if let ManagedTorrentState::Initializing(_) = &g.state { @@ -461,18 +462,42 @@ impl ManagedTorrent { } } +enum ManagedTorrentBuilderStorage { + Filesystem { + overwrite: bool, + output_folder: PathBuf, + }, + Custom(Box), +} + +impl ManagedTorrentBuilderStorage { + fn build(self) -> anyhow::Result> { + let s = match self { + ManagedTorrentBuilderStorage::Filesystem { + overwrite, + output_folder, + } => Box::new(FilesystemStorageFactory { + output_folder, + allow_overwrite: overwrite, + }), + ManagedTorrentBuilderStorage::Custom(s) => s, + }; + Ok(s) + } +} + pub struct ManagedTorrentBuilder { info: TorrentMetaV1Info, info_hash: Id20, - output_folder: PathBuf, force_tracker_interval: Option, peer_connect_timeout: Option, peer_read_write_timeout: Option, only_files: Option>, trackers: Vec, peer_id: Option, - overwrite: bool, spawner: Option, + deferred_build_errors: Vec, + storage: Option, } impl ManagedTorrentBuilder { @@ -484,15 +509,19 @@ impl ManagedTorrentBuilder { Self { info, info_hash, - output_folder: output_folder.as_ref().into(), spawner: None, force_tracker_interval: None, peer_connect_timeout: None, peer_read_write_timeout: None, only_files: None, + deferred_build_errors: Default::default(), trackers: Default::default(), peer_id: None, - overwrite: false, + // default is filesystem to keep the old API unchanged for now + storage: Some(ManagedTorrentBuilderStorage::Filesystem { + overwrite: false, + output_folder: output_folder.as_ref().to_owned(), + }), } } @@ -506,8 +535,15 @@ impl ManagedTorrentBuilder { self } - pub fn overwrite(&mut self, overwrite: bool) -> &mut Self { - self.overwrite = overwrite; + pub fn overwrite(&mut self, new_overwrite: bool) -> &mut Self { + match self.storage.as_mut() { + Some(ManagedTorrentBuilderStorage::Filesystem { overwrite, .. }) => { + *overwrite = new_overwrite + } + _ => self + .deferred_build_errors + .push("overwrite() called when storage factory was not filesystem".to_owned()), + } self } @@ -537,25 +573,33 @@ impl ManagedTorrentBuilder { } pub(crate) fn build(self, span: tracing::Span) -> anyhow::Result { + if !self.deferred_build_errors.is_empty() { + anyhow::bail!("Errors: {}", self.deferred_build_errors.join(";")) + } let lengths = Lengths::from_torrent(&self.info)?; let file_infos = self .info .iter_file_details(&lengths)? .map(|fd| { Ok::<_, anyhow::Error>(FileInfo { - filename: self.output_folder.join(fd.filename.to_pathbuf()?), + relative_filename: fd.filename.to_pathbuf()?, offset_in_torrent: fd.offset, piece_range: fd.pieces, len: fd.len, }) }) .collect::>>()?; + + let storage_factory = self + .storage + .context("by the time build() is called you must set storage factory")? + .build()?; + let info = Arc::new(ManagedTorrentInfo { span, file_infos, info: self.info, info_hash: self.info_hash, - out_dir: self.output_folder, trackers: self.trackers.into_iter().collect(), spawner: self.spawner.unwrap_or_default(), peer_id: self.peer_id.unwrap_or_else(generate_peer_id), @@ -564,9 +608,9 @@ impl ManagedTorrentBuilder { force_tracker_interval: self.force_tracker_interval, peer_connect_timeout: self.peer_connect_timeout, peer_read_write_timeout: self.peer_read_write_timeout, - overwrite: self.overwrite, }, }); + let initializing = Arc::new(TorrentStateInitializing::new( info.clone(), self.only_files.clone(), @@ -576,6 +620,7 @@ impl ManagedTorrentBuilder { state: ManagedTorrentState::Initializing(initializing), only_files: self.only_files, }), + storage_factory, info, })) }