Downloading chunks now does not copy the chunks, but writes them straight to disk. (Still reads into a buffer first though, but does not allocate on every chunk)

This commit is contained in:
Igor Katson 2021-06-28 16:37:15 +01:00
parent fab43a8d23
commit 14b62b45c5
7 changed files with 96 additions and 109 deletions

View file

@ -1,7 +1,6 @@
use log::{debug, info};
use crate::{
buffers::ByteString,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
peer_binary_protocol::Piece,
type_aliases::BF,
@ -118,10 +117,13 @@ impl ChunkTracker {
}
// return true if the whole piece is marked downloaded
pub fn mark_chunk_downloaded(
pub fn mark_chunk_downloaded<ByteBuf>(
&mut self,
piece: &Piece<ByteString>,
) -> Option<ChunkMarkingResult> {
piece: &Piece<ByteBuf>,
) -> Option<ChunkMarkingResult>
where
ByteBuf: AsRef<[u8]>,
{
let chunk_info = self.lengths.chunk_info_from_received_piece(piece)?;
let chunk_range = self.lengths.chunk_range(chunk_info.piece_index);
let chunk_range = self.chunk_status.get_mut(chunk_range).unwrap();

View file

@ -340,12 +340,15 @@ impl<'a> FileOps<'a> {
return Ok(());
}
pub fn write_chunk(
pub fn write_chunk<ByteBuf>(
&self,
who_sent: PeerHandle,
data: &Piece<ByteString>,
data: &Piece<ByteBuf>,
chunk_info: &ChunkInfo,
) -> anyhow::Result<()> {
) -> anyhow::Result<()>
where
ByteBuf: AsRef<[u8]>,
{
let mut buf = data.block.as_ref();
let mut absolute_offset = self.lengths.chunk_absolute_offset(&chunk_info);

View file

@ -189,11 +189,17 @@ impl Lengths {
})
}
pub fn chunk_info_from_received_piece(&self, piece: &Piece<ByteString>) -> Option<ChunkInfo> {
pub fn chunk_info_from_received_piece<ByteBuf>(
&self,
piece: &Piece<ByteBuf>,
) -> Option<ChunkInfo>
where
ByteBuf: AsRef<[u8]>,
{
self.chunk_info_from_received_data(
self.validate_piece_index(piece.index)?,
piece.begin,
piece.block.len() as u32,
piece.block.as_ref().len() as u32,
)
}
pub const fn chunk_range(&self, index: ValidPieceIndex) -> std::ops::Range<usize> {

View file

@ -18,7 +18,7 @@ use crate::{
MessageOwned, Piece, Request,
},
peer_id::try_decode_peer_id,
spawn_utils::{spawn, spawn_blocking},
spawn_utils::spawn,
torrent_state::{InflightRequest, TorrentState},
type_aliases::PeerHandle,
};
@ -117,10 +117,7 @@ impl PeerConnection {
Err(_) => WriterRequest::Message(MessageOwned::KeepAlive),
};
let uploaded_add = match &req {
WriterRequest::Message(Message::Piece(p)) => Some(p.block.len()),
_ => None,
};
let mut uploaded_add = None;
let len = match &req {
WriterRequest::Message(msg) => msg.serialize(&mut buf),
@ -138,6 +135,7 @@ impl PeerConnection {
)
})
.with_context(|| format!("error reading chunk {:?}", chunk))?;
uploaded_add = Some(chunk.size);
full_len
}
};
@ -164,15 +162,10 @@ impl PeerConnection {
let reader = async move {
loop {
let message = loop {
let (message, size) = loop {
match MessageBorrowed::deserialize(&read_buf[..read_so_far]) {
Ok((msg, size)) => {
let msg = msg.clone_to_owned();
if read_so_far > size {
read_buf.copy_within(size..read_so_far, 0);
}
read_so_far -= size;
break msg;
break (msg, size);
}
Err(MessageDeserializeError::NotEnoughData(d, _)) => {
if read_buf.len() < read_so_far + d {
@ -206,7 +199,7 @@ impl PeerConnection {
format!("error handling download request from {}", handle)
})?;
}
Message::Bitfield(b) => self.on_bitfield(handle, b).await?,
Message::Bitfield(b) => self.on_bitfield(handle, b.clone_to_owned()).await?,
Message::Choke => self.on_i_am_choked(handle),
Message::Unchoke => self.on_i_am_unchoked(handle),
Message::Interested => {
@ -227,6 +220,11 @@ impl PeerConnection {
info!("received \"not interested\", but we don't care yet")
}
}
if read_so_far > size {
read_buf.copy_within(size..read_so_far, 0);
}
read_so_far -= size;
}
// For type inference.
@ -471,11 +469,7 @@ impl PeerConnection {
live.requests_sem.add_permits(16);
}
fn on_received_piece(
&self,
handle: PeerHandle,
piece: Piece<ByteString>,
) -> anyhow::Result<()> {
fn on_received_piece(&self, handle: PeerHandle, piece: Piece<ByteBuf>) -> anyhow::Result<()> {
let chunk_info = match self.state.lengths.chunk_info_from_received_piece(&piece) {
Some(i) => i,
None => {
@ -533,73 +527,73 @@ impl PeerConnection {
}
};
let this = self.clone();
// to prevent deadlocks.
drop(g);
spawn_blocking(
format!(
"write_and_check(piece={}, peer={}, block={:?})",
piece.index, handle, &piece
),
move || {
let index = piece.index;
tokio::task::block_in_place(move || {
let index = piece.index;
// TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what
// should we really do? If we unmark it, it will get requested forever...
this.state
.file_ops()
.write_chunk(handle, &piece, &chunk_info)?;
// TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what
// should we really do? If we unmark it, it will get requested forever...
//
// So let's just unwrap and abort.
self.state
.file_ops()
.write_chunk(handle, &piece, &chunk_info)
.expect("expected to be able to write to disk");
if !should_checksum {
return Ok(());
if !should_checksum {
return Ok(());
}
match self
.state
.file_ops()
.check_piece(handle, chunk_info.piece_index, &chunk_info)
.with_context(|| format!("error checking piece={}", index))?
{
true => {
let piece_len = self.state.lengths.piece_length(chunk_info.piece_index) as u64;
self.state
.stats
.downloaded_and_checked
.fetch_add(piece_len, Ordering::Relaxed);
self.state
.stats
.have
.fetch_add(piece_len, Ordering::Relaxed);
self.state
.locked
.write()
.chunks
.mark_piece_downloaded(chunk_info.piece_index);
debug!(
"piece={} successfully downloaded and verified from {}",
index, handle
);
let state_clone = self.state.clone();
let index = piece.index;
spawn("transmit haves", async move {
state_clone.task_transmit_haves(index).await
});
}
match this
.state
.file_ops()
.check_piece(handle, chunk_info.piece_index, &chunk_info)
.with_context(|| format!("error checking piece={}", index))?
{
true => {
let piece_len =
this.state.lengths.piece_length(chunk_info.piece_index) as u64;
this.state
.stats
.downloaded_and_checked
.fetch_add(piece_len, Ordering::Relaxed);
this.state
.stats
.have
.fetch_add(piece_len, Ordering::Relaxed);
this.state
.locked
.write()
.chunks
.mark_piece_downloaded(chunk_info.piece_index);
debug!(
"piece={} successfully downloaded and verified from {}",
index, handle
);
let state_clone = this.state.clone();
spawn("transmit haves", async move {
state_clone.task_transmit_haves(piece.index).await
});
}
false => {
warn!(
"checksum for piece={} did not validate, came from {}",
index, handle
);
this.state
.locked
.write()
.chunks
.mark_piece_broken(chunk_info.piece_index);
}
};
Ok::<_, anyhow::Error>(())
},
);
false => {
warn!(
"checksum for piece={} did not validate, came from {}",
index, handle
);
self.state
.locked
.write()
.chunks
.mark_piece_broken(chunk_info.piece_index);
}
};
Ok::<_, anyhow::Error>(())
})
.with_context(|| format!("error processing received chunk {:?}", chunk_info))?;
Ok(())
}
}

View file

@ -18,20 +18,3 @@ pub fn spawn<N: Display + 'static + Send>(
}
});
}
pub fn spawn_blocking<T: Send + Sync + 'static, N: Display + 'static + Send>(
name: N,
f: impl FnOnce() -> anyhow::Result<T> + Send + 'static,
) -> tokio::task::JoinHandle<anyhow::Result<T>> {
debug!("starting blocking task \"{}\"", name);
tokio::task::spawn_blocking(move || match f() {
Ok(v) => {
debug!("blocking task \"{}\" finished", name);
Ok(v)
}
Err(e) => {
error!("error in blocking task \"{}\": {:#}", name, &e);
Err(e)
}
})
}

View file

@ -21,7 +21,6 @@ use crate::{
chunk_tracker::ChunkTracker,
file_ops::FileOps,
lengths::Lengths,
peer_binary_protocol::MessageOwned,
peer_connection::{PeerConnection, WriterRequest},
spawn_utils::spawn,
torrent_metainfo::TorrentMetaV1Owned,

View file

@ -17,7 +17,7 @@ use crate::{
chunk_tracker::ChunkTracker,
file_ops::FileOps,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
peer_binary_protocol::{Handshake, Message, MessageOwned},
peer_binary_protocol::{Handshake, Message},
peer_connection::WriterRequest,
peer_state::{LivePeerState, PeerState},
torrent_metainfo::TorrentMetaV1Owned,