Introduce more bugs :)

This commit is contained in:
Igor Katson 2021-06-27 10:25:36 +01:00
parent 5c092389f9
commit 3ab61a6108

View file

@ -115,7 +115,7 @@ struct LivePeerState {
struct PeerStates {
states: HashMap<PeerHandle, PeerState>,
seen_peers: HashSet<SocketAddr>,
requested_pieces: HashSet<ValidPieceIndex>,
inflight_pieces: HashSet<ValidPieceIndex>,
tx: HashMap<PeerHandle, Arc<tokio::sync::mpsc::Sender<MessageOwned>>>,
}
@ -800,7 +800,7 @@ impl TorrentManager {
.locked
.read()
.peers
.requested_pieces
.inflight_pieces
.iter()
.choose(&mut rng)
.copied()
@ -926,7 +926,7 @@ impl TorrentManager {
.get_live_mut(peer_handle)?
.requested_pieces
.insert(n);
g.peers.requested_pieces.insert(n);
g.peers.inflight_pieces.insert(n);
g.chunks.reserve_needed_piece(n);
Some(n)
}
@ -1075,21 +1075,27 @@ impl TorrentManager {
Ok(())
}
fn on_received_piece(&self, handle: PeerHandle, piece: Piece<ByteString>) -> Option<()> {
fn on_received_piece(
&self,
handle: PeerHandle,
piece: Piece<ByteString>,
) -> anyhow::Result<()> {
let chunk_info = match self.inner.lengths.chunk_info_from_received_piece(&piece) {
Some(i) => i,
None => {
warn!(
"peer {} sent us a piece that is invalid {:?}, dropping",
handle, &piece,
anyhow::bail!(
"peer {} sent us a piece that is invalid {:?}",
handle,
&piece,
);
self.drop_peer(handle);
return None;
}
};
let mut g = self.inner.locked.write();
let h = g.peers.get_live_mut(handle)?;
let h = match g.peers.get_live_mut(handle) {
Some(l) => l,
None => anyhow::bail!("peer is not live anymore"),
};
h.outstanding_requests.add_permits(1);
self.inner
@ -1097,18 +1103,38 @@ impl TorrentManager {
.fetch_add(piece.block.len() as u64, Ordering::Relaxed);
if !h.requested_pieces.contains(&chunk_info.piece_index) {
// TODO: this is wrong, we need to distinguish between these cases.
warn!(
"peer {} sent us a piece that we did not ask for, dropping it. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece,
anyhow::bail!(
"peer {} sent us a piece that we did not ask it for. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece,
);
// this prevents a deadlock.
drop(g);
self.drop_peer(handle);
return None;
}
match g.chunks.mark_chunk_downloaded(&piece) {
Some(ChunkMarkingResult::Completed) => {
debug!(
"piece={} done by {}, will write and checksum",
piece.index, handle
);
// This will prevent others from stealing it.
g.peers.inflight_pieces.remove(&chunk_info.piece_index);
}
Some(ChunkMarkingResult::PreviouslyCompleted) => {
// TODO: we might need to send cancellations here.
debug!(
"piece={} was done by someone else {}, ignoring",
piece.index, handle
);
return Ok(());
}
Some(ChunkMarkingResult::NotCompleted) => return Ok(()),
None => {
anyhow::bail!(
"bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer",
handle,
piece
);
}
};
let this = self.clone();
spawn_blocking(
@ -1118,45 +1144,11 @@ impl TorrentManager {
),
move || {
let index = piece.index;
// TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what
// should we really do? If we unmark it, it will get requested forever...
this.write_chunk_blocking(handle, &piece, &chunk_info)?;
match this
.inner
.locked
.write()
.chunks
.mark_chunk_downloaded(&piece)
{
Some(ChunkMarkingResult::Completed) => {
debug!("piece={} done by {}, will checksum", piece.index, handle);
}
Some(ChunkMarkingResult::PreviouslyCompleted) => {
debug!(
"piece={} was done by someone else {}, ignoring",
piece.index, handle
);
return Ok(());
}
Some(ChunkMarkingResult::NotCompleted) => return Ok(()),
None => {
warn!(
"bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer",
handle, piece
);
this.drop_peer(handle);
return Ok(());
}
};
// Ignore responses about this piece from now on.
{
let mut g = this.inner.locked.write();
g.peers
.get_live_mut(handle)
.map(|l| l.requested_pieces.remove(&chunk_info.piece_index));
g.peers.requested_pieces.remove(&chunk_info.piece_index);
}
let clone = this.clone();
match clone
.check_piece_blocking(handle, chunk_info.piece_index, &chunk_info)
@ -1194,13 +1186,12 @@ impl TorrentManager {
.write()
.chunks
.mark_piece_needed(chunk_info.piece_index);
// this.drop_peer(handle);
}
};
Ok::<_, anyhow::Error>(())
},
);
Some(())
Ok(())
}
fn into_handle(self) -> TorrentManagerHandle {
TorrentManagerHandle { manager: self }
@ -1442,7 +1433,8 @@ impl TorrentManager {
)
}
Message::Piece(piece) => {
self.on_received_piece(handle, piece);
self.on_received_piece(handle, piece)
.context("error in on_received_piece()")?;
}
Message::KeepAlive => {
debug!("keepalive received from {}", handle);