diff --git a/crates/librqbit/src/buffers.rs b/crates/librqbit/src/buffers.rs index 1ffa1a4..2027738 100644 --- a/crates/librqbit/src/buffers.rs +++ b/crates/librqbit/src/buffers.rs @@ -110,3 +110,9 @@ impl<'a> From<&'a [u8]> for ByteString { Self(b.into()) } } + +impl From> for ByteString { + fn from(b: Vec) -> Self { + Self(b) + } +} diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 24e47a7..d13fc47 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -55,7 +55,7 @@ impl ChunkTracker { // return true if the whole piece is marked downloaded pub fn mark_chunk_downloaded(&mut self, piece: &Piece) -> Option { - let chunk_info = self.lengths.chunk_info_from_received_piece_data(piece)?; + let chunk_info = self.lengths.chunk_info_from_received_piece(piece)?; self.chunk_status .set(chunk_info.absolute_index as usize, true); let chunk_range = self.lengths.chunk_range(chunk_info.piece_index); diff --git a/crates/librqbit/src/lengths.rs b/crates/librqbit/src/lengths.rs index c0e9f69..4b0feb8 100644 --- a/crates/librqbit/src/lengths.rs +++ b/crates/librqbit/src/lengths.rs @@ -141,18 +141,19 @@ impl Lengths { }) } - pub fn chunk_info_from_received_piece_data( + pub fn chunk_info_from_received_data( &self, - piece: &Piece, + piece_index: ValidPieceIndex, + begin: u32, + chunk_size: u32, ) -> Option { - let piece_index = self.validate_piece_index(piece.index)?; - let index = piece.begin / self.chunk_length; - let chunk_size = self.chunk_size(piece_index, index)?; + let index = begin / self.chunk_length; + let expected_chunk_size = self.chunk_size(piece_index, index)?; let offset = self.chunk_offset_in_piece(piece_index, index)?; - if offset != piece.begin { + if offset != begin { return None; } - if chunk_size as usize != piece.block.len() { + if expected_chunk_size != chunk_size { return None; } let absolute_index = self.chunks_per_piece * piece_index.get() + index; @@ -164,6 +165,14 @@ impl Lengths { absolute_index, }) } + + pub fn chunk_info_from_received_piece(&self, piece: &Piece) -> Option { + self.chunk_info_from_received_data( + self.validate_piece_index(piece.index)?, + piece.begin, + piece.block.len() as u32, + ) + } pub const fn chunk_range(&self, index: ValidPieceIndex) -> std::ops::Range { let start = index.0 * self.chunks_per_piece; let end = start + self.chunks_per_piece(index); diff --git a/crates/librqbit/src/peer_comms.rs b/crates/librqbit/src/peer_comms.rs index bde0409..51ce816 100644 --- a/crates/librqbit/src/peer_comms.rs +++ b/crates/librqbit/src/peer_comms.rs @@ -57,6 +57,17 @@ impl Piece where ByteBuf: AsRef<[u8]>, { + pub fn from_vec(index: u32, begin: u32, block: Vec) -> Piece + where + ByteBuf: From>, + { + Piece { + index, + begin, + block: ByteBuf::from(block), + } + } + pub fn serialize(&self, buf: &mut [u8]) -> usize { byteorder::BigEndian::write_u32(&mut buf[0..4], self.index); byteorder::BigEndian::write_u32(&mut buf[4..8], self.begin); diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 517ef27..1490813 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -224,32 +224,35 @@ fn spawn( name: N, fut: impl std::future::Future> + Send + 'static, ) { - debug!("starting task \"{}\"", name); + debug!("starting task \"{}\"", &name); tokio::spawn(async move { match fut.await { Ok(_) => { - debug!("task \"{}\" finished", name); + debug!("task \"{}\" finished", &name); } Err(e) => { - error!("error in task \"{}\": {:#}", name, e) + error!("error in task \"{}\": {:#}", &name, e) } } }); } -fn spawn_blocking( +async fn spawn_blocking( name: N, - f: impl FnOnce() -> anyhow::Result<()> + Send + 'static, -) { + f: impl FnOnce() -> anyhow::Result + Send + 'static, +) -> anyhow::Result { debug!("starting blocking task \"{}\"", name); tokio::task::spawn_blocking(move || match f() { - Ok(_) => { + Ok(v) => { debug!("blocking task \"{}\" finished", name); + Ok(v) } Err(e) => { - error!("error in blocking task \"{}\": {:#}", name, e) + error!("error in blocking task \"{}\": {:#}", name, &e); + Err(e) } - }); + }) + .await? } fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result { @@ -406,12 +409,7 @@ impl TorrentManager { }; match message { - Message::Request(request) => { - warn!( - "{}: received {:?} , but download requests not implemented", - peer_handle, request - ) - } + Message::Request(request) => self.on_download_request(peer_handle, request), Message::Bitfield(b) => self.on_bitfield(peer_handle, b), Message::Choke => self.on_i_am_choked(peer_handle), Message::Unchoke => self.on_i_am_unchoked(peer_handle), @@ -434,6 +432,118 @@ impl TorrentManager { } } } + fn on_download_request(&self, peer_handle: PeerHandle, request: Request) { + let piece_index = match self.inner.lengths.validate_piece_index(request.index) { + Some(p) => p, + None => { + warn!( + "{}: received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.", + peer_handle, request + ); + return; + } + }; + let chunk_info = match self.inner.lengths.chunk_info_from_received_data( + piece_index, + request.begin, + request.length, + ) { + Some(d) => d, + None => { + warn!( + "{}: received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.", + peer_handle, request + ); + return; + } + }; + let this = self.clone(); + + let task_name = format!( + "download_request(peer={}, chunk_info={:?})", + peer_handle, &chunk_info + ); + spawn(task_name, async move { + let clone = this.clone(); + let chunk = spawn_blocking( + format!( + "read_chunk_blocking(peer={}, chunk_info={:?}", + peer_handle, &chunk_info + ), + move || clone.read_chunk_blocking(peer_handle, chunk_info), + ) + .await?; + let tx = this + .inner + .locked + .read() + .peers + .clone_tx(peer_handle) + .ok_or_else(|| { + anyhow::anyhow!( + "peer {} died, dropping chunk that it requested", + peer_handle + ) + })?; + let message = Message::Piece(Piece::from_vec( + chunk_info.piece_index.get(), + chunk_info.offset, + chunk, + )); + Ok::<_, anyhow::Error>(tx.send(message).await?) + }); + } + fn read_chunk_blocking( + self, + who_sent: PeerHandle, + chunk_info: ChunkInfo, + ) -> anyhow::Result> { + let mut h = sha1::Sha1::new(); + let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info); + let mut result_buf = vec![0u8; chunk_info.size as usize]; + let mut buf = &mut result_buf[..]; + + for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() { + if absolute_offset > file_len { + absolute_offset -= file_len; + continue; + } + let file_remaining_len = file_len - absolute_offset; + let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize; + + let mut file_g = self.inner.files[file_idx].lock(); + debug!( + "piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}", + chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info + ); + file_g + .seek(std::io::SeekFrom::Start(absolute_offset)) + .with_context(|| { + format!( + "error seeking to {}, file id: {}", + absolute_offset, file_idx + ) + })?; + file_g + .read_exact(&mut buf[..to_read_in_file]) + .with_context(|| { + format!( + "error reading {} bytes, file_id: {}", + file_idx, to_read_in_file + ) + })?; + + buf = &mut buf[to_read_in_file..]; + + if buf.is_empty() { + break; + } + + absolute_offset = 0; + } + + return Ok(result_buf); + } fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { self.get_next_needed_piece(handle).is_some() } @@ -756,11 +866,7 @@ impl TorrentManager { } fn on_received_piece(&self, handle: PeerHandle, piece: Piece) -> Option<()> { - let chunk_info = match self - .inner - .lengths - .chunk_info_from_received_piece_data(&piece) - { + let chunk_info = match self.inner.lengths.chunk_info_from_received_piece(&piece) { Some(i) => i, None => { warn!(