diff --git a/crates/librqbit/src/peer_comms.rs b/crates/librqbit/src/peer_comms.rs index 7ab719b..9d29fa7 100644 --- a/crates/librqbit/src/peer_comms.rs +++ b/crates/librqbit/src/peer_comms.rs @@ -240,7 +240,9 @@ where (&mut out[PREAMBLE_LEN..PREAMBLE_LEN + block_len]).copy_from_slice(b.as_ref()); msg_len } - Message::Choke | Message::Unchoke | Message::Interested => PREAMBLE_LEN, + Message::Choke | Message::Unchoke | Message::Interested | Message::NotInterested => { + PREAMBLE_LEN + } Message::Piece(p) => { // below code is wrong, need to serialize len_prefix let block_len = p.block.as_ref().len(); @@ -261,7 +263,6 @@ where byteorder::BE::write_u32(&mut out[PREAMBLE_LEN..], *v); msg_len } - Message::NotInterested => todo!(), } } pub fn deserialize<'a>( diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 417e7ad..872b8ce 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -747,25 +747,36 @@ impl TorrentManager { } } - fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) { + async fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) -> anyhow::Result<()> { if bitfield.len() != self.inner.lengths.piece_bitfield_bytes() as usize { - warn!( + anyhow::bail!( "dropping {} as its bitfield has unexpected size. Got {}, expected {}", handle, bitfield.len(), self.inner.lengths.piece_bitfield_bytes(), ); - self.inner.locked.write().peers.drop_peer(handle); - return; } self.inner .locked .write() .peers .update_bitfield_from_vec(handle, bitfield.0); + if !self.am_i_interested_in_peer(handle) { - // self.inner.locked.write().peers.drop_peer(handle); - return; + let tx = self + .inner + .locked + .read() + .peers + .clone_tx(handle) + .ok_or_else(|| anyhow::anyhow!("peer closed"))?; + tx.send(MessageOwned::Unchoke) + .await + .context("peer dropped")?; + tx.send(MessageOwned::NotInterested) + .await + .context("peer dropped")?; + return Ok(()); } // Additional spawn per peer, not good. @@ -773,6 +784,7 @@ impl TorrentManager { format!("peer_chunk_requester({})", handle), self.clone().task_peer_chunk_requester(handle), ); + Ok(()) } async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> { @@ -1073,7 +1085,13 @@ impl TorrentManager { let mut buf = data.block.as_ref(); let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info); - for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() { + for (file_idx, (name, file_len)) in self + .inner + .torrent + .info + .iter_filenames_and_lengths() + .enumerate() + { if absolute_offset > file_len { absolute_offset -= file_len; continue; @@ -1084,17 +1102,26 @@ impl TorrentManager { let mut file_g = self.inner.files[file_idx].lock(); debug!( - "piece={}, chunk={}, handle={}, begin={}, file={}, writing {} bytes at {}", + "piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}", chunk_info.piece_index, - chunk_info.chunk_index, + chunk_info, who_sent, chunk_info.offset, file_idx, to_write, absolute_offset ); - file_g.seek(SeekFrom::Start(absolute_offset))?; - file_g.write_all(&buf[..to_write])?; + file_g + .seek(SeekFrom::Start(absolute_offset)) + .with_context(|| { + format!( + "error seeking to {} in file {} (\"{:?}\")", + absolute_offset, file_idx, name + ) + })?; + file_g + .write_all(&buf[..to_write]) + .with_context(|| format!("error writing to file {} (\"{:?}\")", file_idx, name))?; buf = &buf[to_write..]; if buf.is_empty() { break; @@ -1139,7 +1166,7 @@ impl TorrentManager { ); } - match g.chunks.mark_chunk_downloaded(&piece) { + let should_checksum = match g.chunks.mark_chunk_downloaded(&piece) { Some(ChunkMarkingResult::Completed) => { debug!( "piece={} done by {}, will write and checksum", @@ -1147,6 +1174,7 @@ impl TorrentManager { ); // This will prevent others from stealing it. g.peers.inflight_pieces.remove(&chunk_info.piece_index); + true } Some(ChunkMarkingResult::PreviouslyCompleted) => { // TODO: we might need to send cancellations here. @@ -1156,7 +1184,7 @@ impl TorrentManager { ); return Ok(()); } - Some(ChunkMarkingResult::NotCompleted) => return Ok(()), + Some(ChunkMarkingResult::NotCompleted) => false, None => { anyhow::bail!( "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", @@ -1180,6 +1208,10 @@ impl TorrentManager { // should we really do? If we unmark it, it will get requested forever... this.write_chunk_blocking(handle, &piece, &chunk_info)?; + if !should_checksum { + return Ok(()); + } + let clone = this.clone(); match clone .check_piece_blocking(handle, chunk_info.piece_index, &chunk_info) @@ -1454,7 +1486,7 @@ impl TorrentManager { format!("error handling download request from {}", handle) })?; } - Message::Bitfield(b) => self.on_bitfield(handle, b), + Message::Bitfield(b) => self.on_bitfield(handle, b).await?, Message::Choke => self.on_i_am_choked(handle), Message::Unchoke => self.on_i_am_unchoked(handle), Message::Interested => {