diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index e9e0f2f..e70b618 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -5,7 +5,10 @@ use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex}; use peer_binary_protocol::Piece; use tracing::{debug, trace}; -use crate::type_aliases::{FilePriorities, OpenedFiles, BF}; +use crate::{ + file_info::FileInfo, + type_aliases::{FileInfos, FilePriorities, BF}, +}; pub struct ChunkTracker { // This forms the basis of a "queue" to pull from. @@ -29,6 +32,9 @@ pub struct ChunkTracker { // was called. selected: BF, + // How many bytes do we have per each file. + per_file_bytes: Vec, + lengths: Lengths, // Quick to retrieve stats, that MUST be in sync with the BFs @@ -128,6 +134,7 @@ impl ChunkTracker { // Selected pieces are the ones the user has selected selected_pieces: BF, lengths: Lengths, + file_infos: &FileInfos, ) -> anyhow::Result { let needed_pieces = compute_queued_pieces(&have_pieces, &selected_pieces) .context("error computing needed pieces")?; @@ -143,11 +150,44 @@ impl ChunkTracker { lengths, have: have_pieces, hns: HaveNeededSelected::default(), + per_file_bytes: vec![0; file_infos.len()], }; + ct.recalculate_per_file_bytes(file_infos); ct.hns = ct.calc_hns(); Ok(ct) } + fn recalculate_per_file_bytes(&mut self, file_infos: &FileInfos) { + for (slot, fi) in self.per_file_bytes.iter_mut().zip(file_infos.iter()) { + *slot = fi + .piece_range + .clone() + .filter(|p| self.have[*p as usize]) + .map(|id| { + self.lengths + .size_of_piece_in_file(id, fi.offset_in_torrent, fi.len) + }) + .sum(); + } + } + + pub fn new_empty(lengths: Lengths, file_infos: &FileInfos) -> anyhow::Result { + let have = BF::from_boxed_slice(vec![0; lengths.piece_bitfield_bytes()].into_boxed_slice()); + let selected = have.clone(); + let chunk_status = + BF::from_boxed_slice(vec![0; lengths.chunk_bitfield_bytes()].into_boxed_slice()); + let queued = have.clone(); + Ok(Self { + queue_pieces: queued, + chunk_status, + have, + selected, + lengths, + per_file_bytes: vec![0; file_infos.len()], + hns: Default::default(), + }) + } + pub fn get_lengths(&self) -> &Lengths { &self.lengths } @@ -182,12 +222,12 @@ impl ChunkTracker { pub(crate) fn iter_queued_pieces<'a>( &'a self, file_priorities: &'a FilePriorities, - opened_files: &'a OpenedFiles, + opened_files: &'a FileInfos, ) -> impl Iterator + 'a { file_priorities .iter() .filter_map(|p| opened_files.get(*p)) - .filter(|f| !f.approx_is_finished()) + // .filter(|f| !f.approx_is_finished()) .flat_map(|f| f.iter_piece_priorities()) .filter(|id| self.queue_pieces[*id]) .filter_map(|id| id.try_into().ok()) @@ -226,7 +266,12 @@ impl ChunkTracker { { return; } - debug!("remarking piece={} as broken", index); + self.mark_piece_broken(index) + } + + pub fn mark_piece_broken(&mut self, index: ValidPieceIndex) { + debug!("marking piece={} as broken", index); + self.have.set(index.get() as usize, false); self.queue_pieces.set(index.get() as usize, true); if let Some(s) = self.chunk_status.get_mut(self.lengths.chunk_range(index)) { s.fill(false); @@ -372,6 +417,37 @@ impl ChunkTracker { pub(crate) fn get_selected_pieces(&self) -> &BF { &self.selected } + + pub fn is_file_finished(&self, file_info: &FileInfo) -> bool { + self.have + .get(file_info.piece_range_usize()) + .map(|r| r.all()) + .unwrap_or(true) + } + + pub(crate) fn is_finished(&self) -> bool { + self.get_hns().finished() + } + + pub fn per_file_have_bytes(&self) -> &[u64] { + &self.per_file_bytes + } + + // Returns remaining bytes + pub fn update_file_have_on_piece_completed( + &mut self, + piece_id: ValidPieceIndex, + file_id: usize, + file_info: &FileInfo, + ) -> u64 { + let diff_have = self.lengths.size_of_piece_in_file( + piece_id.get(), + file_info.offset_in_torrent, + file_info.len, + ); + self.per_file_bytes[file_id] += diff_have; + file_info.len.saturating_sub(self.per_file_bytes[file_id]) + } } #[cfg(test)] @@ -502,7 +578,13 @@ mod tests { let initial_selected = BF::from_boxed_slice(vec![u8::MAX; bf_len].into_boxed_slice()); // Initially, we need all files and all pieces. - let mut ct = ChunkTracker::new(initial_have.clone(), initial_selected.clone(), l).unwrap(); + let mut ct = ChunkTracker::new( + initial_have.clone(), + initial_selected.clone(), + l, + &Default::default(), + ) + .unwrap(); // Select all file, no changes. assert_eq!( diff --git a/crates/librqbit/src/file_info.rs b/crates/librqbit/src/file_info.rs new file mode 100644 index 0000000..babc77f --- /dev/null +++ b/crates/librqbit/src/file_info.rs @@ -0,0 +1,49 @@ +use std::path::PathBuf; + +#[derive(Debug, Clone)] +pub struct FileInfo { + pub filename: PathBuf, + pub offset_in_torrent: u64, + pub piece_range: std::ops::Range, + pub len: u64, +} + +// Iterate file pieces in the following order: first, last, everything else from start to end. +fn iter_piece_priorities(range: std::ops::Range) -> impl Iterator { + // First and last of each file first, then the rest of pieces in that file. + let r = range; + use std::iter::once; + + let first = once(r.start); + let last = once(r.start + r.len().overflowing_sub(1).0); // it's ok if it repeats, doesn't matter + let mid = r.clone().skip(1).take(r.len().overflowing_sub(2).0); + + // The take(r.len()) is to not yield start/end pieces in case of 0 and 1 lengths. + first.chain(last).chain(mid).take(r.len()) +} + +impl FileInfo { + pub fn piece_range_usize(&self) -> std::ops::Range { + self.piece_range.start as usize..self.piece_range.end as usize + } + + pub fn iter_piece_priorities(&self) -> impl Iterator { + iter_piece_priorities(self.piece_range_usize()) + } +} + +#[cfg(test)] +mod tests { + use super::iter_piece_priorities; + + #[test] + fn test_iter_piece_priorities() { + let it = |r: std::ops::Range| -> Vec { iter_piece_priorities(r).collect() }; + assert_eq!(it(0..0), Vec::::new()); + + assert_eq!(it(0..1), vec![0]); + assert_eq!(it(0..2), vec![0, 1]); + assert_eq!(it(0..3), vec![0, 2, 1]); + assert_eq!(it(0..4), vec![0, 3, 1, 2]); + } +} diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index c186de3..3eae81e 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -1,6 +1,4 @@ use std::{ - fs::File, - io::{Read, Seek, SeekFrom, Write}, marker::PhantomData, sync::atomic::{AtomicU64, Ordering}, }; @@ -16,8 +14,9 @@ use sha1w::{ISha1, Sha1}; use tracing::{debug, trace, warn}; use crate::{ - opened_file::OpenedFile, - type_aliases::{OpenedFiles, PeerHandle, BF}, + file_info::FileInfo, + storage::TorrentStorage, + type_aliases::{FileInfos, PeerHandle, BF}, }; pub(crate) struct InitialCheckResults { @@ -44,7 +43,9 @@ pub(crate) struct InitialCheckResults { } pub fn update_hash_from_file( - file: &mut File, + file_id: usize, + mut pos: u64, + files: &dyn TorrentStorage, hash: &mut Sha1, buf: &mut [u8], mut bytes_to_read: usize, @@ -52,10 +53,12 @@ pub fn update_hash_from_file( let mut read = 0; while bytes_to_read > 0 { let chunk = std::cmp::min(buf.len(), bytes_to_read); - file.read_exact(&mut buf[..chunk]) + files + .pread_exact(file_id, pos, &mut buf[..chunk]) .with_context(|| format!("failed reading chunk of size {chunk}, read so far {read}"))?; bytes_to_read -= chunk; read += chunk; + pos += chunk as u64; hash.update(&buf[..chunk]); } Ok(()) @@ -63,7 +66,8 @@ pub fn update_hash_from_file( pub(crate) struct FileOps<'a> { torrent: &'a TorrentMetaV1Info, - files: &'a OpenedFiles, + files: &'a dyn TorrentStorage, + file_infos: &'a FileInfos, lengths: &'a Lengths, phantom_data: PhantomData, } @@ -71,12 +75,14 @@ pub(crate) struct FileOps<'a> { impl<'a> FileOps<'a> { pub fn new( torrent: &'a TorrentMetaV1Info, - files: &'a OpenedFiles, + files: &'a dyn TorrentStorage, + file_infos: &'a FileInfos, lengths: &'a Lengths, ) -> Self { Self { torrent, files, + file_infos, lengths, phantom_data: PhantomData, } @@ -85,8 +91,6 @@ impl<'a> FileOps<'a> { pub fn initial_check( &self, only_files: Option<&[usize]>, - opened_files: &OpenedFiles, - lengths: &Lengths, progress: &AtomicU64, ) -> anyhow::Result { let mut needed_pieces = @@ -102,20 +106,20 @@ impl<'a> FileOps<'a> { #[derive(Debug)] struct CurrentFile<'a> { index: usize, - fd: &'a OpenedFile, + fi: &'a FileInfo, full_file_required: bool, processed_bytes: u64, is_broken: bool, } impl<'a> CurrentFile<'a> { fn remaining(&self) -> u64 { - self.fd.len - self.processed_bytes + self.fi.len - self.processed_bytes } fn mark_processed_bytes(&mut self, bytes: u64) { self.processed_bytes += bytes } } - let mut file_iterator = self.files.iter().enumerate().map(|(idx, fd)| { + let mut file_iterator = self.file_infos.iter().enumerate().map(|(idx, fi)| { let full_file_required = if let Some(only_files) = only_files { only_files.contains(&idx) } else { @@ -123,7 +127,7 @@ impl<'a> FileOps<'a> { }; CurrentFile { index: idx, - fd, + fi, full_file_required, processed_bytes: 0, is_broken: false, @@ -172,19 +176,17 @@ impl<'a> FileOps<'a> { continue; } - let mut fd = current_file.fd.file.lock(); - - fd.seek(SeekFrom::Start(pos)) - .context("bug? error seeking")?; if let Err(err) = update_hash_from_file( - &mut fd, + current_file.index, + pos, + self.files, &mut computed_hash, &mut read_buffer, to_read_in_file, ) { debug!( "error reading from file {} ({:?}) at {}: {:#}", - current_file.index, current_file.fd.filename, pos, &err + current_file.index, current_file.fi.filename, pos, &err ); current_file.is_broken = true; some_files_broken = true; @@ -216,10 +218,6 @@ impl<'a> FileOps<'a> { piece_info.piece_index ); have_bytes += piece_info.len as u64; - for file_id in piece_files.drain(..) { - opened_files[file_id] - .update_have_on_piece_completed(piece_info.piece_index.get(), lengths); - } have_pieces.set(piece_info.piece_index.get() as usize, true); } else if piece_selected { trace!( @@ -265,9 +263,8 @@ impl<'a> FileOps<'a> { } let file_remaining_len = file_len - absolute_offset; - let to_read_in_file = + let to_read_in_file: usize = std::cmp::min(file_remaining_len, piece_remaining_bytes as u64).try_into()?; - let mut file_g = self.files[file_idx].file.lock(); trace!( "piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}", piece_index, @@ -276,18 +273,17 @@ impl<'a> FileOps<'a> { absolute_offset, &last_received_chunk ); - file_g - .seek(SeekFrom::Start(absolute_offset)) - .with_context(|| { - format!("error seeking to {absolute_offset}, file id: {file_idx}") - })?; - update_hash_from_file(&mut file_g, &mut h, &mut buf, to_read_in_file).with_context( - || { - format!( - "error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{name:?}\")" - ) - }, - )?; + update_hash_from_file( + file_idx, + absolute_offset, + self.files, + &mut h, + &mut buf, + to_read_in_file, + ) + .with_context(|| { + format!("error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{name:?}\")") + })?; piece_remaining_bytes -= to_read_in_file; @@ -335,7 +331,6 @@ impl<'a> FileOps<'a> { let file_remaining_len = file_len - absolute_offset; let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64).try_into()?; - let mut file_g = self.files[file_idx].file.lock(); trace!( "piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}", chunk_info.piece_index, @@ -344,13 +339,8 @@ impl<'a> FileOps<'a> { absolute_offset, &chunk_info ); - file_g - .seek(SeekFrom::Start(absolute_offset)) - .with_context(|| { - format!("error seeking to {absolute_offset}, file id: {file_idx}") - })?; - file_g - .read_exact(&mut buf[..to_read_in_file]) + self.files + .pread_exact(file_idx, absolute_offset, &mut buf[..to_read_in_file]) .with_context(|| { format!("error reading {file_idx} bytes, file_id: {to_read_in_file}") })?; @@ -388,7 +378,6 @@ impl<'a> FileOps<'a> { let remaining_len = file_len - absolute_offset; let to_write = std::cmp::min(buf.len() as u64, remaining_len).try_into()?; - let mut file_g = self.files[file_idx].file.lock(); trace!( "piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}", chunk_info.piece_index, @@ -399,13 +388,8 @@ impl<'a> FileOps<'a> { to_write, absolute_offset ); - file_g - .seek(SeekFrom::Start(absolute_offset)) - .with_context(|| { - format!("error seeking to {absolute_offset} in file {file_idx} (\"{name:?}\")") - })?; - file_g - .write_all(&buf[..to_write]) + self.files + .pwrite_all(file_idx, absolute_offset, &buf[..to_write]) .with_context(|| format!("error writing to file {file_idx} (\"{name:?}\")"))?; buf = &buf[to_write..]; if buf.is_empty() { diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 5345714..a53b826 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -30,6 +30,7 @@ mod api_error; mod chunk_tracker; mod create_torrent_file; mod dht_utils; +pub mod file_info; mod file_ops; pub mod http_api; pub mod http_api_client; @@ -40,6 +41,7 @@ mod peer_info_reader; mod read_buf; mod session; mod spawn_utils; +mod storage; mod torrent_state; pub mod tracing_subscriber_config_utils; mod type_aliases; diff --git a/crates/librqbit/src/opened_file.rs b/crates/librqbit/src/opened_file.rs index 205b7b7..b7a3cca 100644 --- a/crates/librqbit/src/opened_file.rs +++ b/crates/librqbit/src/opened_file.rs @@ -1,21 +1,11 @@ -use std::{ - fs::File, - path::PathBuf, - sync::atomic::{AtomicU64, Ordering}, -}; +use std::fs::File; use anyhow::Context; -use librqbit_core::lengths::Lengths; use parking_lot::Mutex; #[derive(Debug)] pub(crate) struct OpenedFile { pub file: Mutex, - pub filename: PathBuf, - pub offset_in_torrent: u64, - pub have: AtomicU64, - pub piece_range: std::ops::Range, - pub len: u64, } pub(crate) fn dummy_file() -> anyhow::Result { @@ -30,36 +20,10 @@ pub(crate) fn dummy_file() -> anyhow::Result { .with_context(|| format!("error opening {}", DEVNULL)) } -// Iterate file pieces in the following order: first, last, everything else from start to end. -fn iter_piece_priorities(range: std::ops::Range) -> impl Iterator { - // First and last of each file first, then the rest of pieces in that file. - let r = range; - use std::iter::once; - - let first = once(r.start); - let last = once(r.start + r.len().overflowing_sub(1).0); // it's ok if it repeats, doesn't matter - let mid = r.clone().skip(1).take(r.len().overflowing_sub(2).0); - - // The take(r.len()) is to not yield start/end pieces in case of 0 and 1 lengths. - first.chain(last).chain(mid).take(r.len()) -} - impl OpenedFile { - pub fn new( - f: File, - filename: PathBuf, - have: u64, - len: u64, - offset_in_torrent: u64, - piece_range: std::ops::Range, - ) -> Self { + pub fn new(f: File) -> Self { Self { file: Mutex::new(f), - filename, - have: AtomicU64::new(have), - len, - offset_in_torrent, - piece_range, } } @@ -74,45 +38,6 @@ impl OpenedFile { let f = self.take()?; Ok(Self { file: Mutex::new(f), - filename: self.filename.clone(), - offset_in_torrent: self.offset_in_torrent, - have: AtomicU64::new(self.have.load(Ordering::Relaxed)), - len: self.len, - piece_range: self.piece_range.clone(), }) } - - pub fn piece_range_usize(&self) -> std::ops::Range { - self.piece_range.start as usize..self.piece_range.end as usize - } - - pub fn update_have_on_piece_completed(&self, piece_id: u32, lengths: &Lengths) -> u64 { - let size = lengths.size_of_piece_in_file(piece_id, self.offset_in_torrent, self.len); - self.have.fetch_add(size, Ordering::Relaxed); - size - } - - pub fn approx_is_finished(&self) -> bool { - self.have.load(Ordering::Relaxed) == self.len - } - - pub fn iter_piece_priorities(&self) -> impl Iterator { - iter_piece_priorities(self.piece_range_usize()) - } -} - -#[cfg(test)] -mod tests { - use crate::opened_file::iter_piece_priorities; - - #[test] - fn test_iter_piece_priorities() { - let it = |r: std::ops::Range| -> Vec { iter_piece_priorities(r).collect() }; - assert_eq!(it(0..0), Vec::::new()); - - assert_eq!(it(0..1), vec![0]); - assert_eq!(it(0..2), vec![0, 1]); - assert_eq!(it(0..3), vec![0, 2, 1]); - assert_eq!(it(0..4), vec![0, 3, 1, 2]); - } } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 38c9bfc..997a6a0 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -1092,10 +1092,9 @@ impl Session { warn!(error=?e, "error deleting torrent cleanly"); } (Ok(Some(paused)), true) => { - for file in paused.files.iter() { - drop(file.take()?); - if let Err(e) = std::fs::remove_file(&file.filename) { - warn!(?file.filename, error=?e, "could not delete file"); + for (id, fi) in removed.info().file_infos.iter().enumerate() { + if let Err(e) = paused.files.remove_file(id, &fi.filename) { + warn!(?fi.filename, error=?e, "could not delete file"); } } } diff --git a/crates/librqbit/src/storage.rs b/crates/librqbit/src/storage.rs new file mode 100644 index 0000000..0593cb7 --- /dev/null +++ b/crates/librqbit/src/storage.rs @@ -0,0 +1,184 @@ +use std::{ + collections::HashMap, + io::{Read, Seek, SeekFrom, Write}, + path::Path, +}; + +use anyhow::Context; +use librqbit_core::lengths::{Lengths, ValidPieceIndex}; +use parking_lot::RwLock; + +use crate::{opened_file::OpenedFile, type_aliases::FileInfos}; + +pub trait TorrentStorage: Send + Sync { + fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()>; + + fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()>; + + fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()>; + + fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()>; + + fn take(&self) -> anyhow::Result>; +} + +pub struct FilesystemStorage { + opened_files: Vec, +} + +impl FilesystemStorage { + pub fn new(opened_files: Vec) -> Self { + Self { opened_files } + } +} + +impl TorrentStorage for FilesystemStorage { + fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + let mut g = self + .opened_files + .get(file_id) + .context("no such file")? + .file + .lock(); + g.seek(SeekFrom::Start(offset))?; + Ok(g.read_exact(buf)?) + } + + fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { + let mut g = self + .opened_files + .get(file_id) + .context("no such file")? + .file + .lock(); + g.seek(SeekFrom::Start(offset))?; + Ok(g.write_all(buf)?) + } + + fn remove_file(&self, _file_id: usize, filename: &Path) -> anyhow::Result<()> { + Ok(std::fs::remove_file(filename)?) + } + + fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> { + Ok(self.opened_files[file_id].file.lock().set_len(len)?) + } + + fn take(&self) -> anyhow::Result> { + Ok(Box::new(Self::new( + self.opened_files + .iter() + .map(|f| f.take_clone()) + .collect::>>()?, + ))) + } +} + +impl TorrentStorage for Box { + fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + (**self).pread_exact(file_id, offset, buf) + } + + fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { + (**self).pwrite_all(file_id, offset, buf) + } + + fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()> { + (**self).remove_file(file_id, filename) + } + + fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> { + (**self).ensure_file_length(file_id, length) + } + + fn take(&self) -> anyhow::Result> { + (**self).take() + } +} + +struct InMemoryPiece { + bytes: Box<[u8]>, +} + +impl InMemoryPiece { + fn new(l: &Lengths) -> Self { + let v = vec![0; l.default_piece_length() as usize].into_boxed_slice(); + Self { bytes: v } + } +} + +pub struct InMemoryGarbageCollectingStorage { + lengths: Lengths, + file_infos: FileInfos, + map: RwLock>, + // TODO: chunk tracker - rename to PieceTracker and extract chunks out of it (only keep pieces) + // this sucker here would track chunks, and the storage above too. +} + +impl InMemoryGarbageCollectingStorage { + pub fn new(lengths: Lengths, file_infos: FileInfos) -> anyhow::Result { + // Max memory 128MiB. Make it tunable + let max_pieces = 128 * 1024 * 1024 / lengths.default_piece_length(); + if max_pieces == 0 { + anyhow::bail!("pieces too large"); + } + + Ok(Self { + lengths, + file_infos, + map: RwLock::new(HashMap::new()), + }) + } +} + +impl TorrentStorage for InMemoryGarbageCollectingStorage { + fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> { + let fi = &self.file_infos[file_id]; + let abs_offset = fi.offset_in_torrent + offset; + let piece_id: u32 = (abs_offset / self.lengths.default_piece_length() as u64).try_into()?; + let piece_offset: usize = + (abs_offset % self.lengths.default_piece_length() as u64).try_into()?; + let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?; + + let g = self.map.read(); + let inmp = g.get(&piece_id).context("piece expired")?; + buf.copy_from_slice(&inmp.bytes[piece_offset..(piece_offset + buf.len())]); + Ok(()) + } + + fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> { + let fi = &self.file_infos[file_id]; + let abs_offset = fi.offset_in_torrent + offset; + let piece_id: u32 = (abs_offset / self.lengths.default_piece_length() as u64).try_into()?; + let piece_offset: usize = + (abs_offset % self.lengths.default_piece_length() as u64).try_into()?; + let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?; + let mut g = self.map.write(); + let inmp = g + .entry(piece_id) + .or_insert_with(|| InMemoryPiece::new(&self.lengths)); + inmp.bytes[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf); + Ok(()) + } + + fn remove_file(&self, _file_id: usize, _filename: &Path) -> anyhow::Result<()> { + Ok(()) + } + + fn ensure_file_length(&self, _file_id: usize, _length: u64) -> anyhow::Result<()> { + Ok(()) + } + + fn take(&self) -> anyhow::Result> { + let map = { + let mut g = self.map.write(); + let mut repl = HashMap::new(); + std::mem::swap(&mut *g, &mut repl); + repl + }; + Ok(Box::new(Self { + lengths: self.lengths, + map: RwLock::new(map), + file_infos: self.file_infos.clone(), + })) + } +} diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 453c196..746e8ce 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -10,8 +10,10 @@ use size_format::SizeFormatterBinary as SF; use tracing::{debug, info, warn}; use crate::{ - chunk_tracker::ChunkTracker, file_ops::FileOps, opened_file::OpenedFile, - type_aliases::OpenedFiles, + chunk_tracker::ChunkTracker, + file_ops::FileOps, + opened_file::OpenedFile, + storage::{FilesystemStorage, InMemoryGarbageCollectingStorage, TorrentStorage}, }; use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; @@ -37,7 +39,23 @@ impl TorrentStateInitializing { } pub async fn check(&self) -> anyhow::Result { - let mut files = OpenedFiles::new(); + // Return in-memory store + let store = + InMemoryGarbageCollectingStorage::new(self.meta.lengths, self.meta.file_infos.clone())?; + let ct = ChunkTracker::new_empty(self.meta.lengths, &self.meta.file_infos)?; + + Ok(TorrentStatePaused { + info: self.meta.clone(), + files: Box::new(store), + chunk_tracker: ct, + streams: Arc::new(Default::default()), + }) + + // self.check_disk().await + } + + pub async fn check_disk(&self) -> anyhow::Result { + let mut files = Vec::::new(); for file_details in self.meta.info.iter_file_details(&self.meta.lengths)? { let mut full_path = self.meta.out_dir.clone(); let relative_path = file_details @@ -64,26 +82,21 @@ impl TorrentStateInitializing { .with_context(|| format!("error creating {:?}", &full_path))?; OpenOptions::new().read(true).write(true).open(&full_path)? }; - files.push(OpenedFile::new( - file, - full_path, - 0, - file_details.len, - file_details.offset, - file_details.pieces, - )); + files.push(OpenedFile::new(file)); } + let files: Box = Box::new(FilesystemStorage::new(files)); debug!("computed lengths: {:?}", &self.meta.lengths); info!("Doing initial checksum validation, this might take a while..."); let initial_check_results = self.meta.spawner.spawn_block_in_place(|| { - FileOps::new(&self.meta.info, &files, &self.meta.lengths).initial_check( - self.only_files.as_deref(), + FileOps::new( + &self.meta.info, &files, + &self.meta.file_infos, &self.meta.lengths, - &self.checked_bytes, ) + .initial_check(self.only_files.as_deref(), &self.checked_bytes) })?; info!( @@ -95,7 +108,7 @@ impl TorrentStateInitializing { // Ensure file lenghts are correct, and reopen read-only. self.meta.spawner.spawn_block_in_place(|| { - for (idx, file) in files.iter().enumerate() { + for (idx, fi) in self.meta.file_infos.iter().enumerate() { if self .only_files .as_ref() @@ -103,16 +116,16 @@ impl TorrentStateInitializing { .unwrap_or(true) { let now = Instant::now(); - if let Err(err) = file.file.lock().set_len(file.len) { + if let Err(err) = files.ensure_file_length(idx, fi.len) { warn!( "Error setting length for file {:?} to {}: {:#?}", - file.filename, file.len, err + fi.filename, fi.len, err ); } else { debug!( "Set length for file {:?} to {} in {:?}", - file.filename, - SF::new(file.len), + fi.filename, + SF::new(fi.len), now.elapsed() ); } @@ -125,6 +138,7 @@ impl TorrentStateInitializing { initial_check_results.have_pieces, initial_check_results.selected_pieces, self.meta.lengths, + &self.meta.file_infos, ) .context("error creating chunk tracker")?; diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index c24f70f..fe61184 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -86,8 +86,9 @@ use crate::{ PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, }, session::CheckedIncomingConnection, + storage::TorrentStorage, torrent_state::{peer::Peer, utils::atomic_inc}, - type_aliases::{FilePriorities, OpenedFiles, PeerHandle, BF}, + type_aliases::{FilePriorities, FileStorage, PeerHandle, BF}, }; use self::{ @@ -141,7 +142,7 @@ impl TorrentStateLocked { .context("chunk tracker empty, torrent was paused") } - fn get_chunks_mut(&mut self) -> anyhow::Result<&mut ChunkTracker> { + pub(crate) fn get_chunks_mut(&mut self) -> anyhow::Result<&mut ChunkTracker> { self.chunks .as_mut() .context("chunk tracker empty, torrent was paused") @@ -159,7 +160,7 @@ pub struct TorrentStateLive { meta: Arc, locked: RwLock, - pub(crate) files: OpenedFiles, + pub(crate) files: FileStorage, stats: AtomicStats, lengths: Lengths, @@ -195,9 +196,15 @@ impl TorrentStateLive { // TODO: make it configurable let file_priorities = { - let mut pri = (0..paused.files.len()).collect::>(); + let mut pri = (0..paused.info.file_infos.len()).collect::>(); // sort by filename, cause many torrents have random sort order. - pri.sort_unstable_by_key(|id| paused.files.get(*id).map(|op| op.filename.as_path())); + pri.sort_unstable_by_key(|id| { + paused + .info + .file_infos + .get(*id) + .map(|fi| fi.filename.as_path()) + }); pri }; @@ -482,7 +489,12 @@ impl TorrentStateLive { self.meta.peer_id } pub(crate) fn file_ops(&self) -> FileOps<'_> { - FileOps::new(&self.meta.info, &self.files, &self.lengths) + FileOps::new( + &self.meta.info, + &self.files, + &self.meta().file_infos, + &self.lengths, + ) } pub(crate) fn lock_read( @@ -632,12 +644,6 @@ impl TorrentStateLive { // It should be impossible to make a fatal error after pausing. g.fatal_errors_tx.take(); - let files = self - .files - .iter() - .map(|f| f.take_clone()) - .collect::>>()?; - let mut chunk_tracker = g .chunks .take() @@ -649,7 +655,7 @@ impl TorrentStateLive { // g.chunks; Ok(TorrentStatePaused { info: self.meta.clone(), - files, + files: self.files.take()?, chunk_tracker, streams: self.streams.clone(), }) @@ -671,7 +677,7 @@ impl TorrentStateLive { pub(crate) fn update_only_files(&self, only_files: &HashSet) -> anyhow::Result<()> { let mut g = self.lock_write("update_only_files"); let ct = g.get_chunks_mut()?; - let hns = ct.update_only_files(self.files.iter().map(|f| f.len), only_files)?; + let hns = ct.update_only_files(self.meta().file_infos.iter().map(|f| f.len), only_files)?; if !hns.finished() { self.reconnect_all_not_needed_peers(); } @@ -682,41 +688,49 @@ impl TorrentStateLive { self.get_hns().map(|h| h.finished()).unwrap_or_default() } - pub(crate) fn has_active_streams_unfinished_files(&self) -> bool { + fn has_active_streams_unfinished_files(&self, state: &TorrentStateLocked) -> bool { + let chunks = match state.get_chunks() { + Ok(c) => c, + Err(_) => return false, + }; self.streams .streamed_file_ids() - .any(|file_id| !self.files[file_id].approx_is_finished()) + .any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id])) } - pub(crate) fn is_finished_and_dont_need_peers(&self) -> bool { - self.is_finished() && !self.has_active_streams_unfinished_files() + fn is_finished_and_dont_need_peers(&self) -> bool { + self.is_finished() + && !self.has_active_streams_unfinished_files( + &self.lock_read("is_finished_and_dont_need_peers"), + ) } fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> { + let mut g = self.lock_write("on_piece_completed"); + let chunks = g.get_chunks_mut()?; + // if we have all the pieces of the file, reopen it read only - for (idx, opened_file) in self - .files + for (idx, file_info) in self + .meta() + .file_infos .iter() .enumerate() - .skip_while(|fd| !fd.1.piece_range.contains(&id.get())) - .take_while(|fd| fd.1.piece_range.contains(&id.get())) + .skip_while(|(_, fi)| !fi.piece_range.contains(&id.get())) + .take_while(|(_, fi)| fi.piece_range.contains(&id.get())) { - let bytes = opened_file.update_have_on_piece_completed(id.get(), &self.lengths); - if bytes == 0 { - warn!(file_id=idx, piece_id=id.get(), "bug: update_have_on_piece_completed() returned 0, although this piece is present in the file"); - } + let _remaining = chunks.update_file_have_on_piece_completed(id, idx, file_info); } self.streams .wake_streams_on_piece_completed(id, &self.meta.lengths); - if self.is_finished() { - if self.lock_read("chunks").get_chunks()?.get_selected_pieces()[id.get_usize()] { + if chunks.is_finished() { + if chunks.get_selected_pieces()[id.get_usize()] { info!("torrent finished downloading"); } self.finished_notify.notify_waiters(); - if !self.has_active_streams_unfinished_files() { + if !self.has_active_streams_unfinished_files(&g) { // There is not poing being connected to peers that have all the torrent, when // we don't need anything from them, and they don't need anything from us. self.disconnect_all_peers_that_have_full_torrent(); @@ -749,13 +763,6 @@ impl TorrentStateLive { } } } - - pub(crate) fn get_file_progress(&self) -> Vec { - self.files - .iter() - .map(|fd| fd.have.load(Ordering::Relaxed)) - .collect() - } } struct PeerHandlerLocked { @@ -989,8 +996,8 @@ impl PeerHandler { !chunk_tracker.is_piece_have(*pid) && !g.inflight_pieces.contains_key(pid) }); - let natural_order_pieces = - chunk_tracker.iter_queued_pieces(&g.file_priorities, &self.state.files); + let natural_order_pieces = chunk_tracker + .iter_queued_pieces(&g.file_priorities, &self.state.meta().file_infos); for n in priority_streamed_pieces.chain(natural_order_pieces) { if bf.get(n.get() as usize).map(|v| *v) == Some(true) { n_opt = Some(n); diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 95259fe..7340829 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -34,8 +34,10 @@ use tracing::error_span; use tracing::warn; use crate::chunk_tracker::ChunkTracker; +use crate::file_info::FileInfo; use crate::spawn_utils::BlockingSpawner; use crate::torrent_state::stats::LiveStats; +use crate::type_aliases::FileInfos; use crate::type_aliases::PeerStream; use initializing::TorrentStateInitializing; @@ -98,6 +100,7 @@ pub struct ManagedTorrentInfo { pub trackers: HashSet, pub peer_id: Id20, pub lengths: Lengths, + pub file_infos: FileInfos, pub span: tracing::Span, pub(crate) options: ManagedTorrentOptions, } @@ -370,11 +373,7 @@ impl ManagedTorrent { resp.total_bytes = hns.total(); resp.progress_bytes = hns.progress(); resp.finished = hns.finished(); - resp.file_progress = p - .files - .iter() - .map(|f| f.have.load(Ordering::Relaxed)) - .collect(); + resp.file_progress = p.chunk_tracker.per_file_have_bytes().to_owned(); } ManagedTorrentState::Live(l) => { resp.state = S::Live; @@ -384,7 +383,12 @@ impl ManagedTorrent { resp.progress_bytes = hns.progress(); resp.finished = hns.finished(); resp.uploaded_bytes = l.get_uploaded_bytes(); - resp.file_progress = l.get_file_progress(); + resp.file_progress = l + .lock_read("file_progress") + .get_chunks() + .ok() + .map(|c| c.per_file_have_bytes().to_owned()) + .unwrap_or_default(); resp.live = Some(live_stats); } ManagedTorrentState::Error(e) => { @@ -534,8 +538,21 @@ impl ManagedTorrentBuilder { pub(crate) fn build(self, span: tracing::Span) -> anyhow::Result { let lengths = Lengths::from_torrent(&self.info)?; + let file_infos = self + .info + .iter_file_details(&lengths)? + .map(|fd| { + Ok::<_, anyhow::Error>(FileInfo { + filename: self.output_folder.join(fd.filename.to_pathbuf()?), + offset_in_torrent: fd.offset, + piece_range: fd.pieces, + len: fd.len, + }) + }) + .collect::>>()?; let info = Arc::new(ManagedTorrentInfo { span, + file_infos, info: self.info, info_hash: self.info_hash, out_dir: self.output_folder, diff --git a/crates/librqbit/src/torrent_state/paused.rs b/crates/librqbit/src/torrent_state/paused.rs index 848d89d..805b8d5 100644 --- a/crates/librqbit/src/torrent_state/paused.rs +++ b/crates/librqbit/src/torrent_state/paused.rs @@ -2,14 +2,14 @@ use std::{collections::HashSet, sync::Arc}; use crate::{ chunk_tracker::{ChunkTracker, HaveNeededSelected}, - type_aliases::OpenedFiles, + type_aliases::FileStorage, }; use super::{streaming::TorrentStreams, ManagedTorrentInfo}; pub struct TorrentStatePaused { pub(crate) info: Arc, - pub(crate) files: OpenedFiles, + pub(crate) files: FileStorage, pub(crate) chunk_tracker: ChunkTracker, pub(crate) streams: Arc, } diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 5a2b998..78f4ed1 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -1,6 +1,6 @@ use std::{ collections::VecDeque, - io::{Read, Seek, SeekFrom}, + io::SeekFrom, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -10,11 +10,12 @@ use std::{ use anyhow::Context; use dashmap::DashMap; + use librqbit_core::lengths::{Lengths, ValidPieceIndex}; use tokio::io::{AsyncRead, AsyncSeek}; use tracing::{debug, trace}; -use crate::{opened_file::OpenedFile, type_aliases::OpenedFiles, ManagedTorrent}; +use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent}; use super::ManagedTorrentHandle; @@ -236,18 +237,22 @@ impl AsyncRead for FileStream { "will write bytes" ); - poll_try_io!(poll_try_io!(self.torrent.with_opened_file( + poll_try_io!(poll_try_io!(self.torrent.with_storage_and_file( self.file_id, - |fd| { - let mut g = fd.file.lock(); - g.seek(SeekFrom::Start(self.position))?; - g.read_exact(buf)?; + |files, _fi| { + files.pread_exact(self.file_id, self.position, buf)?; Ok::<_, anyhow::Error>(()) } ))); self.as_mut().advance(bytes_to_read as u64); tbuf.advance(bytes_to_read); + self.streams + .streams + .get_mut(&self.stream_id) + .unwrap() + .value_mut() + .position = self.position; Poll::Ready(Ok(())) } @@ -292,30 +297,25 @@ impl Drop for FileStream { } impl ManagedTorrent { - fn with_opened_files(&self, f: F) -> anyhow::Result + fn with_storage_and_file(&self, file_id: usize, f: F) -> anyhow::Result where - F: FnOnce(&OpenedFiles) -> R, + F: FnOnce(&dyn TorrentStorage, &FileInfo) -> R, { self.with_state(|s| { let files = match s { crate::ManagedTorrentState::Paused(p) => &p.files, crate::ManagedTorrentState::Live(l) => &l.files, - s => anyhow::bail!("with_opened_file: invalid state {}", s.name()), + _ => anyhow::bail!("invalid state"), }; - Ok(f(files)) + let fi = self + .info() + .file_infos + .get(file_id) + .context("invalid file")?; + Ok(f(files, fi)) }) } - fn with_opened_file(&self, file_id: usize, f: F) -> anyhow::Result - where - F: FnOnce(&OpenedFile) -> R, - { - self.with_opened_files(|opened_files| { - let fd = opened_files.get(file_id).context("invalid file id")?; - Ok(f(fd)) - })? - } - fn streams(&self) -> anyhow::Result> { self.with_state(|s| match s { crate::ManagedTorrentState::Paused(p) => Ok(p.streams.clone()), @@ -326,7 +326,7 @@ impl ManagedTorrent { fn maybe_reconnect_needed_peers_for_file(&self, file_id: usize) -> bool { // If we have the full file, don't bother. - if let Ok(true) = self.with_opened_file(file_id, |f| f.approx_is_finished()) { + if self.is_file_finished(file_id) { return false; } self.with_state(|state| { @@ -337,9 +337,14 @@ impl ManagedTorrent { true } + fn is_file_finished(&self, file_id: usize) -> bool { + self.with_chunk_tracker(|ct| ct.is_file_finished(&self.info.file_infos[file_id])) + .unwrap_or(false) + } + pub fn stream(self: Arc, file_id: usize) -> anyhow::Result { let (fd_len, fd_offset) = - self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?; + self.with_storage_and_file(file_id, |_fd, fi| (fi.len, fi.offset_in_torrent))?; let streams = self.streams()?; let s = FileStream { stream_id: streams.next_id(), diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs index 054373f..a458734 100644 --- a/crates/librqbit/src/type_aliases.rs +++ b/crates/librqbit/src/type_aliases.rs @@ -2,11 +2,12 @@ use std::net::SocketAddr; use futures::stream::BoxStream; -use crate::opened_file::OpenedFile; +use crate::{file_info::FileInfo, storage::TorrentStorage}; pub type BF = bitvec::boxed::BitBox; pub type PeerHandle = SocketAddr; pub type PeerStream = BoxStream<'static, SocketAddr>; -pub(crate) type OpenedFiles = Vec; +pub(crate) type FileInfos = Vec; +pub(crate) type FileStorage = Box; pub(crate) type FilePriorities = Vec;