Preventively fixed other race conditions
This commit is contained in:
parent
34ee9d9bd9
commit
2695a8ec26
3 changed files with 73 additions and 53 deletions
|
|
@ -8,8 +8,9 @@ pub struct ChunkTracker {
|
|||
// This forms the basis of a "queue" to pull from.
|
||||
// It's set to 1 if we need a piece, but the moment we start requesting a peer,
|
||||
// it's set to 0.
|
||||
|
||||
// Better to rename into piece_queue or smth, and maybe use some other form of a queue.
|
||||
//
|
||||
// Initially this is the opposite of "have", until we start making requests.
|
||||
// An in-flight request is not in "needed", and not in "have".
|
||||
needed_pieces: BF,
|
||||
|
||||
// This has a bit set per each chunk (block) that we have written to the output file.
|
||||
|
|
@ -21,6 +22,7 @@ pub struct ChunkTracker {
|
|||
|
||||
lengths: Lengths,
|
||||
|
||||
// What pieces to download first.
|
||||
priority_piece_ids: Vec<usize>,
|
||||
}
|
||||
|
||||
|
|
@ -168,17 +170,6 @@ impl ChunkTracker {
|
|||
piece.index, chunk_info, chunk_range,
|
||||
);
|
||||
|
||||
// TODO: remove me, it's for debugging
|
||||
// {
|
||||
// use std::io::Write;
|
||||
// let mut f = std::fs::OpenOptions::new()
|
||||
// .write(true)
|
||||
// .create(true)
|
||||
// .open("/tmp/chunks")
|
||||
// .unwrap();
|
||||
// write!(f, "{:?}", &self.have).unwrap();
|
||||
// }
|
||||
|
||||
if chunk_range.all() {
|
||||
return Some(ChunkMarkingResult::Completed);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -224,7 +224,7 @@ pub struct LivePeerState {
|
|||
pub i_am_choked: bool,
|
||||
pub peer_interested: bool,
|
||||
|
||||
// This is used to limit the number of requests we send to a peer at a time.
|
||||
// This is used to limit the number of chunk requests we send to a peer at a time.
|
||||
pub requests_sem: Arc<Semaphore>,
|
||||
|
||||
// This is used to unpause processes after we were choked.
|
||||
|
|
|
|||
|
|
@ -170,9 +170,8 @@ impl PeerStates {
|
|||
}
|
||||
pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec<u8>) -> Option<()> {
|
||||
self.with_live_mut(handle, "update_bitfield_from_vec", |live| {
|
||||
let bitfield = BF::from_vec(bitfield);
|
||||
live.previously_requested_pieces = BF::with_capacity(bitfield.len());
|
||||
live.bitfield = bitfield;
|
||||
live.previously_requested_pieces = BF::from_vec(vec![0; bitfield.len()]);
|
||||
live.bitfield = BF::from_vec(bitfield);
|
||||
})
|
||||
}
|
||||
pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<PeerRx> {
|
||||
|
|
@ -205,14 +204,12 @@ impl PeerStates {
|
|||
}
|
||||
|
||||
pub struct TorrentStateLocked {
|
||||
// What chunks we have and need.
|
||||
pub chunks: ChunkTracker,
|
||||
pub inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
|
||||
}
|
||||
|
||||
impl TorrentStateLocked {
|
||||
pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option<InflightPiece> {
|
||||
self.inflight_pieces.remove(&piece)
|
||||
}
|
||||
// At a moment in time, we are expecting a piece from only one peer.
|
||||
// inflight_pieces stores this information.
|
||||
pub inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
|
|
@ -623,15 +620,22 @@ impl TorrentState {
|
|||
None
|
||||
}
|
||||
|
||||
// TODO: need to throttle this or make it smarter as it may loop and steal pieces forever from each other.
|
||||
// NOTE: this doesn't actually "steal" it, but only returns an id we might steal.
|
||||
fn try_steal_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
|
||||
let mut rng = rand::thread_rng();
|
||||
use rand::seq::IteratorRandom;
|
||||
|
||||
self.peers
|
||||
.with_live(handle, |live| {
|
||||
let g = self.lock_read("try_steal_piece");
|
||||
g.inflight_pieces
|
||||
.keys()
|
||||
.filter(|p| {
|
||||
live.previously_requested_pieces
|
||||
.get(p.get() as usize)
|
||||
.map(|r| *r)
|
||||
== Some(false)
|
||||
})
|
||||
.filter(|p| !live.inflight_requests.iter().any(|req| req.piece == **p))
|
||||
.choose(&mut rng)
|
||||
.copied()
|
||||
|
|
@ -1102,45 +1106,46 @@ impl PeerHandler {
|
|||
},
|
||||
};
|
||||
|
||||
let (tx, sem) = match self
|
||||
.state
|
||||
.peers
|
||||
.with_live(handle, |l| (l.tx.clone(), l.requests_sem.clone()))
|
||||
{
|
||||
Some((tx, sem)) => (tx, sem),
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
for chunk in self.state.lengths.iter_chunk_infos(next) {
|
||||
if self
|
||||
.state
|
||||
.lock_read("is_chunk_downloaded")
|
||||
.chunks
|
||||
.is_chunk_downloaded(&chunk)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let (tx, sem) =
|
||||
match self
|
||||
.state
|
||||
.peers
|
||||
.with_live_mut(handle, "inflight_requests.insert", |l| {
|
||||
l.inflight_requests.insert(InflightRequest::from(&chunk))
|
||||
.with_live_mut(handle, "peer_setup_for_piece_request", |l| {
|
||||
l.previously_requested_pieces.set(next.get() as usize, true);
|
||||
(l.tx.clone(), l.requests_sem.clone())
|
||||
}) {
|
||||
Some(true) => {}
|
||||
Some(false) => {
|
||||
warn!("probably a bug, we already requested {:?}", chunk);
|
||||
continue;
|
||||
}
|
||||
Some(res) => res,
|
||||
None => return Ok(()),
|
||||
}
|
||||
};
|
||||
|
||||
for chunk in self.state.lengths.iter_chunk_infos(next) {
|
||||
let request = Request {
|
||||
index: next.get(),
|
||||
begin: chunk.offset,
|
||||
length: chunk.size,
|
||||
};
|
||||
|
||||
match self
|
||||
.state
|
||||
.peers
|
||||
.with_live_mut(handle, "add chunk request", |live| {
|
||||
live.inflight_requests.insert(InflightRequest::from(&chunk))
|
||||
}) {
|
||||
Some(true) => {}
|
||||
Some(false) => {
|
||||
// This request was already in-flight for this peer for this chunk.
|
||||
// This might happen in theory, but not very likely.
|
||||
//
|
||||
// Example:
|
||||
// someone stole a piece from us, and then died, the piece became "needed" again, and we reserved it
|
||||
// all before the piece request was processed by us.
|
||||
warn!("we already requested {:?} previously", chunk);
|
||||
continue;
|
||||
}
|
||||
// peer died
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
loop {
|
||||
match timeout(Duration::from_secs(10), sem.acquire()).await {
|
||||
Ok(acq) => break acq?.forget(),
|
||||
|
|
@ -1241,12 +1246,33 @@ impl PeerHandler {
|
|||
let full_piece_download_time = {
|
||||
let mut g = self.state.lock_write("mark_chunk_downloaded");
|
||||
|
||||
match g.inflight_pieces.get(&chunk_info.piece_index) {
|
||||
Some(InflightPiece { peer, .. }) if *peer == handle => {}
|
||||
Some(InflightPiece { peer, .. }) => {
|
||||
debug!(
|
||||
"in-flight piece {} was stolen by {}, ignoring",
|
||||
chunk_info.piece_index, peer
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
None => {
|
||||
debug!(
|
||||
"in-flight piece {} not found. it was probably completed by someone else",
|
||||
chunk_info.piece_index
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
match g.chunks.mark_chunk_downloaded(&piece) {
|
||||
Some(ChunkMarkingResult::Completed) => {
|
||||
debug!("piece={} done, will write and checksum", piece.index,);
|
||||
// This will prevent others from stealing it.
|
||||
g.remove_inflight_piece(chunk_info.piece_index)
|
||||
.map(|t| t.started.elapsed())
|
||||
{
|
||||
let piece = chunk_info.piece_index;
|
||||
g.inflight_pieces.remove(&piece)
|
||||
}
|
||||
.map(|t| t.started.elapsed())
|
||||
}
|
||||
Some(ChunkMarkingResult::PreviouslyCompleted) => {
|
||||
// TODO: we might need to send cancellations here.
|
||||
|
|
@ -1263,6 +1289,9 @@ impl PeerHandler {
|
|||
}
|
||||
};
|
||||
|
||||
// By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would
|
||||
// have fallen off above in one of the defensive checks.
|
||||
|
||||
self.spawner
|
||||
.spawn_block_in_place(move || {
|
||||
let index = piece.index;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue