From fab43a8d230ec3582b42f636dea68767ce0299de Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 28 Jun 2021 16:09:20 +0100 Subject: [PATCH] Uploading chunks now reads straight into write buffer --- crates/librqbit/src/file_ops.rs | 395 ++++++++++++++++++++ crates/librqbit/src/peer_binary_protocol.rs | 27 +- crates/librqbit/src/peer_connection.rs | 79 ++-- crates/librqbit/src/torrent_manager.rs | 4 +- crates/librqbit/src/torrent_state.rs | 15 +- 5 files changed, 470 insertions(+), 50 deletions(-) create mode 100644 crates/librqbit/src/file_ops.rs diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs new file mode 100644 index 0000000..ca22fd3 --- /dev/null +++ b/crates/librqbit/src/file_ops.rs @@ -0,0 +1,395 @@ +use std::{ + fs::File, + io::{Read, Seek, SeekFrom, Write}, + sync::Arc, +}; + +use anyhow::Context; +use log::{debug, trace, warn}; +use parking_lot::Mutex; + +use crate::{ + buffers::ByteString, + lengths::{ChunkInfo, Lengths, ValidPieceIndex}, + peer_binary_protocol::Piece, + torrent_metainfo::{FileIteratorName, TorrentMetaV1Owned}, + type_aliases::{PeerHandle, BF}, +}; + +pub struct InitialCheckResults { + pub needed_pieces: BF, + pub have_pieces: BF, + pub have_bytes: u64, + pub needed_bytes: u64, +} + +pub fn update_hash_from_file( + file: &mut File, + hash: &mut sha1::Sha1, + buf: &mut [u8], + mut bytes_to_read: usize, +) -> anyhow::Result<()> { + let mut read = 0; + while bytes_to_read > 0 { + let chunk = std::cmp::min(buf.len(), bytes_to_read); + file.read_exact(&mut buf[..chunk]).with_context(|| { + format!( + "failed reading chunk of size {}, read so far {}", + chunk, read + ) + })?; + bytes_to_read -= chunk; + read += chunk; + hash.update(&buf[..chunk]); + } + Ok(()) +} + +pub struct FileOps<'a> { + torrent: &'a TorrentMetaV1Owned, + files: &'a [Arc>], + lengths: &'a Lengths, +} + +impl<'a> FileOps<'a> { + pub fn new( + torrent: &'a TorrentMetaV1Owned, + files: &'a [Arc>], + lengths: &'a Lengths, + ) -> Self { + Self { + torrent, + files, + lengths, + } + } + + pub fn initial_check( + &self, + only_files: Option<&[usize]>, + ) -> anyhow::Result { + let mut needed_pieces = BF::from_vec(vec![0u8; self.lengths.piece_bitfield_bytes()]); + let mut have_pieces = BF::from_vec(vec![0u8; self.lengths.piece_bitfield_bytes()]); + + let mut have_bytes = 0u64; + let mut needed_bytes = 0u64; + + struct CurrentFile<'a> { + index: usize, + fd: &'a Arc>, + len: u64, + name: FileIteratorName<'a, ByteString>, + full_file_required: bool, + processed_bytes: u64, + is_broken: bool, + } + impl<'a> CurrentFile<'a> { + fn remaining(&self) -> u64 { + self.len - self.processed_bytes + } + fn mark_processed_bytes(&mut self, bytes: u64) { + self.processed_bytes += bytes as u64 + } + } + let mut file_iterator = self + .files + .iter() + .zip(self.torrent.info.iter_filenames_and_lengths()) + .enumerate() + .map(|(idx, (fd, (name, len)))| { + let full_file_required = if let Some(only_files) = only_files { + only_files.contains(&idx) + } else { + true + }; + CurrentFile { + index: idx, + fd, + len, + name, + full_file_required, + processed_bytes: 0, + is_broken: false, + } + }); + + let mut current_file = file_iterator + .next() + .ok_or_else(|| anyhow::anyhow!("empty input file list"))?; + + let mut read_buffer = vec![0u8; 65536]; + + for piece_info in self.lengths.iter_piece_infos() { + let mut computed_hash = sha1::Sha1::new(); + let mut piece_remaining = piece_info.len as usize; + let mut some_files_broken = false; + let mut at_least_one_file_required = current_file.full_file_required; + + while piece_remaining > 0 { + let mut to_read_in_file = + std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize; + while to_read_in_file == 0 { + current_file = file_iterator + .next() + .ok_or_else(|| anyhow::anyhow!("broken torrent metadata"))?; + + at_least_one_file_required |= current_file.full_file_required; + + to_read_in_file = + std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize; + } + + let pos = current_file.processed_bytes; + piece_remaining -= to_read_in_file; + current_file.mark_processed_bytes(to_read_in_file as u64); + + if current_file.is_broken { + // no need to read. + continue; + } + + let mut fd = current_file.fd.lock(); + + fd.seek(SeekFrom::Start(pos)).unwrap(); + if let Err(err) = update_hash_from_file( + &mut fd, + &mut computed_hash, + &mut read_buffer, + to_read_in_file, + ) { + debug!( + "error reading from file {} ({:?}) at {}: {:#}", + current_file.index, current_file.name, pos, &err + ); + current_file.is_broken = true; + some_files_broken = true; + } + } + + if at_least_one_file_required && some_files_broken { + trace!( + "piece {} had errors, marking as needed", + piece_info.piece_index + ); + + needed_bytes += piece_info.len as u64; + needed_pieces.set(piece_info.piece_index.get() as usize, true); + continue; + } + + if self + .torrent + .info + .compare_hash(piece_info.piece_index.get(), &computed_hash) + .unwrap() + { + trace!( + "piece {} is fine, not marking as needed", + piece_info.piece_index + ); + have_bytes += piece_info.len as u64; + have_pieces.set(piece_info.piece_index.get() as usize, true); + } else { + if at_least_one_file_required { + trace!( + "piece {} hash does not match, marking as needed", + piece_info.piece_index + ); + needed_bytes += piece_info.len as u64; + needed_pieces.set(piece_info.piece_index.get() as usize, true); + } else { + trace!( + "piece {} hash does not match, but it is not required by any of the requested files, ignoring", + piece_info.piece_index + ); + } + } + } + + Ok(InitialCheckResults { + needed_pieces, + have_pieces, + have_bytes, + needed_bytes, + }) + } + + pub fn check_piece( + &self, + who_sent: PeerHandle, + piece_index: ValidPieceIndex, + last_received_chunk: &ChunkInfo, + ) -> anyhow::Result { + let mut h = sha1::Sha1::new(); + let piece_length = self.lengths.piece_length(piece_index); + let mut absolute_offset = self.lengths.piece_offset(piece_index); + let mut buf = vec![0u8; std::cmp::min(65536, piece_length as usize)]; + + let mut piece_remaining_bytes = piece_length as usize; + + for (file_idx, (name, file_len)) in + self.torrent.info.iter_filenames_and_lengths().enumerate() + { + if absolute_offset > file_len { + absolute_offset -= file_len; + continue; + } + let file_remaining_len = file_len - absolute_offset; + + let to_read_in_file = + std::cmp::min(file_remaining_len, piece_remaining_bytes as u64) as usize; + let mut file_g = self.files[file_idx].lock(); + debug!( + "piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}", + piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk + ); + file_g + .seek(SeekFrom::Start(absolute_offset)) + .with_context(|| { + format!( + "error seeking to {}, file id: {}", + absolute_offset, file_idx + ) + })?; + update_hash_from_file(&mut file_g, &mut h, &mut buf, to_read_in_file).with_context( + || { + format!( + "error reading {} bytes, file_id: {} (\"{:?}\")", + to_read_in_file, file_idx, name + ) + }, + )?; + + piece_remaining_bytes -= to_read_in_file; + + if piece_remaining_bytes == 0 { + return Ok(true); + } + + absolute_offset = 0; + } + + match self.torrent.info.compare_hash(piece_index.get(), &h) { + Some(true) => { + debug!("piece={} hash matches", piece_index); + Ok(true) + } + Some(false) => { + warn!("the piece={} hash does not match", piece_index); + Ok(false) + } + None => { + // this is probably a bug? + warn!("compare_hash() did not find the piece"); + anyhow::bail!("compare_hash() did not find the piece"); + } + } + } + + pub fn read_chunk( + &self, + who_sent: PeerHandle, + chunk_info: &ChunkInfo, + result_buf: &mut [u8], + ) -> anyhow::Result<()> { + if result_buf.len() < chunk_info.size as usize { + anyhow::bail!("read_chunk(): not enough capacity in the provided buffer") + } + let mut absolute_offset = self.lengths.chunk_absolute_offset(&chunk_info); + let mut buf = &mut result_buf[..]; + + for (file_idx, file_len) in self.torrent.info.iter_file_lengths().enumerate() { + if absolute_offset > file_len { + absolute_offset -= file_len; + continue; + } + let file_remaining_len = file_len - absolute_offset; + let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize; + + let mut file_g = self.files[file_idx].lock(); + debug!( + "piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}", + chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info + ); + file_g + .seek(SeekFrom::Start(absolute_offset)) + .with_context(|| { + format!( + "error seeking to {}, file id: {}", + absolute_offset, file_idx + ) + })?; + file_g + .read_exact(&mut buf[..to_read_in_file]) + .with_context(|| { + format!( + "error reading {} bytes, file_id: {}", + file_idx, to_read_in_file + ) + })?; + + buf = &mut buf[to_read_in_file..]; + + if buf.is_empty() { + break; + } + + absolute_offset = 0; + } + + return Ok(()); + } + + pub fn write_chunk( + &self, + who_sent: PeerHandle, + data: &Piece, + chunk_info: &ChunkInfo, + ) -> anyhow::Result<()> { + let mut buf = data.block.as_ref(); + let mut absolute_offset = self.lengths.chunk_absolute_offset(&chunk_info); + + for (file_idx, (name, file_len)) in + self.torrent.info.iter_filenames_and_lengths().enumerate() + { + if absolute_offset > file_len { + absolute_offset -= file_len; + continue; + } + + let remaining_len = file_len - absolute_offset; + let to_write = std::cmp::min(buf.len(), remaining_len as usize); + + let mut file_g = self.files[file_idx].lock(); + debug!( + "piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}", + chunk_info.piece_index, + chunk_info, + who_sent, + chunk_info.offset, + file_idx, + to_write, + absolute_offset + ); + file_g + .seek(SeekFrom::Start(absolute_offset)) + .with_context(|| { + format!( + "error seeking to {} in file {} (\"{:?}\")", + absolute_offset, file_idx, name + ) + })?; + file_g + .write_all(&buf[..to_write]) + .with_context(|| format!("error writing to file {} (\"{:?}\")", file_idx, name))?; + buf = &buf[to_write..]; + if buf.is_empty() { + break; + } + + absolute_offset = 0; + } + + Ok(()) + } +} diff --git a/crates/librqbit/src/peer_binary_protocol.rs b/crates/librqbit/src/peer_binary_protocol.rs index 872322f..a6755ff 100644 --- a/crates/librqbit/src/peer_binary_protocol.rs +++ b/crates/librqbit/src/peer_binary_protocol.rs @@ -1,10 +1,11 @@ use bincode::Options; -use byteorder::ByteOrder; +use byteorder::{ByteOrder, BE}; use serde::{Deserialize, Serialize}; use crate::{ buffers::{ByteBuf, ByteString}, clone_to_owned::CloneToOwned, + lengths::ChunkInfo, }; const PREAMBLE_LEN: usize = 5; @@ -18,6 +19,7 @@ const LEN_PREFIX_UNCHOKE: u32 = 1; const LEN_PREFIX_INTERESTED: u32 = 1; const LEN_PREFIX_NOT_INTERESTED: u32 = 1; const LEN_PREFIX_HAVE: u32 = 5; +const LEN_PREFIX_PIECE: u32 = 9; const LEN_PREFIX_REQUEST: u32 = 13; const MSGID_CHOKE: u8 = 0; @@ -46,6 +48,17 @@ pub enum MessageDeserializeError { }, } +pub fn serialize_piece_preamble(chunk: &ChunkInfo, mut buf: &mut [u8]) -> usize { + BE::write_u32(&mut buf[0..4], LEN_PREFIX_PIECE + chunk.size); + buf[4] = MSGID_PIECE; + + buf = &mut buf[PREAMBLE_LEN..]; + BE::write_u32(&mut buf[0..4], chunk.piece_index.get()); + BE::write_u32(&mut buf[4..8], chunk.offset); + + PREAMBLE_LEN + 8 +} + #[derive(Debug)] pub struct Piece { pub index: u32, @@ -209,7 +222,10 @@ where Message::Unchoke => (LEN_PREFIX_UNCHOKE, MSGID_UNCHOKE), Message::Interested => (LEN_PREFIX_INTERESTED, MSGID_INTERESTED), Message::NotInterested => (LEN_PREFIX_NOT_INTERESTED, MSGID_NOT_INTERESTED), - Message::Piece(p) => (9 + p.block.as_ref().len() as u32, MSGID_PIECE), + Message::Piece(p) => ( + LEN_PREFIX_PIECE + p.block.as_ref().len() as u32, + MSGID_PIECE, + ), Message::KeepAlive => (LEN_PREFIX_KEEPALIVE, 0), Message::Have(_) => (LEN_PREFIX_HAVE, MSGID_HAVE), } @@ -260,7 +276,7 @@ where Message::Have(v) => { let msg_len = PREAMBLE_LEN + 4; out.resize(msg_len, 0); - byteorder::BE::write_u32(&mut out[PREAMBLE_LEN..], *v); + BE::write_u32(&mut out[PREAMBLE_LEN..], *v); msg_len } } @@ -332,10 +348,7 @@ where MSGID_HAVE => { let expected_len = 4; match rest.get(..expected_len as usize) { - Some(h) => Ok(( - Message::Have(byteorder::BE::read_u32(&h)), - PREAMBLE_LEN + expected_len, - )), + Some(h) => Ok((Message::Have(BE::read_u32(&h)), PREAMBLE_LEN + expected_len)), None => { let missing = expected_len - rest.len(); Err(MessageDeserializeError::NotEnoughData(missing, "have")) diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 0430d57..1d869da 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -12,8 +12,10 @@ use crate::{ buffers::{ByteBuf, ByteString}, chunk_tracker::ChunkMarkingResult, clone_to_owned::CloneToOwned, + lengths::ChunkInfo, peer_binary_protocol::{ - Handshake, Message, MessageBorrowed, MessageDeserializeError, MessageOwned, Piece, Request, + serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError, + MessageOwned, Piece, Request, }, peer_id::try_decode_peer_id, spawn_utils::{spawn, spawn_blocking}, @@ -21,6 +23,12 @@ use crate::{ type_aliases::PeerHandle, }; +#[derive(Debug)] +pub enum WriterRequest { + Message(MessageOwned), + ReadChunkRequest(ChunkInfo), +} + #[derive(Clone)] pub struct PeerConnection { state: Arc, @@ -38,7 +46,7 @@ impl PeerConnection { addr: SocketAddr, handle: PeerHandle, // outgoing_chan_tx: tokio::sync::mpsc::Sender, - mut outgoing_chan: tokio::sync::mpsc::Receiver, + mut outgoing_chan: tokio::sync::mpsc::Receiver, ) -> anyhow::Result<()> { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -101,21 +109,40 @@ impl PeerConnection { } loop { - let msg = match timeout(keep_alive_interval, outgoing_chan.recv()).await { + let req = match timeout(keep_alive_interval, outgoing_chan.recv()).await { Ok(Some(msg)) => msg, Ok(None) => { anyhow::bail!("closing writer, channel closed") } - Err(_) => MessageOwned::KeepAlive, + Err(_) => WriterRequest::Message(MessageOwned::KeepAlive), }; - let uploaded_add = match &msg { - Message::Piece(p) => Some(p.block.len()), + let uploaded_add = match &req { + WriterRequest::Message(Message::Piece(p)) => Some(p.block.len()), _ => None, }; - let len = msg.serialize(&mut buf); - debug!("sending to {}: {:?}, length={}", handle, &msg, len); + let len = match &req { + WriterRequest::Message(msg) => msg.serialize(&mut buf), + WriterRequest::ReadChunkRequest(chunk) => { + // this whole section is an optimization + + let preamble_len = serialize_piece_preamble(&chunk, &mut buf); + let full_len = preamble_len + chunk.size as usize; + buf.resize(full_len, 0); + tokio::task::block_in_place(|| { + this.state.file_ops().read_chunk( + handle, + &chunk, + &mut buf[preamble_len..], + ) + }) + .with_context(|| format!("error reading chunk {:?}", chunk))?; + full_len + } + }; + + debug!("sending to {}: {:?}, length={}", handle, &req, len); write_half .write_all(&buf[..len]) @@ -243,22 +270,6 @@ impl PeerConnection { } }; - let state = self.state.clone(); - let chunk = spawn_blocking( - format!( - "read_chunk_blocking(peer={}, chunk_info={:?}", - peer_handle, &chunk_info - ), - move || { - let mut buf = Vec::new(); - state - .file_ops() - .read_chunk(peer_handle, chunk_info, &mut buf)?; - Ok(buf) - }, - ) - .await??; - let tx = self .state .locked @@ -275,13 +286,9 @@ impl PeerConnection { // TODO: this is not super efficient as it does copying multiple times. // Theoretically, this could be done in the sending code, so that it reads straight into // the send buffer. - let message = Message::Piece(Piece::from_data( - chunk_info.piece_index.get(), - chunk_info.offset, - chunk, - )); - info!("sending to {}: {:?}", peer_handle, &message); - Ok::<_, anyhow::Error>(tx.send(message).await?) + let request = WriterRequest::ReadChunkRequest(chunk_info); + info!("sending to {}: {:?}", peer_handle, &request); + Ok::<_, anyhow::Error>(tx.send(request).await?) } fn on_have(&self, handle: PeerHandle, have: u32) { @@ -321,10 +328,10 @@ impl PeerConnection { .peers .clone_tx(handle) .ok_or_else(|| anyhow::anyhow!("peer closed"))?; - tx.send(MessageOwned::Unchoke) + tx.send(WriterRequest::Message(MessageOwned::Unchoke)) .await .context("peer dropped")?; - tx.send(MessageOwned::NotInterested) + tx.send(WriterRequest::Message(MessageOwned::NotInterested)) .await .context("peer dropped")?; return Ok(()); @@ -343,10 +350,10 @@ impl PeerConnection { Some(tx) => tx, None => return Ok(()), }; - tx.send(MessageOwned::Unchoke) + tx.send(WriterRequest::Message(MessageOwned::Unchoke)) .await .context("peer dropped")?; - tx.send(MessageOwned::Interested) + tx.send(WriterRequest::Message(MessageOwned::Interested)) .await .context("peer dropped")?; @@ -445,7 +452,7 @@ impl PeerConnection { }; sem.acquire().await?.forget(); - tx.send(MessageOwned::Request(request)) + tx.send(WriterRequest::Message(MessageOwned::Request(request))) .await .context("peer dropped")?; } diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 1b8bf6f..36bcaf8 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -22,7 +22,7 @@ use crate::{ file_ops::FileOps, lengths::Lengths, peer_binary_protocol::MessageOwned, - peer_connection::PeerConnection, + peer_connection::{PeerConnection, WriterRequest}, spawn_utils::spawn, torrent_metainfo::TorrentMetaV1Owned, torrent_state::{AtomicStats, TorrentState, TorrentStateLocked}, @@ -304,7 +304,7 @@ impl TorrentManager { } fn add_peer(&self, addr: SocketAddr) { - let (out_tx, out_rx) = tokio::sync::mpsc::channel::(1); + let (out_tx, out_rx) = tokio::sync::mpsc::channel::(1); let handle = match self .inner .locked diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 8dd7f41..c8d338b 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -18,6 +18,7 @@ use crate::{ file_ops::FileOps, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, peer_binary_protocol::{Handshake, Message, MessageOwned}, + peer_connection::WriterRequest, peer_state::{LivePeerState, PeerState}, torrent_metainfo::TorrentMetaV1Owned, type_aliases::{PeerHandle, BF}, @@ -43,7 +44,7 @@ pub struct PeerStates { states: HashMap, seen_peers: HashSet, inflight_pieces: HashSet, - tx: HashMap>>, + tx: HashMap>>, } #[derive(Debug, Default)] @@ -67,7 +68,7 @@ impl PeerStates { pub fn add_if_not_seen( &mut self, addr: SocketAddr, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, ) -> Option { if self.seen_peers.contains(&addr) { return None; @@ -95,7 +96,7 @@ impl PeerStates { pub fn add( &mut self, addr: SocketAddr, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, ) -> Option { let handle = addr; if self.states.contains_key(&addr) { @@ -127,7 +128,7 @@ impl PeerStates { live.bitfield = Some(bitfield); Some(prev) } - pub fn clone_tx(&self, handle: PeerHandle) -> Option>> { + pub fn clone_tx(&self, handle: PeerHandle) -> Option>> { Some(self.tx.get(&handle)?.clone()) } pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> bool { @@ -277,7 +278,11 @@ impl TorrentState { { unordered.push(async move { if let Some(tx) = weak.upgrade() { - if tx.send(Message::Have(index)).await.is_err() { + if tx + .send(WriterRequest::Message(Message::Have(index))) + .await + .is_err() + { // whatever } }