From 6c3dfbc52f06f784b35b6e8f20a86a2965c47de6 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 30 Apr 2024 09:28:39 +0100 Subject: [PATCH] Add storage example --- crates/librqbit/examples/storage.rs | 70 +++++++++++++++++++ crates/librqbit/src/chunk_tracker.rs | 17 ----- crates/librqbit/src/lib.rs | 6 +- crates/librqbit/src/session.rs | 16 +++-- crates/librqbit/src/storage/example.rs | 20 +++++- crates/librqbit/src/torrent_state/live/mod.rs | 1 - crates/rqbit/src/main.rs | 9 +-- 7 files changed, 103 insertions(+), 36 deletions(-) create mode 100644 crates/librqbit/examples/storage.rs diff --git a/crates/librqbit/examples/storage.rs b/crates/librqbit/examples/storage.rs new file mode 100644 index 0000000..9343331 --- /dev/null +++ b/crates/librqbit/examples/storage.rs @@ -0,0 +1,70 @@ +use librqbit::{ + storage::{StorageFactory, TorrentStorage}, + ManagedTorrentInfo, SessionOptions, +}; + +struct DummyStorage {} + +impl StorageFactory for DummyStorage { + fn init_storage( + &self, + info: &ManagedTorrentInfo, + ) -> anyhow::Result> { + Ok(Box::new(DummyStorage {})) + } +} + +impl TorrentStorage for DummyStorage { + fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + anyhow::bail!("pread_exact") + } + + fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { + anyhow::bail!("pwrite_all") + } + + fn remove_file(&self, file_id: usize, filename: &std::path::Path) -> anyhow::Result<()> { + anyhow::bail!("remove_file") + } + + fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> { + anyhow::bail!("ensure_file_length") + } + + fn take(&self) -> anyhow::Result> { + Ok(Box::new(Self {})) + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Output logs to console. + match std::env::var("RUST_LOG") { + Ok(_) => {} + Err(_) => std::env::set_var("RUST_LOG", "info"), + } + tracing_subscriber::fmt::init(); + let s = librqbit::Session::new_with_opts( + "/does-not-matter".into(), + SessionOptions { + disable_dht: true, + persistence: false, + listen_port_range: None, + enable_upnp_port_forwarding: false, + ..Default::default() + }, + ) + .await?; + let handle = s + .add_torrent( + librqbit::AddTorrent::TorrentFileBytes( + include_bytes!("../resources/ubuntu-21.04-live-server-amd64.iso.torrent").into(), + ), + Some(librqbit::AddTorrentOptions { + storage_factory: Some(Box::new(DummyStorage {})), + ..Default::default() + }), + ) + .await?; + Ok(()) +} diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index e70b618..891fd43 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -171,23 +171,6 @@ impl ChunkTracker { } } - pub fn new_empty(lengths: Lengths, file_infos: &FileInfos) -> anyhow::Result { - let have = BF::from_boxed_slice(vec![0; lengths.piece_bitfield_bytes()].into_boxed_slice()); - let selected = have.clone(); - let chunk_status = - BF::from_boxed_slice(vec![0; lengths.chunk_bitfield_bytes()].into_boxed_slice()); - let queued = have.clone(); - Ok(Self { - queue_pieces: queued, - chunk_status, - have, - selected, - lengths, - per_file_bytes: vec![0; file_infos.len()], - hns: Default::default(), - }) - } - pub fn get_lengths(&self) -> &Lengths { &self.lengths } diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index a53b826..e7a9fd2 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -41,7 +41,7 @@ mod peer_info_reader; mod read_buf; mod session; mod spawn_utils; -mod storage; +pub mod storage; mod torrent_state; pub mod tracing_subscriber_config_utils; mod type_aliases; @@ -56,7 +56,9 @@ pub use session::{ SUPPORTED_SCHEMES, }; pub use spawn_utils::spawn as librqbit_spawn; -pub use torrent_state::{ManagedTorrent, ManagedTorrentState, TorrentStats, TorrentStatsState}; +pub use torrent_state::{ + ManagedTorrent, ManagedTorrentInfo, ManagedTorrentState, TorrentStats, TorrentStatsState, +}; pub use buffers::*; pub use clone_to_owned::CloneToOwned; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index a999b5a..0ef89f3 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -15,6 +15,7 @@ use crate::{ peer_connection::PeerConnectionOptions, read_buf::ReadBuf, spawn_utils::BlockingSpawner, + storage::StorageFactory, torrent_state::{ ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, }, @@ -43,7 +44,6 @@ use librqbit_core::{ use parking_lot::RwLock; use peer_binary_protocol::Handshake; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use serde_with::serde_as; use tokio::net::{TcpListener, TcpStream}; use tokio_stream::StreamExt; use tokio_util::sync::{CancellationToken, DropGuard}; @@ -270,9 +270,7 @@ fn merge_two_optional_streams( } /// Options for adding new torrents to the session. -#[serde_as] -#[derive(Default, Clone, Serialize, Deserialize)] -#[serde(default)] +#[derive(Default)] pub struct AddTorrentOptions { /// Start in paused state. pub paused: bool, @@ -295,7 +293,6 @@ pub struct AddTorrentOptions { pub peer_opts: Option, /// Force a refresh interval for polling trackers. - #[serde_as(as = "Option")] pub force_tracker_interval: Option, pub disable_trackers: bool, @@ -304,8 +301,9 @@ pub struct AddTorrentOptions { pub initial_peers: Option>, /// This is used to restore the session from serialized state. - #[serde(skip)] pub preferred_id: Option, + + pub storage_factory: Option>, } pub struct ListOnlyResponse { @@ -976,7 +974,7 @@ impl Session { trackers: Vec, peer_rx: Option, initial_peers: Vec, - opts: AddTorrentOptions, + mut opts: AddTorrentOptions, ) -> anyhow::Result { debug!("Torrent info: {:#?}", &info); @@ -1031,6 +1029,10 @@ impl Session { builder.peer_read_write_timeout(t); } + if let Some(storage_factory) = opts.storage_factory.take() { + builder.storage_factory(storage_factory); + } + let (managed_torrent, id) = { let mut g = self.db.write(); if let Some((id, handle)) = g.torrents.iter().find(|(_, t)| t.info_hash() == info_hash) diff --git a/crates/librqbit/src/storage/example.rs b/crates/librqbit/src/storage/example.rs index ad353b1..5755e56 100644 --- a/crates/librqbit/src/storage/example.rs +++ b/crates/librqbit/src/storage/example.rs @@ -6,7 +6,7 @@ use parking_lot::RwLock; use crate::type_aliases::FileInfos; -use super::TorrentStorage; +use super::{StorageFactory, TorrentStorage}; struct InMemoryPiece { bytes: Box<[u8]>, @@ -19,7 +19,21 @@ impl InMemoryPiece { } } -pub struct InMemoryExampleStorage { +pub struct InMemoryExampleStorageFactory {} + +impl StorageFactory for InMemoryExampleStorageFactory { + fn init_storage( + &self, + info: &crate::torrent_state::ManagedTorrentInfo, + ) -> anyhow::Result> { + Ok(Box::new(InMemoryExampleStorage::new( + info.lengths, + info.file_infos.clone(), + )?)) + } +} + +struct InMemoryExampleStorage { lengths: Lengths, file_infos: FileInfos, map: RwLock>, @@ -28,7 +42,7 @@ pub struct InMemoryExampleStorage { } impl InMemoryExampleStorage { - pub fn new(lengths: Lengths, file_infos: FileInfos) -> anyhow::Result { + fn new(lengths: Lengths, file_infos: FileInfos) -> anyhow::Result { // Max memory 128MiB. Make it tunable let max_pieces = 128 * 1024 * 1024 / lengths.default_piece_length(); if max_pieces == 0 { diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index d7e4d9b..d3ae147 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -86,7 +86,6 @@ use crate::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, session::CheckedIncomingConnection, - storage::TorrentStorage, torrent_state::{peer::Peer, utils::atomic_inc}, type_aliases::{FilePriorities, FileStorage, PeerHandle, BF}, }; diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index e97114a..3ca442b 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -367,7 +367,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { } let http_api_url = format!("http://{}", opts.http_api_listen_addr); let client = http_api_client::HttpApiClient::new(&http_api_url)?; - let torrent_opts = AddTorrentOptions { + let torrent_opts = || AddTorrentOptions { only_files_regex: download_opts.only_files_matching_regex.clone(), overwrite: download_opts.overwrite, list_only: download_opts.list, @@ -393,7 +393,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { match client .add_torrent( AddTorrent::from_cli_argument(torrent_url)?, - Some(torrent_opts.clone()), + Some(torrent_opts()), ) .await { @@ -452,10 +452,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { for path in &download_opts.torrent_path { let handle = match session - .add_torrent( - AddTorrent::from_cli_argument(path)?, - Some(torrent_opts.clone()), - ) + .add_torrent(AddTorrent::from_cli_argument(path)?, Some(torrent_opts())) .await { Ok(v) => match v {