Disk write queue

This commit is contained in:
Igor Katson 2024-05-03 12:47:57 +01:00
parent e658b960e3
commit 1744f0c101
5 changed files with 43 additions and 6 deletions

1
Cargo.lock generated
View file

@ -1293,6 +1293,7 @@ version = "5.6.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
"async-trait",
"axum 0.7.5", "axum 0.7.5",
"backoff", "backoff",
"base64 0.21.7", "base64 0.21.7",

View file

@ -76,6 +76,7 @@ memmap2 = { version = "0.9.4" }
rand_distr = { version = "0.4.3", optional = true } rand_distr = { version = "0.4.3", optional = true }
lru = { version = "0.12.3", optional = true } lru = { version = "0.12.3", optional = true }
async-trait = "0.1.80"
[dev-dependencies] [dev-dependencies]
futures = { version = "0.3" } futures = { version = "0.3" }

View file

@ -19,6 +19,7 @@ use tracing::trace;
use crate::{read_buf::ReadBuf, spawn_utils::BlockingSpawner}; use crate::{read_buf::ReadBuf, spawn_utils::BlockingSpawner};
#[async_trait::async_trait]
pub trait PeerConnectionHandler { pub trait PeerConnectionHandler {
fn on_connected(&self, _connection_time: Duration) {} fn on_connected(&self, _connection_time: Duration) {}
fn get_have_bytes(&self) -> u64; fn get_have_bytes(&self) -> u64;
@ -28,7 +29,7 @@ pub trait PeerConnectionHandler {
&self, &self,
extended_handshake: &ExtendedHandshake<ByteBuf>, extended_handshake: &ExtendedHandshake<ByteBuf>,
) -> anyhow::Result<()>; ) -> anyhow::Result<()>;
fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()>; async fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()>;
fn on_uploaded_bytes(&self, bytes: u32); fn on_uploaded_bytes(&self, bytes: u32);
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>; fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>;
} }
@ -360,6 +361,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
} else { } else {
self.handler self.handler
.on_received_message(message) .on_received_message(message)
.await
.context("error in handler.on_received_message()")?; .context("error in handler.on_received_message()")?;
} }
} }

View file

@ -141,6 +141,7 @@ struct Handler {
locked: RwLock<Option<HandlerLocked>>, locked: RwLock<Option<HandlerLocked>>,
} }
#[async_trait::async_trait]
impl PeerConnectionHandler for Handler { impl PeerConnectionHandler for Handler {
fn get_have_bytes(&self) -> u64 { fn get_have_bytes(&self) -> u64 {
0 0
@ -157,7 +158,7 @@ impl PeerConnectionHandler for Handler {
Ok(()) Ok(())
} }
fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()> { async fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
trace!("{}: received message: {:?}", self.addr, msg); trace!("{}: received message: {:?}", self.addr, msg);
if let Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Data { if let Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Data {

View file

@ -59,6 +59,7 @@ use buffers::{ByteBuf, ByteBufOwned};
use clone_to_owned::CloneToOwned; use clone_to_owned::CloneToOwned;
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::{ use librqbit_core::{
constants::CHUNK_SIZE,
hash_id::Id20, hash_id::Id20,
lengths::{ChunkInfo, Lengths, ValidPieceIndex}, lengths::{ChunkInfo, Lengths, ValidPieceIndex},
spawn_utils::spawn_with_cancel, spawn_utils::spawn_with_cancel,
@ -155,6 +156,10 @@ pub struct TorrentStateOptions {
pub peer_read_write_timeout: Option<Duration>, pub peer_read_write_timeout: Option<Duration>,
} }
struct DiskWriteWorkItem {
work: Box<dyn FnOnce() + Send + Sync>,
}
pub struct TorrentStateLive { pub struct TorrentStateLive {
peers: PeerStates, peers: PeerStates,
meta: Arc<ManagedTorrentInfo>, meta: Arc<ManagedTorrentInfo>,
@ -179,6 +184,8 @@ pub struct TorrentStateLive {
up_speed_estimator: SpeedEstimator, up_speed_estimator: SpeedEstimator,
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
disk_work_tx: tokio::sync::mpsc::Sender<DiskWriteWorkItem>,
pub(crate) streams: Arc<TorrentStreams>, pub(crate) streams: Arc<TorrentStreams>,
} }
@ -210,6 +217,10 @@ impl TorrentStateLive {
pri pri
}; };
// 8MB per torrent of disk buffering.
let (disk_work_tx, mut disk_work_rx) =
tokio::sync::mpsc::channel(8 * 1024 * 1024 / CHUNK_SIZE as usize);
let state = Arc::new(TorrentStateLive { let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(), meta: paused.info.clone(),
peers: Default::default(), peers: Default::default(),
@ -236,8 +247,19 @@ impl TorrentStateLive {
per_piece_locks: (0..lengths.total_pieces()) per_piece_locks: (0..lengths.total_pieces())
.map(|_| RwLock::new(())) .map(|_| RwLock::new(()))
.collect(), .collect(),
disk_work_tx,
}); });
state.spawn(
error_span!(parent: state.meta.span.clone(), "disk_writer"),
async move {
while let Some(work_item) = disk_work_rx.recv().await {
tokio::task::spawn_blocking(work_item.work);
}
Ok(())
},
);
state.spawn( state.spawn(
error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"),
{ {
@ -802,6 +824,7 @@ struct PeerHandler {
tx: PeerTx, tx: PeerTx,
} }
#[async_trait::async_trait]
impl<'a> PeerConnectionHandler for &'a PeerHandler { impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn on_connected(&self, connection_time: Duration) { fn on_connected(&self, connection_time: Duration) {
self.counters self.counters
@ -812,7 +835,8 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
.total_time_connecting_ms .total_time_connecting_ms
.fetch_add(connection_time.as_millis() as u64, Ordering::Relaxed); .fetch_add(connection_time.as_millis() as u64, Ordering::Relaxed);
} }
fn on_received_message(&self, message: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
async fn on_received_message(&self, message: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
match message { match message {
Message::Request(request) => { Message::Request(request) => {
self.on_download_request(request) self.on_download_request(request)
@ -824,7 +848,10 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
Message::Choke => self.on_i_am_choked(), Message::Choke => self.on_i_am_choked(),
Message::Unchoke => self.on_i_am_unchoked(), Message::Unchoke => self.on_i_am_unchoked(),
Message::Interested => self.on_peer_interested(), Message::Interested => self.on_peer_interested(),
Message::Piece(piece) => self.on_received_piece(piece).context("on_received_piece")?, Message::Piece(piece) => self
.on_received_piece(piece)
.await
.context("on_received_piece")?,
Message::KeepAlive => { Message::KeepAlive => {
trace!("keepalive received"); trace!("keepalive received");
} }
@ -1302,7 +1329,7 @@ impl PeerHandler {
self.requests_sem.add_permits(128); self.requests_sem.add_permits(128);
} }
fn on_received_piece(&self, piece: Piece<ByteBuf>) -> anyhow::Result<()> { async fn on_received_piece(&self, piece: Piece<ByteBuf<'_>>) -> anyhow::Result<()> {
let piece_index = self let piece_index = self
.state .state
.lengths .lengths
@ -1510,7 +1537,12 @@ impl PeerHandler {
} }
}) })
}; };
tokio::task::spawn_blocking(work); self.state
.disk_work_tx
.send(DiskWriteWorkItem {
work: Box::new(work),
})
.await?;
} else { } else {
self.state self.state
.meta .meta