Merge pull request #106 from ikatson/resilience-to-bogus-peers

Fix resilience to bogus peers
This commit is contained in:
Igor Katson 2024-03-29 20:46:37 +00:00 committed by GitHub
commit bb0f3c36ec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 38 additions and 8 deletions

View file

@ -283,7 +283,7 @@ impl<'a> FileOps<'a> {
piece_remaining_bytes -= to_read_in_file; piece_remaining_bytes -= to_read_in_file;
if piece_remaining_bytes == 0 { if piece_remaining_bytes == 0 {
return Ok(true); break;
} }
absolute_offset = 0; absolute_offset = 0;

View file

@ -272,8 +272,12 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
.and_then(|e| e.ut_metadata()) .and_then(|e| e.ut_metadata())
})?, })?,
WriterRequest::ReadChunkRequest(chunk) => { WriterRequest::ReadChunkRequest(chunk) => {
#[allow(unused_mut)]
let mut skip_reading_for_e2e_tests = false;
#[cfg(test)] #[cfg(test)]
{ {
use tracing::warn;
// This is poor-mans fault injection for running e2e tests. // This is poor-mans fault injection for running e2e tests.
use crate::tests::test_util::TestPeerMetadata; use crate::tests::test_util::TestPeerMetadata;
let tpm = TestPeerMetadata::from_peer_id(self.peer_id); let tpm = TestPeerMetadata::from_peer_id(self.peer_id);
@ -286,6 +290,12 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
* (tpm.max_random_sleep_ms as f64)) * (tpm.max_random_sleep_ms as f64))
as u64; as u64;
tokio::time::sleep(Duration::from_millis(sleep_ms)).await; tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
if rand::thread_rng().gen_bool(tpm.bad_data_probability()) {
warn!("will NOT actually read the data to simulate a malicious peer that sends garbage");
write_buf.fill(0);
skip_reading_for_e2e_tests = true;
}
} }
// this whole section is an optimization // this whole section is an optimization
@ -293,12 +303,14 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let preamble_len = serialize_piece_preamble(chunk, &mut write_buf); let preamble_len = serialize_piece_preamble(chunk, &mut write_buf);
let full_len = preamble_len + chunk.size as usize; let full_len = preamble_len + chunk.size as usize;
write_buf.resize(full_len, 0); write_buf.resize(full_len, 0);
self.spawner if !skip_reading_for_e2e_tests {
.spawn_block_in_place(|| { self.spawner
self.handler .spawn_block_in_place(|| {
.read_chunk(chunk, &mut write_buf[preamble_len..]) self.handler
}) .read_chunk(chunk, &mut write_buf[preamble_len..])
.with_context(|| format!("error reading chunk {chunk:?}"))?; })
.with_context(|| format!("error reading chunk {chunk:?}"))?;
}
uploaded_add = Some(chunk.size); uploaded_add = Some(chunk.size);
full_len full_len

View file

@ -105,6 +105,7 @@ async fn test_e2e() {
} }
Ok(true) Ok(true)
} }
crate::ManagedTorrentState::Error(e) => bail!("error: {e:?}"),
_ => bail!("broken state"), _ => bail!("broken state"),
}) })
.unwrap(); .unwrap();

View file

@ -63,4 +63,11 @@ impl TestPeerMetadata {
} }
0f64 0f64
} }
pub fn bad_data_probability(&self) -> f64 {
if self.server_id % 2 == 0 {
return 0.05f64;
}
0f64
}
} }

View file

@ -1396,11 +1396,15 @@ impl PeerHandler {
self.state.maybe_transmit_haves(chunk_info.piece_index); self.state.maybe_transmit_haves(chunk_info.piece_index);
} }
false => { false => {
warn!("checksum for piece={} did not validate", index,); warn!(
"checksum for piece={} did not validate. disconecting peer.",
index
);
self.state self.state
.lock_write("mark_piece_broken") .lock_write("mark_piece_broken")
.get_chunks_mut()? .get_chunks_mut()?
.mark_piece_broken_if_not_have(chunk_info.piece_index); .mark_piece_broken_if_not_have(chunk_info.piece_index);
anyhow::bail!("i am probably a bogus peer. dying.")
} }
}; };
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())

View file

@ -25,9 +25,15 @@ pub struct PieceInfo {
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ChunkInfo { pub struct ChunkInfo {
pub piece_index: ValidPieceIndex, pub piece_index: ValidPieceIndex,
// Index of chunk within the piece.
pub chunk_index: u32, pub chunk_index: u32,
// Absolute chunk index if the first chunk of the first piece was 0.
pub absolute_index: u32, pub absolute_index: u32,
pub size: u32, pub size: u32,
// Offset of chunk in bytes within the piece.
pub offset: u32, pub offset: u32,
} }