Change peer states to dashmap

This commit is contained in:
Igor Katson 2023-11-19 15:47:14 +00:00
parent a745257be2
commit 38c99023ac
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 261 additions and 269 deletions

14
Cargo.lock generated
View file

@ -364,6 +364,19 @@ dependencies = [
"winapi",
]
[[package]]
name = "dashmap"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "digest"
version = "0.10.7"
@ -820,6 +833,7 @@ dependencies = [
"bitvec",
"byteorder",
"crypto-hash",
"dashmap",
"futures",
"hex 0.4.3",
"http",

View file

@ -56,6 +56,7 @@ futures = "0.3"
url = "2"
hex = "0.4"
backoff = "0.4.0"
dashmap = "5.5.3"
[dev-dependencies]
futures = {version = "0.3"}

View file

@ -115,6 +115,13 @@ impl PeerState {
}
}
pub fn get_live(&self) -> Option<&LivePeerState> {
match self {
PeerState::Live(l) => Some(l),
_ => None,
}
}
pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
match self {
PeerState::Live(l) => Some(l),

View file

@ -2,7 +2,7 @@
// to them, tracking peer state etc.
use std::{
collections::{HashMap, HashSet},
collections::HashMap,
fs::File,
net::SocketAddr,
path::PathBuf,
@ -17,6 +17,7 @@ use anyhow::{bail, Context};
use backoff::backoff::Backoff;
use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use dashmap::DashMap;
use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::{
id20::Id20,
@ -56,9 +57,7 @@ pub struct InflightPiece {
#[derive(Default)]
pub struct PeerStates {
states: HashMap<PeerHandle, Peer>,
seen: HashSet<SocketAddr>,
inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
states: DashMap<PeerHandle, Peer>,
}
#[derive(Debug, Default)]
@ -73,11 +72,11 @@ pub struct AggregatePeerStats {
impl PeerStates {
pub fn stats(&self) -> AggregatePeerStats {
let mut stats = self
.states
.values()
self.states
.iter()
.fold(AggregatePeerStats::default(), |mut s, p| {
match &p.state {
s.seen += 1;
match &p.value().state {
PeerState::Connecting(_) => s.connecting += 1,
PeerState::Live(_) => s.live += 1,
PeerState::Queued => s.queued += 1,
@ -85,114 +84,113 @@ impl PeerStates {
PeerState::NotNeeded => s.fully_have_and_we_are_finished += 1,
};
s
});
stats.seen = self.seen.len();
stats
})
}
pub fn add_if_not_seen(&mut self, addr: SocketAddr) -> Option<PeerHandle> {
if self.seen.contains(&addr) {
return None;
pub fn add_if_not_seen(&self, addr: SocketAddr) -> Option<PeerHandle> {
use dashmap::mapref::entry::Entry;
match self.states.entry(addr) {
Entry::Occupied(_) => None,
Entry::Vacant(vac) => {
vac.insert(Default::default());
Some(addr)
}
}
let handle = self.add(addr)?;
self.seen.insert(addr);
Some(handle)
}
pub fn seen(&self) -> &HashSet<SocketAddr> {
&self.seen
pub fn with_peer<R>(&self, addr: PeerHandle, f: impl FnOnce(&Peer) -> R) -> Option<R> {
self.states.get(&addr).map(|e| f(e.value()))
}
pub fn get_live(&self, handle: PeerHandle) -> Option<&LivePeerState> {
if let PeerState::Live(ref l) = &self.states.get(&handle)?.state {
return Some(l);
}
None
pub fn with_peer_mut<R>(&self, addr: PeerHandle, f: impl FnOnce(&mut Peer) -> R) -> Option<R> {
self.states.get_mut(&addr).map(|mut e| f(e.value_mut()))
}
pub fn get_live_mut(&mut self, handle: PeerHandle) -> Option<&mut LivePeerState> {
if let PeerState::Live(ref mut l) = &mut self.states.get_mut(&handle)?.state {
return Some(l);
}
None
pub fn with_live<R>(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option<R> {
self.states.get(&addr).and_then(|e| match &e.value().state {
PeerState::Live(l) => Some(f(l)),
_ => None,
})
}
pub fn try_get_live_mut(&mut self, handle: PeerHandle) -> anyhow::Result<&mut LivePeerState> {
self.get_live_mut(handle)
.ok_or_else(|| anyhow::anyhow!("peer dropped"))
pub fn with_live_mut<R>(
&self,
addr: PeerHandle,
f: impl FnOnce(&mut LivePeerState) -> R,
) -> Option<R> {
self.states
.get_mut(&addr)
.and_then(|mut e| match &mut e.value_mut().state {
PeerState::Live(l) => Some(f(l)),
_ => None,
})
}
pub fn add(&mut self, addr: SocketAddr) -> Option<PeerHandle> {
let handle = addr;
if self.states.contains_key(&addr) {
return None;
}
self.states.insert(handle, Default::default());
Some(handle)
pub fn add(&self, addr: SocketAddr) -> Option<PeerHandle> {
self.add_if_not_seen(addr)
}
pub fn mark_peer_dead(&mut self, handle: PeerHandle) -> Option<Option<LivePeerState>> {
let peer = self.states.get_mut(&handle)?;
pub fn mark_peer_dead(&self, handle: PeerHandle) -> Option<Option<LivePeerState>> {
let mut peer = self.states.get_mut(&handle)?;
peer.state.to_dead()
}
pub fn drop_peer(&mut self, handle: PeerHandle) -> Option<Peer> {
self.states.remove(&handle)
pub fn drop_peer(&self, handle: PeerHandle) -> Option<Peer> {
self.states.remove(&handle).map(|r| r.1)
}
pub fn mark_i_am_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option<bool> {
let live = self.get_live_mut(handle)?;
let prev = live.i_am_choked;
live.i_am_choked = is_choked;
Some(prev)
pub fn mark_i_am_choked(&self, handle: PeerHandle, is_choked: bool) -> Option<bool> {
self.with_live_mut(handle, |live| {
let prev = live.i_am_choked;
live.i_am_choked = is_choked;
prev
})
}
pub fn mark_peer_interested(
&mut self,
handle: PeerHandle,
is_interested: bool,
) -> Option<bool> {
let live = self.get_live_mut(handle)?;
let prev = live.peer_interested;
live.peer_interested = is_interested;
Some(prev)
pub fn mark_peer_interested(&self, handle: PeerHandle, is_interested: bool) -> Option<bool> {
self.with_live_mut(handle, |live| {
let prev = live.peer_interested;
live.peer_interested = is_interested;
prev
})
}
pub fn update_bitfield_from_vec(
&mut self,
&self,
handle: PeerHandle,
bitfield: Vec<u8>,
) -> Option<Option<BF>> {
let live = self.get_live_mut(handle)?;
let bitfield = BF::from_vec(bitfield);
let prev = live.bitfield.take();
live.bitfield = Some(bitfield);
Some(prev)
self.with_live_mut(handle, |live| {
let bitfield = BF::from_vec(bitfield);
let prev = live.bitfield.take();
live.bitfield = Some(bitfield);
prev
})
}
pub fn mark_peer_connecting(&mut self, h: PeerHandle) -> anyhow::Result<PeerRx> {
let peer = self
.states
.get_mut(&h)
.context("peer not found in states")?;
let rx = peer
.state
.queued_to_connecting()
.context("invalid peer state")?;
Ok(rx)
pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<PeerRx> {
self.with_peer_mut(h, |peer| {
peer.state
.queued_to_connecting()
.context("invalid peer state")
})
.context("peer not found in states")?
}
pub fn clone_tx(&self, handle: PeerHandle) -> Option<PeerTx> {
Some(self.get_live(handle)?.tx.clone())
}
pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option<InflightPiece> {
self.inflight_pieces.remove(&piece)
self.with_live(handle, |live| live.tx.clone())
}
fn reset_peer_backoff(&mut self, handle: PeerHandle) {
let p = match self.states.get_mut(&handle) {
Some(p) => p,
None => return,
};
p.stats.backoff.reset();
fn reset_peer_backoff(&self, handle: PeerHandle) {
self.with_peer_mut(handle, |p| {
p.stats.backoff.reset();
});
}
fn mark_peer_not_needed(&mut self, handle: PeerHandle) -> Option<LivePeerState> {
fn mark_peer_not_needed(&self, handle: PeerHandle) -> Option<LivePeerState> {
self.states.get_mut(&handle)?.state.to_not_needed()
}
}
pub struct TorrentStateLocked {
pub peers: PeerStates,
pub chunks: ChunkTracker,
pub inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
}
impl TorrentStateLocked {
pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option<InflightPiece> {
self.inflight_pieces.remove(&piece)
}
}
#[derive(Default, Debug)]
@ -254,6 +252,7 @@ pub struct TorrentStateOptions {
}
pub struct TorrentState {
peers: PeerStates,
info: TorrentMetaV1Info<ByteString>,
locked: Arc<RwLock<TorrentStateLocked>>,
files: Vec<Arc<Mutex<File>>>,
@ -296,9 +295,10 @@ impl TorrentState {
info_hash,
info,
peer_id,
peers: Default::default(),
locked: Arc::new(RwLock::new(TorrentStateLocked {
peers: Default::default(),
chunks: chunk_tracker,
inflight_pieces: Default::default(),
})),
files,
filenames,
@ -328,7 +328,7 @@ impl TorrentState {
spawner: BlockingSpawner,
) -> anyhow::Result<()> {
let state = self;
let rx = state.locked.write().peers.mark_peer_connecting(addr)?;
let rx = state.peers.mark_peer_connecting(addr)?;
let handler = PeerHandler {
addr,
@ -376,7 +376,7 @@ impl TorrentState {
let addr = peer_queue_rx.recv().await.unwrap();
if state.is_finished() {
debug!("ignoring peer {} as we are finished", addr);
state.locked.write().peers.mark_peer_not_needed(addr);
state.peers.mark_peer_not_needed(addr);
continue;
}
@ -409,52 +409,54 @@ impl TorrentState {
}
fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
let g = self.locked.read();
let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?;
for n in g.chunks.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
// in theory it should be safe without validation, but whatever.
return self.lengths.validate_piece_index(n as u32);
self.peers.with_live_mut(peer_handle, |live| {
let g = self.locked.read();
let bf = live.bitfield.as_ref()?;
for n in g.chunks.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
// in theory it should be safe without validation, but whatever.
return self.lengths.validate_piece_index(n as u32);
}
}
}
None
None
})?
}
fn am_i_choked(&self, peer_handle: PeerHandle) -> Option<bool> {
self.locked
.read()
.peers
.get_live(peer_handle)
.map(|l| l.i_am_choked)
self.peers.with_live(peer_handle, |l| l.i_am_choked)
}
fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
if self.am_i_choked(peer_handle)? {
debug!("we are choked, can't reserve next piece");
return None;
}
let mut g = self.locked.write();
let n = {
let mut n_opt = None;
let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?;
for n in g.chunks.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
n_opt = Some(n);
break;
self.peers
.with_live_mut(peer_handle, |live| {
if live.i_am_choked {
debug!("we are choked, can't reserve next piece");
return None;
}
}
let mut g = self.locked.write();
let n = {
let mut n_opt = None;
let bf = live.bitfield.as_ref()?;
for n in g.chunks.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
n_opt = Some(n);
break;
}
}
self.lengths.validate_piece_index(n_opt? as u32)?
};
g.peers.inflight_pieces.insert(
n,
InflightPiece {
peer: peer_handle,
started: Instant::now(),
},
);
g.chunks.reserve_needed_piece(n);
Some(n)
self.lengths.validate_piece_index(n_opt? as u32)?
};
g.inflight_pieces.insert(
n,
InflightPiece {
peer: peer_handle,
started: Instant::now(),
},
);
g.chunks.reserve_needed_piece(n);
Some(n)
})
.flatten()
}
fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool {
@ -472,7 +474,6 @@ impl TorrentState {
let mut g = self.locked.write();
let (idx, elapsed, piece_req) = g
.peers
.inflight_pieces
.iter_mut()
// don't steal from myself
@ -496,40 +497,41 @@ impl TorrentState {
fn try_steal_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
let mut rng = rand::thread_rng();
use rand::seq::IteratorRandom;
let g = self.locked.read();
let pl = g.peers.get_live(handle)?;
g.peers
.inflight_pieces
.keys()
.filter(|p| !pl.inflight_requests.iter().any(|req| req.piece == **p))
.choose(&mut rng)
.copied()
self.peers
.with_live(handle, |live| {
let g = self.locked.read();
g.inflight_pieces
.keys()
.filter(|p| !live.inflight_requests.iter().any(|req| req.piece == **p))
.choose(&mut rng)
.copied()
})
.flatten()
}
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
let mut g = self.locked.write();
let peer = match g.peers.states.get_mut(&handle) {
Some(peer) => peer,
None => {
warn!("peer was in a wrong state, can't set live");
return;
}
};
peer.state.connecting_to_live(Id20(h.peer_id));
let result = self.peers.with_peer_mut(handle, |p| {
p.state.connecting_to_live(Id20(h.peer_id)).is_some()
});
match result {
Some(true) => debug!("set peer to live"),
Some(false) => debug!("can't set peer live, it was in wrong state"),
None => debug!("can't set peer live, it disappeared"),
}
}
fn on_peer_died(self: &Arc<Self>, handle: PeerHandle, error: Option<anyhow::Error>) {
let mut g = self.locked.write();
let peer = match g.peers.states.get_mut(&handle) {
let mut pe = match self.peers.states.get_mut(&handle) {
Some(peer) => peer,
None => {
warn!("bug: peer not found in table. Forgetting it forever");
return;
}
};
match std::mem::take(&mut peer.state) {
match std::mem::take(&mut pe.value_mut().state) {
PeerState::Connecting(_) => {}
PeerState::Live(live) => {
let mut g = self.locked.write();
for req in live.inflight_requests {
debug!(
"peer dead, marking chunk request cancelled, index={}, chunk={}",
@ -541,39 +543,25 @@ impl TorrentState {
}
PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => {
warn!("bug: peer was in a wrong state, ignoring it forever");
g.peers.drop_peer(handle);
self.peers.drop_peer(handle);
return;
}
};
// Re-borrow as we were modifying states above
// (otherwise borrow checker rightfully says we're wrong).
let peer = g.peers.states.get_mut(&handle).unwrap();
if error.is_none() {
debug!("peer died without errors, not re-queueing");
peer.state = PeerState::NotNeeded;
pe.value_mut().state = PeerState::NotNeeded;
return;
}
if self.is_finished() {
debug!("torrent finished, not re-queueing");
peer.state = PeerState::NotNeeded;
pe.value_mut().state = PeerState::NotNeeded;
return;
}
peer.state = PeerState::Dead;
let backoff = {
let peer = match g.peers.states.get_mut(&handle) {
Some(p) => p,
None => {
warn!("bug: did not find peer in the list");
return;
}
};
peer.stats.backoff.next_backoff()
};
pe.value_mut().state = PeerState::Dead;
let backoff = pe.value_mut().stats.backoff.next_backoff();
if let Some(dur) = backoff {
let state = self.clone();
@ -587,27 +575,26 @@ impl TorrentState {
),
async move {
tokio::time::sleep(dur).await;
{
let mut g = state.locked.write();
let peer = match g.peers.states.get_mut(&handle) {
Some(p) => p,
None => bail!("bug: peer disappeared"),
};
match &peer.state {
PeerState::Dead => peer.state = PeerState::Queued,
other => bail!(
"peer is in unexpected state: {}. Expected dead",
other.name()
),
}
}
state
.peers
.with_peer_mut(handle, |peer| {
match &peer.state {
PeerState::Dead => peer.state = PeerState::Queued,
other => bail!(
"peer is in unexpected state: {}. Expected dead",
other.name()
),
};
Ok(())
})
.context("bug: peer disappeared")??;
state.peer_queue_tx.send(handle)?;
Ok::<_, anyhow::Error>(())
},
);
} else {
debug!("dropping peer, backoff exhausted");
g.peers.drop_peer(handle);
self.peers.drop_peer(handle);
}
}
@ -629,9 +616,8 @@ impl TorrentState {
fn maybe_transmit_haves(&self, index: ValidPieceIndex) {
let mut futures = Vec::new();
let g = self.locked.read();
for (_, peer) in g.peers.states.iter() {
match &peer.state {
for pe in self.peers.states.iter() {
match &pe.value().state {
PeerState::Live(live) => {
if !live.peer_interested {
continue;
@ -683,7 +669,7 @@ impl TorrentState {
}
pub fn add_peer_if_not_seen(self: &Arc<Self>, addr: SocketAddr) -> bool {
match self.locked.write().peers.add_if_not_seen(addr) {
match self.peers.add_if_not_seen(addr) {
Some(handle) => handle,
None => return false,
};
@ -693,13 +679,12 @@ impl TorrentState {
}
pub fn peer_stats_snapshot(&self) -> AggregatePeerStats {
self.locked.read().peers.stats()
self.peers.stats()
}
pub fn stats_snapshot(&self) -> StatsSnapshot {
let g = self.locked.read();
use Ordering::*;
let peer_stats = g.peers.stats();
let peer_stats = self.peers.stats();
let downloaded = self.stats.downloaded_and_checked.load(Relaxed);
let remaining = self.needed - downloaded;
StatsSnapshot {
@ -710,7 +695,7 @@ impl TorrentState {
uploaded_bytes: self.stats.uploaded.load(Relaxed),
total_bytes: self.have_plus_needed,
live_peers: peer_stats.live as u32,
seen_peers: g.peers.seen.len() as u32,
seen_peers: peer_stats.seen as u32,
connecting_peers: peer_stats.connecting as u32,
time: Instant::now(),
initially_needed_bytes: self.needed,
@ -833,7 +818,8 @@ impl PeerHandler {
);
}
g.peers
self.state
.peers
.clone_tx(peer_handle)
.context("peer died, dropping chunk that it requested")?
};
@ -847,17 +833,12 @@ impl PeerHandler {
}
fn on_have(&self, handle: PeerHandle, have: u32) {
if let Some(bitfield) = self
.state
.locked
.write()
.peers
.get_live_mut(handle)
.and_then(|l| l.bitfield.as_mut())
{
debug!("updated bitfield with have={}", have);
bitfield.set(have as usize, true)
}
self.state.peers.with_live_mut(handle, |live| {
if let Some(bitfield) = live.bitfield.as_mut() {
bitfield.set(have as usize, true);
debug!("updated bitfield with have={}", have);
}
});
}
fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) -> anyhow::Result<()> {
@ -869,19 +850,11 @@ impl PeerHandler {
);
}
self.state
.locked
.write()
.peers
.update_bitfield_from_vec(handle, bitfield.0);
if !self.state.am_i_interested_in_peer(handle) {
let tx = self
.state
.locked
.read()
.peers
.clone_tx(handle)
.context("peer dropped")?;
let tx = self.state.peers.clone_tx(handle).context("peer dropped")?;
tx.send(WriterRequest::Message(MessageOwned::Unchoke))?;
tx.send(WriterRequest::Message(MessageOwned::NotInterested))?;
if self.state.is_finished() {
@ -903,7 +876,7 @@ impl PeerHandler {
}
async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> {
let tx = match self.state.locked.read().peers.clone_tx(handle) {
let tx = match self.state.peers.clone_tx(handle) {
Some(tx) => tx,
None => return Ok(()),
};
@ -917,25 +890,21 @@ impl PeerHandler {
fn on_i_am_choked(&self, handle: PeerHandle) {
debug!("we are choked");
self.state
.locked
.write()
.peers
.mark_i_am_choked(handle, true);
self.state.peers.mark_i_am_choked(handle, true);
}
fn on_peer_interested(&self, handle: PeerHandle) {
debug!("peer is interested");
self.state
.locked
.write()
.peers
.mark_peer_interested(handle, true);
self.state.peers.mark_peer_interested(handle, true);
}
async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> {
let notify = match self.state.locked.read().peers.get_live(handle) {
Some(l) => l.have_notify.clone(),
let notify = match self
.state
.peers
.with_live(handle, |l| l.have_notify.clone())
{
Some(notify) => notify,
None => return Ok(()),
};
@ -984,29 +953,29 @@ impl PeerHandler {
},
};
let tx = match self.state.locked.read().peers.clone_tx(handle) {
Some(tx) => tx,
None => return Ok(()),
};
let sem = match self.state.locked.read().peers.get_live(handle) {
Some(live) => live.requests_sem.clone(),
let (tx, sem) = match self
.state
.peers
.with_live(handle, |l| (l.tx.clone(), l.requests_sem.clone()))
{
Some((tx, sem)) => (tx, sem),
None => return Ok(()),
};
for chunk in self.state.lengths.iter_chunk_infos(next) {
if self.state.locked.read().chunks.is_chunk_downloaded(&chunk) {
continue;
}
if !self
.state
.locked
.write()
.peers
.try_get_live_mut(handle)?
.inflight_requests
.insert(InflightRequest::from(&chunk))
{
warn!("probably a bug, we already requested {:?}", chunk);
continue;
match self.state.peers.with_live_mut(handle, |l| {
l.inflight_requests.insert(InflightRequest::from(&chunk))
}) {
Some(true) => {}
Some(false) => {
warn!("probably a bug, we already requested {:?}", chunk);
continue;
}
None => bail!("peer dropped"),
}
let request = Request {
@ -1053,14 +1022,11 @@ impl PeerHandler {
fn on_i_am_unchoked(&self, handle: PeerHandle) {
debug!("we are unchoked");
let mut g = self.state.locked.write();
let live = match g.peers.get_live_mut(handle) {
Some(live) => live,
None => return,
};
live.i_am_choked = false;
live.have_notify.notify_waiters();
live.requests_sem.add_permits(16);
self.state.peers.with_live_mut(handle, |live| {
live.i_am_choked = false;
live.have_notify.notify_waiters();
live.requests_sem.add_permits(16);
});
}
fn on_received_piece(&self, handle: PeerHandle, piece: Piece<ByteBuf>) -> anyhow::Result<()> {
@ -1076,31 +1042,36 @@ impl PeerHandler {
};
let mut g = self.state.locked.write();
let h = g.peers.try_get_live_mut(handle)?;
h.requests_sem.add_permits(1);
self.state
.stats
.fetched_bytes
.fetch_add(piece.block.len() as u64, Ordering::Relaxed);
.peers
.with_live_mut(handle, |h| {
h.requests_sem.add_permits(1);
if !h
.inflight_requests
.remove(&InflightRequest::from(&chunk_info))
{
anyhow::bail!(
"peer sent us a piece we did not ask. Requested pieces: {:?}. Got: {:?}",
&h.inflight_requests,
&piece,
);
}
self.state
.stats
.fetched_bytes
.fetch_add(piece.block.len() as u64, Ordering::Relaxed);
if !h
.inflight_requests
.remove(&InflightRequest::from(&chunk_info))
{
anyhow::bail!(
"peer sent us a piece we did not ask. Requested pieces: {:?}. Got: {:?}",
&h.inflight_requests,
&piece,
);
}
Ok(())
})
.context("peer not found")??;
let full_piece_download_time = match g.chunks.mark_chunk_downloaded(&piece) {
Some(ChunkMarkingResult::Completed) => {
debug!("piece={} done, will write and checksum", piece.index,);
// This will prevent others from stealing it.
g.peers
.remove_inflight_piece(chunk_info.piece_index)
g.remove_inflight_piece(chunk_info.piece_index)
.map(|t| t.started.elapsed())
}
Some(ChunkMarkingResult::PreviouslyCompleted) => {
@ -1171,7 +1142,7 @@ impl PeerHandler {
let mut g = self.state.locked.write();
g.chunks.mark_piece_downloaded(chunk_info.piece_index);
g.peers.reset_peer_backoff(handle);
self.state.peers.reset_peer_backoff(handle);
}
debug!("piece={} successfully downloaded and verified", index);
@ -1200,11 +1171,10 @@ impl PeerHandler {
}
fn disconnect_all_peers_that_have_full_torrent(&self) {
let mut g = self.state.locked.write();
for (_, peer) in g.peers.states.iter_mut() {
if let PeerState::Live(l) = &peer.state {
for mut pe in self.state.peers.states.iter_mut() {
if let PeerState::Live(l) = &pe.value().state {
if l.has_full_torrent(self.state.lengths.total_pieces() as usize) {
let live = peer.state.to_not_needed().unwrap();
let live = pe.value_mut().state.to_not_needed().unwrap();
let _ = live.tx.send(WriterRequest::Disconnect);
}
}