Patch in some changes from another branch

This commit is contained in:
Igor Katson 2024-04-29 17:26:02 +01:00
parent 9474a6d52e
commit 82f8b0932c
2 changed files with 93 additions and 80 deletions

View file

@ -707,7 +707,8 @@ impl TorrentStateLive {
}
}
self.streams.wake_streams_on_piece_completed(id);
self.streams
.wake_streams_on_piece_completed(id, &self.meta.lengths);
if self.is_finished() {
if self.lock_read("chunks").get_chunks()?.get_selected_pieces()[id.get_usize()] {

View file

@ -1,4 +1,5 @@
use std::{
collections::VecDeque,
io::{Read, Seek, SeekFrom},
sync::{
atomic::{AtomicUsize, Ordering},
@ -9,7 +10,6 @@ use std::{
use anyhow::Context;
use dashmap::DashMap;
use itertools::Itertools;
use librqbit_core::lengths::{Lengths, ValidPieceIndex};
use tokio::io::{AsyncRead, AsyncSeek};
use tracing::{debug, trace};
@ -20,24 +20,64 @@ use super::ManagedTorrentHandle;
type StreamId = usize;
// Buffer either 1/10th of the file forward.
const PER_STREAM_BUF_PART: u64 = 10;
// Or 32 mb, whichever is larger
const PER_STREAM_BUF_MIN: u64 = 32 * 1024 * 1024;
// 32 mb lookahead by default.
const PER_STREAM_BUF_DEFAULT: u64 = 32 * 1024 * 1024;
struct StreamState {
file_id: usize,
current_piece: ValidPieceIndex,
file_len: u64,
file_abs_offset: u64,
position: u64,
waker: Option<Waker>,
}
impl StreamState {
fn current_piece(&self, lengths: &Lengths) -> Option<CurrentPiece> {
compute_current_piece(lengths, self.position, self.file_abs_offset)
}
fn queue<'a>(&self, lengths: &'a Lengths) -> impl Iterator<Item = ValidPieceIndex> + 'a {
let start = self.file_abs_offset + self.position;
let end = (start + PER_STREAM_BUF_DEFAULT).min(self.file_abs_offset + self.file_len);
let dpl = lengths.default_piece_length();
let start_id = (start / dpl as u64).try_into().unwrap();
let end_id = end.div_ceil(dpl as u64).try_into().unwrap();
(start_id..end_id).filter_map(|i| lengths.validate_piece_index(i))
}
}
#[derive(Default)]
pub(crate) struct TorrentStreams {
next_stream_id: AtomicUsize,
streams: DashMap<StreamId, StreamState>,
}
struct CurrentPiece {
id: ValidPieceIndex,
piece_remaining: u32,
}
fn compute_current_piece(
lengths: &Lengths,
file_pos: u64,
file_torrent_abs_offset: u64,
) -> Option<CurrentPiece> {
let dpl = lengths.default_piece_length();
let abs_pos = file_torrent_abs_offset + file_pos;
let piece_id = abs_pos / dpl as u64;
let piece_id: u32 = piece_id.try_into().ok()?;
let piece_id = lengths.validate_piece_index(piece_id)?;
let piece_len = lengths.piece_length(piece_id);
Some(CurrentPiece {
id: piece_id,
piece_remaining: (piece_len as u64 - (abs_pos % dpl as u64))
.try_into()
.ok()?,
})
}
impl TorrentStreams {
fn next_id(&self) -> usize {
self.next_stream_id.fetch_add(1, Ordering::Relaxed)
@ -50,61 +90,40 @@ impl TorrentStreams {
}
}
// Queue 1st, 2nd etc pieces from each stream in turn until they get 1/10th of the file .
pub(crate) fn iter_next_pieces(
&self,
lengths: &Lengths,
) -> impl Iterator<Item = ValidPieceIndex> {
let all = self
.streams
.iter()
.map(|s| {
let remaining = (s.value().file_len
+ lengths.piece_length(s.value().current_piece) as u64)
.div_ceil(PER_STREAM_BUF_PART)
.max(PER_STREAM_BUF_MIN);
(s.value().current_piece, remaining)
})
.map(Some)
.collect_vec();
struct It {
all: Vec<Option<(ValidPieceIndex, u64)>>,
lengths: Lengths,
// Interleave 1st, 2nd etc pieces from each active stream in turn until they get 1/10th of the file .
pub(crate) fn iter_next_pieces<'a>(
&'a self,
lengths: &'a Lengths,
) -> impl Iterator<Item = ValidPieceIndex> + 'a {
struct Interleave<I> {
all: VecDeque<I>,
}
impl Iterator for It {
impl<I: Iterator<Item = ValidPieceIndex>> Iterator for Interleave<I> {
type Item = ValidPieceIndex;
fn next(&mut self) -> Option<Self::Item> {
for item in self.all.iter_mut() {
if let Some((p, remaining)) = item {
let y = *p;
let pl = self.lengths.piece_length(y);
*remaining = remaining.saturating_sub(pl as u64);
if *remaining == 0 {
*item = None;
} else if let Some(next_p) = self.lengths.validate_piece_index(y.get() + 1)
{
*item = Some((next_p, *remaining))
} else {
*item = None;
}
return Some(y);
while let Some(mut it) = self.all.pop_front() {
if let Some(piece) = it.next() {
self.all.push_back(it);
return Some(piece);
}
}
None
}
}
It {
all,
lengths: *lengths,
}
let all = self.streams.iter().map(|s| s.queue(lengths)).collect();
Interleave { all }
}
pub(crate) fn wake_streams_on_piece_completed(&self, piece_id: ValidPieceIndex) {
pub(crate) fn wake_streams_on_piece_completed(
&self,
piece_id: ValidPieceIndex,
lengths: &Lengths,
) {
for mut w in self.streams.iter_mut() {
if w.value().current_piece == piece_id {
if w.value().current_piece(lengths).map(|p| p.id) == Some(piece_id) {
if let Some(waker) = w.value_mut().waker.take() {
trace!(
stream_id = *w.key(),
@ -117,9 +136,9 @@ impl TorrentStreams {
}
}
fn drop_stream(&self, stream_id: StreamId) {
fn drop_stream(&self, stream_id: StreamId) -> Option<StreamState> {
trace!(stream_id, "dropping stream");
self.streams.remove(&stream_id);
self.streams.remove(&stream_id).map(|s| s.1)
}
pub(crate) fn streamed_file_ids(&self) -> impl Iterator<Item = usize> + '_ {
@ -174,30 +193,17 @@ impl AsyncRead for FileStream {
return Poll::Ready(Ok(()));
}
// determine the piece that position is pointing to.
let lengths = &self.torrent.info().lengths;
let dpl = lengths.default_piece_length();
let abs_pos = self.file_torrent_abs_offset + self.position;
let piece_id = abs_pos / dpl as u64;
let piece_id: u32 = poll_try_io!(piece_id.try_into());
let piece_id = poll_try_io!(lengths
.validate_piece_index(piece_id)
.context("bug: invalid piece"));
let piece_len = lengths.piece_length(piece_id);
let piece_offset = abs_pos % dpl as u64;
let piece_remaining = piece_len as u64 - piece_offset;
// queue N pieces after this if not yet. The "if let" should never fail.
if let Some(mut s) = self.streams.streams.get_mut(&self.stream_id) {
s.value_mut().current_piece = piece_id;
}
let current = poll_try_io!(compute_current_piece(
&self.torrent.info().lengths,
self.position,
self.file_torrent_abs_offset
)
.context("invalid position"));
// if the piece is not there, register to wake when it is
// check if we have the piece for real
let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| {
let have = ct.get_have_pieces()[piece_id.get() as usize];
let have = ct.get_have_pieces()[current.id.get() as usize];
if !have {
self.streams
.register_waker(self.stream_id, cx.waker().clone());
@ -205,16 +211,15 @@ impl AsyncRead for FileStream {
have
}));
if !have {
trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %piece_id, "poll pending, not have");
trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have");
return Poll::Pending;
}
// actually stream the piece
let buf = tbuf.initialize_unfilled();
let file_remaining = self.file_len - self.position;
let bytes_to_read: usize = poll_try_io!((piece_len as u64)
.min(buf.len() as u64)
.min(piece_remaining)
let bytes_to_read: usize = poll_try_io!((buf.len() as u64)
.min(current.piece_remaining as u64)
.min(file_remaining)
.try_into());
@ -238,6 +243,12 @@ impl AsyncRead for FileStream {
self.as_mut().position += buf.len() as u64;
tbuf.advance(bytes_to_read);
self.streams
.streams
.get_mut(&self.stream_id)
.unwrap()
.value_mut()
.position = self.position;
Poll::Ready(Ok(()))
}
@ -276,7 +287,7 @@ impl AsyncSeek for FileStream {
impl Drop for FileStream {
fn drop(&mut self) {
self.streams.drop_stream(self.stream_id)
self.streams.drop_stream(self.stream_id);
}
}
@ -321,7 +332,6 @@ impl ManagedTorrent {
let (fd_len, fd_offset) =
self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?;
let streams = self.streams()?;
let first_piece = self.info().lengths.validate_piece_index(0).context("bug")?;
let s = FileStream {
stream_id: streams.next_id(),
streams: streams.clone(),
@ -333,15 +343,17 @@ impl ManagedTorrent {
torrent: self,
};
if s.torrent.maybe_reconnect_needed_peers_for_file(file_id) {
s.torrent.with_opened_file(file_id, |f| f.reopen(false))??;
s.torrent
.with_opened_file(file_id, |fd| fd.reopen(false))??;
}
streams.streams.insert(
s.stream_id,
StreamState {
file_id,
current_piece: first_piece,
position: 0,
waker: None,
file_len: s.file_len,
file_len: fd_len,
file_abs_offset: fd_offset,
},
);