diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index 8f3ca95..4d051d9 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -283,7 +283,7 @@ impl<'a> FileOps<'a> { piece_remaining_bytes -= to_read_in_file; if piece_remaining_bytes == 0 { - return Ok(true); + break; } absolute_offset = 0; diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 7994933..e5d7096 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -272,8 +272,12 @@ impl PeerConnection { .and_then(|e| e.ut_metadata()) })?, WriterRequest::ReadChunkRequest(chunk) => { + #[allow(unused_mut)] + let mut skip_reading_for_e2e_tests = false; + #[cfg(test)] { + use tracing::warn; // This is poor-mans fault injection for running e2e tests. use crate::tests::test_util::TestPeerMetadata; let tpm = TestPeerMetadata::from_peer_id(self.peer_id); @@ -286,6 +290,12 @@ impl PeerConnection { * (tpm.max_random_sleep_ms as f64)) as u64; tokio::time::sleep(Duration::from_millis(sleep_ms)).await; + + if rand::thread_rng().gen_bool(tpm.bad_data_probability()) { + warn!("will NOT actually read the data to simulate a malicious peer that sends garbage"); + write_buf.fill(0); + skip_reading_for_e2e_tests = true; + } } // this whole section is an optimization @@ -293,12 +303,14 @@ impl PeerConnection { let preamble_len = serialize_piece_preamble(chunk, &mut write_buf); let full_len = preamble_len + chunk.size as usize; write_buf.resize(full_len, 0); - self.spawner - .spawn_block_in_place(|| { - self.handler - .read_chunk(chunk, &mut write_buf[preamble_len..]) - }) - .with_context(|| format!("error reading chunk {chunk:?}"))?; + if !skip_reading_for_e2e_tests { + self.spawner + .spawn_block_in_place(|| { + self.handler + .read_chunk(chunk, &mut write_buf[preamble_len..]) + }) + .with_context(|| format!("error reading chunk {chunk:?}"))?; + } uploaded_add = Some(chunk.size); full_len diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index f7bfd59..3920709 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -105,6 +105,7 @@ async fn test_e2e() { } Ok(true) } + crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"), _ => bail!("broken state"), }) .unwrap(); diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index aac8af0..dcf4555 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -63,4 +63,11 @@ impl TestPeerMetadata { } 0f64 } + + pub fn bad_data_probability(&self) -> f64 { + if self.server_id % 2 == 0 { + return 0.05f64; + } + 0f64 + } } diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index cfe550b..46edfc4 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1396,11 +1396,15 @@ impl PeerHandler { self.state.maybe_transmit_haves(chunk_info.piece_index); } false => { - warn!("checksum for piece={} did not validate", index,); + warn!( + "checksum for piece={} did not validate. disconecting peer.", + index + ); self.state .lock_write("mark_piece_broken") .get_chunks_mut()? .mark_piece_broken_if_not_have(chunk_info.piece_index); + anyhow::bail!("i am probably a bogus peer. dying.") } }; Ok::<_, anyhow::Error>(()) diff --git a/crates/librqbit_core/src/lengths.rs b/crates/librqbit_core/src/lengths.rs index 63a083c..7b7fac0 100644 --- a/crates/librqbit_core/src/lengths.rs +++ b/crates/librqbit_core/src/lengths.rs @@ -25,9 +25,15 @@ pub struct PieceInfo { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct ChunkInfo { pub piece_index: ValidPieceIndex, + + // Index of chunk within the piece. pub chunk_index: u32, + + // Absolute chunk index if the first chunk of the first piece was 0. pub absolute_index: u32, pub size: u32, + + // Offset of chunk in bytes within the piece. pub offset: u32, }