Defer writes command line flag

This commit is contained in:
Igor Katson 2024-05-01 15:23:11 +01:00
parent b8bbe6a87f
commit d25309e358
6 changed files with 166 additions and 90 deletions

View file

@ -37,7 +37,7 @@ pub trait PeerConnectionHandler {
pub enum WriterRequest {
Message(MessageOwned),
ReadChunkRequest(ChunkInfo),
Disconnect,
Disconnect(anyhow::Result<()>),
}
#[serde_as]
@ -270,7 +270,8 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let mut uploaded_add = None;
let len = match &req {
trace!("about to send: {:?}", &req);
let len = match req {
WriterRequest::Message(msg) => msg.serialize(&mut write_buf, &|| {
extended_handshake_ref
.read()
@ -307,14 +308,14 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
// this whole section is an optimization
write_buf.resize(PIECE_MESSAGE_DEFAULT_LEN, 0);
let preamble_len = serialize_piece_preamble(chunk, &mut write_buf);
let preamble_len = serialize_piece_preamble(&chunk, &mut write_buf);
let full_len = preamble_len + chunk.size as usize;
write_buf.resize(full_len, 0);
if !skip_reading_for_e2e_tests {
self.spawner
.spawn_block_in_place(|| {
self.handler
.read_chunk(chunk, &mut write_buf[preamble_len..])
.read_chunk(&chunk, &mut write_buf[preamble_len..])
})
.with_context(|| format!("error reading chunk {chunk:?}"))?;
}
@ -322,14 +323,12 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
uploaded_add = Some(chunk.size);
full_len
}
WriterRequest::Disconnect => {
WriterRequest::Disconnect(res) => {
trace!("disconnect requested, closing writer");
return Ok(());
return res;
}
};
trace!("sending: {:?}, length={}", &req, len);
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
.await
.context("error writing the message to peer")?;

View file

@ -186,6 +186,8 @@ pub struct Session {
cancellation_token: CancellationToken,
default_defer_writes: bool,
// This is stored for all tasks to stop when session is dropped.
_cancellation_token_drop_guard: DropGuard,
}
@ -306,6 +308,10 @@ pub struct AddTorrentOptions {
pub preferred_id: Option<usize>,
pub storage_factory: Option<BoxStorageFactory>,
// If true, will write to disk in separate threads. The downside is additional allocations.
// May be useful if the disk is slow.
pub defer_writes: Option<bool>,
}
pub struct ListOnlyResponse {
@ -413,6 +419,10 @@ pub struct SessionOptions {
pub listen_port_range: Option<std::ops::Range<u16>>,
pub enable_upnp_port_forwarding: bool,
// If true, will write to disk in separate threads. The downside is additional allocations.
// May be useful if the disk is slow.
pub default_defer_writes: bool,
}
async fn create_tcp_listener(
@ -511,6 +521,7 @@ impl Session {
_cancellation_token_drop_guard: token.clone().drop_guard(),
cancellation_token: token,
tcp_listen_port,
default_defer_writes: opts.default_defer_writes,
});
if let Some(tcp_listener) = tcp_listener {
@ -1022,6 +1033,7 @@ impl Session {
.allow_overwrite(opts.overwrite)
.spawner(self.spawner)
.trackers(trackers)
.defer_writes(opts.defer_writes.unwrap_or(self.default_defer_writes))
.peer_id(self.peer_id);
if let Some(only_files) = only_files {

View file

@ -72,6 +72,7 @@ async fn test_e2e() {
peer_opts: None,
listen_port_range: Some(15100..17000),
enable_upnp_port_forwarding: false,
default_defer_writes: false,
},
)
.await

View file

@ -747,7 +747,7 @@ impl TorrentStateLive {
.take_live_no_counters()
.unwrap()
.tx
.send(WriterRequest::Disconnect);
.send(WriterRequest::Disconnect(Ok(())));
}
}
}
@ -1189,7 +1189,7 @@ impl PeerHandler {
.unwrap_or_default()
{
debug!("both peer and us have full torrent, disconnecting");
self.tx.send(WriterRequest::Disconnect)?;
self.tx.send(WriterRequest::Disconnect(Ok(())))?;
// Sleep a bit to ensure this gets written to the network by manage_peer
tokio::time::sleep(Duration::from_millis(100)).await;
return Ok(());
@ -1382,93 +1382,137 @@ impl PeerHandler {
// By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would
// have fallen off above in one of the defensive checks.
//
let work = {
fn write_to_disk(
state: &TorrentStateLive,
addr: PeerHandle,
counters: &AtomicPeerCounters,
piece: &Piece<impl AsRef<[u8]>>,
chunk_info: &ChunkInfo,
full_piece_download_time: Option<Duration>,
) -> anyhow::Result<()> {
let index = piece.index;
// Not being able to write to storage is a fatal error. You need to unpause the
// torrent to recover from it.
match state.file_ops().write_chunk(addr, piece, chunk_info) {
Ok(()) => {}
Err(e) => {
error!("FATAL: error writing chunk to disk: {:?}", e);
return state.on_fatal_error(e);
}
}
// Global chunk/byte counters.
state
.stats
.fetched_bytes
.fetch_add(piece.block.as_ref().len() as u64, Ordering::Relaxed);
let full_piece_download_time = match full_piece_download_time {
Some(t) => t,
None => return Ok(()),
};
match state
.file_ops()
.check_piece(addr, chunk_info.piece_index, chunk_info)
.with_context(|| format!("error checking piece={index}"))?
{
true => {
{
let mut g = state.lock_write("mark_piece_downloaded");
g.get_chunks_mut()?
.mark_piece_downloaded(chunk_info.piece_index);
}
// Global piece counters.
let piece_len = state.lengths.piece_length(chunk_info.piece_index) as u64;
state
.stats
.downloaded_and_checked_bytes
// This counter is used to compute "is_finished", so using
// stronger ordering.
.fetch_add(piece_len, Ordering::Release);
state
.stats
.downloaded_and_checked_pieces
// This counter is used to compute "is_finished", so using
// stronger ordering.
.fetch_add(1, Ordering::Release);
state
.stats
.have_bytes
.fetch_add(piece_len, Ordering::Relaxed);
#[allow(clippy::cast_possible_truncation)]
state.stats.total_piece_download_ms.fetch_add(
full_piece_download_time.as_millis() as u64,
Ordering::Relaxed,
);
// Per-peer piece counters.
counters.on_piece_completed(piece_len, full_piece_download_time);
state.peers.reset_peer_backoff(addr);
debug!("piece={} successfully downloaded and verified", index);
state.on_piece_completed(chunk_info.piece_index)?;
state.maybe_transmit_haves(chunk_info.piece_index);
}
false => {
warn!(
"checksum for piece={} did not validate. disconecting peer.",
index
);
state
.lock_write("mark_piece_broken")
.get_chunks_mut()?
.mark_piece_broken_if_not_have(chunk_info.piece_index);
anyhow::bail!("i am probably a bogus peer. dying.")
}
};
Ok(())
}
if self.state.meta().options.defer_writes {
let state = self.state.clone();
let addr = self.addr;
let counters = self.counters.clone();
let piece = piece.clone_to_owned();
move || {
let index = piece.index;
let tx = self.tx.clone();
// Not being able to write to storage is a fatal error. You need to unpause the
// torrent to recover from it.
match state.file_ops().write_chunk(addr, &piece, &chunk_info) {
Ok(()) => {}
Err(e) => {
error!("FATAL: error writing chunk to disk: {:?}", e);
return state.on_fatal_error(e);
}
let work = move || {
if let Err(e) = write_to_disk(
&state,
addr,
&counters,
&piece,
&chunk_info,
full_piece_download_time,
) {
let _ = tx.send(WriterRequest::Disconnect(Err(e)));
}
};
tokio::runtime::Handle::current().spawn_blocking(work);
} else {
self.state
.meta
.spawner
.spawn_block_in_place(|| {
write_to_disk(
&self.state,
self.addr,
&self.counters,
&piece,
&chunk_info,
full_piece_download_time,
)
})
.with_context(|| format!("error processing received chunk {chunk_info:?}"))?;
}
let full_piece_download_time = match full_piece_download_time {
Some(t) => t,
None => return Ok(()),
};
match state
.file_ops()
.check_piece(addr, chunk_info.piece_index, &chunk_info)
.with_context(|| format!("error checking piece={index}"))?
{
true => {
{
let mut g = state.lock_write("mark_piece_downloaded");
g.get_chunks_mut()?
.mark_piece_downloaded(chunk_info.piece_index);
}
// Global piece counters.
let piece_len = state.lengths.piece_length(chunk_info.piece_index) as u64;
state
.stats
.downloaded_and_checked_bytes
// This counter is used to compute "is_finished", so using
// stronger ordering.
.fetch_add(piece_len, Ordering::Release);
state
.stats
.downloaded_and_checked_pieces
// This counter is used to compute "is_finished", so using
// stronger ordering.
.fetch_add(1, Ordering::Release);
state
.stats
.have_bytes
.fetch_add(piece_len, Ordering::Relaxed);
#[allow(clippy::cast_possible_truncation)]
state.stats.total_piece_download_ms.fetch_add(
full_piece_download_time.as_millis() as u64,
Ordering::Relaxed,
);
// Per-peer piece counters.
counters.on_piece_completed(piece_len, full_piece_download_time);
state.peers.reset_peer_backoff(addr);
debug!("piece={} successfully downloaded and verified", index);
state.on_piece_completed(chunk_info.piece_index)?;
state.maybe_transmit_haves(chunk_info.piece_index);
}
false => {
warn!(
"checksum for piece={} did not validate. disconecting peer.",
index
);
state
.lock_write("mark_piece_broken")
.get_chunks_mut()?
.mark_piece_broken_if_not_have(chunk_info.piece_index);
anyhow::bail!("i am probably a bogus peer. dying.")
}
};
Ok::<_, anyhow::Error>(())
}
};
tokio::runtime::Handle::current().spawn_blocking(work);
Ok(())
}
}

View file

@ -92,6 +92,7 @@ pub(crate) struct ManagedTorrentOptions {
pub peer_read_write_timeout: Option<Duration>,
pub allow_overwrite: bool,
pub output_folder: PathBuf,
pub defer_writes: bool,
}
pub struct ManagedTorrentInfo {
@ -505,6 +506,7 @@ pub(crate) struct ManagedTorrentBuilder {
spawner: Option<BlockingSpawner>,
allow_overwrite: bool,
storage_factory: BoxStorageFactory,
defer_writes: bool,
}
impl ManagedTorrentBuilder {
@ -527,6 +529,7 @@ impl ManagedTorrentBuilder {
allow_overwrite: false,
output_folder,
storage_factory,
defer_writes: false,
}
}
@ -570,6 +573,11 @@ impl ManagedTorrentBuilder {
self
}
pub fn defer_writes(&mut self, value: bool) -> &mut Self {
self.defer_writes = value;
self
}
pub fn build(self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> {
let lengths = Lengths::from_torrent(&self.info)?;
let file_infos = self
@ -600,6 +608,7 @@ impl ManagedTorrentBuilder {
peer_read_write_timeout: self.peer_read_write_timeout,
allow_overwrite: self.allow_overwrite,
output_folder: self.output_folder,
defer_writes: self.defer_writes,
},
});

View file

@ -97,6 +97,16 @@ struct Opts {
#[command(subcommand)]
subcommand: SubCommand,
/// How many blocking tokio threads to spawn to process disk reads/writes.
/// Might want to increase if the disk is slow.
#[arg(long = "max-blocking-threads", default_value = "16")]
max_blocking_threads: u16,
/// If set, will write to disk in background and not inline with peer.
/// Might be useful if the disk is slow.
#[arg(long = "defer-writes", default_value = "false")]
defer_writes: bool,
}
#[derive(Parser)]
@ -239,7 +249,7 @@ fn main() -> anyhow::Result<()> {
// note: we aren't using spawn_blocking() anymore, so this doesn't apply,
// however I'm still messing around, so in case we do, let's block the number of
// spawned threads.
.max_blocking_threads(8)
.max_blocking_threads(opts.max_blocking_threads as usize)
.build()?;
rt.block_on(async_main(opts))
@ -282,6 +292,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
None
},
enable_upnp_port_forwarding: !opts.disable_upnp,
default_defer_writes: opts.defer_writes,
};
let stats_printer = |session: Arc<Session>| async move {