Tweak everything for APIs to support everything while not changing too much

This commit is contained in:
Igor Katson 2024-04-30 22:47:23 +01:00
parent ebd4731c09
commit fd30ad9cbf
6 changed files with 70 additions and 110 deletions

View file

@ -203,7 +203,12 @@ impl Api {
id: Some(id),
details,
seen_peers: None,
output_folder: "".to_owned(),
output_folder: handle
.info()
.options
.output_folder
.to_string_lossy()
.into_owned(),
}
}
};

View file

@ -1,4 +1,5 @@
use std::{
any::TypeId,
borrow::Cow,
collections::{HashMap, HashSet},
io::{BufReader, BufWriter, Read},
@ -15,7 +16,7 @@ use crate::{
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
spawn_utils::BlockingSpawner,
storage::StorageFactory,
storage::{filesystem::FilesystemStorageFactory, StorageFactory},
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
@ -96,11 +97,12 @@ impl SessionDatabase {
torrents: self
.torrents
.iter()
.filter_map(|(id, torrent)| {
// This will skip serializing torrents that don't have an output folder.
// This is for backwards compat not to change serialization format.
let output_folder = torrent.storage_factory.output_folder()?;
Some((
// We don't support serializing / deserializing of other storage types.
.filter(|(_, torrent)| {
torrent.storage_factory.type_id() == TypeId::of::<FilesystemStorageFactory>()
})
.map(|(id, torrent)| {
(
*id,
SerializedTorrent {
trackers: torrent
@ -114,9 +116,9 @@ impl SessionDatabase {
only_files: torrent.only_files().clone(),
is_paused: torrent
.with_state(|s| matches!(s, ManagedTorrentState::Paused(_))),
output_folder: output_folder.to_owned(),
output_folder: torrent.info().options.output_folder.clone(),
},
))
)
})
.collect(),
}
@ -435,10 +437,12 @@ pub(crate) struct CheckedIncomingConnection {
}
impl Session {
/// Create a new session. The passed in folder will be used as a default unless overriden per torrent.
/// Create a new session with default options.
/// The passed in folder will be used as a default unless overriden per torrent.
/// It will run a DHT server/client, a TCP listener and .
#[inline(never)]
pub fn new(output_folder: PathBuf) -> BoxFuture<'static, anyhow::Result<Arc<Self>>> {
Self::new_with_opts(output_folder, SessionOptions::default())
pub fn new(default_output_folder: PathBuf) -> BoxFuture<'static, anyhow::Result<Arc<Self>>> {
Self::new_with_opts(default_output_folder, SessionOptions::default())
}
pub fn default_persistence_filename() -> anyhow::Result<PathBuf> {
@ -453,7 +457,7 @@ impl Session {
/// Create a new session with options.
#[inline(never)]
pub fn new_with_opts(
output_folder: PathBuf,
default_output_folder: PathBuf,
mut opts: SessionOptions,
) -> BoxFuture<'static, anyhow::Result<Arc<Self>>> {
async move {
@ -502,7 +506,7 @@ impl Session {
dht,
peer_opts,
spawner,
output_folder,
output_folder: default_output_folder,
db: RwLock::new(Default::default()),
_cancellation_token_drop_guard: token.clone().drop_guard(),
cancellation_token: token,
@ -991,10 +995,17 @@ impl Session {
.unwrap_or_default(),
),
(Some(o), None) => PathBuf::from(o),
(Some(_), Some(_)) => bail!("you can't provide both output_folder and sub_folder"),
(Some(_), Some(_)) => {
bail!("you can't provide both output_folder and sub_folder")
}
(None, Some(s)) => self.output_folder.join(s),
};
let storage_factory = opts
.storage_factory
.take()
.unwrap_or_else(|| Box::<FilesystemStorageFactory>::default());
if opts.list_only {
return Ok(AddTorrentResponse::ListOnly(ListOnlyResponse {
info_hash,
@ -1005,9 +1016,10 @@ impl Session {
}));
}
let mut builder = ManagedTorrentBuilder::new(info, info_hash, output_folder.clone());
let mut builder =
ManagedTorrentBuilder::new(info, info_hash, output_folder, storage_factory);
builder
.overwrite(opts.overwrite)
.allow_overwrite(opts.overwrite)
.spawner(self.spawner)
.trackers(trackers)
.peer_id(self.peer_id);
@ -1029,10 +1041,6 @@ impl Session {
builder.peer_read_write_timeout(t);
}
if let Some(storage_factory) = opts.storage_factory.take() {
builder.storage_factory(storage_factory);
}
let (managed_torrent, id) = {
let mut g = self.db.write();
if let Some((id, handle)) = g.torrents.iter().find(|(_, t)| t.info_hash() == info_hash)

View file

@ -37,8 +37,6 @@ struct InMemoryExampleStorage {
lengths: Lengths,
file_infos: FileInfos,
map: RwLock<HashMap<ValidPieceIndex, InMemoryPiece>>,
// TODO: chunk tracker - rename to PieceTracker and extract chunks out of it (only keep pieces)
// this sucker here would track chunks, and the storage above too.
}
impl InMemoryExampleStorage {

View file

@ -14,16 +14,15 @@ use self::opened_file::OpenedFile;
use super::{StorageFactory, TorrentStorage};
pub struct FilesystemStorageFactory {
pub output_folder: PathBuf,
pub allow_overwrite: bool,
}
#[derive(Default)]
pub struct FilesystemStorageFactory {}
impl StorageFactory for FilesystemStorageFactory {
fn init_storage(&self, meta: &ManagedTorrentInfo) -> anyhow::Result<Box<dyn TorrentStorage>> {
let mut files = Vec::<OpenedFile>::new();
let output_folder = &meta.options.output_folder;
for file_details in meta.info.iter_file_details(&meta.lengths)? {
let mut full_path = self.output_folder.clone();
let mut full_path = output_folder.clone();
let relative_path = file_details
.filename
.to_pathbuf()
@ -31,7 +30,7 @@ impl StorageFactory for FilesystemStorageFactory {
full_path.push(relative_path);
std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?;
let file = if self.allow_overwrite {
let file = if meta.options.allow_overwrite {
OpenOptions::new()
.create(true)
.truncate(false)
@ -45,20 +44,21 @@ impl StorageFactory for FilesystemStorageFactory {
.create_new(true)
.write(true)
.open(&full_path)
.with_context(|| format!("error creating {:?}", &full_path))?;
.with_context(|| {
format!(
"error creating a new file (because allow_overwrite = false) {:?}",
&full_path
)
})?;
OpenOptions::new().read(true).write(true).open(&full_path)?
};
files.push(OpenedFile::new(file));
}
Ok(Box::new(FilesystemStorage {
output_folder: self.output_folder.clone(),
output_folder: output_folder.clone(),
opened_files: files,
}))
}
fn output_folder(&self) -> Option<&Path> {
Some(&self.output_folder)
}
}
pub struct FilesystemStorage {

View file

@ -1,16 +1,12 @@
pub mod example;
pub mod filesystem;
use std::path::Path;
use std::{any::Any, path::Path};
use crate::torrent_state::ManagedTorrentInfo;
pub trait StorageFactory: Send + Sync {
pub trait StorageFactory: Send + Sync + Any {
fn init_storage(&self, info: &ManagedTorrentInfo) -> anyhow::Result<Box<dyn TorrentStorage>>;
fn output_folder(&self) -> Option<&Path> {
None
}
}
pub trait TorrentStorage: Send + Sync {
@ -23,8 +19,4 @@ pub trait TorrentStorage: Send + Sync {
fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()>;
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>>;
fn output_folder(&self) -> Option<&Path> {
None
}
}

View file

@ -6,7 +6,6 @@ mod streaming;
pub mod utils;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
@ -37,7 +36,6 @@ use tracing::warn;
use crate::chunk_tracker::ChunkTracker;
use crate::file_info::FileInfo;
use crate::spawn_utils::BlockingSpawner;
use crate::storage::filesystem::FilesystemStorageFactory;
use crate::storage::StorageFactory;
use crate::torrent_state::stats::LiveStats;
use crate::type_aliases::FileInfos;
@ -92,6 +90,8 @@ pub(crate) struct ManagedTorrentOptions {
pub force_tracker_interval: Option<Duration>,
pub peer_connect_timeout: Option<Duration>,
pub peer_read_write_timeout: Option<Duration>,
pub allow_overwrite: bool,
pub output_folder: PathBuf,
}
pub struct ManagedTorrentInfo {
@ -492,32 +492,9 @@ impl ManagedTorrent {
}
}
enum ManagedTorrentBuilderStorage {
Filesystem {
overwrite: bool,
output_folder: PathBuf,
},
Custom(Box<dyn StorageFactory>),
}
impl ManagedTorrentBuilderStorage {
fn build(self) -> anyhow::Result<Box<dyn StorageFactory>> {
let s = match self {
ManagedTorrentBuilderStorage::Filesystem {
overwrite,
output_folder,
} => Box::new(FilesystemStorageFactory {
output_folder,
allow_overwrite: overwrite,
}),
ManagedTorrentBuilderStorage::Custom(s) => s,
};
Ok(s)
}
}
pub struct ManagedTorrentBuilder {
pub(crate) struct ManagedTorrentBuilder {
info: TorrentMetaV1Info<ByteBufOwned>,
output_folder: PathBuf,
info_hash: Id20,
force_tracker_interval: Option<Duration>,
peer_connect_timeout: Option<Duration>,
@ -526,15 +503,16 @@ pub struct ManagedTorrentBuilder {
trackers: Vec<String>,
peer_id: Option<Id20>,
spawner: Option<BlockingSpawner>,
deferred_build_errors: Vec<String>,
storage: Option<ManagedTorrentBuilderStorage>,
allow_overwrite: bool,
storage_factory: Box<dyn StorageFactory>,
}
impl ManagedTorrentBuilder {
pub fn new<P: AsRef<Path>>(
pub fn new(
info: TorrentMetaV1Info<ByteBufOwned>,
info_hash: Id20,
output_folder: P,
output_folder: PathBuf,
storage_factory: Box<dyn StorageFactory>,
) -> Self {
Self {
info,
@ -544,14 +522,11 @@ impl ManagedTorrentBuilder {
peer_connect_timeout: None,
peer_read_write_timeout: None,
only_files: None,
deferred_build_errors: Default::default(),
trackers: Default::default(),
peer_id: None,
// default is filesystem to keep the old API unchanged for now
storage: Some(ManagedTorrentBuilderStorage::Filesystem {
overwrite: false,
output_folder: output_folder.as_ref().to_owned(),
}),
allow_overwrite: false,
output_folder,
storage_factory,
}
}
@ -565,29 +540,12 @@ impl ManagedTorrentBuilder {
self
}
pub fn overwrite(&mut self, new_overwrite: bool) -> &mut Self {
match self.storage.as_mut() {
Some(ManagedTorrentBuilderStorage::Filesystem { overwrite, .. }) => {
*overwrite = new_overwrite
}
_ => self
.deferred_build_errors
.push("overwrite() called when storage factory was not filesystem".to_owned()),
}
self
}
pub fn storage_factory(&mut self, factory: Box<dyn StorageFactory>) -> &mut Self {
self.storage = Some(ManagedTorrentBuilderStorage::Custom(factory));
self
}
pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self {
self.force_tracker_interval = Some(force_tracker_interval);
self
}
pub(crate) fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self {
pub fn spawner(&mut self, spawner: BlockingSpawner) -> &mut Self {
self.spawner = Some(spawner);
self
}
@ -597,6 +555,11 @@ impl ManagedTorrentBuilder {
self
}
pub fn allow_overwrite(&mut self, value: bool) -> &mut Self {
self.allow_overwrite = value;
self
}
pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self {
self.peer_connect_timeout = Some(timeout);
self
@ -607,10 +570,7 @@ impl ManagedTorrentBuilder {
self
}
pub(crate) fn build(self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> {
if !self.deferred_build_errors.is_empty() {
anyhow::bail!("Errors: {}", self.deferred_build_errors.join(";"))
}
pub fn build(self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> {
let lengths = Lengths::from_torrent(&self.info)?;
let file_infos = self
.info
@ -625,11 +585,6 @@ impl ManagedTorrentBuilder {
})
.collect::<anyhow::Result<Vec<FileInfo>>>()?;
let storage_factory = self
.storage
.context("by the time build() is called you must set storage factory")?
.build()?;
let info = Arc::new(ManagedTorrentInfo {
span,
file_infos,
@ -643,6 +598,8 @@ impl ManagedTorrentBuilder {
force_tracker_interval: self.force_tracker_interval,
peer_connect_timeout: self.peer_connect_timeout,
peer_read_write_timeout: self.peer_read_write_timeout,
allow_overwrite: self.allow_overwrite,
output_folder: self.output_folder,
},
});
@ -656,7 +613,7 @@ impl ManagedTorrentBuilder {
only_files: self.only_files,
}),
state_change_notify: Notify::new(),
storage_factory,
storage_factory: self.storage_factory,
info,
}))
}