--defer-writes-up-to

This commit is contained in:
Igor Katson 2024-05-03 14:55:31 +01:00
parent a3c4ca70fb
commit eafd274a0b
6 changed files with 55 additions and 62 deletions

View file

@ -20,7 +20,7 @@ use crate::{
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
type_aliases::PeerStream,
type_aliases::{DiskWorkQueueSender, PeerStream},
};
use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
@ -34,6 +34,7 @@ use futures::{
};
use itertools::Itertools;
use librqbit_core::{
constants::CHUNK_SIZE,
directories::get_configuration_directory,
magnet::Magnet,
peer_id::generate_peer_id,
@ -188,7 +189,7 @@ pub struct Session {
cancellation_token: CancellationToken,
default_defer_writes: bool,
disk_write_tx: Option<DiskWorkQueueSender>,
default_storage_factory: Option<BoxStorageFactory>,
@ -424,9 +425,9 @@ pub struct SessionOptions {
pub listen_port_range: Option<std::ops::Range<u16>>,
pub enable_upnp_port_forwarding: bool,
// If true, will write to disk in separate threads. The downside is additional allocations.
// May be useful if the disk is slow.
pub default_defer_writes: bool,
// If you set this to something, all writes to disk will happen in background and be
// buffered in memory up to approximately the given number of megabytes.
pub defer_writes_up_to: Option<usize>,
pub default_storage_factory: Option<BoxStorageFactory>,
}
@ -516,6 +517,16 @@ impl Session {
};
let spawner = BlockingSpawner::default();
let (disk_write_tx, disk_write_rx) = opts
.defer_writes_up_to
.map(|mb| {
const DISK_WRITE_APPROX_WORK_ITEM_SIZE: usize = CHUNK_SIZE as usize + 300;
let count = mb * 1024 * 1024 / DISK_WRITE_APPROX_WORK_ITEM_SIZE;
let (tx, rx) = tokio::sync::mpsc::channel(count);
(Some(tx), Some(rx))
})
.unwrap_or_default();
let session = Arc::new(Self {
persistence_filename,
peer_id,
@ -527,10 +538,20 @@ impl Session {
_cancellation_token_drop_guard: token.clone().drop_guard(),
cancellation_token: token,
tcp_listen_port,
default_defer_writes: opts.default_defer_writes,
disk_write_tx,
default_storage_factory: opts.default_storage_factory,
});
if let Some(mut disk_write_rx) = disk_write_rx {
session.spawn(error_span!("disk_writer"), async move {
while let Some(work) = disk_write_rx.recv().await {
trace!(disk_write_rx_queue_len = disk_write_rx.len());
spawner.spawn_block_in_place(work);
}
Ok(())
});
}
if let Some(tcp_listener) = tcp_listener {
session.spawn(
error_span!("tcp_listen", port = tcp_listen_port),
@ -1041,9 +1062,12 @@ impl Session {
.allow_overwrite(opts.overwrite)
.spawner(self.spawner)
.trackers(trackers)
.defer_writes(opts.defer_writes.unwrap_or(self.default_defer_writes))
.peer_id(self.peer_id);
if let Some(d) = self.disk_write_tx.clone() {
builder.disk_writer(d);
}
if let Some(only_files) = only_files {
builder.only_files(only_files);
}

View file

@ -72,8 +72,8 @@ async fn test_e2e() {
peer_opts: None,
listen_port_range: Some(15100..17000),
enable_upnp_port_forwarding: false,
default_defer_writes: false,
default_storage_factory: None,
defer_writes_up_to: None,
},
)
.await

View file

@ -59,7 +59,6 @@ use buffers::{ByteBuf, ByteBufOwned};
use clone_to_owned::CloneToOwned;
use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::{
constants::CHUNK_SIZE,
hash_id::Id20,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
spawn_utils::spawn_with_cancel,
@ -88,7 +87,7 @@ use crate::{
},
session::CheckedIncomingConnection,
torrent_state::{peer::Peer, utils::atomic_inc},
type_aliases::{FilePriorities, FileStorage, PeerHandle, BF},
type_aliases::{DiskWorkQueueSender, FilePriorities, FileStorage, PeerHandle, BF},
};
use self::{
@ -156,10 +155,6 @@ pub struct TorrentStateOptions {
pub peer_read_write_timeout: Option<Duration>,
}
struct DiskWriteWorkItem {
work: Box<dyn FnOnce() + Send + Sync>,
}
pub struct TorrentStateLive {
peers: PeerStates,
meta: Arc<ManagedTorrentInfo>,
@ -184,8 +179,6 @@ pub struct TorrentStateLive {
up_speed_estimator: SpeedEstimator,
cancellation_token: CancellationToken,
disk_work_tx: tokio::sync::mpsc::Sender<DiskWriteWorkItem>,
pub(crate) streams: Arc<TorrentStreams>,
}
@ -217,15 +210,7 @@ impl TorrentStateLive {
pri
};
let defer_writes = paused.info.options.defer_writes;
// 8MB per torrent of disk buffering.
let (disk_work_tx, mut disk_work_rx) = tokio::sync::mpsc::channel(if defer_writes {
const APPROX_WORK_ITEM_SIZE: usize = CHUNK_SIZE as usize + 300;
8 * 1024 * 1024 / APPROX_WORK_ITEM_SIZE
} else {
1
});
let defer_writes = paused.info.options.disk_write_queue.is_some();
let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(),
@ -257,24 +242,8 @@ impl TorrentStateLive {
} else {
vec![]
},
disk_work_tx,
});
if defer_writes {
state.spawn(
error_span!(parent: state.meta.span.clone(), "disk_writer"),
{
let spawner = state.meta.spawner;
async move {
while let Some(work_item) = disk_work_rx.recv().await {
spawner.spawn_block_in_place(work_item.work);
}
Ok(())
}
},
);
}
state.spawn(
error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"),
{
@ -324,8 +293,8 @@ impl TorrentStateLive {
&self.up_speed_estimator
}
fn defer_writes(&self) -> bool {
self.meta.options.defer_writes
fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> {
self.meta.options.disk_write_queue.as_ref()
}
pub(crate) fn add_incoming_peer(
@ -1546,7 +1515,7 @@ impl PeerHandler {
Ok(())
}
if self.state.defer_writes() {
if let Some(dtx) = self.state.disk_work_tx() {
// TODO: shove all this into one thing to .clone() once rather than 5 times.
let state = self.state.clone();
let addr = self.addr;
@ -1555,7 +1524,6 @@ impl PeerHandler {
let tx = self.tx.clone();
let span = tracing::error_span!("deferred_write");
let work = move || {
span.in_scope(|| {
if let Err(e) = write_to_disk(&state, addr, &counters, &piece, &chunk_info) {
@ -1563,12 +1531,7 @@ impl PeerHandler {
}
})
};
self.state
.disk_work_tx
.send(DiskWriteWorkItem {
work: Box::new(work),
})
.await?;
dtx.send(Box::new(work)).await?;
} else {
self.state
.meta

View file

@ -38,6 +38,7 @@ use crate::file_info::FileInfo;
use crate::spawn_utils::BlockingSpawner;
use crate::storage::BoxStorageFactory;
use crate::torrent_state::stats::LiveStats;
use crate::type_aliases::DiskWorkQueueSender;
use crate::type_aliases::FileInfos;
use crate::type_aliases::PeerStream;
@ -92,7 +93,7 @@ pub(crate) struct ManagedTorrentOptions {
pub peer_read_write_timeout: Option<Duration>,
pub allow_overwrite: bool,
pub output_folder: PathBuf,
pub defer_writes: bool,
pub disk_write_queue: Option<DiskWorkQueueSender>,
}
pub struct ManagedTorrentInfo {
@ -506,7 +507,7 @@ pub(crate) struct ManagedTorrentBuilder {
spawner: Option<BlockingSpawner>,
allow_overwrite: bool,
storage_factory: BoxStorageFactory,
defer_writes: bool,
disk_writer: Option<DiskWorkQueueSender>,
}
impl ManagedTorrentBuilder {
@ -529,7 +530,7 @@ impl ManagedTorrentBuilder {
allow_overwrite: false,
output_folder,
storage_factory,
defer_writes: false,
disk_writer: None,
}
}
@ -573,8 +574,8 @@ impl ManagedTorrentBuilder {
self
}
pub fn defer_writes(&mut self, value: bool) -> &mut Self {
self.defer_writes = value;
pub fn disk_writer(&mut self, value: DiskWorkQueueSender) -> &mut Self {
self.disk_writer = Some(value);
self
}
@ -608,7 +609,7 @@ impl ManagedTorrentBuilder {
peer_read_write_timeout: self.peer_read_write_timeout,
allow_overwrite: self.allow_overwrite,
output_folder: self.output_folder,
defer_writes: self.defer_writes,
disk_write_queue: self.disk_writer,
},
});

View file

@ -11,3 +11,6 @@ pub type PeerStream = BoxStream<'static, SocketAddr>;
pub type FileInfos = Vec<FileInfo>;
pub(crate) type FileStorage = Box<dyn TorrentStorage>;
pub(crate) type FilePriorities = Vec<usize>;
pub(crate) type DiskWorkQueueItem = Box<dyn FnOnce() + Send + Sync>;
pub(crate) type DiskWorkQueueSender = tokio::sync::mpsc::Sender<DiskWorkQueueItem>;

View file

@ -104,10 +104,12 @@ struct Opts {
#[arg(long = "max-blocking-threads", default_value = "8")]
max_blocking_threads: u16,
/// If set, will write to disk in background and not inline with peer.
/// Useful if the disk is slow or its latency is very unstable (e.g. HDD or old SSD).
#[arg(long = "defer-writes", default_value = "false")]
defer_writes: bool,
// If you set this to something, all writes to disk will happen in background and be
// buffered in memory up to approximately the given number of megabytes.
//
// Might be useful for slow disks.
#[arg(long = "defer-writes-up-to")]
defer_writes_up_to: Option<usize>,
/// Use mmap (file-backed) for storage. Any advantages are questionable and unproven.
/// If you use it, you know what you are doing.
@ -298,7 +300,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
None
},
enable_upnp_port_forwarding: !opts.disable_upnp,
default_defer_writes: opts.defer_writes,
defer_writes_up_to: opts.defer_writes_up_to,
default_storage_factory: Some({
fn wrap<S: StorageFactory + Clone>(s: S) -> impl StorageFactory {
#[cfg(feature = "debug_slow_disk")]