fix: fixes according to PR comments
- Reset previous implementation of InMemoryExampleStorage - Implement default (empty) behaviour of on_piece_completed in trait itself - Now passing a ValidPieceIndex in on_piece_completed
This commit is contained in:
parent
35d57ae8a2
commit
9183df0ebd
6 changed files with 19 additions and 79 deletions
|
|
@ -57,10 +57,6 @@ impl TorrentStorage for CustomStorage {
|
|||
fn init(&mut self, _meta: &librqbit::ManagedTorrentShared) -> anyhow::Result<()> {
|
||||
anyhow::bail!("not implemented")
|
||||
}
|
||||
|
||||
fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> {
|
||||
anyhow::bail!("not implemented")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
|
|
|||
|
|
@ -215,7 +215,7 @@ impl<'a> FileOps<'a> {
|
|||
format!("error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{name:?}\")")
|
||||
})?;
|
||||
|
||||
self.files.on_piece_completed(file_idx, absolute_offset)?;
|
||||
self.files.on_piece_completed(piece_index)?;
|
||||
|
||||
piece_remaining_bytes -= to_read_in_file;
|
||||
|
||||
|
|
|
|||
|
|
@ -8,37 +8,19 @@ use crate::type_aliases::FileInfos;
|
|||
|
||||
use crate::storage::{StorageFactory, StorageFactoryExt, TorrentStorage};
|
||||
|
||||
pub struct InMemoryPiece {
|
||||
pub content: Box<[u8]>,
|
||||
pub has_been_validated: bool,
|
||||
struct InMemoryPiece {
|
||||
bytes: Box<[u8]>,
|
||||
}
|
||||
|
||||
impl InMemoryPiece {
|
||||
pub fn new(l: &Lengths) -> Self {
|
||||
fn new(l: &Lengths) -> Self {
|
||||
let v = vec![0; l.default_piece_length() as usize].into_boxed_slice();
|
||||
Self {
|
||||
content: v,
|
||||
has_been_validated: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn can_be_discard(&self, upper_bound_offset: usize) -> bool {
|
||||
self.has_been_validated && upper_bound_offset >= self.content.len()
|
||||
Self { bytes: v }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct InMemoryExampleStorageFactory {
|
||||
max_ram_size_per_torrent: usize,
|
||||
}
|
||||
|
||||
impl InMemoryExampleStorageFactory {
|
||||
pub fn new(max_ram_size_per_torrent: usize) -> Self {
|
||||
Self {
|
||||
max_ram_size_per_torrent,
|
||||
}
|
||||
}
|
||||
}
|
||||
pub struct InMemoryExampleStorageFactory {}
|
||||
|
||||
impl StorageFactory for InMemoryExampleStorageFactory {
|
||||
type Storage = InMemoryExampleStorage;
|
||||
|
|
@ -47,11 +29,7 @@ impl StorageFactory for InMemoryExampleStorageFactory {
|
|||
&self,
|
||||
info: &crate::torrent_state::ManagedTorrentShared,
|
||||
) -> anyhow::Result<InMemoryExampleStorage> {
|
||||
InMemoryExampleStorage::new(
|
||||
info.lengths,
|
||||
info.file_infos.clone(),
|
||||
self.max_ram_size_per_torrent,
|
||||
)
|
||||
InMemoryExampleStorage::new(info.lengths, info.file_infos.clone())
|
||||
}
|
||||
|
||||
fn clone_box(&self) -> crate::storage::BoxStorageFactory {
|
||||
|
|
@ -63,15 +41,10 @@ pub struct InMemoryExampleStorage {
|
|||
lengths: Lengths,
|
||||
file_infos: FileInfos,
|
||||
map: RwLock<HashMap<ValidPieceIndex, InMemoryPiece>>,
|
||||
max_ram_size_per_torrent: usize,
|
||||
}
|
||||
|
||||
impl InMemoryExampleStorage {
|
||||
fn new(
|
||||
lengths: Lengths,
|
||||
file_infos: FileInfos,
|
||||
max_ram_size_per_torrent: usize,
|
||||
) -> anyhow::Result<Self> {
|
||||
fn new(lengths: Lengths, file_infos: FileInfos) -> anyhow::Result<Self> {
|
||||
// Max memory 128MiB. Make it tunable
|
||||
let max_pieces = 128 * 1024 * 1024 / lengths.default_piece_length();
|
||||
if max_pieces == 0 {
|
||||
|
|
@ -82,7 +55,6 @@ impl InMemoryExampleStorage {
|
|||
lengths,
|
||||
file_infos,
|
||||
map: RwLock::new(HashMap::new()),
|
||||
max_ram_size_per_torrent,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -96,16 +68,9 @@ impl TorrentStorage for InMemoryExampleStorage {
|
|||
(abs_offset % self.lengths.default_piece_length() as u64).try_into()?;
|
||||
let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?;
|
||||
|
||||
let mut g = self.map.write();
|
||||
// Get and remove this data from buffer to free space
|
||||
let g = self.map.read();
|
||||
let inmp = g.get(&piece_id).context("piece expired")?;
|
||||
let upper_bound_offset = piece_offset + buf.len();
|
||||
buf.copy_from_slice(&inmp.content[piece_offset..upper_bound_offset]);
|
||||
|
||||
if inmp.can_be_discard(upper_bound_offset) {
|
||||
let _ = g.remove(&piece_id);
|
||||
}
|
||||
|
||||
buf.copy_from_slice(&inmp.bytes[piece_offset..(piece_offset + buf.len())]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -116,12 +81,11 @@ impl TorrentStorage for InMemoryExampleStorage {
|
|||
let piece_offset: usize =
|
||||
(abs_offset % self.lengths.default_piece_length() as u64).try_into()?;
|
||||
let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?;
|
||||
|
||||
let mut g = self.map.write();
|
||||
let inmp = g
|
||||
.entry(piece_id)
|
||||
.or_insert_with(|| InMemoryPiece::new(&self.lengths));
|
||||
inmp.content[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf);
|
||||
inmp.bytes[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -144,7 +108,6 @@ impl TorrentStorage for InMemoryExampleStorage {
|
|||
lengths: self.lengths,
|
||||
map: RwLock::new(map),
|
||||
file_infos: self.file_infos.clone(),
|
||||
max_ram_size_per_torrent: self.max_ram_size_per_torrent,
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
@ -155,18 +118,4 @@ impl TorrentStorage for InMemoryExampleStorage {
|
|||
fn remove_directory_if_empty(&self, _path: &Path) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()> {
|
||||
let fi = &self.file_infos[file_id];
|
||||
let abs_offset = fi.offset_in_torrent + offset;
|
||||
let piece_id: u32 = (abs_offset / self.lengths.default_piece_length() as u64).try_into()?;
|
||||
let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?;
|
||||
|
||||
let mut g = self.map.write();
|
||||
let inmp = g.get_mut(&piece_id).context("piece does not exist")?;
|
||||
|
||||
inmp.has_been_validated = true;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -188,9 +188,4 @@ impl TorrentStorage for FilesystemStorage {
|
|||
self.opened_files = files;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> {
|
||||
// No job has to be done for this storage
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -112,9 +112,4 @@ impl TorrentStorage for MmapFilesystemStorage {
|
|||
self.opened_mmaps = mmaps;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_piece_completed(&self, _file_id: usize, _offset: u64) -> anyhow::Result<()> {
|
||||
// No job has to be done for this storage
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@ use std::{
|
|||
path::Path,
|
||||
};
|
||||
|
||||
use librqbit_core::lengths::ValidPieceIndex;
|
||||
|
||||
use crate::torrent_state::ManagedTorrentShared;
|
||||
|
||||
pub trait StorageFactory: Send + Sync + Any {
|
||||
|
|
@ -99,7 +101,10 @@ pub trait TorrentStorage: Send + Sync {
|
|||
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>>;
|
||||
|
||||
/// Callback called every time a piece has completed and has been validated.
|
||||
fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()>;
|
||||
/// Default implementation does nothing, but can be override in trait implementations.
|
||||
fn on_piece_completed(&self, _piece_index: ValidPieceIndex) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<U: TorrentStorage + ?Sized> TorrentStorage for Box<U> {
|
||||
|
|
@ -131,7 +136,7 @@ impl<U: TorrentStorage + ?Sized> TorrentStorage for Box<U> {
|
|||
(**self).init(meta)
|
||||
}
|
||||
|
||||
fn on_piece_completed(&self, file_id: usize, offset: u64) -> anyhow::Result<()> {
|
||||
(**self).on_piece_completed(file_id, offset)
|
||||
fn on_piece_completed(&self, piece_id: ValidPieceIndex) -> anyhow::Result<()> {
|
||||
(**self).on_piece_completed(piece_id)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue