feat: add on_piece_completed method on TorrentStorage

This commit is contained in:
LIAUD Corentin 2024-08-27 22:00:14 +02:00
parent 67f984ac6f
commit 35d57ae8a2
No known key found for this signature in database
GPG key ID: 48D3490FB1A44E6A
10 changed files with 100 additions and 18 deletions

View file

@ -57,6 +57,10 @@ impl TorrentStorage for CustomStorage {
fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> { fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> {
anyhow::bail!("not implemented") anyhow::bail!("not implemented")
} }
fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> {
anyhow::bail!("not implemented")
}
} }
#[tokio::main] #[tokio::main]

View file

@ -215,6 +215,8 @@ impl<'a> FileOps<'a> {
format!("error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{name:?}\")") format!("error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{name:?}\")")
})?; })?;
self.files.on_piece_completed(file_idx, absolute_offset)?;
piece_remaining_bytes -= to_read_in_file; piece_remaining_bytes -= to_read_in_file;
if piece_remaining_bytes == 0 { if piece_remaining_bytes == 0 {

View file

@ -14,7 +14,10 @@ use librqbit_core::{
}; };
use parking_lot::RwLock; use parking_lot::RwLock;
use peer_binary_protocol::{ use peer_binary_protocol::{
extended::{handshake::{ExtendedHandshake, YourIP}, ExtendedMessage}, extended::{
handshake::{ExtendedHandshake, YourIP},
ExtendedMessage,
},
serialize_piece_preamble, Handshake, Message, MessageOwned, PIECE_MESSAGE_DEFAULT_LEN, serialize_piece_preamble, Handshake, Message, MessageOwned, PIECE_MESSAGE_DEFAULT_LEN,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};

View file

@ -8,28 +8,50 @@ use crate::type_aliases::FileInfos;
use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage}; use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage};
struct InMemoryPiece { pub struct InMemoryPiece {
bytes: Box<[u8]>, pub content: Box<[u8]>,
pub has_been_validated: bool,
} }
impl InMemoryPiece { impl InMemoryPiece {
fn new(l: &Lengths) -> Self { pub fn new(l: &Lengths) -> Self {
let v = vec![0; l.default_piece_length() as usize].into_boxed_slice(); let v = vec![0; l.default_piece_length() as usize].into_boxed_slice();
Self { bytes: v } Self {
content: v,
has_been_validated: false,
}
}
pub fn can_be_discard(&self, upper_bound_offset: usize) -> bool {
self.has_been_validated && upper_bound_offset >= self.content.len()
} }
} }
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct InMemoryExampleStorageFactory {} pub struct InMemoryExampleStorageFactory {
max_ram_size_per_torrent: usize,
}
impl InMemoryExampleStorageFactory {
pub fn new(max_ram_size_per_torrent: usize) -> Self {
Self {
max_ram_size_per_torrent,
}
}
}
impl StorageFactory for InMemoryExampleStorageFactory { impl StorageFactory for InMemoryExampleStorageFactory {
type Storage = InMemoryExampleStorage; type Storage = InMemoryExampleStorage;
fn create( fn create(
&self, &self,
info: &crate::torrent_state::ManagedTorrentInfo, info: &crate::torrent_state::ManagedTorrentShared,
) -> anyhow::Result<InMemoryExampleStorage> { ) -> anyhow::Result<InMemoryExampleStorage> {
InMemoryExampleStorage::new(info.lengths, info.file_infos.clone()) InMemoryExampleStorage::new(
info.lengths,
info.file_infos.clone(),
self.max_ram_size_per_torrent,
)
} }
fn clone_box(&self) -> crate::storage::BoxStorageFactory { fn clone_box(&self) -> crate::storage::BoxStorageFactory {
@ -41,10 +63,15 @@ pub struct InMemoryExampleStorage {
lengths: Lengths, lengths: Lengths,
file_infos: FileInfos, file_infos: FileInfos,
map: RwLock<HashMap<ValidPieceIndex, InMemoryPiece>>, map: RwLock<HashMap<ValidPieceIndex, InMemoryPiece>>,
max_ram_size_per_torrent: usize,
} }
impl InMemoryExampleStorage { impl InMemoryExampleStorage {
fn new(lengths: Lengths, file_infos: FileInfos) -> anyhow::Result<Self> { fn new(
lengths: Lengths,
file_infos: FileInfos,
max_ram_size_per_torrent: usize,
) -> anyhow::Result<Self> {
// Max memory 128MiB. Make it tunable // Max memory 128MiB. Make it tunable
let max_pieces = 128 * 1024 * 1024 / lengths.default_piece_length(); let max_pieces = 128 * 1024 * 1024 / lengths.default_piece_length();
if max_pieces == 0 { if max_pieces == 0 {
@ -55,6 +82,7 @@ impl InMemoryExampleStorage {
lengths, lengths,
file_infos, file_infos,
map: RwLock::new(HashMap::new()), map: RwLock::new(HashMap::new()),
max_ram_size_per_torrent,
}) })
} }
} }
@ -68,9 +96,16 @@ impl TorrentStorage for InMemoryExampleStorage {
(abs_offset % self.lengths.default_piece_length() as u64).try_into()?; (abs_offset % self.lengths.default_piece_length() as u64).try_into()?;
let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?; let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?;
let g = self.map.read(); let mut g = self.map.write();
// Get and remove this data from buffer to free space
let inmp = g.get(&piece_id).context("piece expired")?; let inmp = g.get(&piece_id).context("piece expired")?;
buf.copy_from_slice(&inmp.bytes[piece_offset..(piece_offset + buf.len())]); let upper_bound_offset = piece_offset + buf.len();
buf.copy_from_slice(&inmp.content[piece_offset..upper_bound_offset]);
if inmp.can_be_discard(upper_bound_offset) {
let _ = g.remove(&piece_id);
}
Ok(()) Ok(())
} }
@ -81,11 +116,12 @@ impl TorrentStorage for InMemoryExampleStorage {
let piece_offset: usize = let piece_offset: usize =
(abs_offset % self.lengths.default_piece_length() as u64).try_into()?; (abs_offset % self.lengths.default_piece_length() as u64).try_into()?;
let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?; let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?;
let mut g = self.map.write(); let mut g = self.map.write();
let inmp = g let inmp = g
.entry(piece_id) .entry(piece_id)
.or_insert_with(|| InMemoryPiece::new(&self.lengths)); .or_insert_with(|| InMemoryPiece::new(&self.lengths));
inmp.bytes[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf); inmp.content[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf);
Ok(()) Ok(())
} }
@ -108,14 +144,29 @@ impl TorrentStorage for InMemoryExampleStorage {
lengths: self.lengths, lengths: self.lengths,
map: RwLock::new(map), map: RwLock::new(map),
file_infos: self.file_infos.clone(), file_infos: self.file_infos.clone(),
max_ram_size_per_torrent: self.max_ram_size_per_torrent,
})) }))
} }
fn init(&mut self, _meta: &crate::ManagedTorrentInfo) -> anyhow::Result<()> { fn init(&mut self, _meta: &crate::ManagedTorrentShared) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
fn remove_directory_if_empty(&self, _path: &Path) -> anyhow::Result<()> { fn remove_directory_if_empty(&self, _path: &Path) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()> {
let fi = &self.file_infos[file_id];
let abs_offset = fi.offset_in_torrent + offset;
let piece_id: u32 = (abs_offset / self.lengths.default_piece_length() as u64).try_into()?;
let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?;
let mut g = self.map.write();
let inmp = g.get_mut(&piece_id).context("piece does not exist")?;
inmp.has_been_validated = true;
Ok(())
}
} }

View file

@ -4,7 +4,7 @@ use parking_lot::RwLock;
use crate::{ use crate::{
storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, storage::{StorageFactory, StorageFactoryExt, TorrentStorage},
FileInfos, ManagedTorrentInfo, FileInfos, ManagedTorrentShared,
}; };
#[derive(Default, Clone)] #[derive(Default, Clone)]
@ -18,7 +18,7 @@ pub struct MmapStorage {
impl StorageFactory for MmapStorageFactory { impl StorageFactory for MmapStorageFactory {
type Storage = MmapStorage; type Storage = MmapStorage;
fn create(&self, info: &ManagedTorrentInfo) -> anyhow::Result<Self::Storage> { fn create(&self, info: &ManagedTorrentShared) -> anyhow::Result<Self::Storage> {
Ok(MmapStorage { Ok(MmapStorage {
mmap: RwLock::new( mmap: RwLock::new(
MmapOptions::new() MmapOptions::new()
@ -63,11 +63,15 @@ impl TorrentStorage for MmapStorage {
anyhow::bail!("not implemented") anyhow::bail!("not implemented")
} }
fn init(&mut self, _meta: &ManagedTorrentInfo) -> anyhow::Result<()> { fn init(&mut self, _meta: &ManagedTorrentShared) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
fn remove_directory_if_empty(&self, _path: &std::path::Path) -> anyhow::Result<()> { fn remove_directory_if_empty(&self, _path: &std::path::Path) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> {
Ok(())
}
} }

View file

@ -188,4 +188,9 @@ impl TorrentStorage for FilesystemStorage {
self.opened_files = files; self.opened_files = files;
Ok(()) Ok(())
} }
fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> {
// No job has to be done for this storage
Ok(())
}
} }

View file

@ -112,4 +112,9 @@ impl TorrentStorage for MmapFilesystemStorage {
self.opened_mmaps = mmaps; self.opened_mmaps = mmaps;
Ok(()) Ok(())
} }
fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> {
// No job has to be done for this storage
Ok(())
}
} }

View file

@ -97,6 +97,9 @@ pub trait TorrentStorage: Send + Sync {
/// Replace the current storage with a dummy, and return a new one that should be used instead. /// Replace the current storage with a dummy, and return a new one that should be used instead.
/// This is used to make the underlying object useless when e.g. pausing the torrent. /// This is used to make the underlying object useless when e.g. pausing the torrent.
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>>; fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>>;
/// Callback called every time a piece has completed and has been validated.
fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()>;
} }
impl<U: TorrentStorage + ?Sized> TorrentStorage for Box<U> { impl<U: TorrentStorage + ?Sized> TorrentStorage for Box<U> {
@ -127,4 +130,8 @@ impl<U: TorrentStorage + ?Sized> TorrentStorage for Box<U> {
fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> { fn init(&mut self, meta: &ManagedTorrentShared) -> anyhow::Result<()> {
(**self).init(meta) (**self).init(meta)
} }
fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()> {
(**self).on_piece_completed(file_id, offset)
}
} }

View file

@ -5,7 +5,7 @@ use std::{
time::Duration, time::Duration,
}; };
use anyhow::{bail, Context}; use anyhow::bail;
use librqbit_core::Id20; use librqbit_core::Id20;
use parking_lot::RwLock; use parking_lot::RwLock;
use rand::{thread_rng, Rng, RngCore, SeedableRng}; use rand::{thread_rng, Rng, RngCore, SeedableRng};
@ -97,6 +97,7 @@ impl TestPeerMetadata {
#[cfg(feature = "http-api")] #[cfg(feature = "http-api")]
async fn debug_server() -> anyhow::Result<()> { async fn debug_server() -> anyhow::Result<()> {
use anyhow::Context;
use axum::{response::IntoResponse, routing::get, Router}; use axum::{response::IntoResponse, routing::get, Router};
async fn backtraces() -> impl IntoResponse { async fn backtraces() -> impl IntoResponse {
#[cfg(feature = "async-bt")] #[cfg(feature = "async-bt")]

View file

@ -99,7 +99,7 @@ impl Serialize for YourIP {
IpAddr::V6(ipv6) => { IpAddr::V6(ipv6) => {
let buf = ipv6.octets(); let buf = ipv6.octets();
serializer.serialize_bytes(&buf) serializer.serialize_bytes(&buf)
}, }
} }
} }
} }