Something broke computation of initial checks

This commit is contained in:
Igor Katson 2021-06-27 11:01:41 +01:00
parent 3ab61a6108
commit efaa96a9b5
2 changed files with 59 additions and 19 deletions

View file

@ -74,8 +74,28 @@ impl ChunkTracker {
pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) {
self.needed_pieces.set(index.get() as usize, false)
}
pub fn mark_piece_needed(&mut self, index: ValidPieceIndex) -> bool {
info!("remarking piece={} as needed", index);
// None if wrong chunk
// true if did something
// false if didn't do anything
pub fn mark_chunk_request_cancelled(
&mut self,
index: ValidPieceIndex,
chunk: u32,
) -> Option<bool> {
if *self.have.get(index.get() as usize)? {
return Some(false);
}
// This will trigger the requesters to re-check each chunk in this piece.
let chunk_range = self.lengths.chunk_range(index);
if !self.chunk_status.get(chunk_range)?.all() {
self.needed_pieces.set(index.get() as usize, true);
}
Some(true)
}
pub fn mark_piece_broken(&mut self, index: ValidPieceIndex) -> bool {
info!("remarking piece={} as broken", index);
self.needed_pieces.set(index.get() as usize, true);
self.chunk_status
.get_mut(self.lengths.chunk_range(index))

View file

@ -97,6 +97,21 @@ enum PeerState {
type BF = bitvec::vec::BitVec<bitvec::order::Msb0, u8>;
#[derive(Debug, Hash, PartialEq, Eq)]
struct InflightRequest {
piece: ValidPieceIndex,
chunk: u32,
}
impl From<&ChunkInfo> for InflightRequest {
fn from(c: &ChunkInfo) -> Self {
Self {
piece: c.piece_index,
chunk: c.chunk_index,
}
}
}
struct LivePeerState {
#[allow(unused)]
peer_id: [u8; 20],
@ -108,7 +123,7 @@ struct LivePeerState {
outstanding_requests: Arc<Semaphore>,
have_notify: Arc<Notify>,
bitfield: Option<BF>,
requested_pieces: HashSet<ValidPieceIndex>,
inflight_requests: HashSet<InflightRequest>,
}
#[derive(Default)]
@ -161,6 +176,10 @@ impl PeerStates {
}
None
}
fn try_get_live_mut(&mut self, handle: PeerHandle) -> anyhow::Result<&mut LivePeerState> {
self.get_live_mut(handle)
.ok_or_else(|| anyhow::anyhow!("peer dropped"))
}
fn add(
&mut self,
addr: SocketAddr,
@ -862,15 +881,24 @@ impl TorrentManager {
None => return Ok(()),
};
for chunk in self.inner.lengths.iter_chunk_infos(next) {
if is_stolen && self.inner.locked.read().chunks.is_chunk_downloaded(&chunk) {
if self.inner.locked.read().chunks.is_chunk_downloaded(&chunk) {
continue;
}
self.inner
.locked
.write()
.peers
.try_get_live_mut(handle)?
.inflight_requests
.insert(InflightRequest::from(&chunk));
let request = Request {
index: next.get(),
begin: chunk.offset,
length: chunk.size,
};
sem.acquire().await?.forget();
tx.send(MessageOwned::Request(request))
.await
.context("peer dropped")?;
@ -921,11 +949,6 @@ impl TorrentManager {
self.inner.lengths.validate_piece_index(n_opt? as u32)?
};
g.peers
.get_live_mut(peer_handle)?
.requested_pieces
.insert(n);
g.peers.inflight_pieces.insert(n);
g.chunks.reserve_needed_piece(n);
Some(n)
@ -1092,19 +1115,16 @@ impl TorrentManager {
};
let mut g = self.inner.locked.write();
let h = match g.peers.get_live_mut(handle) {
Some(l) => l,
None => anyhow::bail!("peer is not live anymore"),
};
let h = g.peers.try_get_live_mut(handle)?;
h.outstanding_requests.add_permits(1);
self.inner
.fetched_bytes
.fetch_add(piece.block.len() as u64, Ordering::Relaxed);
if !h.requested_pieces.contains(&chunk_info.piece_index) {
if !h.inflight_requests.remove(&(&chunk_info).into()) {
anyhow::bail!(
"peer {} sent us a piece that we did not ask it for. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece,
"peer {} sent us a piece that we did not ask it for. Requested pieces: {:?}. Got: {:?}", handle, &h.inflight_requests, &piece,
);
}
@ -1185,7 +1205,7 @@ impl TorrentManager {
.locked
.write()
.chunks
.mark_piece_needed(chunk_info.piece_index);
.mark_piece_broken(chunk_info.piece_index);
}
};
Ok::<_, anyhow::Error>(())
@ -1270,7 +1290,7 @@ impl TorrentManager {
bitfield: None,
have_notify: Arc::new(Notify::new()),
outstanding_requests: Arc::new(Semaphore::new(0)),
requested_pieces: Default::default(),
inflight_requests: Default::default(),
});
}
_ => {
@ -1467,8 +1487,8 @@ impl TorrentManager {
match peer {
PeerState::Connecting(_) => {}
PeerState::Live(l) => {
for piece in l.requested_pieces {
g.chunks.mark_piece_needed(piece);
for req in l.inflight_requests {
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
}
}
}