Add file priorities.

This commit is contained in:
Igor Katson 2024-04-13 16:05:02 +03:00
parent c291e42bb5
commit e03600c463
4 changed files with 57 additions and 28 deletions

View file

@ -5,7 +5,7 @@ use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex};
use peer_binary_protocol::Piece;
use tracing::{debug, trace};
use crate::type_aliases::BF;
use crate::type_aliases::{FilePriorities, OpenedFiles, BF};
pub struct ChunkTracker {
// This forms the basis of a "queue" to pull from.
@ -31,9 +31,6 @@ pub struct ChunkTracker {
lengths: Lengths,
// What pieces to download first.
priority_piece_ids: Vec<usize>,
// Quick to retrieve stats, that MUST be in sync with the BFs
// above (have/selected).
hns: HaveNeededSelected,
@ -138,18 +135,6 @@ impl ChunkTracker {
// TODO: ideally this needs to be a list based on needed files, e.g.
// last needed piece for each file. But let's keep simple for now.
// TODO: bitvec is bugged, the short version panics.
// let last_needed_piece_id = needed_pieces.iter_ones().next_back();
let last_needed_piece_id = needed_pieces
.iter()
.enumerate()
.filter_map(|(id, b)| if *b { Some(id) } else { None })
.last();
// The last pieces first. Often important information is stored in the last piece.
// E.g. if it's a video file, than the last piece often contains some index, or just
// players look into it, and it's better be there.
let priority_piece_ids = last_needed_piece_id.into_iter().collect();
let mut ct = Self {
chunk_status: compute_chunk_have_status(&lengths, &have_pieces)
.context("error computing chunk status")?,
@ -157,7 +142,6 @@ impl ChunkTracker {
selected: selected_pieces,
lengths,
have: have_pieces,
priority_piece_ids,
hns: HaveNeededSelected::default(),
};
ct.hns = ct.calc_hns();
@ -198,16 +182,16 @@ impl ChunkTracker {
hns
}
pub fn iter_queued_pieces(&self) -> impl Iterator<Item = usize> + '_ {
self.priority_piece_ids
pub fn iter_queued_pieces<'a>(
&'a self,
file_priorities: &'a FilePriorities,
opened_files: &'a OpenedFiles,
) -> impl Iterator<Item = usize> + 'a {
file_priorities
.iter()
.copied()
.filter(move |piece_id| self.queue_pieces[*piece_id])
.chain(
self.queue_pieces
.iter_ones()
.filter(move |id| !self.priority_piece_ids.contains(id)),
)
.filter_map(|p| opened_files.get(*p))
.flat_map(|f| f.iter_piece_priorities())
.filter(|id| self.queue_pieces[*id])
}
// None if wrong chunk

View file

@ -31,6 +31,20 @@ pub(crate) fn dummy_file() -> anyhow::Result<std::fs::File> {
.with_context(|| format!("error opening {}", DEVNULL))
}
// Iterate file pieces in the following order: first, last, everything else from start to end.
fn iter_piece_priorities(range: std::ops::Range<usize>) -> impl Iterator<Item = usize> {
// First and last of each file first, then the rest of pieces in that file.
let r = range;
use std::iter::once;
let first = once(r.start);
let last = once(r.start + r.len().overflowing_sub(1).0); // it's ok if it repeats, doesn't matter
let mid = r.clone().skip(1).take(r.len().overflowing_sub(2).0);
// The take(r.len()) is to not yield start/end pieces in case of 0 and 1 lengths.
first.chain(last).chain(mid).take(r.len())
}
impl OpenedFile {
pub fn new(
f: File,
@ -94,4 +108,24 @@ impl OpenedFile {
self.have.fetch_add(size, Ordering::Relaxed);
size
}
pub fn iter_piece_priorities(&self) -> impl Iterator<Item = usize> {
iter_piece_priorities(self.piece_range_usize())
}
}
#[cfg(test)]
mod tests {
use crate::opened_file::iter_piece_priorities;
#[test]
fn test_iter_piece_priorities() {
let it = |r: std::ops::Range<usize>| -> Vec<usize> { iter_piece_priorities(r).collect() };
assert_eq!(it(0..0), Vec::<usize>::new());
assert_eq!(it(0..1), vec![0]);
assert_eq!(it(0..2), vec![0, 1]);
assert_eq!(it(0..3), vec![0, 2, 1]);
assert_eq!(it(0..4), vec![0, 3, 1, 2]);
}
}

View file

@ -87,7 +87,7 @@ use crate::{
},
session::CheckedIncomingConnection,
torrent_state::{peer::Peer, utils::atomic_inc},
type_aliases::{OpenedFiles, PeerHandle, BF},
type_aliases::{FilePriorities, OpenedFiles, PeerHandle, BF},
};
use self::{
@ -122,6 +122,9 @@ pub(crate) struct TorrentStateLocked {
// If this is None, the torrent was paused, and this live state is useless, and needs to be dropped.
pub(crate) chunks: Option<ChunkTracker>,
// The sorted file list in which order to download them.
file_priorities: FilePriorities,
// At a moment in time, we are expecting a piece from only one peer.
// inflight_pieces stores this information.
inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
@ -215,12 +218,16 @@ impl TorrentStateLive {
reopen_necessary_files_for_write(&paused.chunk_tracker, &paused.files)?;
// TODO: make it configurable
let file_priorities = (0..paused.files.len()).collect();
let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(),
peers: Default::default(),
locked: RwLock::new(TorrentStateLocked {
chunks: Some(paused.chunk_tracker),
inflight_pieces: Default::default(),
file_priorities,
fatal_errors_tx: Some(fatal_errors_tx),
}),
files: paused.files,
@ -989,7 +996,10 @@ impl PeerHandler {
let n = {
let mut n_opt = None;
let bf = &live.bitfield;
for n in g.get_chunks()?.iter_queued_pieces() {
for n in g
.get_chunks()?
.iter_queued_pieces(&g.file_priorities, &self.state.files)
{
if bf.get(n).map(|v| *v) == Some(true) {
n_opt = Some(n);
break;

View file

@ -9,3 +9,4 @@ pub type BF = bitvec::boxed::BitBox<u8, bitvec::order::Msb0>;
pub type PeerHandle = SocketAddr;
pub type PeerStream = BoxStream<'static, SocketAddr>;
pub(crate) type OpenedFiles = Vec<OpenedFile>;
pub(crate) type FilePriorities = Vec<usize>;