Add storage example

This commit is contained in:
Igor Katson 2024-04-30 09:28:39 +01:00
parent 3e37b4698f
commit 6c3dfbc52f
7 changed files with 103 additions and 36 deletions

View file

@ -0,0 +1,70 @@
use librqbit::{
storage::{StorageFactory, TorrentStorage},
ManagedTorrentInfo, SessionOptions,
};
struct DummyStorage {}
impl StorageFactory for DummyStorage {
fn init_storage(
&self,
info: &ManagedTorrentInfo,
) -> anyhow::Result<Box<dyn librqbit::storage::TorrentStorage>> {
Ok(Box::new(DummyStorage {}))
}
}
impl TorrentStorage for DummyStorage {
fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> {
anyhow::bail!("pread_exact")
}
fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> {
anyhow::bail!("pwrite_all")
}
fn remove_file(&self, file_id: usize, filename: &std::path::Path) -> anyhow::Result<()> {
anyhow::bail!("remove_file")
}
fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> {
anyhow::bail!("ensure_file_length")
}
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>> {
Ok(Box::new(Self {}))
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Output logs to console.
match std::env::var("RUST_LOG") {
Ok(_) => {}
Err(_) => std::env::set_var("RUST_LOG", "info"),
}
tracing_subscriber::fmt::init();
let s = librqbit::Session::new_with_opts(
"/does-not-matter".into(),
SessionOptions {
disable_dht: true,
persistence: false,
listen_port_range: None,
enable_upnp_port_forwarding: false,
..Default::default()
},
)
.await?;
let handle = s
.add_torrent(
librqbit::AddTorrent::TorrentFileBytes(
include_bytes!("../resources/ubuntu-21.04-live-server-amd64.iso.torrent").into(),
),
Some(librqbit::AddTorrentOptions {
storage_factory: Some(Box::new(DummyStorage {})),
..Default::default()
}),
)
.await?;
Ok(())
}

View file

@ -171,23 +171,6 @@ impl ChunkTracker {
}
}
pub fn new_empty(lengths: Lengths, file_infos: &FileInfos) -> anyhow::Result<Self> {
let have = BF::from_boxed_slice(vec![0; lengths.piece_bitfield_bytes()].into_boxed_slice());
let selected = have.clone();
let chunk_status =
BF::from_boxed_slice(vec![0; lengths.chunk_bitfield_bytes()].into_boxed_slice());
let queued = have.clone();
Ok(Self {
queue_pieces: queued,
chunk_status,
have,
selected,
lengths,
per_file_bytes: vec![0; file_infos.len()],
hns: Default::default(),
})
}
pub fn get_lengths(&self) -> &Lengths {
&self.lengths
}

View file

@ -41,7 +41,7 @@ mod peer_info_reader;
mod read_buf;
mod session;
mod spawn_utils;
mod storage;
pub mod storage;
mod torrent_state;
pub mod tracing_subscriber_config_utils;
mod type_aliases;
@ -56,7 +56,9 @@ pub use session::{
SUPPORTED_SCHEMES,
};
pub use spawn_utils::spawn as librqbit_spawn;
pub use torrent_state::{ManagedTorrent, ManagedTorrentState, TorrentStats, TorrentStatsState};
pub use torrent_state::{
ManagedTorrent, ManagedTorrentInfo, ManagedTorrentState, TorrentStats, TorrentStatsState,
};
pub use buffers::*;
pub use clone_to_owned::CloneToOwned;

View file

@ -15,6 +15,7 @@ use crate::{
peer_connection::PeerConnectionOptions,
read_buf::ReadBuf,
spawn_utils::BlockingSpawner,
storage::StorageFactory,
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
@ -43,7 +44,6 @@ use librqbit_core::{
use parking_lot::RwLock;
use peer_binary_protocol::Handshake;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
use tokio::net::{TcpListener, TcpStream};
use tokio_stream::StreamExt;
use tokio_util::sync::{CancellationToken, DropGuard};
@ -270,9 +270,7 @@ fn merge_two_optional_streams<T>(
}
/// Options for adding new torrents to the session.
#[serde_as]
#[derive(Default, Clone, Serialize, Deserialize)]
#[serde(default)]
#[derive(Default)]
pub struct AddTorrentOptions {
/// Start in paused state.
pub paused: bool,
@ -295,7 +293,6 @@ pub struct AddTorrentOptions {
pub peer_opts: Option<PeerConnectionOptions>,
/// Force a refresh interval for polling trackers.
#[serde_as(as = "Option<serde_with::DurationSeconds>")]
pub force_tracker_interval: Option<Duration>,
pub disable_trackers: bool,
@ -304,8 +301,9 @@ pub struct AddTorrentOptions {
pub initial_peers: Option<Vec<SocketAddr>>,
/// This is used to restore the session from serialized state.
#[serde(skip)]
pub preferred_id: Option<usize>,
pub storage_factory: Option<Box<dyn StorageFactory>>,
}
pub struct ListOnlyResponse {
@ -976,7 +974,7 @@ impl Session {
trackers: Vec<String>,
peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>,
opts: AddTorrentOptions,
mut opts: AddTorrentOptions,
) -> anyhow::Result<AddTorrentResponse> {
debug!("Torrent info: {:#?}", &info);
@ -1031,6 +1029,10 @@ impl Session {
builder.peer_read_write_timeout(t);
}
if let Some(storage_factory) = opts.storage_factory.take() {
builder.storage_factory(storage_factory);
}
let (managed_torrent, id) = {
let mut g = self.db.write();
if let Some((id, handle)) = g.torrents.iter().find(|(_, t)| t.info_hash() == info_hash)

View file

@ -6,7 +6,7 @@ use parking_lot::RwLock;
use crate::type_aliases::FileInfos;
use super::TorrentStorage;
use super::{StorageFactory, TorrentStorage};
struct InMemoryPiece {
bytes: Box<[u8]>,
@ -19,7 +19,21 @@ impl InMemoryPiece {
}
}
pub struct InMemoryExampleStorage {
pub struct InMemoryExampleStorageFactory {}
impl StorageFactory for InMemoryExampleStorageFactory {
fn init_storage(
&self,
info: &crate::torrent_state::ManagedTorrentInfo,
) -> anyhow::Result<Box<dyn TorrentStorage>> {
Ok(Box::new(InMemoryExampleStorage::new(
info.lengths,
info.file_infos.clone(),
)?))
}
}
struct InMemoryExampleStorage {
lengths: Lengths,
file_infos: FileInfos,
map: RwLock<HashMap<ValidPieceIndex, InMemoryPiece>>,
@ -28,7 +42,7 @@ pub struct InMemoryExampleStorage {
}
impl InMemoryExampleStorage {
pub fn new(lengths: Lengths, file_infos: FileInfos) -> anyhow::Result<Self> {
fn new(lengths: Lengths, file_infos: FileInfos) -> anyhow::Result<Self> {
// Max memory 128MiB. Make it tunable
let max_pieces = 128 * 1024 * 1024 / lengths.default_piece_length();
if max_pieces == 0 {

View file

@ -86,7 +86,6 @@ use crate::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
session::CheckedIncomingConnection,
storage::TorrentStorage,
torrent_state::{peer::Peer, utils::atomic_inc},
type_aliases::{FilePriorities, FileStorage, PeerHandle, BF},
};

View file

@ -367,7 +367,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
}
let http_api_url = format!("http://{}", opts.http_api_listen_addr);
let client = http_api_client::HttpApiClient::new(&http_api_url)?;
let torrent_opts = AddTorrentOptions {
let torrent_opts = || AddTorrentOptions {
only_files_regex: download_opts.only_files_matching_regex.clone(),
overwrite: download_opts.overwrite,
list_only: download_opts.list,
@ -393,7 +393,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
match client
.add_torrent(
AddTorrent::from_cli_argument(torrent_url)?,
Some(torrent_opts.clone()),
Some(torrent_opts()),
)
.await
{
@ -452,10 +452,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
for path in &download_opts.torrent_path {
let handle = match session
.add_torrent(
AddTorrent::from_cli_argument(path)?,
Some(torrent_opts.clone()),
)
.add_torrent(AddTorrent::from_cli_argument(path)?, Some(torrent_opts()))
.await
{
Ok(v) => match v {