Ability to change the list of files at any time, including through UI (#115)

* Now can update the list of files without pausing/unpausing

* Shrink a few functions

* Reopen write when updating files

* Todos

* opened_file abstraction

* iter_pieces_within iterator

* Simplify iter_pieces_within

* Simplify iter_pieces_within

* Add "iter_file_details"

* temporarily broken: readonly by default

* Live torrent - reopen files

* Reopen files after changing the list

* Now reopening files read only when they are completed

* Fix a bug in opened_file.rs

* update todos

* update help

* Reconnect all peers that are idling

* Add a couple fields to OpenedFile

* Add a couple fields to OpenedFile

* Small cleanups - use the new iterator where possible

* size_of_piece_in_file function

* Updating have

* Include file progress

* Almost nothing

* ugly progress bars

* bad UI, saving

* its not so bad

* Works now

* update progress bar a bit

* Reopen read-only on pause

* Zero bytes isnt too bad! Doesnt break anything

* fix per file progress bars

* progress bar not as ugly anymore?

* ui tweaks

* fix a react bug

* TODO.md update

* Fix js + TODOs

* Compute per-file progress on init

* Fix stats updating live

* Nothing

* Nothing

* cleanup ui a bit

* Nothing

* Final fixes

* Trying to fix rust 1.73

* Sorting filenames

* remove unnecessary indentation

* Remove unnecessary comment
This commit is contained in:
Igor Katson 2024-04-06 09:20:03 +01:00 committed by GitHub
parent d7380217f6
commit 5eb01ac226
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 865 additions and 512 deletions

View file

@ -33,9 +33,13 @@ pub struct ChunkTracker {
// What pieces to download first.
priority_piece_ids: Vec<usize>,
// Quick to retrieve stats, that MUST be in sync with the BFs
// above (have/selected).
hns: HaveNeededSelected,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Default, Debug, PartialEq, Eq, Clone, Copy)]
pub struct HaveNeededSelected {
// How many bytes we have downloaded and verified.
pub have_bytes: u64,
@ -146,7 +150,7 @@ impl ChunkTracker {
// E.g. if it's a video file, than the last piece often contains some index, or just
// players look into it, and it's better be there.
let priority_piece_ids = last_needed_piece_id.into_iter().collect();
Ok(Self {
let mut ct = Self {
chunk_status: compute_chunk_have_status(&lengths, &have_pieces)
.context("error computing chunk status")?,
queue_pieces: needed_pieces,
@ -154,7 +158,10 @@ impl ChunkTracker {
lengths,
have: have_pieces,
priority_piece_ids,
})
hns: HaveNeededSelected::default(),
};
ct.hns = ct.calc_hns();
Ok(ct)
}
pub fn get_lengths(&self) -> &Lengths {
@ -164,34 +171,31 @@ impl ChunkTracker {
pub fn get_have_pieces(&self) -> &BF {
&self.have
}
pub fn get_selected_pieces(&self) -> &BF {
&self.selected
}
pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) {
self.queue_pieces.set(index.get() as usize, false)
}
pub fn calc_have_bytes(&self) -> u64 {
self.have
.iter_ones()
.filter_map(|piece_id| {
let piece_id = self.lengths.validate_piece_index(piece_id as u32)?;
Some(self.lengths.piece_length(piece_id) as u64)
})
.sum()
pub fn get_hns(&self) -> &HaveNeededSelected {
&self.hns
}
pub fn calc_needed_bytes(&self) -> u64 {
self.have
.iter()
.zip(self.selected.iter())
.enumerate()
.filter_map(|(piece_id, (have, selected))| {
if *selected && !*have {
let piece_id = self.lengths.validate_piece_index(piece_id as u32)?;
Some(self.lengths.piece_length(piece_id) as u64)
} else {
None
}
})
.sum()
fn calc_hns(&self) -> HaveNeededSelected {
let mut hns = HaveNeededSelected::default();
for piece in self.lengths.iter_piece_infos() {
let id = piece.piece_index.get() as usize;
let len = piece.len as u64;
let is_have = self.have[id];
let is_selected = self.selected[id];
let is_needed = is_selected && !is_have;
hns.have_bytes += len * (is_have as u64);
hns.selected_bytes += len * (is_selected as u64);
hns.needed_bytes += len * (is_needed as u64);
}
hns
}
pub fn iter_queued_pieces(&self) -> impl Iterator<Item = usize> + '_ {
@ -242,7 +246,15 @@ impl ChunkTracker {
}
pub fn mark_piece_downloaded(&mut self, idx: ValidPieceIndex) {
self.have.set(idx.get() as usize, true);
let id = idx.get() as usize;
if !self.have[id] {
self.have.set(id, true);
let len = self.lengths.piece_length(idx) as u64;
self.hns.have_bytes += len;
if self.selected[id] {
self.hns.needed_bytes -= len;
}
}
}
pub fn is_chunk_ready_to_upload(&self, chunk: &ChunkInfo) -> bool {
@ -252,6 +264,10 @@ impl ChunkTracker {
.unwrap_or(false)
}
pub fn get_remaining_bytes(&self) -> u64 {
self.hns.needed_bytes
}
// return true if the whole piece is marked downloaded
pub fn mark_chunk_downloaded<ByteBuf>(
&mut self,
@ -356,11 +372,13 @@ impl ChunkTracker {
}
}
Ok(HaveNeededSelected {
let res = HaveNeededSelected {
have_bytes,
needed_bytes,
selected_bytes,
})
};
self.hns = res;
Ok(res)
}
}

View file

@ -2,24 +2,23 @@ use std::{
fs::File,
io::{Read, Seek, SeekFrom, Write},
marker::PhantomData,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
sync::atomic::{AtomicU64, Ordering},
};
use anyhow::Context;
use buffers::ByteBufOwned;
use librqbit_core::{
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
torrent_metainfo::{FileIteratorName, TorrentMetaV1Info},
torrent_metainfo::TorrentMetaV1Info,
};
use parking_lot::Mutex;
use peer_binary_protocol::Piece;
use sha1w::{ISha1, Sha1};
use tracing::{debug, trace, warn};
use crate::type_aliases::{PeerHandle, BF};
use crate::{
opened_file::OpenedFile,
type_aliases::{OpenedFiles, PeerHandle, BF},
};
pub(crate) struct InitialCheckResults {
// A piece as flags based on these dimensions:
@ -64,7 +63,7 @@ pub fn update_hash_from_file<Sha1: ISha1>(
pub(crate) struct FileOps<'a> {
torrent: &'a TorrentMetaV1Info<ByteBufOwned>,
files: &'a [Arc<Mutex<File>>],
files: &'a OpenedFiles,
lengths: &'a Lengths,
phantom_data: PhantomData<Sha1>,
}
@ -72,7 +71,7 @@ pub(crate) struct FileOps<'a> {
impl<'a> FileOps<'a> {
pub fn new(
torrent: &'a TorrentMetaV1Info<ByteBufOwned>,
files: &'a [Arc<Mutex<File>>],
files: &'a OpenedFiles,
lengths: &'a Lengths,
) -> Self {
Self {
@ -86,6 +85,8 @@ impl<'a> FileOps<'a> {
pub fn initial_check(
&self,
only_files: Option<&[usize]>,
opened_files: &OpenedFiles,
lengths: &Lengths,
progress: &AtomicU64,
) -> anyhow::Result<InitialCheckResults> {
let mut needed_pieces =
@ -96,46 +97,38 @@ impl<'a> FileOps<'a> {
let mut have_bytes = 0u64;
let mut needed_bytes = 0u64;
let mut total_selected_bytes = 0u64;
let mut piece_files = Vec::<usize>::new();
#[derive(Debug)]
struct CurrentFile<'a> {
index: usize,
fd: &'a Arc<Mutex<File>>,
len: u64,
name: FileIteratorName<'a, ByteBufOwned>,
fd: &'a OpenedFile,
full_file_required: bool,
processed_bytes: u64,
is_broken: bool,
}
impl<'a> CurrentFile<'a> {
fn remaining(&self) -> u64 {
self.len - self.processed_bytes
self.fd.len - self.processed_bytes
}
fn mark_processed_bytes(&mut self, bytes: u64) {
self.processed_bytes += bytes
}
}
let mut file_iterator = self
.files
.iter()
.zip(self.torrent.iter_filenames_and_lengths()?)
.enumerate()
.map(|(idx, (fd, (name, len)))| {
let full_file_required = if let Some(only_files) = only_files {
only_files.contains(&idx)
} else {
true
};
CurrentFile {
index: idx,
fd,
len,
name,
full_file_required,
processed_bytes: 0,
is_broken: false,
}
});
let mut file_iterator = self.files.iter().enumerate().map(|(idx, fd)| {
let full_file_required = if let Some(only_files) = only_files {
only_files.contains(&idx)
} else {
true
};
CurrentFile {
index: idx,
fd,
full_file_required,
processed_bytes: 0,
is_broken: false,
}
});
let mut current_file = file_iterator
.next()
@ -144,6 +137,7 @@ impl<'a> FileOps<'a> {
let mut read_buffer = vec![0u8; 65536];
for piece_info in self.lengths.iter_piece_infos() {
piece_files.clear();
let mut computed_hash = Sha1::new();
let mut piece_remaining = piece_info.len as usize;
let mut some_files_broken = false;
@ -166,6 +160,8 @@ impl<'a> FileOps<'a> {
std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize;
}
piece_files.push(current_file.index);
let pos = current_file.processed_bytes;
piece_remaining -= to_read_in_file;
current_file.mark_processed_bytes(to_read_in_file as u64);
@ -175,7 +171,7 @@ impl<'a> FileOps<'a> {
continue;
}
let mut fd = current_file.fd.lock();
let mut fd = current_file.fd.file.lock();
fd.seek(SeekFrom::Start(pos))
.context("bug? error seeking")?;
@ -187,7 +183,7 @@ impl<'a> FileOps<'a> {
) {
debug!(
"error reading from file {} ({:?}) at {}: {:#}",
current_file.index, current_file.name, pos, &err
current_file.index, current_file.fd.filename, pos, &err
);
current_file.is_broken = true;
some_files_broken = true;
@ -219,6 +215,10 @@ impl<'a> FileOps<'a> {
piece_info.piece_index
);
have_bytes += piece_info.len as u64;
for file_id in piece_files.drain(..) {
opened_files[file_id]
.update_have_on_piece_completed(piece_info.piece_index.get(), lengths);
}
have_pieces.set(piece_info.piece_index.get() as usize, true);
} else if piece_selected {
trace!(
@ -266,7 +266,7 @@ impl<'a> FileOps<'a> {
let to_read_in_file =
std::cmp::min(file_remaining_len, piece_remaining_bytes as u64) as usize;
let mut file_g = self.files[file_idx].lock();
let mut file_g = self.files[file_idx].file.lock();
trace!(
"piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}",
piece_index,
@ -334,7 +334,7 @@ impl<'a> FileOps<'a> {
let file_remaining_len = file_len - absolute_offset;
let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize;
let mut file_g = self.files[file_idx].lock();
let mut file_g = self.files[file_idx].file.lock();
trace!(
"piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}",
chunk_info.piece_index,
@ -387,7 +387,7 @@ impl<'a> FileOps<'a> {
let remaining_len = file_len - absolute_offset;
let to_write = std::cmp::min(buf.len(), remaining_len as usize);
let mut file_g = self.files[file_idx].lock();
let mut file_g = self.files[file_idx].file.lock();
trace!(
"piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}",
chunk_info.piece_index,

View file

@ -31,6 +31,7 @@ mod dht_utils;
mod file_ops;
pub mod http_api;
pub mod http_api_client;
mod opened_file;
mod peer_connection;
mod peer_info_reader;
mod read_buf;

View file

@ -0,0 +1,97 @@
use std::{
fs::File,
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
};
use anyhow::Context;
use librqbit_core::lengths::Lengths;
use parking_lot::Mutex;
use tracing::debug;
#[derive(Debug)]
pub(crate) struct OpenedFile {
pub file: Mutex<File>,
pub filename: PathBuf,
pub offset_in_torrent: u64,
pub have: AtomicU64,
pub piece_range: std::ops::Range<u32>,
pub len: u64,
}
pub(crate) fn dummy_file() -> anyhow::Result<std::fs::File> {
#[cfg(target_os = "windows")]
const DEVNULL: &str = "NUL";
#[cfg(not(target_os = "windows"))]
const DEVNULL: &str = "/dev/null";
std::fs::OpenOptions::new()
.read(true)
.open(DEVNULL)
.with_context(|| format!("error opening {}", DEVNULL))
}
impl OpenedFile {
pub fn new(
f: File,
filename: PathBuf,
have: u64,
len: u64,
offset_in_torrent: u64,
piece_range: std::ops::Range<u32>,
) -> Self {
Self {
file: Mutex::new(f),
filename,
have: AtomicU64::new(have),
len,
offset_in_torrent,
piece_range,
}
}
pub fn reopen(&self, read_only: bool) -> anyhow::Result<()> {
let log_suffix = if read_only { " read only" } else { "" };
let mut open_opts = std::fs::OpenOptions::new();
open_opts.read(true);
if !read_only {
open_opts.write(true).create(false);
}
let mut g = self.file.lock();
*g = open_opts
.open(&self.filename)
.with_context(|| format!("error re-opening {:?}{log_suffix}", self.filename))?;
debug!("reopened {:?}{log_suffix}", self.filename);
Ok(())
}
pub fn take(&self) -> anyhow::Result<File> {
let mut f = self.file.lock();
let dummy = dummy_file()?;
let f = std::mem::replace(&mut *f, dummy);
Ok(f)
}
pub fn take_clone(&self) -> anyhow::Result<Self> {
let f = self.take()?;
Ok(Self {
file: Mutex::new(f),
filename: self.filename.clone(),
offset_in_torrent: self.offset_in_torrent,
have: AtomicU64::new(self.have.load(Ordering::Relaxed)),
len: self.len,
piece_range: self.piece_range.clone(),
})
}
pub fn piece_range_usize(&self) -> std::ops::Range<usize> {
self.piece_range.start as usize..self.piece_range.end as usize
}
pub fn update_have_on_piece_completed(&self, piece_id: u32, lengths: &Lengths) -> u64 {
let size = lengths.size_of_piece_in_file(piece_id, self.offset_in_torrent, self.len);
self.have.fetch_add(size, Ordering::Relaxed);
size
}
}

View file

@ -1083,10 +1083,10 @@ impl Session {
warn!(error=?e, "error deleting torrent cleanly");
}
(Ok(Some(paused)), true) => {
drop(paused.files);
for file in paused.filenames {
if let Err(e) = std::fs::remove_file(&file) {
warn!(?file, error=?e, "could not delete file");
for file in paused.files.iter() {
drop(file.take()?);
if let Err(e) = std::fs::remove_file(&file.filename) {
warn!(?file.filename, error=?e, "could not delete file");
}
}
}
@ -1142,10 +1142,7 @@ impl Session {
handle: &ManagedTorrentHandle,
only_files: &HashSet<usize>,
) -> anyhow::Result<()> {
let need_to_unpause = handle.update_only_files(only_files)?;
if need_to_unpause {
self.unpause(handle)?;
}
handle.update_only_files(only_files)?;
Ok(())
}

View file

@ -163,6 +163,7 @@ async fn test_e2e() {
crate::AddTorrent::TorrentFileBytes(Cow::Owned(torrent_file_bytes.clone())),
Some(AddTorrentOptions {
initial_peers: Some(peers.clone()),
// only_files: Some(vec![0]),
overwrite: false,
..Default::default()
}),
@ -253,7 +254,7 @@ async fn test_e2e() {
.with_state(|s| match s {
crate::ManagedTorrentState::Initializing(_) => Ok(false),
crate::ManagedTorrentState::Paused(p) => {
assert_eq!(p.hns.needed_bytes, 0);
assert_eq!(p.chunk_tracker.get_hns().needed_bytes, 0);
Ok(true)
}
_ => bail!("bugged state"),

View file

@ -6,14 +6,12 @@ use std::{
use anyhow::Context;
use parking_lot::Mutex;
use size_format::SizeFormatterBinary as SF;
use tracing::{debug, info, warn};
use crate::{
chunk_tracker::{ChunkTracker, HaveNeededSelected},
file_ops::FileOps,
chunk_tracker::ChunkTracker, file_ops::FileOps, opened_file::OpenedFile,
type_aliases::OpenedFiles,
};
use super::{paused::TorrentStatePaused, ManagedTorrentInfo};
@ -43,48 +41,52 @@ impl TorrentStateInitializing {
}
pub async fn check(&self) -> anyhow::Result<TorrentStatePaused> {
let (files, filenames) = {
let mut files =
Vec::<Arc<Mutex<File>>>::with_capacity(self.meta.info.iter_file_lengths()?.count());
let mut filenames = Vec::new();
for (path_bits, _) in self.meta.info.iter_filenames_and_lengths()? {
let mut full_path = self.meta.out_dir.clone();
let relative_path = path_bits
.to_pathbuf()
.context("error converting file to path")?;
full_path.push(relative_path);
let mut files = OpenedFiles::new();
for file_details in self.meta.info.iter_file_details(&self.meta.lengths)? {
let mut full_path = self.meta.out_dir.clone();
let relative_path = file_details
.filename
.to_pathbuf()
.context("error converting file to path")?;
full_path.push(relative_path);
std::fs::create_dir_all(full_path.parent().unwrap())?;
let file = if self.meta.options.overwrite {
OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&full_path)
.with_context(|| {
format!("error opening {full_path:?} in read/write mode")
})?
} else {
// TODO: create_new does not seem to work with read(true), so calling this twice.
OpenOptions::new()
.create_new(true)
.write(true)
.open(&full_path)
.with_context(|| format!("error creating {:?}", &full_path))?;
OpenOptions::new().read(true).write(true).open(&full_path)?
};
filenames.push(full_path);
files.push(Arc::new(Mutex::new(file)))
}
(files, filenames)
};
std::fs::create_dir_all(full_path.parent().context("bug: no parent")?)?;
let file = if self.meta.options.overwrite {
OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&full_path)
.with_context(|| format!("error opening {full_path:?} in read/write mode"))?
} else {
// TODO: create_new does not seem to work with read(true), so calling this twice.
OpenOptions::new()
.create_new(true)
.write(true)
.open(&full_path)
.with_context(|| format!("error creating {:?}", &full_path))?;
OpenOptions::new().read(true).write(true).open(&full_path)?
};
files.push(OpenedFile::new(
file,
full_path,
0,
file_details.len,
file_details.offset,
file_details.pieces,
));
}
debug!("computed lengths: {:?}", &self.meta.lengths);
info!("Doing initial checksum validation, this might take a while...");
let initial_check_results = self.meta.spawner.spawn_block_in_place(|| {
FileOps::new(&self.meta.info, &files, &self.meta.lengths)
.initial_check(self.only_files.as_deref(), &self.checked_bytes)
FileOps::new(&self.meta.info, &files, &self.meta.lengths).initial_check(
self.only_files.as_deref(),
&files,
&self.meta.lengths,
&self.checked_bytes,
)
})?;
info!(
@ -94,36 +96,35 @@ impl TorrentStateInitializing {
SF::new(initial_check_results.selected_bytes)
);
// Ensure file lenghts are correct, and reopen read-only.
self.meta.spawner.spawn_block_in_place(|| {
for (idx, (file, (name, length))) in files
.iter()
.zip(self.meta.info.iter_filenames_and_lengths().unwrap())
.enumerate()
{
for (idx, file) in files.iter().enumerate() {
if self
.only_files
.as_ref()
.map(|v| !v.contains(&idx))
.unwrap_or(false)
.map(|v| v.contains(&idx))
.unwrap_or(true)
{
continue;
}
let now = Instant::now();
if let Err(err) = ensure_file_length(&file.lock(), length) {
warn!(
"Error setting length for file {:?} to {}: {:#?}",
name, length, err
);
} else {
debug!(
"Set length for file {:?} to {} in {:?}",
name,
SF::new(length),
now.elapsed()
);
let now = Instant::now();
if let Err(err) = ensure_file_length(&file.file.lock(), file.len) {
warn!(
"Error setting length for file {:?} to {}: {:#?}",
file.filename, file.len, err
);
} else {
debug!(
"Set length for file {:?} to {} in {:?}",
file.filename,
SF::new(file.len),
now.elapsed()
);
}
}
file.reopen(true)?;
}
});
Ok::<_, anyhow::Error>(())
})?;
let chunk_tracker = ChunkTracker::new(
initial_check_results.have_pieces,
@ -135,13 +136,7 @@ impl TorrentStateInitializing {
let paused = TorrentStatePaused {
info: self.meta.clone(),
files,
filenames,
chunk_tracker,
hns: HaveNeededSelected {
have_bytes: initial_check_results.have_bytes,
needed_bytes: initial_check_results.needed_bytes,
selected_bytes: initial_check_results.selected_bytes,
},
};
Ok(paused)
}

View file

@ -44,10 +44,8 @@ pub mod peers;
pub mod stats;
use std::{
collections::HashMap,
fs::File,
collections::{HashMap, HashSet},
net::SocketAddr,
path::PathBuf,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
@ -60,7 +58,6 @@ use backoff::backoff::Backoff;
use buffers::{ByteBuf, ByteBufOwned};
use clone_to_owned::CloneToOwned;
use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use librqbit_core::{
hash_id::Id20,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
@ -68,7 +65,7 @@ use librqbit_core::{
speed_estimator::SpeedEstimator,
torrent_metainfo::TorrentMetaV1Info,
};
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use peer_binary_protocol::{
extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request,
};
@ -90,7 +87,7 @@ use crate::{
},
session::CheckedIncomingConnection,
torrent_state::{peer::Peer, utils::atomic_inc},
type_aliases::{PeerHandle, BF},
type_aliases::{OpenedFiles, PeerHandle, BF},
};
use self::{
@ -116,18 +113,6 @@ struct InflightPiece {
started: Instant,
}
fn dummy_file() -> anyhow::Result<std::fs::File> {
#[cfg(target_os = "windows")]
const DEVNULL: &str = "NUL";
#[cfg(not(target_os = "windows"))]
const DEVNULL: &str = "/dev/null";
std::fs::OpenOptions::new()
.read(true)
.open(DEVNULL)
.with_context(|| format!("error opening {}", DEVNULL))
}
fn make_piece_bitfield(lengths: &Lengths) -> BF {
BF::from_boxed_slice(vec![0; lengths.piece_bitfield_bytes()].into_boxed_slice())
}
@ -170,11 +155,7 @@ pub struct TorrentStateLive {
meta: Arc<ManagedTorrentInfo>,
locked: RwLock<TorrentStateLocked>,
files: Vec<Arc<Mutex<File>>>,
filenames: Vec<PathBuf>,
initially_needed_bytes: u64,
total_selected_bytes: u64,
files: OpenedFiles,
stats: AtomicStats,
lengths: Lengths,
@ -192,22 +173,48 @@ pub struct TorrentStateLive {
cancellation_token: CancellationToken,
}
fn reopen_necessary_files_for_write(ct: &ChunkTracker, files: &OpenedFiles) -> anyhow::Result<()> {
// Reopen files that we don't have, but have selected in write-only mode.
for opened_file in files.iter() {
let prange = opened_file.piece_range_usize();
if prange.is_empty() {
continue;
}
let selected = ct
.get_selected_pieces()
.get(prange.clone())
.with_context(|| format!("bug: bad range get_selected_pieces(), {prange:?}"))?;
let have = ct
.get_have_pieces()
.get(prange.clone())
.with_context(|| format!("bug: bad range get_have_pieces(), {prange:?}"))?;
let need_write = selected
.iter()
.zip(have.iter())
.any(|(selected, have)| *selected && !*have);
if need_write {
opened_file.reopen(false)?;
}
}
Ok(())
}
impl TorrentStateLive {
pub(crate) fn new(
paused: TorrentStatePaused,
fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>,
cancellation_token: CancellationToken,
) -> Arc<Self> {
) -> anyhow::Result<Arc<Self>> {
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
let down_speed_estimator = SpeedEstimator::new(5);
let up_speed_estimator = SpeedEstimator::new(5);
let have_bytes = paused.hns.have_bytes;
let needed_bytes = paused.hns.needed_bytes;
let total_selected_bytes = paused.hns.selected_bytes;
let have_bytes = paused.chunk_tracker.get_hns().have_bytes;
let lengths = *paused.chunk_tracker.get_lengths();
reopen_necessary_files_for_write(&paused.chunk_tracker, &paused.files)?;
let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(),
peers: Default::default(),
@ -217,14 +224,11 @@ impl TorrentStateLive {
fatal_errors_tx: Some(fatal_errors_tx),
}),
files: paused.files,
filenames: paused.filenames,
stats: AtomicStats {
have_bytes: AtomicU64::new(have_bytes),
..Default::default()
},
initially_needed_bytes: needed_bytes,
lengths,
total_selected_bytes,
peer_semaphore: Arc::new(Semaphore::new(128)),
peer_queue_tx,
finished_notify: Notify::new(),
@ -246,9 +250,7 @@ impl TorrentStateLive {
let now = Instant::now();
let stats = state.stats_snapshot();
let fetched = stats.fetched_bytes;
let needed = state.initially_needed();
// TODO: this is too coarse.
let remaining = needed - stats.downloaded_and_checked_bytes;
let remaining = state.locked.read().get_chunks()?.get_remaining_bytes();
state
.down_speed_estimator
.add_snapshot(fetched, Some(remaining), now);
@ -265,7 +267,7 @@ impl TorrentStateLive {
error_span!(parent: state.meta.span.clone(), "peer_adder"),
state.clone().task_peer_adder(peer_queue_rx),
);
state
Ok(state)
}
pub(crate) fn spawn(
@ -494,9 +496,6 @@ impl TorrentStateLive {
pub(crate) fn file_ops(&self) -> FileOps<'_> {
FileOps::new(&self.meta.info, &self.files, &self.lengths)
}
pub fn initially_needed(&self) -> u64 {
self.initially_needed_bytes
}
pub(crate) fn lock_read(
&self,
@ -518,10 +517,6 @@ impl TorrentStateLive {
});
}
pub fn get_total_selected_bytes(&self) -> u64 {
self.total_selected_bytes
}
pub fn get_uploaded_bytes(&self) -> u64 {
self.stats.uploaded_bytes.load(Ordering::Relaxed)
}
@ -535,12 +530,11 @@ impl TorrentStateLive {
self.stats.have_bytes.load(Ordering::Relaxed)
}
pub fn is_finished(&self) -> bool {
self.get_left_to_download_bytes() == 0
}
pub fn get_left_to_download_bytes(&self) -> u64 {
self.initially_needed_bytes - self.get_downloaded_bytes()
pub fn get_hns(&self) -> Option<HaveNeededSelected> {
self.lock_read("get_hns")
.get_chunks()
.ok()
.map(|c| *c.get_hns())
}
fn maybe_transmit_haves(&self, index: ValidPieceIndex) {
@ -653,16 +647,11 @@ impl TorrentStateLive {
let files = self
.files
.iter()
.map(|f| {
let mut f = f.lock();
let dummy = dummy_file()?;
let f = std::mem::replace(&mut *f, dummy);
Ok::<_, anyhow::Error>(Arc::new(Mutex::new(f)))
})
.try_collect()?;
let filenames = self.filenames.clone();
.map(|f| f.take_clone())
.collect::<anyhow::Result<Vec<_>>>()?;
for file in files.iter() {
file.reopen(true)?;
}
let mut chunk_tracker = g
.chunks
.take()
@ -670,20 +659,12 @@ impl TorrentStateLive {
for piece_id in g.inflight_pieces.keys().copied() {
chunk_tracker.mark_piece_broken_if_not_have(piece_id);
}
let have_bytes = chunk_tracker.calc_have_bytes();
let needed_bytes = chunk_tracker.calc_needed_bytes();
// g.chunks;
Ok(TorrentStatePaused {
info: self.meta.clone(),
files,
filenames,
chunk_tracker,
hns: HaveNeededSelected {
have_bytes,
needed_bytes,
selected_bytes: self.total_selected_bytes,
},
})
}
@ -699,6 +680,92 @@ impl TorrentStateLive {
}
Err(res)
}
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
let mut g = self.lock_write("update_only_files");
let ct = g.get_chunks_mut()?;
let hns = ct.update_only_files(self.files.iter().map(|f| f.len), only_files)?;
reopen_necessary_files_for_write(ct, &self.files)?;
if !hns.finished() {
self.reconnect_all_not_needed_peers();
}
Ok(())
}
pub(crate) fn is_finished(&self) -> bool {
self.get_hns().map(|h| h.finished()).unwrap_or_default()
}
fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> {
// if we have all the pieces of the file, reopen it read only
for (idx, opened_file) in self
.files
.iter()
.enumerate()
.skip_while(|fd| !fd.1.piece_range.contains(&id.get()))
.take_while(|fd| fd.1.piece_range.contains(&id.get()))
{
let bytes = opened_file.update_have_on_piece_completed(id.get(), &self.lengths);
if bytes == 0 {
warn!(file_id=idx, piece_id=id.get(), "bug: update_have_on_piece_completed() returned 0, although this piece is present in the file");
}
let have_all = self
.lock_read("on_piece_completed_reopen")
.get_chunks()?
.get_have_pieces()
.get(opened_file.piece_range_usize())
.with_context(|| {
format!("bug: invalid range {:?}", opened_file.piece_range_usize())
})?
.all();
if have_all {
opened_file.reopen(true)?;
}
}
if self.is_finished() {
info!("torrent finished downloading");
self.finished_notify.notify_waiters();
// There is not poing being connected to peers that have all the torrent, when
// we don't need anything from them, and they don't need anything from us.
self.disconnect_all_peers_that_have_full_torrent();
}
Ok(())
}
fn disconnect_all_peers_that_have_full_torrent(&self) {
for mut pe in self.peers.states.iter_mut() {
if let PeerState::Live(l) = pe.value().state.get() {
if l.has_full_torrent(self.lengths.total_pieces() as usize) {
let prev = pe.value_mut().state.set_not_needed(&self.peers.stats);
let _ = prev
.take_live_no_counters()
.unwrap()
.tx
.send(WriterRequest::Disconnect);
}
}
}
}
fn reconnect_all_not_needed_peers(&self) {
for pe in self.peers.states.iter() {
if let PeerState::NotNeeded = pe.value().state.get() {
if self.peer_queue_tx.send(*pe.key()).is_err() {
return;
}
}
}
}
pub(crate) fn get_file_progress(&self) -> Vec<u64> {
self.files
.iter()
.map(|fd| fd.have.load(Ordering::Relaxed))
.collect()
}
}
struct PeerHandlerLocked {
@ -1202,27 +1269,6 @@ impl PeerHandler {
self.state.peers.mark_peer_interested(self.addr, true);
}
fn reopen_read_only(&self) -> anyhow::Result<()> {
// Lock exclusive just in case to ensure in-flight operations finish.??
let _guard = self.state.lock_write("reopen_read_only");
for (file, filename) in self.state.files.iter().zip(self.state.filenames.iter()) {
let mut g = file.lock();
// this should close the original file
// putting in a block just in case to guarantee drop.
{
*g = dummy_file()?;
}
*g = std::fs::OpenOptions::new()
.read(true)
.open(filename)
.with_context(|| format!("error re-opening {:?} readonly", filename))?;
debug!("reopened {:?} read-only", filename);
}
info!("reopened all torrent files in read-only mode");
Ok(())
}
fn on_i_am_unchoked(&self) {
trace!("we are unchoked");
self.locked.write().i_am_choked = false;
@ -1398,12 +1444,7 @@ impl PeerHandler {
debug!("piece={} successfully downloaded and verified", index);
if self.state.is_finished() {
info!("torrent finished downloading");
self.state.finished_notify.notify_waiters();
self.disconnect_all_peers_that_have_full_torrent();
self.reopen_read_only()?;
}
self.state.on_piece_completed(chunk_info.piece_index)?;
self.state.maybe_transmit_haves(chunk_info.piece_index);
}
@ -1424,19 +1465,4 @@ impl PeerHandler {
.with_context(|| format!("error processing received chunk {chunk_info:?}"))?;
Ok(())
}
fn disconnect_all_peers_that_have_full_torrent(&self) {
for mut pe in self.state.peers.states.iter_mut() {
if let PeerState::Live(l) = pe.value().state.get() {
if l.has_full_torrent(self.state.lengths.total_pieces() as usize) {
let prev = pe.value_mut().state.set_not_needed(&self.state.peers.stats);
let _ = prev
.take_live_no_counters()
.unwrap()
.tx
.send(WriterRequest::Disconnect);
}
}
}
}
}

View file

@ -122,17 +122,18 @@ impl PeerStateNoMut {
}
}
pub fn queued_to_connecting(
pub fn idle_to_connecting(
&mut self,
counters: &AggregatePeerStatsAtomic,
) -> Option<(PeerRx, PeerTx)> {
if let PeerState::Queued = &self.0 {
let (tx, rx) = unbounded_channel();
let tx_2 = tx.clone();
self.set(PeerState::Connecting(tx), counters);
Some((rx, tx_2))
} else {
None
match &self.0 {
PeerState::Queued | PeerState::NotNeeded => {
let (tx, rx) = unbounded_channel();
let tx_2 = tx.clone();
self.set(PeerState::Connecting(tx), counters);
Some((rx, tx_2))
}
_ => None,
}
}

View file

@ -90,7 +90,7 @@ impl PeerStates {
let rx = self
.with_peer_mut(h, "mark_peer_connecting", |peer| {
peer.state
.queued_to_connecting(&self.stats)
.idle_to_connecting(&self.stats)
.context("invalid peer state")
})
.context("peer not found in states")??;

View file

@ -268,7 +268,7 @@ impl ManagedTorrent {
let (tx, rx) = tokio::sync::oneshot::channel();
let live =
TorrentStateLive::new(paused, tx, live_cancellation_token);
TorrentStateLive::new(paused, tx, live_cancellation_token)?;
g.state = ManagedTorrentState::Live(live.clone());
spawn_fatal_errors_receiver(&t, rx, token);
@ -289,7 +289,7 @@ impl ManagedTorrent {
ManagedTorrentState::Paused(_) => {
let paused = g.state.take().assert_paused();
let (tx, rx) = tokio::sync::oneshot::channel();
let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone());
let live = TorrentStateLive::new(paused, tx, live_cancellation_token.clone())?;
g.state = ManagedTorrentState::Live(live.clone());
spawn_fatal_errors_receiver(self, rx, live_cancellation_token);
spawn_peer_adder(&live, peer_rx);
@ -337,6 +337,7 @@ impl ManagedTorrent {
use stats::TorrentStatsState as S;
let mut resp = TorrentStats {
total_bytes: self.info().lengths.total_length(),
file_progress: Vec::new(),
state: S::Error,
error: None,
progress_bytes: 0,
@ -353,21 +354,25 @@ impl ManagedTorrent {
}
ManagedTorrentState::Paused(p) => {
resp.state = S::Paused;
resp.total_bytes = p.hns.total();
resp.progress_bytes = p.hns.progress();
resp.finished = p.hns.finished();
let hns = p.hns();
resp.total_bytes = hns.total();
resp.progress_bytes = hns.progress();
resp.finished = hns.finished();
resp.file_progress = p
.files
.iter()
.map(|f| f.have.load(Ordering::Relaxed))
.collect();
}
ManagedTorrentState::Live(l) => {
resp.state = S::Live;
let live_stats = LiveStats::from(l.as_ref());
let total = l.get_total_selected_bytes();
let remaining = l.get_left_to_download_bytes();
let progress = total - remaining;
resp.progress_bytes = progress;
resp.total_bytes = total;
resp.finished = remaining == 0;
let hns = l.get_hns().unwrap_or_default();
resp.total_bytes = hns.total();
resp.progress_bytes = hns.progress();
resp.finished = hns.finished();
resp.uploaded_bytes = l.get_uploaded_bytes();
resp.file_progress = l.get_file_progress();
resp.live = Some(live_stats);
}
ManagedTorrentState::Error(e) => {
@ -410,10 +415,7 @@ impl ManagedTorrent {
// Returns true if needed to unpause torrent.
// This is just implementation detail - it's easier to pause/unpause than to tinker with internals.
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<bool> {
if only_files.is_empty() {
anyhow::bail!("you need to select at least one file");
}
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
let file_count = self.info().info.iter_file_lengths()?.count();
for f in only_files.iter().copied() {
if f >= file_count {
@ -426,25 +428,20 @@ impl ManagedTorrent {
// if paused, need to update chunk tracker
let mut g = self.locked.write();
let need_to_unpause = match &mut g.state {
match &mut g.state {
ManagedTorrentState::Initializing(_) => bail!("can't update initializing torrent"),
ManagedTorrentState::Error(_) => false,
ManagedTorrentState::None => false,
ManagedTorrentState::Error(_) => {}
ManagedTorrentState::None => {}
ManagedTorrentState::Paused(p) => {
p.update_only_files(only_files)?;
false
}
ManagedTorrentState::Live(l) => {
let mut p = l.pause()?;
let e = p.update_only_files(only_files);
g.state = ManagedTorrentState::Paused(p);
e?;
true
l.update_only_files(only_files)?;
}
};
g.only_files = Some(only_files.iter().copied().collect());
Ok(need_to_unpause)
Ok(())
}
}

View file

@ -1,25 +1,26 @@
use std::{collections::HashSet, fs::File, path::PathBuf, sync::Arc};
use std::{collections::HashSet, sync::Arc};
use parking_lot::Mutex;
use crate::chunk_tracker::{ChunkTracker, HaveNeededSelected};
use crate::{
chunk_tracker::{ChunkTracker, HaveNeededSelected},
type_aliases::OpenedFiles,
};
use super::ManagedTorrentInfo;
pub struct TorrentStatePaused {
pub(crate) info: Arc<ManagedTorrentInfo>,
pub(crate) files: Vec<Arc<Mutex<File>>>,
pub(crate) filenames: Vec<PathBuf>,
pub(crate) files: OpenedFiles,
pub(crate) chunk_tracker: ChunkTracker,
pub(crate) hns: HaveNeededSelected,
}
impl TorrentStatePaused {
pub(crate) fn update_only_files(&mut self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
let hns = self
.chunk_tracker
self.chunk_tracker
.update_only_files(self.info.info.iter_file_lengths()?, only_files)?;
self.hns = hns;
Ok(())
}
pub(crate) fn hns(&self) -> &HaveNeededSelected {
self.chunk_tracker.get_hns()
}
}

View file

@ -69,6 +69,7 @@ impl std::fmt::Display for TorrentStatsState {
#[derive(Serialize, Debug)]
pub struct TorrentStats {
pub state: TorrentStatsState,
pub file_progress: Vec<u64>,
pub error: Option<String>,
pub progress_bytes: u64,
pub uploaded_bytes: u64,

View file

@ -2,7 +2,10 @@ use std::net::SocketAddr;
use futures::stream::BoxStream;
use crate::opened_file::OpenedFile;
pub type BF = bitvec::boxed::BitBox<u8, bitvec::order::Msb0>;
pub type PeerHandle = SocketAddr;
pub type PeerStream = BoxStream<'static, SocketAddr>;
pub(crate) type OpenedFiles = Vec<OpenedFile>;