rqbit/crates/librqbit/src/torrent_state/streaming.rs

391 lines
11 KiB
Rust
Raw Normal View History

2024-04-24 18:53:15 +01:00
use std::{
collections::VecDeque,
2024-04-29 21:44:21 +01:00
io::SeekFrom,
2024-04-24 18:53:15 +01:00
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Poll, Waker},
};
use anyhow::Context;
use dashmap::DashMap;
2024-04-29 21:44:21 +01:00
2024-05-02 20:59:09 +01:00
use librqbit_core::lengths::{CurrentPiece, Lengths, ValidPieceIndex};
2024-07-27 08:17:04 +02:00
use tokio::io::{AsyncRead, AsyncSeek};
2024-04-24 20:56:58 +01:00
use tracing::{debug, trace};
2024-04-24 18:53:15 +01:00
2024-07-27 08:17:04 +02:00
use crate::{
file_info::FileInfo, spawn_utils::BlockingSpawner, storage::TorrentStorage, ManagedTorrent,
};
2024-04-24 18:53:15 +01:00
2024-12-05 22:57:34 +00:00
use super::{ManagedTorrentHandle, TorrentMetadata};
2024-04-24 18:53:15 +01:00
type StreamId = usize;
// 32 mb lookahead by default.
const PER_STREAM_BUF_DEFAULT: u64 = 32 * 1024 * 1024;
2024-04-24 22:21:41 +01:00
struct StreamState {
2024-04-26 09:26:56 +01:00
file_id: usize,
2024-04-24 22:21:41 +01:00
file_len: u64,
file_abs_offset: u64,
position: u64,
2024-04-24 22:21:41 +01:00
waker: Option<Waker>,
}
impl StreamState {
fn current_piece(&self, lengths: &Lengths) -> Option<CurrentPiece> {
2024-05-02 20:59:09 +01:00
lengths.compute_current_piece(self.position, self.file_abs_offset)
}
fn queue<'a>(&self, lengths: &'a Lengths) -> impl Iterator<Item = ValidPieceIndex> + 'a {
let start = self.file_abs_offset + self.position;
let end = (start + PER_STREAM_BUF_DEFAULT).min(self.file_abs_offset + self.file_len);
let dpl = lengths.default_piece_length();
let start_id = (start / dpl as u64).try_into().unwrap();
let end_id = end.div_ceil(dpl as u64).try_into().unwrap();
(start_id..end_id).filter_map(|i| lengths.validate_piece_index(i))
}
}
2024-04-24 18:53:15 +01:00
#[derive(Default)]
pub(crate) struct TorrentStreams {
next_stream_id: AtomicUsize,
2024-04-24 22:21:41 +01:00
streams: DashMap<StreamId, StreamState>,
2024-04-24 18:53:15 +01:00
}
impl TorrentStreams {
fn next_id(&self) -> usize {
self.next_stream_id.fetch_add(1, Ordering::Relaxed)
}
2024-04-24 22:21:41 +01:00
fn register_waker(&self, stream_id: StreamId, waker: Waker) {
if let Some(mut s) = self.streams.get_mut(&stream_id) {
let vm = s.value_mut();
vm.waker = Some(waker);
}
2024-04-24 18:53:15 +01:00
}
// Interleave 1st, 2nd etc pieces from each active stream in turn until they get 1/10th of the file .
pub(crate) fn iter_next_pieces<'a>(
&'a self,
lengths: &'a Lengths,
) -> impl Iterator<Item = ValidPieceIndex> + 'a {
struct Interleave<I> {
all: VecDeque<I>,
2024-04-24 22:21:41 +01:00
}
impl<I: Iterator<Item = ValidPieceIndex>> Iterator for Interleave<I> {
2024-04-24 22:21:41 +01:00
type Item = ValidPieceIndex;
fn next(&mut self) -> Option<Self::Item> {
while let Some(mut it) = self.all.pop_front() {
if let Some(piece) = it.next() {
self.all.push_back(it);
return Some(piece);
2024-04-24 22:21:41 +01:00
}
}
None
2024-04-24 18:53:15 +01:00
}
}
2024-04-29 18:26:36 +01:00
let mut all: Vec<_> = self.streams.iter().map(|s| s.queue(lengths)).collect();
// Shuffle to decrease determinism and make queueing fairer.
use rand::seq::SliceRandom;
2025-06-05 11:38:50 +01:00
all.shuffle(&mut rand::rng());
2024-04-29 18:26:36 +01:00
Interleave { all: all.into() }
2024-04-24 22:21:41 +01:00
}
pub(crate) fn wake_streams_on_piece_completed(
&self,
piece_id: ValidPieceIndex,
lengths: &Lengths,
) {
2024-04-24 22:21:41 +01:00
for mut w in self.streams.iter_mut() {
if w.value().current_piece(lengths).map(|p| p.id) == Some(piece_id) {
2024-04-24 22:21:41 +01:00
if let Some(waker) = w.value_mut().waker.take() {
trace!(
stream_id = *w.key(),
piece_id = piece_id.get(),
"waking stream"
);
waker.wake();
}
}
2024-04-24 18:53:15 +01:00
}
}
fn drop_stream(&self, stream_id: StreamId) -> Option<StreamState> {
2024-04-24 20:56:58 +01:00
trace!(stream_id, "dropping stream");
self.streams.remove(&stream_id).map(|s| s.1)
2024-04-24 18:53:15 +01:00
}
2024-04-26 09:26:56 +01:00
pub(crate) fn streamed_file_ids(&self) -> impl Iterator<Item = usize> + '_ {
self.streams.iter().map(|s| s.value().file_id)
}
2024-04-24 18:53:15 +01:00
}
2024-04-24 20:56:58 +01:00
pub struct FileStream {
2024-04-24 18:53:15 +01:00
torrent: ManagedTorrentHandle,
2024-12-05 22:57:34 +00:00
metadata: Arc<TorrentMetadata>,
2024-04-24 18:58:30 +01:00
streams: Arc<TorrentStreams>,
2024-04-24 18:53:15 +01:00
stream_id: usize,
file_id: usize,
position: u64,
// file params
file_len: u64,
file_torrent_abs_offset: u64,
2024-07-27 08:17:04 +02:00
spawner: BlockingSpawner,
2024-04-24 18:53:15 +01:00
}
macro_rules! map_io_err {
($e:expr) => {
$e.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
};
}
macro_rules! poll_try_io {
($e:expr) => {{
let e = map_io_err!($e);
match e {
Ok(r) => r,
2024-04-24 20:56:58 +01:00
Err(e) => {
2024-09-13 12:36:32 +01:00
debug!("stream error {e:#}");
2024-04-24 20:56:58 +01:00
return Poll::Ready(Err(e));
}
2024-04-24 18:53:15 +01:00
}
}};
}
impl AsyncRead for FileStream {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
2024-04-24 20:56:58 +01:00
tbuf: &mut tokio::io::ReadBuf<'_>,
2024-04-24 18:53:15 +01:00
) -> Poll<std::io::Result<()>> {
// if the file is over, return 0
if self.position == self.file_len {
2024-04-24 20:56:58 +01:00
trace!(
stream_id = self.stream_id,
file_id = self.file_id,
"stream completed, EOF"
);
2024-04-24 18:53:15 +01:00
return Poll::Ready(Ok(()));
}
2024-05-02 20:59:09 +01:00
let current = poll_try_io!(self
2024-12-05 22:57:34 +00:00
.metadata
2024-05-02 20:59:09 +01:00
.lengths
.compute_current_piece(self.position, self.file_torrent_abs_offset)
.context("invalid position"));
2024-04-24 18:53:15 +01:00
// if the piece is not there, register to wake when it is
// check if we have the piece for real
let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| {
2024-08-20 17:15:37 +01:00
let have = ct.get_have_pieces().as_slice()[current.id.get() as usize];
2024-04-24 18:53:15 +01:00
if !have {
2024-04-24 18:58:30 +01:00
self.streams
2024-04-24 22:21:41 +01:00
.register_waker(self.stream_id, cx.waker().clone());
2024-04-24 18:53:15 +01:00
}
have
}));
if !have {
trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %current.id, "poll pending, not have");
2024-04-24 18:53:15 +01:00
return Poll::Pending;
}
// actually stream the piece
2024-04-24 20:56:58 +01:00
let buf = tbuf.initialize_unfilled();
2024-04-24 18:53:15 +01:00
let file_remaining = self.file_len - self.position;
let bytes_to_read: usize = poll_try_io!((buf.len() as u64)
.min(current.piece_remaining as u64)
2024-04-24 18:53:15 +01:00
.min(file_remaining)
.try_into());
let buf = &mut buf[..bytes_to_read];
2024-04-24 20:56:58 +01:00
trace!(
buflen = buf.len(),
stream_id = self.stream_id,
file_id = self.file_id,
"will write bytes"
);
2024-04-24 18:53:15 +01:00
2024-07-27 08:17:04 +02:00
poll_try_io!(poll_try_io!(self.spawner.spawn_block_in_place(|| {
2024-12-05 22:57:34 +00:00
self.torrent.with_storage_and_file(
self.file_id,
|files, _fi| {
files.pread_exact(self.file_id, self.position, buf)?;
Ok::<_, anyhow::Error>(())
2024-12-05 22:57:34 +00:00
},
&self.metadata,
)
})));
2024-04-24 18:53:15 +01:00
2024-04-29 18:26:36 +01:00
self.as_mut().advance(bytes_to_read as u64);
2024-04-24 20:56:58 +01:00
tbuf.advance(bytes_to_read);
2024-04-24 18:53:15 +01:00
Poll::Ready(Ok(()))
}
}
impl AsyncSeek for FileStream {
fn start_seek(
mut self: std::pin::Pin<&mut Self>,
position: std::io::SeekFrom,
) -> std::io::Result<()> {
let end_i64 = map_io_err!(TryInto::<i64>::try_into(self.file_len))?;
let new_pos: i64 = match position {
2024-04-24 22:21:41 +01:00
SeekFrom::Start(s) => map_io_err!(s.try_into())?,
2024-04-24 18:53:15 +01:00
SeekFrom::End(e) => map_io_err!(TryInto::<i64>::try_into(self.file_len))? + e,
SeekFrom::Current(o) => map_io_err!(TryInto::<i64>::try_into(self.position))? + o,
};
if new_pos < 0 || new_pos > end_i64 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
anyhow::anyhow!("invalid seek"),
));
}
2024-04-29 18:26:36 +01:00
self.as_mut().set_position(map_io_err!(new_pos.try_into())?);
trace!(stream_id = self.stream_id, position = self.position, "seek");
2024-04-24 18:53:15 +01:00
Ok(())
}
fn poll_complete(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> Poll<std::io::Result<u64>> {
Poll::Ready(Ok(self.position))
}
}
impl Drop for FileStream {
fn drop(&mut self) {
self.streams.drop_stream(self.stream_id);
2024-04-24 18:53:15 +01:00
}
}
impl ManagedTorrent {
2024-12-05 22:57:34 +00:00
fn with_storage_and_file<F, R>(
&self,
file_id: usize,
f: F,
metadata: &TorrentMetadata,
) -> anyhow::Result<R>
2024-04-24 18:53:15 +01:00
where
2024-04-29 21:44:21 +01:00
F: FnOnce(&dyn TorrentStorage, &FileInfo) -> R,
2024-04-24 18:53:15 +01:00
{
self.with_state(|s| {
let files = match s {
2024-04-30 09:11:23 +01:00
crate::ManagedTorrentState::Paused(p) => &*p.files,
crate::ManagedTorrentState::Live(l) => &*l.files,
2024-04-30 09:48:19 +01:00
s => anyhow::bail!("with_storage_and_file: invalid state: {}", s.name()),
2024-04-24 18:53:15 +01:00
};
2024-12-05 22:57:34 +00:00
let fi = metadata.file_infos.get(file_id).context("invalid file")?;
2024-04-29 21:44:21 +01:00
Ok(f(files, fi))
2024-04-24 18:53:15 +01:00
})
}
2024-04-24 18:58:30 +01:00
fn streams(&self) -> anyhow::Result<Arc<TorrentStreams>> {
self.with_state(|s| match s {
crate::ManagedTorrentState::Paused(p) => Ok(p.streams.clone()),
crate::ManagedTorrentState::Live(l) => Ok(l.streams.clone()),
2024-04-29 17:40:44 +01:00
s => anyhow::bail!("streams: invalid state {}", s.name()),
2024-04-24 18:58:30 +01:00
})
}
2024-04-26 09:42:44 +01:00
fn maybe_reconnect_needed_peers_for_file(&self, file_id: usize) -> bool {
2024-04-26 09:26:56 +01:00
// If we have the full file, don't bother.
2024-04-29 21:44:21 +01:00
if self.is_file_finished(file_id) {
2024-04-26 09:42:44 +01:00
return false;
2024-04-26 09:26:56 +01:00
}
self.with_state(|state| {
if let crate::ManagedTorrentState::Live(l) = &state {
l.reconnect_all_not_needed_peers();
}
2024-04-26 09:42:44 +01:00
});
true
2024-04-26 09:26:56 +01:00
}
2024-04-29 21:44:21 +01:00
fn is_file_finished(&self, file_id: usize) -> bool {
2024-12-05 22:57:34 +00:00
let metadata = self.metadata.load();
let metadata = match metadata.as_ref() {
Some(r) => r,
None => return false,
};
2024-04-30 09:48:19 +01:00
// TODO: would be nice to remove locking
2024-12-05 22:57:34 +00:00
self.with_chunk_tracker(|ct| ct.is_file_finished(&metadata.file_infos[file_id]))
2024-04-29 21:44:21 +01:00
.unwrap_or(false)
}
2024-04-24 20:56:58 +01:00
pub fn stream(self: Arc<Self>, file_id: usize) -> anyhow::Result<FileStream> {
2024-12-05 22:57:34 +00:00
let metadata = self
.metadata
.load_full()
.context("torrent metadata is not resolved")?;
let (fd_len, fd_offset) = self.with_storage_and_file(
file_id,
|_fd, fi| (fi.len, fi.offset_in_torrent),
&metadata,
)?;
2024-04-24 18:58:30 +01:00
let streams = self.streams()?;
2024-04-24 22:21:41 +01:00
let s = FileStream {
2024-04-24 18:58:30 +01:00
stream_id: streams.next_id(),
2024-04-24 22:21:41 +01:00
streams: streams.clone(),
2024-04-24 18:53:15 +01:00
file_id,
position: 0,
file_len: fd_len,
file_torrent_abs_offset: fd_offset,
torrent: self,
2024-07-27 08:17:04 +02:00
spawner: BlockingSpawner::default(),
2024-12-05 22:57:34 +00:00
metadata,
2024-04-24 22:21:41 +01:00
};
2024-04-29 20:51:34 +01:00
s.torrent.maybe_reconnect_needed_peers_for_file(file_id);
2024-04-24 22:21:41 +01:00
streams.streams.insert(
s.stream_id,
StreamState {
2024-04-26 09:26:56 +01:00
file_id,
position: 0,
2024-04-24 22:21:41 +01:00
waker: None,
file_len: fd_len,
file_abs_offset: fd_offset,
2024-04-24 22:21:41 +01:00
},
);
2024-04-26 09:26:56 +01:00
2024-04-29 18:26:36 +01:00
debug!(stream_id = s.stream_id, file_id, "started stream");
2024-04-24 22:21:41 +01:00
Ok(s)
2024-04-24 18:53:15 +01:00
}
}
2024-04-24 20:56:58 +01:00
impl FileStream {
pub fn position(&self) -> u64 {
self.position
}
2024-04-29 18:26:36 +01:00
fn advance(&mut self, diff: u64) {
self.set_position(self.position + diff)
}
fn set_position(&mut self, new_pos: u64) {
self.position = new_pos;
self.streams
.streams
.get_mut(&self.stream_id)
.unwrap()
.value_mut()
.position = new_pos;
}
2024-04-24 20:56:58 +01:00
pub fn len(&self) -> u64 {
self.file_len
}
}