This commit is contained in:
Igor Katson 2024-04-29 21:44:21 +01:00
parent 609f9d92ae
commit 1b49257019
13 changed files with 499 additions and 230 deletions

View file

@ -5,7 +5,10 @@ use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex};
use peer_binary_protocol::Piece;
use tracing::{debug, trace};
use crate::type_aliases::{FilePriorities, OpenedFiles, BF};
use crate::{
file_info::FileInfo,
type_aliases::{FileInfos, FilePriorities, BF},
};
pub struct ChunkTracker {
// This forms the basis of a "queue" to pull from.
@ -29,6 +32,9 @@ pub struct ChunkTracker {
// was called.
selected: BF,
// How many bytes do we have per each file.
per_file_bytes: Vec<u64>,
lengths: Lengths,
// Quick to retrieve stats, that MUST be in sync with the BFs
@ -128,6 +134,7 @@ impl ChunkTracker {
// Selected pieces are the ones the user has selected
selected_pieces: BF,
lengths: Lengths,
file_infos: &FileInfos,
) -> anyhow::Result<Self> {
let needed_pieces = compute_queued_pieces(&have_pieces, &selected_pieces)
.context("error computing needed pieces")?;
@ -143,11 +150,44 @@ impl ChunkTracker {
lengths,
have: have_pieces,
hns: HaveNeededSelected::default(),
per_file_bytes: vec![0; file_infos.len()],
};
ct.recalculate_per_file_bytes(file_infos);
ct.hns = ct.calc_hns();
Ok(ct)
}
fn recalculate_per_file_bytes(&mut self, file_infos: &FileInfos) {
for (slot, fi) in self.per_file_bytes.iter_mut().zip(file_infos.iter()) {
*slot = fi
.piece_range
.clone()
.filter(|p| self.have[*p as usize])
.map(|id| {
self.lengths
.size_of_piece_in_file(id, fi.offset_in_torrent, fi.len)
})
.sum();
}
}
pub fn new_empty(lengths: Lengths, file_infos: &FileInfos) -> anyhow::Result<Self> {
let have = BF::from_boxed_slice(vec![0; lengths.piece_bitfield_bytes()].into_boxed_slice());
let selected = have.clone();
let chunk_status =
BF::from_boxed_slice(vec![0; lengths.chunk_bitfield_bytes()].into_boxed_slice());
let queued = have.clone();
Ok(Self {
queue_pieces: queued,
chunk_status,
have,
selected,
lengths,
per_file_bytes: vec![0; file_infos.len()],
hns: Default::default(),
})
}
pub fn get_lengths(&self) -> &Lengths {
&self.lengths
}
@ -182,12 +222,12 @@ impl ChunkTracker {
pub(crate) fn iter_queued_pieces<'a>(
&'a self,
file_priorities: &'a FilePriorities,
opened_files: &'a OpenedFiles,
opened_files: &'a FileInfos,
) -> impl Iterator<Item = ValidPieceIndex> + 'a {
file_priorities
.iter()
.filter_map(|p| opened_files.get(*p))
.filter(|f| !f.approx_is_finished())
// .filter(|f| !f.approx_is_finished())
.flat_map(|f| f.iter_piece_priorities())
.filter(|id| self.queue_pieces[*id])
.filter_map(|id| id.try_into().ok())
@ -226,7 +266,12 @@ impl ChunkTracker {
{
return;
}
debug!("remarking piece={} as broken", index);
self.mark_piece_broken(index)
}
pub fn mark_piece_broken(&mut self, index: ValidPieceIndex) {
debug!("marking piece={} as broken", index);
self.have.set(index.get() as usize, false);
self.queue_pieces.set(index.get() as usize, true);
if let Some(s) = self.chunk_status.get_mut(self.lengths.chunk_range(index)) {
s.fill(false);
@ -372,6 +417,37 @@ impl ChunkTracker {
pub(crate) fn get_selected_pieces(&self) -> &BF {
&self.selected
}
pub fn is_file_finished(&self, file_info: &FileInfo) -> bool {
self.have
.get(file_info.piece_range_usize())
.map(|r| r.all())
.unwrap_or(true)
}
pub(crate) fn is_finished(&self) -> bool {
self.get_hns().finished()
}
pub fn per_file_have_bytes(&self) -> &[u64] {
&self.per_file_bytes
}
// Returns remaining bytes
pub fn update_file_have_on_piece_completed(
&mut self,
piece_id: ValidPieceIndex,
file_id: usize,
file_info: &FileInfo,
) -> u64 {
let diff_have = self.lengths.size_of_piece_in_file(
piece_id.get(),
file_info.offset_in_torrent,
file_info.len,
);
self.per_file_bytes[file_id] += diff_have;
file_info.len.saturating_sub(self.per_file_bytes[file_id])
}
}
#[cfg(test)]
@ -502,7 +578,13 @@ mod tests {
let initial_selected = BF::from_boxed_slice(vec![u8::MAX; bf_len].into_boxed_slice());
// Initially, we need all files and all pieces.
let mut ct = ChunkTracker::new(initial_have.clone(), initial_selected.clone(), l).unwrap();
let mut ct = ChunkTracker::new(
initial_have.clone(),
initial_selected.clone(),
l,
&Default::default(),
)
.unwrap();
// Select all file, no changes.
assert_eq!(

View file

@ -0,0 +1,49 @@
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct FileInfo {
pub filename: PathBuf,
pub offset_in_torrent: u64,
pub piece_range: std::ops::Range<u32>,
pub len: u64,
}
// Iterate file pieces in the following order: first, last, everything else from start to end.
fn iter_piece_priorities(range: std::ops::Range<usize>) -> impl Iterator<Item = usize> {
// First and last of each file first, then the rest of pieces in that file.
let r = range;
use std::iter::once;
let first = once(r.start);
let last = once(r.start + r.len().overflowing_sub(1).0); // it's ok if it repeats, doesn't matter
let mid = r.clone().skip(1).take(r.len().overflowing_sub(2).0);
// The take(r.len()) is to not yield start/end pieces in case of 0 and 1 lengths.
first.chain(last).chain(mid).take(r.len())
}
impl FileInfo {
pub fn piece_range_usize(&self) -> std::ops::Range<usize> {
self.piece_range.start as usize..self.piece_range.end as usize
}
pub fn iter_piece_priorities(&self) -> impl Iterator<Item = usize> {
iter_piece_priorities(self.piece_range_usize())
}
}
#[cfg(test)]
mod tests {
use super::iter_piece_priorities;
#[test]
fn test_iter_piece_priorities() {
let it = |r: std::ops::Range<usize>| -> Vec<usize> { iter_piece_priorities(r).collect() };
assert_eq!(it(0..0), Vec::<usize>::new());
assert_eq!(it(0..1), vec![0]);
assert_eq!(it(0..2), vec![0, 1]);
assert_eq!(it(0..3), vec![0, 2, 1]);
assert_eq!(it(0..4), vec![0, 3, 1, 2]);
}
}

View file

@ -1,6 +1,4 @@
use std::{
fs::File,
io::{Read, Seek, SeekFrom, Write},
marker::PhantomData,
sync::atomic::{AtomicU64, Ordering},
};
@ -16,8 +14,9 @@ use sha1w::{ISha1, Sha1};
use tracing::{debug, trace, warn};
use crate::{
opened_file::OpenedFile,
type_aliases::{OpenedFiles, PeerHandle, BF},
file_info::FileInfo,
storage::TorrentStorage,
type_aliases::{FileInfos, PeerHandle, BF},
};
pub(crate) struct InitialCheckResults {
@ -44,7 +43,9 @@ pub(crate) struct InitialCheckResults {
}
pub fn update_hash_from_file<Sha1: ISha1>(
file: &mut File,
file_id: usize,
mut pos: u64,
files: &dyn TorrentStorage,
hash: &mut Sha1,
buf: &mut [u8],
mut bytes_to_read: usize,
@ -52,10 +53,12 @@ pub fn update_hash_from_file<Sha1: ISha1>(
let mut read = 0;
while bytes_to_read > 0 {
let chunk = std::cmp::min(buf.len(), bytes_to_read);
file.read_exact(&mut buf[..chunk])
files
.pread_exact(file_id, pos, &mut buf[..chunk])
.with_context(|| format!("failed reading chunk of size {chunk}, read so far {read}"))?;
bytes_to_read -= chunk;
read += chunk;
pos += chunk as u64;
hash.update(&buf[..chunk]);
}
Ok(())
@ -63,7 +66,8 @@ pub fn update_hash_from_file<Sha1: ISha1>(
pub(crate) struct FileOps<'a> {
torrent: &'a TorrentMetaV1Info<ByteBufOwned>,
files: &'a OpenedFiles,
files: &'a dyn TorrentStorage,
file_infos: &'a FileInfos,
lengths: &'a Lengths,
phantom_data: PhantomData<Sha1>,
}
@ -71,12 +75,14 @@ pub(crate) struct FileOps<'a> {
impl<'a> FileOps<'a> {
pub fn new(
torrent: &'a TorrentMetaV1Info<ByteBufOwned>,
files: &'a OpenedFiles,
files: &'a dyn TorrentStorage,
file_infos: &'a FileInfos,
lengths: &'a Lengths,
) -> Self {
Self {
torrent,
files,
file_infos,
lengths,
phantom_data: PhantomData,
}
@ -85,8 +91,6 @@ impl<'a> FileOps<'a> {
pub fn initial_check(
&self,
only_files: Option<&[usize]>,
opened_files: &OpenedFiles,
lengths: &Lengths,
progress: &AtomicU64,
) -> anyhow::Result<InitialCheckResults> {
let mut needed_pieces =
@ -102,20 +106,20 @@ impl<'a> FileOps<'a> {
#[derive(Debug)]
struct CurrentFile<'a> {
index: usize,
fd: &'a OpenedFile,
fi: &'a FileInfo,
full_file_required: bool,
processed_bytes: u64,
is_broken: bool,
}
impl<'a> CurrentFile<'a> {
fn remaining(&self) -> u64 {
self.fd.len - self.processed_bytes
self.fi.len - self.processed_bytes
}
fn mark_processed_bytes(&mut self, bytes: u64) {
self.processed_bytes += bytes
}
}
let mut file_iterator = self.files.iter().enumerate().map(|(idx, fd)| {
let mut file_iterator = self.file_infos.iter().enumerate().map(|(idx, fi)| {
let full_file_required = if let Some(only_files) = only_files {
only_files.contains(&idx)
} else {
@ -123,7 +127,7 @@ impl<'a> FileOps<'a> {
};
CurrentFile {
index: idx,
fd,
fi,
full_file_required,
processed_bytes: 0,
is_broken: false,
@ -172,19 +176,17 @@ impl<'a> FileOps<'a> {
continue;
}
let mut fd = current_file.fd.file.lock();
fd.seek(SeekFrom::Start(pos))
.context("bug? error seeking")?;
if let Err(err) = update_hash_from_file(
&mut fd,
current_file.index,
pos,
self.files,
&mut computed_hash,
&mut read_buffer,
to_read_in_file,
) {
debug!(
"error reading from file {} ({:?}) at {}: {:#}",
current_file.index, current_file.fd.filename, pos, &err
current_file.index, current_file.fi.filename, pos, &err
);
current_file.is_broken = true;
some_files_broken = true;
@ -216,10 +218,6 @@ impl<'a> FileOps<'a> {
piece_info.piece_index
);
have_bytes += piece_info.len as u64;
for file_id in piece_files.drain(..) {
opened_files[file_id]
.update_have_on_piece_completed(piece_info.piece_index.get(), lengths);
}
have_pieces.set(piece_info.piece_index.get() as usize, true);
} else if piece_selected {
trace!(
@ -265,9 +263,8 @@ impl<'a> FileOps<'a> {
}
let file_remaining_len = file_len - absolute_offset;
let to_read_in_file =
let to_read_in_file: usize =
std::cmp::min(file_remaining_len, piece_remaining_bytes as u64).try_into()?;
let mut file_g = self.files[file_idx].file.lock();
trace!(
"piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}",
piece_index,
@ -276,18 +273,17 @@ impl<'a> FileOps<'a> {
absolute_offset,
&last_received_chunk
);
file_g
.seek(SeekFrom::Start(absolute_offset))
.with_context(|| {
format!("error seeking to {absolute_offset}, file id: {file_idx}")
})?;
update_hash_from_file(&mut file_g, &mut h, &mut buf, to_read_in_file).with_context(
|| {
format!(
"error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{name:?}\")"
)
},
)?;
update_hash_from_file(
file_idx,
absolute_offset,
self.files,
&mut h,
&mut buf,
to_read_in_file,
)
.with_context(|| {
format!("error reading {to_read_in_file} bytes, file_id: {file_idx} (\"{name:?}\")")
})?;
piece_remaining_bytes -= to_read_in_file;
@ -335,7 +331,6 @@ impl<'a> FileOps<'a> {
let file_remaining_len = file_len - absolute_offset;
let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64).try_into()?;
let mut file_g = self.files[file_idx].file.lock();
trace!(
"piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}",
chunk_info.piece_index,
@ -344,13 +339,8 @@ impl<'a> FileOps<'a> {
absolute_offset,
&chunk_info
);
file_g
.seek(SeekFrom::Start(absolute_offset))
.with_context(|| {
format!("error seeking to {absolute_offset}, file id: {file_idx}")
})?;
file_g
.read_exact(&mut buf[..to_read_in_file])
self.files
.pread_exact(file_idx, absolute_offset, &mut buf[..to_read_in_file])
.with_context(|| {
format!("error reading {file_idx} bytes, file_id: {to_read_in_file}")
})?;
@ -388,7 +378,6 @@ impl<'a> FileOps<'a> {
let remaining_len = file_len - absolute_offset;
let to_write = std::cmp::min(buf.len() as u64, remaining_len).try_into()?;
let mut file_g = self.files[file_idx].file.lock();
trace!(
"piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}",
chunk_info.piece_index,
@ -399,13 +388,8 @@ impl<'a> FileOps<'a> {
to_write,
absolute_offset
);
file_g
.seek(SeekFrom::Start(absolute_offset))
.with_context(|| {
format!("error seeking to {absolute_offset} in file {file_idx} (\"{name:?}\")")
})?;
file_g
.write_all(&buf[..to_write])
self.files
.pwrite_all(file_idx, absolute_offset, &buf[..to_write])
.with_context(|| format!("error writing to file {file_idx} (\"{name:?}\")"))?;
buf = &buf[to_write..];
if buf.is_empty() {

View file

@ -30,6 +30,7 @@ mod api_error;
mod chunk_tracker;
mod create_torrent_file;
mod dht_utils;
pub mod file_info;
mod file_ops;
pub mod http_api;
pub mod http_api_client;
@ -40,6 +41,7 @@ mod peer_info_reader;
mod read_buf;
mod session;
mod spawn_utils;
mod storage;
mod torrent_state;
pub mod tracing_subscriber_config_utils;
mod type_aliases;

View file

@ -1,21 +1,11 @@
use std::{
fs::File,
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
};
use std::fs::File;
use anyhow::Context;
use librqbit_core::lengths::Lengths;
use parking_lot::Mutex;
#[derive(Debug)]
pub(crate) struct OpenedFile {
pub file: Mutex<File>,
pub filename: PathBuf,
pub offset_in_torrent: u64,
pub have: AtomicU64,
pub piece_range: std::ops::Range<u32>,
pub len: u64,
}
pub(crate) fn dummy_file() -> anyhow::Result<std::fs::File> {
@ -30,36 +20,10 @@ pub(crate) fn dummy_file() -> anyhow::Result<std::fs::File> {
.with_context(|| format!("error opening {}", DEVNULL))
}
// Iterate file pieces in the following order: first, last, everything else from start to end.
fn iter_piece_priorities(range: std::ops::Range<usize>) -> impl Iterator<Item = usize> {
// First and last of each file first, then the rest of pieces in that file.
let r = range;
use std::iter::once;
let first = once(r.start);
let last = once(r.start + r.len().overflowing_sub(1).0); // it's ok if it repeats, doesn't matter
let mid = r.clone().skip(1).take(r.len().overflowing_sub(2).0);
// The take(r.len()) is to not yield start/end pieces in case of 0 and 1 lengths.
first.chain(last).chain(mid).take(r.len())
}
impl OpenedFile {
pub fn new(
f: File,
filename: PathBuf,
have: u64,
len: u64,
offset_in_torrent: u64,
piece_range: std::ops::Range<u32>,
) -> Self {
pub fn new(f: File) -> Self {
Self {
file: Mutex::new(f),
filename,
have: AtomicU64::new(have),
len,
offset_in_torrent,
piece_range,
}
}
@ -74,45 +38,6 @@ impl OpenedFile {
let f = self.take()?;
Ok(Self {
file: Mutex::new(f),
filename: self.filename.clone(),
offset_in_torrent: self.offset_in_torrent,
have: AtomicU64::new(self.have.load(Ordering::Relaxed)),
len: self.len,
piece_range: self.piece_range.clone(),
})
}
pub fn piece_range_usize(&self) -> std::ops::Range<usize> {
self.piece_range.start as usize..self.piece_range.end as usize
}
pub fn update_have_on_piece_completed(&self, piece_id: u32, lengths: &Lengths) -> u64 {
let size = lengths.size_of_piece_in_file(piece_id, self.offset_in_torrent, self.len);
self.have.fetch_add(size, Ordering::Relaxed);
size
}
pub fn approx_is_finished(&self) -> bool {
self.have.load(Ordering::Relaxed) == self.len
}
pub fn iter_piece_priorities(&self) -> impl Iterator<Item = usize> {
iter_piece_priorities(self.piece_range_usize())
}
}
#[cfg(test)]
mod tests {
use crate::opened_file::iter_piece_priorities;
#[test]
fn test_iter_piece_priorities() {
let it = |r: std::ops::Range<usize>| -> Vec<usize> { iter_piece_priorities(r).collect() };
assert_eq!(it(0..0), Vec::<usize>::new());
assert_eq!(it(0..1), vec![0]);
assert_eq!(it(0..2), vec![0, 1]);
assert_eq!(it(0..3), vec![0, 2, 1]);
assert_eq!(it(0..4), vec![0, 3, 1, 2]);
}
}

View file

@ -1092,10 +1092,9 @@ impl Session {
warn!(error=?e, "error deleting torrent cleanly");
}
(Ok(Some(paused)), true) => {
for file in paused.files.iter() {
drop(file.take()?);
if let Err(e) = std::fs::remove_file(&file.filename) {
warn!(?file.filename, error=?e, "could not delete file");
for (id, fi) in removed.info().file_infos.iter().enumerate() {
if let Err(e) = paused.files.remove_file(id, &fi.filename) {
warn!(?fi.filename, error=?e, "could not delete file");
}
}
}

View file

@ -0,0 +1,184 @@
use std::{
collections::HashMap,
io::{Read, Seek, SeekFrom, Write},
path::Path,
};
use anyhow::Context;
use librqbit_core::lengths::{Lengths, ValidPieceIndex};
use parking_lot::RwLock;
use crate::{opened_file::OpenedFile, type_aliases::FileInfos};
pub trait TorrentStorage: Send + Sync {
fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()>;
fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()>;
fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()>;
fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()>;
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>>;
}
pub struct FilesystemStorage {
opened_files: Vec<OpenedFile>,
}
impl FilesystemStorage {
pub fn new(opened_files: Vec<OpenedFile>) -> Self {
Self { opened_files }
}
}
impl TorrentStorage for FilesystemStorage {
fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> {
let mut g = self
.opened_files
.get(file_id)
.context("no such file")?
.file
.lock();
g.seek(SeekFrom::Start(offset))?;
Ok(g.read_exact(buf)?)
}
fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> {
let mut g = self
.opened_files
.get(file_id)
.context("no such file")?
.file
.lock();
g.seek(SeekFrom::Start(offset))?;
Ok(g.write_all(buf)?)
}
fn remove_file(&self, _file_id: usize, filename: &Path) -> anyhow::Result<()> {
Ok(std::fs::remove_file(filename)?)
}
fn ensure_file_length(&self, file_id: usize, len: u64) -> anyhow::Result<()> {
Ok(self.opened_files[file_id].file.lock().set_len(len)?)
}
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>> {
Ok(Box::new(Self::new(
self.opened_files
.iter()
.map(|f| f.take_clone())
.collect::<anyhow::Result<Vec<_>>>()?,
)))
}
}
impl TorrentStorage for Box<dyn TorrentStorage> {
fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> {
(**self).pread_exact(file_id, offset, buf)
}
fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> {
(**self).pwrite_all(file_id, offset, buf)
}
fn remove_file(&self, file_id: usize, filename: &Path) -> anyhow::Result<()> {
(**self).remove_file(file_id, filename)
}
fn ensure_file_length(&self, file_id: usize, length: u64) -> anyhow::Result<()> {
(**self).ensure_file_length(file_id, length)
}
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>> {
(**self).take()
}
}
struct InMemoryPiece {
bytes: Box<[u8]>,
}
impl InMemoryPiece {
fn new(l: &Lengths) -> Self {
let v = vec![0; l.default_piece_length() as usize].into_boxed_slice();
Self { bytes: v }
}
}
pub struct InMemoryGarbageCollectingStorage {
lengths: Lengths,
file_infos: FileInfos,
map: RwLock<HashMap<ValidPieceIndex, InMemoryPiece>>,
// TODO: chunk tracker - rename to PieceTracker and extract chunks out of it (only keep pieces)
// this sucker here would track chunks, and the storage above too.
}
impl InMemoryGarbageCollectingStorage {
pub fn new(lengths: Lengths, file_infos: FileInfos) -> anyhow::Result<Self> {
// Max memory 128MiB. Make it tunable
let max_pieces = 128 * 1024 * 1024 / lengths.default_piece_length();
if max_pieces == 0 {
anyhow::bail!("pieces too large");
}
Ok(Self {
lengths,
file_infos,
map: RwLock::new(HashMap::new()),
})
}
}
impl TorrentStorage for InMemoryGarbageCollectingStorage {
fn pread_exact(&self, file_id: usize, offset: u64, buf: &mut [u8]) -> anyhow::Result<()> {
let fi = &self.file_infos[file_id];
let abs_offset = fi.offset_in_torrent + offset;
let piece_id: u32 = (abs_offset / self.lengths.default_piece_length() as u64).try_into()?;
let piece_offset: usize =
(abs_offset % self.lengths.default_piece_length() as u64).try_into()?;
let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?;
let g = self.map.read();
let inmp = g.get(&piece_id).context("piece expired")?;
buf.copy_from_slice(&inmp.bytes[piece_offset..(piece_offset + buf.len())]);
Ok(())
}
fn pwrite_all(&self, file_id: usize, offset: u64, buf: &[u8]) -> anyhow::Result<()> {
let fi = &self.file_infos[file_id];
let abs_offset = fi.offset_in_torrent + offset;
let piece_id: u32 = (abs_offset / self.lengths.default_piece_length() as u64).try_into()?;
let piece_offset: usize =
(abs_offset % self.lengths.default_piece_length() as u64).try_into()?;
let piece_id = self.lengths.validate_piece_index(piece_id).context("bug")?;
let mut g = self.map.write();
let inmp = g
.entry(piece_id)
.or_insert_with(|| InMemoryPiece::new(&self.lengths));
inmp.bytes[piece_offset..(piece_offset + buf.len())].copy_from_slice(buf);
Ok(())
}
fn remove_file(&self, _file_id: usize, _filename: &Path) -> anyhow::Result<()> {
Ok(())
}
fn ensure_file_length(&self, _file_id: usize, _length: u64) -> anyhow::Result<()> {
Ok(())
}
fn take(&self) -> anyhow::Result<Box<dyn TorrentStorage>> {
let map = {
let mut g = self.map.write();
let mut repl = HashMap::new();
std::mem::swap(&mut *g, &mut repl);
repl
};
Ok(Box::new(Self {
lengths: self.lengths,
map: RwLock::new(map),
file_infos: self.file_infos.clone(),
}))
}
}

View file

@ -10,8 +10,10 @@ use size_format::SizeFormatterBinary as SF;
use tracing::{debug, info, warn};
use crate::{
chunk_tracker::ChunkTracker, file_ops::FileOps, opened_file::OpenedFile,
type_aliases::OpenedFiles,
chunk_tracker::ChunkTracker,
file_ops::FileOps,
opened_file::OpenedFile,
storage::{FilesystemStorage, InMemoryGarbageCollectingStorage, TorrentStorage},
};
use super::{paused::TorrentStatePaused, ManagedTorrentInfo};
@ -37,7 +39,23 @@ impl TorrentStateInitializing {
}
pub async fn check(&self) -> anyhow::Result<TorrentStatePaused> {
let mut files = OpenedFiles::new();
// Return in-memory store
let store =
InMemoryGarbageCollectingStorage::new(self.meta.lengths, self.meta.file_infos.clone())?;
let ct = ChunkTracker::new_empty(self.meta.lengths, &self.meta.file_infos)?;
Ok(TorrentStatePaused {
info: self.meta.clone(),
files: Box::new(store),
chunk_tracker: ct,
streams: Arc::new(Default::default()),
})
// self.check_disk().await
}
pub async fn check_disk(&self) -> anyhow::Result<TorrentStatePaused> {
let mut files = Vec::<OpenedFile>::new();
for file_details in self.meta.info.iter_file_details(&self.meta.lengths)? {
let mut full_path = self.meta.out_dir.clone();
let relative_path = file_details
@ -64,26 +82,21 @@ impl TorrentStateInitializing {
.with_context(|| format!("error creating {:?}", &full_path))?;
OpenOptions::new().read(true).write(true).open(&full_path)?
};
files.push(OpenedFile::new(
file,
full_path,
0,
file_details.len,
file_details.offset,
file_details.pieces,
));
files.push(OpenedFile::new(file));
}
let files: Box<dyn TorrentStorage> = Box::new(FilesystemStorage::new(files));
debug!("computed lengths: {:?}", &self.meta.lengths);
info!("Doing initial checksum validation, this might take a while...");
let initial_check_results = self.meta.spawner.spawn_block_in_place(|| {
FileOps::new(&self.meta.info, &files, &self.meta.lengths).initial_check(
self.only_files.as_deref(),
FileOps::new(
&self.meta.info,
&files,
&self.meta.file_infos,
&self.meta.lengths,
&self.checked_bytes,
)
.initial_check(self.only_files.as_deref(), &self.checked_bytes)
})?;
info!(
@ -95,7 +108,7 @@ impl TorrentStateInitializing {
// Ensure file lenghts are correct, and reopen read-only.
self.meta.spawner.spawn_block_in_place(|| {
for (idx, file) in files.iter().enumerate() {
for (idx, fi) in self.meta.file_infos.iter().enumerate() {
if self
.only_files
.as_ref()
@ -103,16 +116,16 @@ impl TorrentStateInitializing {
.unwrap_or(true)
{
let now = Instant::now();
if let Err(err) = file.file.lock().set_len(file.len) {
if let Err(err) = files.ensure_file_length(idx, fi.len) {
warn!(
"Error setting length for file {:?} to {}: {:#?}",
file.filename, file.len, err
fi.filename, fi.len, err
);
} else {
debug!(
"Set length for file {:?} to {} in {:?}",
file.filename,
SF::new(file.len),
fi.filename,
SF::new(fi.len),
now.elapsed()
);
}
@ -125,6 +138,7 @@ impl TorrentStateInitializing {
initial_check_results.have_pieces,
initial_check_results.selected_pieces,
self.meta.lengths,
&self.meta.file_infos,
)
.context("error creating chunk tracker")?;

View file

@ -86,8 +86,9 @@ use crate::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
session::CheckedIncomingConnection,
storage::TorrentStorage,
torrent_state::{peer::Peer, utils::atomic_inc},
type_aliases::{FilePriorities, OpenedFiles, PeerHandle, BF},
type_aliases::{FilePriorities, FileStorage, PeerHandle, BF},
};
use self::{
@ -141,7 +142,7 @@ impl TorrentStateLocked {
.context("chunk tracker empty, torrent was paused")
}
fn get_chunks_mut(&mut self) -> anyhow::Result<&mut ChunkTracker> {
pub(crate) fn get_chunks_mut(&mut self) -> anyhow::Result<&mut ChunkTracker> {
self.chunks
.as_mut()
.context("chunk tracker empty, torrent was paused")
@ -159,7 +160,7 @@ pub struct TorrentStateLive {
meta: Arc<ManagedTorrentInfo>,
locked: RwLock<TorrentStateLocked>,
pub(crate) files: OpenedFiles,
pub(crate) files: FileStorage,
stats: AtomicStats,
lengths: Lengths,
@ -195,9 +196,15 @@ impl TorrentStateLive {
// TODO: make it configurable
let file_priorities = {
let mut pri = (0..paused.files.len()).collect::<Vec<usize>>();
let mut pri = (0..paused.info.file_infos.len()).collect::<Vec<usize>>();
// sort by filename, cause many torrents have random sort order.
pri.sort_unstable_by_key(|id| paused.files.get(*id).map(|op| op.filename.as_path()));
pri.sort_unstable_by_key(|id| {
paused
.info
.file_infos
.get(*id)
.map(|fi| fi.filename.as_path())
});
pri
};
@ -482,7 +489,12 @@ impl TorrentStateLive {
self.meta.peer_id
}
pub(crate) fn file_ops(&self) -> FileOps<'_> {
FileOps::new(&self.meta.info, &self.files, &self.lengths)
FileOps::new(
&self.meta.info,
&self.files,
&self.meta().file_infos,
&self.lengths,
)
}
pub(crate) fn lock_read(
@ -632,12 +644,6 @@ impl TorrentStateLive {
// It should be impossible to make a fatal error after pausing.
g.fatal_errors_tx.take();
let files = self
.files
.iter()
.map(|f| f.take_clone())
.collect::<anyhow::Result<Vec<_>>>()?;
let mut chunk_tracker = g
.chunks
.take()
@ -649,7 +655,7 @@ impl TorrentStateLive {
// g.chunks;
Ok(TorrentStatePaused {
info: self.meta.clone(),
files,
files: self.files.take()?,
chunk_tracker,
streams: self.streams.clone(),
})
@ -671,7 +677,7 @@ impl TorrentStateLive {
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
let mut g = self.lock_write("update_only_files");
let ct = g.get_chunks_mut()?;
let hns = ct.update_only_files(self.files.iter().map(|f| f.len), only_files)?;
let hns = ct.update_only_files(self.meta().file_infos.iter().map(|f| f.len), only_files)?;
if !hns.finished() {
self.reconnect_all_not_needed_peers();
}
@ -682,41 +688,49 @@ impl TorrentStateLive {
self.get_hns().map(|h| h.finished()).unwrap_or_default()
}
pub(crate) fn has_active_streams_unfinished_files(&self) -> bool {
fn has_active_streams_unfinished_files(&self, state: &TorrentStateLocked) -> bool {
let chunks = match state.get_chunks() {
Ok(c) => c,
Err(_) => return false,
};
self.streams
.streamed_file_ids()
.any(|file_id| !self.files[file_id].approx_is_finished())
.any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id]))
}
pub(crate) fn is_finished_and_dont_need_peers(&self) -> bool {
self.is_finished() && !self.has_active_streams_unfinished_files()
fn is_finished_and_dont_need_peers(&self) -> bool {
self.is_finished()
&& !self.has_active_streams_unfinished_files(
&self.lock_read("is_finished_and_dont_need_peers"),
)
}
fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> {
let mut g = self.lock_write("on_piece_completed");
let chunks = g.get_chunks_mut()?;
// if we have all the pieces of the file, reopen it read only
for (idx, opened_file) in self
.files
for (idx, file_info) in self
.meta()
.file_infos
.iter()
.enumerate()
.skip_while(|fd| !fd.1.piece_range.contains(&id.get()))
.take_while(|fd| fd.1.piece_range.contains(&id.get()))
.skip_while(|(_, fi)| !fi.piece_range.contains(&id.get()))
.take_while(|(_, fi)| fi.piece_range.contains(&id.get()))
{
let bytes = opened_file.update_have_on_piece_completed(id.get(), &self.lengths);
if bytes == 0 {
warn!(file_id=idx, piece_id=id.get(), "bug: update_have_on_piece_completed() returned 0, although this piece is present in the file");
}
let _remaining = chunks.update_file_have_on_piece_completed(id, idx, file_info);
}
self.streams
.wake_streams_on_piece_completed(id, &self.meta.lengths);
if self.is_finished() {
if self.lock_read("chunks").get_chunks()?.get_selected_pieces()[id.get_usize()] {
if chunks.is_finished() {
if chunks.get_selected_pieces()[id.get_usize()] {
info!("torrent finished downloading");
}
self.finished_notify.notify_waiters();
if !self.has_active_streams_unfinished_files() {
if !self.has_active_streams_unfinished_files(&g) {
// There is not poing being connected to peers that have all the torrent, when
// we don't need anything from them, and they don't need anything from us.
self.disconnect_all_peers_that_have_full_torrent();
@ -749,13 +763,6 @@ impl TorrentStateLive {
}
}
}
pub(crate) fn get_file_progress(&self) -> Vec<u64> {
self.files
.iter()
.map(|fd| fd.have.load(Ordering::Relaxed))
.collect()
}
}
struct PeerHandlerLocked {
@ -989,8 +996,8 @@ impl PeerHandler {
!chunk_tracker.is_piece_have(*pid)
&& !g.inflight_pieces.contains_key(pid)
});
let natural_order_pieces =
chunk_tracker.iter_queued_pieces(&g.file_priorities, &self.state.files);
let natural_order_pieces = chunk_tracker
.iter_queued_pieces(&g.file_priorities, &self.state.meta().file_infos);
for n in priority_streamed_pieces.chain(natural_order_pieces) {
if bf.get(n.get() as usize).map(|v| *v) == Some(true) {
n_opt = Some(n);

View file

@ -34,8 +34,10 @@ use tracing::error_span;
use tracing::warn;
use crate::chunk_tracker::ChunkTracker;
use crate::file_info::FileInfo;
use crate::spawn_utils::BlockingSpawner;
use crate::torrent_state::stats::LiveStats;
use crate::type_aliases::FileInfos;
use crate::type_aliases::PeerStream;
use initializing::TorrentStateInitializing;
@ -98,6 +100,7 @@ pub struct ManagedTorrentInfo {
pub trackers: HashSet<String>,
pub peer_id: Id20,
pub lengths: Lengths,
pub file_infos: FileInfos,
pub span: tracing::Span,
pub(crate) options: ManagedTorrentOptions,
}
@ -370,11 +373,7 @@ impl ManagedTorrent {
resp.total_bytes = hns.total();
resp.progress_bytes = hns.progress();
resp.finished = hns.finished();
resp.file_progress = p
.files
.iter()
.map(|f| f.have.load(Ordering::Relaxed))
.collect();
resp.file_progress = p.chunk_tracker.per_file_have_bytes().to_owned();
}
ManagedTorrentState::Live(l) => {
resp.state = S::Live;
@ -384,7 +383,12 @@ impl ManagedTorrent {
resp.progress_bytes = hns.progress();
resp.finished = hns.finished();
resp.uploaded_bytes = l.get_uploaded_bytes();
resp.file_progress = l.get_file_progress();
resp.file_progress = l
.lock_read("file_progress")
.get_chunks()
.ok()
.map(|c| c.per_file_have_bytes().to_owned())
.unwrap_or_default();
resp.live = Some(live_stats);
}
ManagedTorrentState::Error(e) => {
@ -534,8 +538,21 @@ impl ManagedTorrentBuilder {
pub(crate) fn build(self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> {
let lengths = Lengths::from_torrent(&self.info)?;
let file_infos = self
.info
.iter_file_details(&lengths)?
.map(|fd| {
Ok::<_, anyhow::Error>(FileInfo {
filename: self.output_folder.join(fd.filename.to_pathbuf()?),
offset_in_torrent: fd.offset,
piece_range: fd.pieces,
len: fd.len,
})
})
.collect::<anyhow::Result<Vec<FileInfo>>>()?;
let info = Arc::new(ManagedTorrentInfo {
span,
file_infos,
info: self.info,
info_hash: self.info_hash,
out_dir: self.output_folder,

View file

@ -2,14 +2,14 @@ use std::{collections::HashSet, sync::Arc};
use crate::{
chunk_tracker::{ChunkTracker, HaveNeededSelected},
type_aliases::OpenedFiles,
type_aliases::FileStorage,
};
use super::{streaming::TorrentStreams, ManagedTorrentInfo};
pub struct TorrentStatePaused {
pub(crate) info: Arc<ManagedTorrentInfo>,
pub(crate) files: OpenedFiles,
pub(crate) files: FileStorage,
pub(crate) chunk_tracker: ChunkTracker,
pub(crate) streams: Arc<TorrentStreams>,
}

View file

@ -1,6 +1,6 @@
use std::{
collections::VecDeque,
io::{Read, Seek, SeekFrom},
io::SeekFrom,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
@ -10,11 +10,12 @@ use std::{
use anyhow::Context;
use dashmap::DashMap;
use librqbit_core::lengths::{Lengths, ValidPieceIndex};
use tokio::io::{AsyncRead, AsyncSeek};
use tracing::{debug, trace};
use crate::{opened_file::OpenedFile, type_aliases::OpenedFiles, ManagedTorrent};
use crate::{file_info::FileInfo, storage::TorrentStorage, ManagedTorrent};
use super::ManagedTorrentHandle;
@ -236,18 +237,22 @@ impl AsyncRead for FileStream {
"will write bytes"
);
poll_try_io!(poll_try_io!(self.torrent.with_opened_file(
poll_try_io!(poll_try_io!(self.torrent.with_storage_and_file(
self.file_id,
|fd| {
let mut g = fd.file.lock();
g.seek(SeekFrom::Start(self.position))?;
g.read_exact(buf)?;
|files, _fi| {
files.pread_exact(self.file_id, self.position, buf)?;
Ok::<_, anyhow::Error>(())
}
)));
self.as_mut().advance(bytes_to_read as u64);
tbuf.advance(bytes_to_read);
self.streams
.streams
.get_mut(&self.stream_id)
.unwrap()
.value_mut()
.position = self.position;
Poll::Ready(Ok(()))
}
@ -292,30 +297,25 @@ impl Drop for FileStream {
}
impl ManagedTorrent {
fn with_opened_files<F, R>(&self, f: F) -> anyhow::Result<R>
fn with_storage_and_file<F, R>(&self, file_id: usize, f: F) -> anyhow::Result<R>
where
F: FnOnce(&OpenedFiles) -> R,
F: FnOnce(&dyn TorrentStorage, &FileInfo) -> R,
{
self.with_state(|s| {
let files = match s {
crate::ManagedTorrentState::Paused(p) => &p.files,
crate::ManagedTorrentState::Live(l) => &l.files,
s => anyhow::bail!("with_opened_file: invalid state {}", s.name()),
_ => anyhow::bail!("invalid state"),
};
Ok(f(files))
let fi = self
.info()
.file_infos
.get(file_id)
.context("invalid file")?;
Ok(f(files, fi))
})
}
fn with_opened_file<F, R>(&self, file_id: usize, f: F) -> anyhow::Result<R>
where
F: FnOnce(&OpenedFile) -> R,
{
self.with_opened_files(|opened_files| {
let fd = opened_files.get(file_id).context("invalid file id")?;
Ok(f(fd))
})?
}
fn streams(&self) -> anyhow::Result<Arc<TorrentStreams>> {
self.with_state(|s| match s {
crate::ManagedTorrentState::Paused(p) => Ok(p.streams.clone()),
@ -326,7 +326,7 @@ impl ManagedTorrent {
fn maybe_reconnect_needed_peers_for_file(&self, file_id: usize) -> bool {
// If we have the full file, don't bother.
if let Ok(true) = self.with_opened_file(file_id, |f| f.approx_is_finished()) {
if self.is_file_finished(file_id) {
return false;
}
self.with_state(|state| {
@ -337,9 +337,14 @@ impl ManagedTorrent {
true
}
fn is_file_finished(&self, file_id: usize) -> bool {
self.with_chunk_tracker(|ct| ct.is_file_finished(&self.info.file_infos[file_id]))
.unwrap_or(false)
}
pub fn stream(self: Arc<Self>, file_id: usize) -> anyhow::Result<FileStream> {
let (fd_len, fd_offset) =
self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?;
self.with_storage_and_file(file_id, |_fd, fi| (fi.len, fi.offset_in_torrent))?;
let streams = self.streams()?;
let s = FileStream {
stream_id: streams.next_id(),

View file

@ -2,11 +2,12 @@ use std::net::SocketAddr;
use futures::stream::BoxStream;
use crate::opened_file::OpenedFile;
use crate::{file_info::FileInfo, storage::TorrentStorage};
pub type BF = bitvec::boxed::BitBox<u8, bitvec::order::Msb0>;
pub type PeerHandle = SocketAddr;
pub type PeerStream = BoxStream<'static, SocketAddr>;
pub(crate) type OpenedFiles = Vec<OpenedFile>;
pub(crate) type FileInfos = Vec<FileInfo>;
pub(crate) type FileStorage = Box<dyn TorrentStorage>;
pub(crate) type FilePriorities = Vec<usize>;