From 14b62b45c5118a484d8071a7e9c4f7f54d86baf6 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 28 Jun 2021 16:37:15 +0100 Subject: [PATCH] Downloading chunks now does not copy the chunks, but writes them straight to disk. (Still reads into a buffer first though, but does not allocate on every chunk) --- crates/librqbit/src/chunk_tracker.rs | 10 +- crates/librqbit/src/file_ops.rs | 9 +- crates/librqbit/src/lengths.rs | 10 +- crates/librqbit/src/peer_connection.rs | 156 ++++++++++++------------- crates/librqbit/src/spawn_utils.rs | 17 --- crates/librqbit/src/torrent_manager.rs | 1 - crates/librqbit/src/torrent_state.rs | 2 +- 7 files changed, 96 insertions(+), 109 deletions(-) diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 58bcbc3..021409e 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -1,7 +1,6 @@ use log::{debug, info}; use crate::{ - buffers::ByteString, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, peer_binary_protocol::Piece, type_aliases::BF, @@ -118,10 +117,13 @@ impl ChunkTracker { } // return true if the whole piece is marked downloaded - pub fn mark_chunk_downloaded( + pub fn mark_chunk_downloaded( &mut self, - piece: &Piece, - ) -> Option { + piece: &Piece, + ) -> Option + where + ByteBuf: AsRef<[u8]>, + { let chunk_info = self.lengths.chunk_info_from_received_piece(piece)?; let chunk_range = self.lengths.chunk_range(chunk_info.piece_index); let chunk_range = self.chunk_status.get_mut(chunk_range).unwrap(); diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index ca22fd3..b407686 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -340,12 +340,15 @@ impl<'a> FileOps<'a> { return Ok(()); } - pub fn write_chunk( + pub fn write_chunk( &self, who_sent: PeerHandle, - data: &Piece, + data: &Piece, chunk_info: &ChunkInfo, - ) -> anyhow::Result<()> { + ) -> anyhow::Result<()> + where + ByteBuf: AsRef<[u8]>, + { let mut buf = data.block.as_ref(); let mut absolute_offset = self.lengths.chunk_absolute_offset(&chunk_info); diff --git a/crates/librqbit/src/lengths.rs b/crates/librqbit/src/lengths.rs index c2362eb..6b5a041 100644 --- a/crates/librqbit/src/lengths.rs +++ b/crates/librqbit/src/lengths.rs @@ -189,11 +189,17 @@ impl Lengths { }) } - pub fn chunk_info_from_received_piece(&self, piece: &Piece) -> Option { + pub fn chunk_info_from_received_piece( + &self, + piece: &Piece, + ) -> Option + where + ByteBuf: AsRef<[u8]>, + { self.chunk_info_from_received_data( self.validate_piece_index(piece.index)?, piece.begin, - piece.block.len() as u32, + piece.block.as_ref().len() as u32, ) } pub const fn chunk_range(&self, index: ValidPieceIndex) -> std::ops::Range { diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 1d869da..dffc770 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -18,7 +18,7 @@ use crate::{ MessageOwned, Piece, Request, }, peer_id::try_decode_peer_id, - spawn_utils::{spawn, spawn_blocking}, + spawn_utils::spawn, torrent_state::{InflightRequest, TorrentState}, type_aliases::PeerHandle, }; @@ -117,10 +117,7 @@ impl PeerConnection { Err(_) => WriterRequest::Message(MessageOwned::KeepAlive), }; - let uploaded_add = match &req { - WriterRequest::Message(Message::Piece(p)) => Some(p.block.len()), - _ => None, - }; + let mut uploaded_add = None; let len = match &req { WriterRequest::Message(msg) => msg.serialize(&mut buf), @@ -138,6 +135,7 @@ impl PeerConnection { ) }) .with_context(|| format!("error reading chunk {:?}", chunk))?; + uploaded_add = Some(chunk.size); full_len } }; @@ -164,15 +162,10 @@ impl PeerConnection { let reader = async move { loop { - let message = loop { + let (message, size) = loop { match MessageBorrowed::deserialize(&read_buf[..read_so_far]) { Ok((msg, size)) => { - let msg = msg.clone_to_owned(); - if read_so_far > size { - read_buf.copy_within(size..read_so_far, 0); - } - read_so_far -= size; - break msg; + break (msg, size); } Err(MessageDeserializeError::NotEnoughData(d, _)) => { if read_buf.len() < read_so_far + d { @@ -206,7 +199,7 @@ impl PeerConnection { format!("error handling download request from {}", handle) })?; } - Message::Bitfield(b) => self.on_bitfield(handle, b).await?, + Message::Bitfield(b) => self.on_bitfield(handle, b.clone_to_owned()).await?, Message::Choke => self.on_i_am_choked(handle), Message::Unchoke => self.on_i_am_unchoked(handle), Message::Interested => { @@ -227,6 +220,11 @@ impl PeerConnection { info!("received \"not interested\", but we don't care yet") } } + + if read_so_far > size { + read_buf.copy_within(size..read_so_far, 0); + } + read_so_far -= size; } // For type inference. @@ -471,11 +469,7 @@ impl PeerConnection { live.requests_sem.add_permits(16); } - fn on_received_piece( - &self, - handle: PeerHandle, - piece: Piece, - ) -> anyhow::Result<()> { + fn on_received_piece(&self, handle: PeerHandle, piece: Piece) -> anyhow::Result<()> { let chunk_info = match self.state.lengths.chunk_info_from_received_piece(&piece) { Some(i) => i, None => { @@ -533,73 +527,73 @@ impl PeerConnection { } }; - let this = self.clone(); + // to prevent deadlocks. + drop(g); - spawn_blocking( - format!( - "write_and_check(piece={}, peer={}, block={:?})", - piece.index, handle, &piece - ), - move || { - let index = piece.index; + tokio::task::block_in_place(move || { + let index = piece.index; - // TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what - // should we really do? If we unmark it, it will get requested forever... - this.state - .file_ops() - .write_chunk(handle, &piece, &chunk_info)?; + // TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what + // should we really do? If we unmark it, it will get requested forever... + // + // So let's just unwrap and abort. + self.state + .file_ops() + .write_chunk(handle, &piece, &chunk_info) + .expect("expected to be able to write to disk"); - if !should_checksum { - return Ok(()); + if !should_checksum { + return Ok(()); + } + + match self + .state + .file_ops() + .check_piece(handle, chunk_info.piece_index, &chunk_info) + .with_context(|| format!("error checking piece={}", index))? + { + true => { + let piece_len = self.state.lengths.piece_length(chunk_info.piece_index) as u64; + self.state + .stats + .downloaded_and_checked + .fetch_add(piece_len, Ordering::Relaxed); + self.state + .stats + .have + .fetch_add(piece_len, Ordering::Relaxed); + self.state + .locked + .write() + .chunks + .mark_piece_downloaded(chunk_info.piece_index); + + debug!( + "piece={} successfully downloaded and verified from {}", + index, handle + ); + + let state_clone = self.state.clone(); + let index = piece.index; + spawn("transmit haves", async move { + state_clone.task_transmit_haves(index).await + }); } - - match this - .state - .file_ops() - .check_piece(handle, chunk_info.piece_index, &chunk_info) - .with_context(|| format!("error checking piece={}", index))? - { - true => { - let piece_len = - this.state.lengths.piece_length(chunk_info.piece_index) as u64; - this.state - .stats - .downloaded_and_checked - .fetch_add(piece_len, Ordering::Relaxed); - this.state - .stats - .have - .fetch_add(piece_len, Ordering::Relaxed); - this.state - .locked - .write() - .chunks - .mark_piece_downloaded(chunk_info.piece_index); - - debug!( - "piece={} successfully downloaded and verified from {}", - index, handle - ); - let state_clone = this.state.clone(); - spawn("transmit haves", async move { - state_clone.task_transmit_haves(piece.index).await - }); - } - false => { - warn!( - "checksum for piece={} did not validate, came from {}", - index, handle - ); - this.state - .locked - .write() - .chunks - .mark_piece_broken(chunk_info.piece_index); - } - }; - Ok::<_, anyhow::Error>(()) - }, - ); + false => { + warn!( + "checksum for piece={} did not validate, came from {}", + index, handle + ); + self.state + .locked + .write() + .chunks + .mark_piece_broken(chunk_info.piece_index); + } + }; + Ok::<_, anyhow::Error>(()) + }) + .with_context(|| format!("error processing received chunk {:?}", chunk_info))?; Ok(()) } } diff --git a/crates/librqbit/src/spawn_utils.rs b/crates/librqbit/src/spawn_utils.rs index 0435f37..35a717e 100644 --- a/crates/librqbit/src/spawn_utils.rs +++ b/crates/librqbit/src/spawn_utils.rs @@ -18,20 +18,3 @@ pub fn spawn( } }); } - -pub fn spawn_blocking( - name: N, - f: impl FnOnce() -> anyhow::Result + Send + 'static, -) -> tokio::task::JoinHandle> { - debug!("starting blocking task \"{}\"", name); - tokio::task::spawn_blocking(move || match f() { - Ok(v) => { - debug!("blocking task \"{}\" finished", name); - Ok(v) - } - Err(e) => { - error!("error in blocking task \"{}\": {:#}", name, &e); - Err(e) - } - }) -} diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 36bcaf8..5648aeb 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -21,7 +21,6 @@ use crate::{ chunk_tracker::ChunkTracker, file_ops::FileOps, lengths::Lengths, - peer_binary_protocol::MessageOwned, peer_connection::{PeerConnection, WriterRequest}, spawn_utils::spawn, torrent_metainfo::TorrentMetaV1Owned, diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index c8d338b..faa25f0 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -17,7 +17,7 @@ use crate::{ chunk_tracker::ChunkTracker, file_ops::FileOps, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, - peer_binary_protocol::{Handshake, Message, MessageOwned}, + peer_binary_protocol::{Handshake, Message}, peer_connection::WriterRequest, peer_state::{LivePeerState, PeerState}, torrent_metainfo::TorrentMetaV1Owned,