From 38c99023ac2522a19a1c047756588c6167b7dbb9 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 19 Nov 2023 15:47:14 +0000 Subject: [PATCH] Change peer states to dashmap --- Cargo.lock | 14 + crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/peer_state.rs | 7 + crates/librqbit/src/torrent_state.rs | 508 +++++++++++++-------------- 4 files changed, 261 insertions(+), 269 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9d75cc8..0bc6010 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -364,6 +364,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "digest" version = "0.10.7" @@ -820,6 +833,7 @@ dependencies = [ "bitvec", "byteorder", "crypto-hash", + "dashmap", "futures", "hex 0.4.3", "http", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index e878704..f5706a7 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -56,6 +56,7 @@ futures = "0.3" url = "2" hex = "0.4" backoff = "0.4.0" +dashmap = "5.5.3" [dev-dependencies] futures = {version = "0.3"} diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 1f2beeb..3e997ed 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -115,6 +115,13 @@ impl PeerState { } } + pub fn get_live(&self) -> Option<&LivePeerState> { + match self { + PeerState::Live(l) => Some(l), + _ => None, + } + } + pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { match self { PeerState::Live(l) => Some(l), diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index b59ffe5..6e3b361 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -2,7 +2,7 @@ // to them, tracking peer state etc. use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, fs::File, net::SocketAddr, path::PathBuf, @@ -17,6 +17,7 @@ use anyhow::{bail, Context}; use backoff::backoff::Backoff; use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; +use dashmap::DashMap; use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::{ id20::Id20, @@ -56,9 +57,7 @@ pub struct InflightPiece { #[derive(Default)] pub struct PeerStates { - states: HashMap, - seen: HashSet, - inflight_pieces: HashMap, + states: DashMap, } #[derive(Debug, Default)] @@ -73,11 +72,11 @@ pub struct AggregatePeerStats { impl PeerStates { pub fn stats(&self) -> AggregatePeerStats { - let mut stats = self - .states - .values() + self.states + .iter() .fold(AggregatePeerStats::default(), |mut s, p| { - match &p.state { + s.seen += 1; + match &p.value().state { PeerState::Connecting(_) => s.connecting += 1, PeerState::Live(_) => s.live += 1, PeerState::Queued => s.queued += 1, @@ -85,114 +84,113 @@ impl PeerStates { PeerState::NotNeeded => s.fully_have_and_we_are_finished += 1, }; s - }); - stats.seen = self.seen.len(); - stats + }) } - pub fn add_if_not_seen(&mut self, addr: SocketAddr) -> Option { - if self.seen.contains(&addr) { - return None; + pub fn add_if_not_seen(&self, addr: SocketAddr) -> Option { + use dashmap::mapref::entry::Entry; + match self.states.entry(addr) { + Entry::Occupied(_) => None, + Entry::Vacant(vac) => { + vac.insert(Default::default()); + Some(addr) + } } - let handle = self.add(addr)?; - self.seen.insert(addr); - Some(handle) } - pub fn seen(&self) -> &HashSet { - &self.seen + pub fn with_peer(&self, addr: PeerHandle, f: impl FnOnce(&Peer) -> R) -> Option { + self.states.get(&addr).map(|e| f(e.value())) } - pub fn get_live(&self, handle: PeerHandle) -> Option<&LivePeerState> { - if let PeerState::Live(ref l) = &self.states.get(&handle)?.state { - return Some(l); - } - None + + pub fn with_peer_mut(&self, addr: PeerHandle, f: impl FnOnce(&mut Peer) -> R) -> Option { + self.states.get_mut(&addr).map(|mut e| f(e.value_mut())) } - pub fn get_live_mut(&mut self, handle: PeerHandle) -> Option<&mut LivePeerState> { - if let PeerState::Live(ref mut l) = &mut self.states.get_mut(&handle)?.state { - return Some(l); - } - None + pub fn with_live(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option { + self.states.get(&addr).and_then(|e| match &e.value().state { + PeerState::Live(l) => Some(f(l)), + _ => None, + }) } - pub fn try_get_live_mut(&mut self, handle: PeerHandle) -> anyhow::Result<&mut LivePeerState> { - self.get_live_mut(handle) - .ok_or_else(|| anyhow::anyhow!("peer dropped")) + pub fn with_live_mut( + &self, + addr: PeerHandle, + f: impl FnOnce(&mut LivePeerState) -> R, + ) -> Option { + self.states + .get_mut(&addr) + .and_then(|mut e| match &mut e.value_mut().state { + PeerState::Live(l) => Some(f(l)), + _ => None, + }) } - pub fn add(&mut self, addr: SocketAddr) -> Option { - let handle = addr; - if self.states.contains_key(&addr) { - return None; - } - self.states.insert(handle, Default::default()); - Some(handle) + + pub fn add(&self, addr: SocketAddr) -> Option { + self.add_if_not_seen(addr) } - pub fn mark_peer_dead(&mut self, handle: PeerHandle) -> Option> { - let peer = self.states.get_mut(&handle)?; + pub fn mark_peer_dead(&self, handle: PeerHandle) -> Option> { + let mut peer = self.states.get_mut(&handle)?; peer.state.to_dead() } - pub fn drop_peer(&mut self, handle: PeerHandle) -> Option { - self.states.remove(&handle) + pub fn drop_peer(&self, handle: PeerHandle) -> Option { + self.states.remove(&handle).map(|r| r.1) } - pub fn mark_i_am_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option { - let live = self.get_live_mut(handle)?; - let prev = live.i_am_choked; - live.i_am_choked = is_choked; - Some(prev) + pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option { + self.with_live_mut(handle, |live| { + let prev = live.i_am_choked; + live.i_am_choked = is_choked; + prev + }) } - pub fn mark_peer_interested( - &mut self, - handle: PeerHandle, - is_interested: bool, - ) -> Option { - let live = self.get_live_mut(handle)?; - let prev = live.peer_interested; - live.peer_interested = is_interested; - Some(prev) + pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option { + self.with_live_mut(handle, |live| { + let prev = live.peer_interested; + live.peer_interested = is_interested; + prev + }) } pub fn update_bitfield_from_vec( - &mut self, + &self, handle: PeerHandle, bitfield: Vec, ) -> Option> { - let live = self.get_live_mut(handle)?; - let bitfield = BF::from_vec(bitfield); - let prev = live.bitfield.take(); - live.bitfield = Some(bitfield); - Some(prev) + self.with_live_mut(handle, |live| { + let bitfield = BF::from_vec(bitfield); + let prev = live.bitfield.take(); + live.bitfield = Some(bitfield); + prev + }) } - pub fn mark_peer_connecting(&mut self, h: PeerHandle) -> anyhow::Result { - let peer = self - .states - .get_mut(&h) - .context("peer not found in states")?; - let rx = peer - .state - .queued_to_connecting() - .context("invalid peer state")?; - Ok(rx) + pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result { + self.with_peer_mut(h, |peer| { + peer.state + .queued_to_connecting() + .context("invalid peer state") + }) + .context("peer not found in states")? } pub fn clone_tx(&self, handle: PeerHandle) -> Option { - Some(self.get_live(handle)?.tx.clone()) - } - pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option { - self.inflight_pieces.remove(&piece) + self.with_live(handle, |live| live.tx.clone()) } - fn reset_peer_backoff(&mut self, handle: PeerHandle) { - let p = match self.states.get_mut(&handle) { - Some(p) => p, - None => return, - }; - p.stats.backoff.reset(); + fn reset_peer_backoff(&self, handle: PeerHandle) { + self.with_peer_mut(handle, |p| { + p.stats.backoff.reset(); + }); } - fn mark_peer_not_needed(&mut self, handle: PeerHandle) -> Option { + fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option { self.states.get_mut(&handle)?.state.to_not_needed() } } pub struct TorrentStateLocked { - pub peers: PeerStates, pub chunks: ChunkTracker, + pub inflight_pieces: HashMap, +} + +impl TorrentStateLocked { + pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option { + self.inflight_pieces.remove(&piece) + } } #[derive(Default, Debug)] @@ -254,6 +252,7 @@ pub struct TorrentStateOptions { } pub struct TorrentState { + peers: PeerStates, info: TorrentMetaV1Info, locked: Arc>, files: Vec>>, @@ -296,9 +295,10 @@ impl TorrentState { info_hash, info, peer_id, + peers: Default::default(), locked: Arc::new(RwLock::new(TorrentStateLocked { - peers: Default::default(), chunks: chunk_tracker, + inflight_pieces: Default::default(), })), files, filenames, @@ -328,7 +328,7 @@ impl TorrentState { spawner: BlockingSpawner, ) -> anyhow::Result<()> { let state = self; - let rx = state.locked.write().peers.mark_peer_connecting(addr)?; + let rx = state.peers.mark_peer_connecting(addr)?; let handler = PeerHandler { addr, @@ -376,7 +376,7 @@ impl TorrentState { let addr = peer_queue_rx.recv().await.unwrap(); if state.is_finished() { debug!("ignoring peer {} as we are finished", addr); - state.locked.write().peers.mark_peer_not_needed(addr); + state.peers.mark_peer_not_needed(addr); continue; } @@ -409,52 +409,54 @@ impl TorrentState { } fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { - let g = self.locked.read(); - let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; - for n in g.chunks.iter_needed_pieces() { - if bf.get(n).map(|v| *v) == Some(true) { - // in theory it should be safe without validation, but whatever. - return self.lengths.validate_piece_index(n as u32); + self.peers.with_live_mut(peer_handle, |live| { + let g = self.locked.read(); + let bf = live.bitfield.as_ref()?; + for n in g.chunks.iter_needed_pieces() { + if bf.get(n).map(|v| *v) == Some(true) { + // in theory it should be safe without validation, but whatever. + return self.lengths.validate_piece_index(n as u32); + } } - } - None + None + })? } fn am_i_choked(&self, peer_handle: PeerHandle) -> Option { - self.locked - .read() - .peers - .get_live(peer_handle) - .map(|l| l.i_am_choked) + self.peers.with_live(peer_handle, |l| l.i_am_choked) } fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { - if self.am_i_choked(peer_handle)? { - debug!("we are choked, can't reserve next piece"); - return None; - } - let mut g = self.locked.write(); - let n = { - let mut n_opt = None; - let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; - for n in g.chunks.iter_needed_pieces() { - if bf.get(n).map(|v| *v) == Some(true) { - n_opt = Some(n); - break; + self.peers + .with_live_mut(peer_handle, |live| { + if live.i_am_choked { + debug!("we are choked, can't reserve next piece"); + return None; } - } + let mut g = self.locked.write(); + let n = { + let mut n_opt = None; + let bf = live.bitfield.as_ref()?; + for n in g.chunks.iter_needed_pieces() { + if bf.get(n).map(|v| *v) == Some(true) { + n_opt = Some(n); + break; + } + } - self.lengths.validate_piece_index(n_opt? as u32)? - }; - g.peers.inflight_pieces.insert( - n, - InflightPiece { - peer: peer_handle, - started: Instant::now(), - }, - ); - g.chunks.reserve_needed_piece(n); - Some(n) + self.lengths.validate_piece_index(n_opt? as u32)? + }; + g.inflight_pieces.insert( + n, + InflightPiece { + peer: peer_handle, + started: Instant::now(), + }, + ); + g.chunks.reserve_needed_piece(n); + Some(n) + }) + .flatten() } fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { @@ -472,7 +474,6 @@ impl TorrentState { let mut g = self.locked.write(); let (idx, elapsed, piece_req) = g - .peers .inflight_pieces .iter_mut() // don't steal from myself @@ -496,40 +497,41 @@ impl TorrentState { fn try_steal_piece(&self, handle: PeerHandle) -> Option { let mut rng = rand::thread_rng(); use rand::seq::IteratorRandom; - let g = self.locked.read(); - let pl = g.peers.get_live(handle)?; - g.peers - .inflight_pieces - .keys() - .filter(|p| !pl.inflight_requests.iter().any(|req| req.piece == **p)) - .choose(&mut rng) - .copied() + self.peers + .with_live(handle, |live| { + let g = self.locked.read(); + g.inflight_pieces + .keys() + .filter(|p| !live.inflight_requests.iter().any(|req| req.piece == **p)) + .choose(&mut rng) + .copied() + }) + .flatten() } fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { - let mut g = self.locked.write(); - let peer = match g.peers.states.get_mut(&handle) { - Some(peer) => peer, - None => { - warn!("peer was in a wrong state, can't set live"); - return; - } - }; - peer.state.connecting_to_live(Id20(h.peer_id)); + let result = self.peers.with_peer_mut(handle, |p| { + p.state.connecting_to_live(Id20(h.peer_id)).is_some() + }); + match result { + Some(true) => debug!("set peer to live"), + Some(false) => debug!("can't set peer live, it was in wrong state"), + None => debug!("can't set peer live, it disappeared"), + } } fn on_peer_died(self: &Arc, handle: PeerHandle, error: Option) { - let mut g = self.locked.write(); - let peer = match g.peers.states.get_mut(&handle) { + let mut pe = match self.peers.states.get_mut(&handle) { Some(peer) => peer, None => { warn!("bug: peer not found in table. Forgetting it forever"); return; } }; - match std::mem::take(&mut peer.state) { + match std::mem::take(&mut pe.value_mut().state) { PeerState::Connecting(_) => {} PeerState::Live(live) => { + let mut g = self.locked.write(); for req in live.inflight_requests { debug!( "peer dead, marking chunk request cancelled, index={}, chunk={}", @@ -541,39 +543,25 @@ impl TorrentState { } PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => { warn!("bug: peer was in a wrong state, ignoring it forever"); - g.peers.drop_peer(handle); + self.peers.drop_peer(handle); return; } }; - // Re-borrow as we were modifying states above - // (otherwise borrow checker rightfully says we're wrong). - let peer = g.peers.states.get_mut(&handle).unwrap(); - if error.is_none() { debug!("peer died without errors, not re-queueing"); - peer.state = PeerState::NotNeeded; + pe.value_mut().state = PeerState::NotNeeded; return; } if self.is_finished() { debug!("torrent finished, not re-queueing"); - peer.state = PeerState::NotNeeded; + pe.value_mut().state = PeerState::NotNeeded; return; } - peer.state = PeerState::Dead; - let backoff = { - let peer = match g.peers.states.get_mut(&handle) { - Some(p) => p, - None => { - warn!("bug: did not find peer in the list"); - return; - } - }; - - peer.stats.backoff.next_backoff() - }; + pe.value_mut().state = PeerState::Dead; + let backoff = pe.value_mut().stats.backoff.next_backoff(); if let Some(dur) = backoff { let state = self.clone(); @@ -587,27 +575,26 @@ impl TorrentState { ), async move { tokio::time::sleep(dur).await; - { - let mut g = state.locked.write(); - let peer = match g.peers.states.get_mut(&handle) { - Some(p) => p, - None => bail!("bug: peer disappeared"), - }; - match &peer.state { - PeerState::Dead => peer.state = PeerState::Queued, - other => bail!( - "peer is in unexpected state: {}. Expected dead", - other.name() - ), - } - } + state + .peers + .with_peer_mut(handle, |peer| { + match &peer.state { + PeerState::Dead => peer.state = PeerState::Queued, + other => bail!( + "peer is in unexpected state: {}. Expected dead", + other.name() + ), + }; + Ok(()) + }) + .context("bug: peer disappeared")??; state.peer_queue_tx.send(handle)?; Ok::<_, anyhow::Error>(()) }, ); } else { debug!("dropping peer, backoff exhausted"); - g.peers.drop_peer(handle); + self.peers.drop_peer(handle); } } @@ -629,9 +616,8 @@ impl TorrentState { fn maybe_transmit_haves(&self, index: ValidPieceIndex) { let mut futures = Vec::new(); - let g = self.locked.read(); - for (_, peer) in g.peers.states.iter() { - match &peer.state { + for pe in self.peers.states.iter() { + match &pe.value().state { PeerState::Live(live) => { if !live.peer_interested { continue; @@ -683,7 +669,7 @@ impl TorrentState { } pub fn add_peer_if_not_seen(self: &Arc, addr: SocketAddr) -> bool { - match self.locked.write().peers.add_if_not_seen(addr) { + match self.peers.add_if_not_seen(addr) { Some(handle) => handle, None => return false, }; @@ -693,13 +679,12 @@ impl TorrentState { } pub fn peer_stats_snapshot(&self) -> AggregatePeerStats { - self.locked.read().peers.stats() + self.peers.stats() } pub fn stats_snapshot(&self) -> StatsSnapshot { - let g = self.locked.read(); use Ordering::*; - let peer_stats = g.peers.stats(); + let peer_stats = self.peers.stats(); let downloaded = self.stats.downloaded_and_checked.load(Relaxed); let remaining = self.needed - downloaded; StatsSnapshot { @@ -710,7 +695,7 @@ impl TorrentState { uploaded_bytes: self.stats.uploaded.load(Relaxed), total_bytes: self.have_plus_needed, live_peers: peer_stats.live as u32, - seen_peers: g.peers.seen.len() as u32, + seen_peers: peer_stats.seen as u32, connecting_peers: peer_stats.connecting as u32, time: Instant::now(), initially_needed_bytes: self.needed, @@ -833,7 +818,8 @@ impl PeerHandler { ); } - g.peers + self.state + .peers .clone_tx(peer_handle) .context("peer died, dropping chunk that it requested")? }; @@ -847,17 +833,12 @@ impl PeerHandler { } fn on_have(&self, handle: PeerHandle, have: u32) { - if let Some(bitfield) = self - .state - .locked - .write() - .peers - .get_live_mut(handle) - .and_then(|l| l.bitfield.as_mut()) - { - debug!("updated bitfield with have={}", have); - bitfield.set(have as usize, true) - } + self.state.peers.with_live_mut(handle, |live| { + if let Some(bitfield) = live.bitfield.as_mut() { + bitfield.set(have as usize, true); + debug!("updated bitfield with have={}", have); + } + }); } fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) -> anyhow::Result<()> { @@ -869,19 +850,11 @@ impl PeerHandler { ); } self.state - .locked - .write() .peers .update_bitfield_from_vec(handle, bitfield.0); if !self.state.am_i_interested_in_peer(handle) { - let tx = self - .state - .locked - .read() - .peers - .clone_tx(handle) - .context("peer dropped")?; + let tx = self.state.peers.clone_tx(handle).context("peer dropped")?; tx.send(WriterRequest::Message(MessageOwned::Unchoke))?; tx.send(WriterRequest::Message(MessageOwned::NotInterested))?; if self.state.is_finished() { @@ -903,7 +876,7 @@ impl PeerHandler { } async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> { - let tx = match self.state.locked.read().peers.clone_tx(handle) { + let tx = match self.state.peers.clone_tx(handle) { Some(tx) => tx, None => return Ok(()), }; @@ -917,25 +890,21 @@ impl PeerHandler { fn on_i_am_choked(&self, handle: PeerHandle) { debug!("we are choked"); - self.state - .locked - .write() - .peers - .mark_i_am_choked(handle, true); + self.state.peers.mark_i_am_choked(handle, true); } fn on_peer_interested(&self, handle: PeerHandle) { debug!("peer is interested"); - self.state - .locked - .write() - .peers - .mark_peer_interested(handle, true); + self.state.peers.mark_peer_interested(handle, true); } async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { - let notify = match self.state.locked.read().peers.get_live(handle) { - Some(l) => l.have_notify.clone(), + let notify = match self + .state + .peers + .with_live(handle, |l| l.have_notify.clone()) + { + Some(notify) => notify, None => return Ok(()), }; @@ -984,29 +953,29 @@ impl PeerHandler { }, }; - let tx = match self.state.locked.read().peers.clone_tx(handle) { - Some(tx) => tx, - None => return Ok(()), - }; - let sem = match self.state.locked.read().peers.get_live(handle) { - Some(live) => live.requests_sem.clone(), + let (tx, sem) = match self + .state + .peers + .with_live(handle, |l| (l.tx.clone(), l.requests_sem.clone())) + { + Some((tx, sem)) => (tx, sem), None => return Ok(()), }; + for chunk in self.state.lengths.iter_chunk_infos(next) { if self.state.locked.read().chunks.is_chunk_downloaded(&chunk) { continue; } - if !self - .state - .locked - .write() - .peers - .try_get_live_mut(handle)? - .inflight_requests - .insert(InflightRequest::from(&chunk)) - { - warn!("probably a bug, we already requested {:?}", chunk); - continue; + + match self.state.peers.with_live_mut(handle, |l| { + l.inflight_requests.insert(InflightRequest::from(&chunk)) + }) { + Some(true) => {} + Some(false) => { + warn!("probably a bug, we already requested {:?}", chunk); + continue; + } + None => bail!("peer dropped"), } let request = Request { @@ -1053,14 +1022,11 @@ impl PeerHandler { fn on_i_am_unchoked(&self, handle: PeerHandle) { debug!("we are unchoked"); - let mut g = self.state.locked.write(); - let live = match g.peers.get_live_mut(handle) { - Some(live) => live, - None => return, - }; - live.i_am_choked = false; - live.have_notify.notify_waiters(); - live.requests_sem.add_permits(16); + self.state.peers.with_live_mut(handle, |live| { + live.i_am_choked = false; + live.have_notify.notify_waiters(); + live.requests_sem.add_permits(16); + }); } fn on_received_piece(&self, handle: PeerHandle, piece: Piece) -> anyhow::Result<()> { @@ -1076,31 +1042,36 @@ impl PeerHandler { }; let mut g = self.state.locked.write(); - let h = g.peers.try_get_live_mut(handle)?; - h.requests_sem.add_permits(1); self.state - .stats - .fetched_bytes - .fetch_add(piece.block.len() as u64, Ordering::Relaxed); + .peers + .with_live_mut(handle, |h| { + h.requests_sem.add_permits(1); - if !h - .inflight_requests - .remove(&InflightRequest::from(&chunk_info)) - { - anyhow::bail!( - "peer sent us a piece we did not ask. Requested pieces: {:?}. Got: {:?}", - &h.inflight_requests, - &piece, - ); - } + self.state + .stats + .fetched_bytes + .fetch_add(piece.block.len() as u64, Ordering::Relaxed); + + if !h + .inflight_requests + .remove(&InflightRequest::from(&chunk_info)) + { + anyhow::bail!( + "peer sent us a piece we did not ask. Requested pieces: {:?}. Got: {:?}", + &h.inflight_requests, + &piece, + ); + } + Ok(()) + }) + .context("peer not found")??; let full_piece_download_time = match g.chunks.mark_chunk_downloaded(&piece) { Some(ChunkMarkingResult::Completed) => { debug!("piece={} done, will write and checksum", piece.index,); // This will prevent others from stealing it. - g.peers - .remove_inflight_piece(chunk_info.piece_index) + g.remove_inflight_piece(chunk_info.piece_index) .map(|t| t.started.elapsed()) } Some(ChunkMarkingResult::PreviouslyCompleted) => { @@ -1171,7 +1142,7 @@ impl PeerHandler { let mut g = self.state.locked.write(); g.chunks.mark_piece_downloaded(chunk_info.piece_index); - g.peers.reset_peer_backoff(handle); + self.state.peers.reset_peer_backoff(handle); } debug!("piece={} successfully downloaded and verified", index); @@ -1200,11 +1171,10 @@ impl PeerHandler { } fn disconnect_all_peers_that_have_full_torrent(&self) { - let mut g = self.state.locked.write(); - for (_, peer) in g.peers.states.iter_mut() { - if let PeerState::Live(l) = &peer.state { + for mut pe in self.state.peers.states.iter_mut() { + if let PeerState::Live(l) = &pe.value().state { if l.has_full_torrent(self.state.lengths.total_pieces() as usize) { - let live = peer.state.to_not_needed().unwrap(); + let live = pe.value_mut().state.to_not_needed().unwrap(); let _ = live.tx.send(WriterRequest::Disconnect); } }