Works fine now finally

This commit is contained in:
Igor Katson 2024-04-24 22:21:41 +01:00
parent 7e180c05b3
commit f0788f2c4a
4 changed files with 121 additions and 35 deletions

View file

@ -194,6 +194,14 @@ impl ChunkTracker {
.filter_map(|id| self.lengths.validate_piece_index(id))
}
pub(crate) fn is_piece_queued(&self, id: ValidPieceIndex) -> bool {
self.queue_pieces[id.get() as usize]
}
pub(crate) fn is_piece_have(&self, id: ValidPieceIndex) -> bool {
self.have[id.get() as usize]
}
// None if wrong chunk
// true if did something
// false if didn't do anything

View file

@ -163,9 +163,6 @@ impl HttpApi {
headers: http::HeaderMap,
) -> Result<impl IntoResponse> {
let mut stream = state.api_stream(idx, file_id)?;
dbg!(&headers);
let mut status = StatusCode::OK;
let mut output_headers = HeaderMap::new();
output_headers.insert("Accept-Ranges", HeaderValue::from_static("bytes"));

View file

@ -965,10 +965,18 @@ impl PeerHandler {
let n = {
let mut n_opt = None;
let bf = &live.bitfield;
for n in g
.get_chunks()?
.iter_queued_pieces(&g.file_priorities, &self.state.files)
{
let chunk_tracker = g.get_chunks()?;
let priority_streamed_pieces = self
.state
.streams
.iter_next_pieces(&self.state.lengths)
.filter(|pid| {
!chunk_tracker.is_piece_have(*pid)
&& chunk_tracker.is_piece_queued(*pid)
});
let natural_order_pieces =
chunk_tracker.iter_queued_pieces(&g.file_priorities, &self.state.files);
for n in priority_streamed_pieces.chain(natural_order_pieces) {
if bf.get(n.get() as usize).map(|v| *v) == Some(true) {
n_opt = Some(n);
break;

View file

@ -9,7 +9,8 @@ use std::{
use anyhow::Context;
use dashmap::DashMap;
use librqbit_core::lengths::ValidPieceIndex;
use itertools::Itertools;
use librqbit_core::lengths::{Lengths, ValidPieceIndex};
use tokio::io::{AsyncRead, AsyncSeek};
use tracing::{debug, trace};
@ -19,10 +20,21 @@ use super::ManagedTorrentHandle;
type StreamId = usize;
// Buffer either 1/10th of the file forward.
const PER_STREAM_BUF_PART: u64 = 10;
// Or 32 mb, whichever is larger
const PER_STREAM_BUF_MIN: u64 = 32 * 1024 * 1024;
struct StreamState {
current_piece: ValidPieceIndex,
file_len: u64,
waker: Option<Waker>,
}
#[derive(Default)]
pub(crate) struct TorrentStreams {
next_stream_id: AtomicUsize,
wakers_by_stream: DashMap<StreamId, (ValidPieceIndex, Waker)>,
streams: DashMap<StreamId, StreamState>,
}
impl TorrentStreams {
@ -30,31 +42,83 @@ impl TorrentStreams {
self.next_stream_id.fetch_add(1, Ordering::Relaxed)
}
fn register_waker(&self, stream_id: StreamId, piece_id: ValidPieceIndex, waker: Waker) {
self.wakers_by_stream.insert(stream_id, (piece_id, waker));
fn register_waker(&self, stream_id: StreamId, waker: Waker) {
if let Some(mut s) = self.streams.get_mut(&stream_id) {
let vm = s.value_mut();
vm.waker = Some(waker);
}
}
// Queue 1st, 2nd etc pieces from each stream in turn until they get 1/10th of the file .
pub(crate) fn iter_next_pieces(
&self,
lengths: &Lengths,
) -> impl Iterator<Item = ValidPieceIndex> {
let all = self
.streams
.iter()
.map(|s| {
let remaining = (s.value().file_len
+ lengths.piece_length(s.value().current_piece) as u64)
.div_ceil(PER_STREAM_BUF_PART)
.max(PER_STREAM_BUF_MIN);
(s.value().current_piece, remaining)
})
.map(Some)
.collect_vec();
struct It {
all: Vec<Option<(ValidPieceIndex, u64)>>,
lengths: Lengths,
}
impl Iterator for It {
type Item = ValidPieceIndex;
fn next(&mut self) -> Option<Self::Item> {
for item in self.all.iter_mut() {
if let Some((p, remaining)) = item {
let y = *p;
let pl = self.lengths.piece_length(y);
*remaining = remaining.saturating_sub(pl as u64);
if *remaining == 0 {
*item = None;
} else if let Some(next_p) = self.lengths.validate_piece_index(y.get() + 1)
{
*item = Some((next_p, *remaining))
} else {
*item = None;
}
return Some(y);
}
}
None
}
}
It {
all,
lengths: *lengths,
}
}
pub(crate) fn wake_streams_on_piece_completed(&self, piece_id: ValidPieceIndex) {
let mut woken = Vec::new();
for w in self.wakers_by_stream.iter() {
if w.value().0 == piece_id {
trace!(
stream_id = *w.key(),
piece_id = piece_id.get(),
"waking stream"
);
w.value().1.wake_by_ref();
woken.push(*w.key());
for mut w in self.streams.iter_mut() {
if w.value().current_piece == piece_id {
if let Some(waker) = w.value_mut().waker.take() {
trace!(
stream_id = *w.key(),
piece_id = piece_id.get(),
"waking stream"
);
waker.wake();
}
}
}
for w in woken {
self.wakers_by_stream.remove(&w);
}
}
fn drop_stream(&self, stream_id: StreamId) {
trace!(stream_id, "dropping stream");
self.wakers_by_stream.remove(&stream_id);
self.streams.remove(&stream_id);
}
}
@ -120,8 +184,10 @@ impl AsyncRead for FileStream {
let piece_offset = abs_pos % dpl as u64;
let piece_remaining = piece_len as u64 - piece_offset;
// queue N pieces after this if not yet
// TODO
// queue N pieces after this if not yet. The "if let" should never fail.
if let Some(mut s) = self.streams.streams.get_mut(&self.stream_id) {
s.value_mut().current_piece = piece_id;
}
// if the piece is not there, register to wake when it is
// check if we have the piece for real
@ -129,7 +195,7 @@ impl AsyncRead for FileStream {
let have = ct.get_have_pieces()[piece_id.get() as usize];
if !have {
self.streams
.register_waker(self.stream_id, piece_id, cx.waker().clone());
.register_waker(self.stream_id, cx.waker().clone());
}
have
}));
@ -179,10 +245,7 @@ impl AsyncSeek for FileStream {
) -> std::io::Result<()> {
let end_i64 = map_io_err!(TryInto::<i64>::try_into(self.file_len))?;
let new_pos: i64 = match position {
SeekFrom::Start(s) => {
self.as_mut().position = s;
return Ok(());
}
SeekFrom::Start(s) => map_io_err!(s.try_into())?,
SeekFrom::End(e) => map_io_err!(TryInto::<i64>::try_into(self.file_len))? + e,
SeekFrom::Current(o) => map_io_err!(TryInto::<i64>::try_into(self.position))? + o,
};
@ -240,16 +303,26 @@ impl ManagedTorrent {
let (fd_len, fd_offset) =
self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?;
let streams = self.streams()?;
Ok(FileStream {
let first_piece = self.info().lengths.validate_piece_index(0).context("bug")?;
let s = FileStream {
stream_id: streams.next_id(),
streams,
streams: streams.clone(),
file_id,
position: 0,
file_len: fd_len,
file_torrent_abs_offset: fd_offset,
torrent: self,
})
};
streams.streams.insert(
s.stream_id,
StreamState {
current_piece: first_piece,
waker: None,
file_len: s.file_len,
},
);
Ok(s)
}
}