Fix the bugs

This commit is contained in:
Igor Katson 2024-04-24 20:56:58 +01:00
parent eadb8872d9
commit 7e180c05b3
4 changed files with 74 additions and 12 deletions

View file

@ -7,10 +7,7 @@ use futures::Stream;
use http::StatusCode; use http::StatusCode;
use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{ use tokio::sync::mpsc::UnboundedSender;
io::{AsyncRead, AsyncSeek},
sync::mpsc::UnboundedSender,
};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tracing::warn; use tracing::warn;
@ -21,7 +18,7 @@ use crate::{
}, },
torrent_state::{ torrent_state::{
peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot}, peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot},
ManagedTorrentHandle, FileStream, ManagedTorrentHandle,
}, },
tracing_subscriber_config_utils::LineBroadcast, tracing_subscriber_config_utils::LineBroadcast,
}; };
@ -243,7 +240,7 @@ impl Api {
Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?) Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?)
} }
pub fn api_stream(&self, idx: TorrentId, file_id: usize) -> Result<impl AsyncRead + AsyncSeek> { pub fn api_stream(&self, idx: TorrentId, file_id: usize) -> Result<FileStream> {
let mgr = self.mgr_handle(idx)?; let mgr = self.mgr_handle(idx)?;
Ok(mgr.stream(file_id)?) Ok(mgr.stream(file_id)?)
} }

View file

@ -5,6 +5,7 @@ use axum::response::IntoResponse;
use axum::routing::{get, post}; use axum::routing::{get, post};
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::{FutureExt, TryStreamExt}; use futures::{FutureExt, TryStreamExt};
use http::{HeaderMap, HeaderValue, StatusCode};
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -162,6 +163,13 @@ impl HttpApi {
headers: http::HeaderMap, headers: http::HeaderMap,
) -> Result<impl IntoResponse> { ) -> Result<impl IntoResponse> {
let mut stream = state.api_stream(idx, file_id)?; let mut stream = state.api_stream(idx, file_id)?;
dbg!(&headers);
let mut status = StatusCode::OK;
let mut output_headers = HeaderMap::new();
output_headers.insert("Accept-Ranges", HeaderValue::from_static("bytes"));
if let Some(range) = headers.get(http::header::RANGE) { if let Some(range) = headers.get(http::header::RANGE) {
let offset: Option<u64> = range let offset: Option<u64> = range
.to_str() .to_str()
@ -170,15 +178,38 @@ impl HttpApi {
.and_then(|s| s.strip_suffix('-')) .and_then(|s| s.strip_suffix('-'))
.and_then(|s| s.parse().ok()); .and_then(|s| s.parse().ok());
if let Some(offset) = offset { if let Some(offset) = offset {
status = StatusCode::PARTIAL_CONTENT;
info!(offset, "range request offset");
stream stream
.seek(SeekFrom::Start(offset)) .seek(SeekFrom::Start(offset))
.await .await
.context("error seeking")?; .context("error seeking")?;
output_headers.insert(
http::header::CONTENT_LENGTH,
HeaderValue::from_str(&format!("{}", stream.len() - stream.position()))
.context("bug")?,
);
output_headers.insert(
http::header::CONTENT_RANGE,
HeaderValue::from_str(&format!(
"bytes {}-{}/{}",
stream.position(),
stream.len().saturating_sub(1),
stream.len()
))
.context("bug")?,
);
} else {
output_headers.insert(
http::header::CONTENT_LENGTH,
HeaderValue::from_str(&format!("{}", stream.len())).context("bug")?,
);
} }
} }
let s = tokio_util::io::ReaderStream::new(stream); let s = tokio_util::io::ReaderStream::new(stream);
Ok(axum::body::Body::from_stream(s)) Ok((status, (output_headers, axum::body::Body::from_stream(s))))
} }
async fn torrent_action_pause( async fn torrent_action_pause(

View file

@ -42,6 +42,7 @@ use initializing::TorrentStateInitializing;
use self::paused::TorrentStatePaused; use self::paused::TorrentStatePaused;
pub use self::stats::{TorrentStats, TorrentStatsState}; pub use self::stats::{TorrentStats, TorrentStatsState};
pub use self::streaming::FileStream;
pub enum ManagedTorrentState { pub enum ManagedTorrentState {
Initializing(Arc<TorrentStateInitializing>), Initializing(Arc<TorrentStateInitializing>),

View file

@ -11,6 +11,7 @@ use anyhow::Context;
use dashmap::DashMap; use dashmap::DashMap;
use librqbit_core::lengths::ValidPieceIndex; use librqbit_core::lengths::ValidPieceIndex;
use tokio::io::{AsyncRead, AsyncSeek}; use tokio::io::{AsyncRead, AsyncSeek};
use tracing::{debug, trace};
use crate::{opened_file::OpenedFile, ManagedTorrent}; use crate::{opened_file::OpenedFile, ManagedTorrent};
@ -37,6 +38,11 @@ impl TorrentStreams {
let mut woken = Vec::new(); let mut woken = Vec::new();
for w in self.wakers_by_stream.iter() { for w in self.wakers_by_stream.iter() {
if w.value().0 == piece_id { if w.value().0 == piece_id {
trace!(
stream_id = *w.key(),
piece_id = piece_id.get(),
"waking stream"
);
w.value().1.wake_by_ref(); w.value().1.wake_by_ref();
woken.push(*w.key()); woken.push(*w.key());
} }
@ -47,11 +53,12 @@ impl TorrentStreams {
} }
fn drop_stream(&self, stream_id: StreamId) { fn drop_stream(&self, stream_id: StreamId) {
trace!(stream_id, "dropping stream");
self.wakers_by_stream.remove(&stream_id); self.wakers_by_stream.remove(&stream_id);
} }
} }
struct FileStream { pub struct FileStream {
torrent: ManagedTorrentHandle, torrent: ManagedTorrentHandle,
streams: Arc<TorrentStreams>, streams: Arc<TorrentStreams>,
stream_id: usize, stream_id: usize,
@ -74,7 +81,10 @@ macro_rules! poll_try_io {
let e = map_io_err!($e); let e = map_io_err!($e);
match e { match e {
Ok(r) => r, Ok(r) => r,
Err(e) => return Poll::Ready(Err(e)), Err(e) => {
debug!("stream error {e:?}");
return Poll::Ready(Err(e));
}
} }
}}; }};
} }
@ -83,10 +93,15 @@ impl AsyncRead for FileStream {
fn poll_read( fn poll_read(
mut self: std::pin::Pin<&mut Self>, mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>, cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>, tbuf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> { ) -> Poll<std::io::Result<()>> {
// if the file is over, return 0 // if the file is over, return 0
if self.position == self.file_len { if self.position == self.file_len {
trace!(
stream_id = self.stream_id,
file_id = self.file_id,
"stream completed, EOF"
);
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
@ -119,11 +134,12 @@ impl AsyncRead for FileStream {
have have
})); }));
if !have { if !have {
trace!(stream_id = self.stream_id, file_id = self.file_id, piece_id = %piece_id, "poll pending, not have");
return Poll::Pending; return Poll::Pending;
} }
// actually stream the piece // actually stream the piece
let buf = buf.initialize_unfilled(); let buf = tbuf.initialize_unfilled();
let file_remaining = self.file_len - self.position; let file_remaining = self.file_len - self.position;
let bytes_to_read: usize = poll_try_io!((piece_len as u64) let bytes_to_read: usize = poll_try_io!((piece_len as u64)
.min(buf.len() as u64) .min(buf.len() as u64)
@ -132,6 +148,12 @@ impl AsyncRead for FileStream {
.try_into()); .try_into());
let buf = &mut buf[..bytes_to_read]; let buf = &mut buf[..bytes_to_read];
trace!(
buflen = buf.len(),
stream_id = self.stream_id,
file_id = self.file_id,
"will write bytes"
);
poll_try_io!(poll_try_io!(self.torrent.with_opened_file( poll_try_io!(poll_try_io!(self.torrent.with_opened_file(
self.file_id, self.file_id,
@ -144,6 +166,7 @@ impl AsyncRead for FileStream {
))); )));
self.as_mut().position += buf.len() as u64; self.as_mut().position += buf.len() as u64;
tbuf.advance(bytes_to_read);
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
@ -213,7 +236,7 @@ impl ManagedTorrent {
}) })
} }
pub fn stream(self: Arc<Self>, file_id: usize) -> anyhow::Result<impl AsyncRead + AsyncSeek> { pub fn stream(self: Arc<Self>, file_id: usize) -> anyhow::Result<FileStream> {
let (fd_len, fd_offset) = let (fd_len, fd_offset) =
self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?; self.with_opened_file(file_id, |fd| (fd.len, fd.offset_in_torrent))?;
let streams = self.streams()?; let streams = self.streams()?;
@ -229,3 +252,13 @@ impl ManagedTorrent {
}) })
} }
} }
impl FileStream {
pub fn position(&self) -> u64 {
self.position
}
pub fn len(&self) -> u64 {
self.file_len
}
}