This commit is contained in:
Igor Katson 2021-07-04 12:17:06 +01:00
parent 60c0c73005
commit b4f6d8b93d
2 changed files with 22 additions and 44 deletions

View file

@ -20,12 +20,14 @@ impl From<&ChunkInfo> for InflightRequest {
}
}
#[derive(Debug)]
pub enum PeerState {
Queued(SocketAddr),
Connecting(SocketAddr),
Queued,
Connecting,
Live(LivePeerState),
}
#[derive(Debug)]
pub struct LivePeerState {
pub peer_id: [u8; 20],
pub i_am_choked: bool,

View file

@ -67,9 +67,9 @@ impl PeerStates {
.values()
.fold(AggregatePeerStats::default(), |mut s, p| {
match p {
PeerState::Connecting(_) => s.connecting += 1,
PeerState::Connecting => s.connecting += 1,
PeerState::Live(_) => s.live += 1,
PeerState::Queued(_) => s.queued += 1,
PeerState::Queued => s.queued += 1,
};
s
})
@ -114,7 +114,7 @@ impl PeerStates {
if self.states.contains_key(&addr) {
return None;
}
self.states.insert(handle, PeerState::Queued(addr));
self.states.insert(handle, PeerState::Queued);
self.tx.insert(handle, Arc::new(tx));
Some(handle)
}
@ -268,26 +268,13 @@ impl TorrentState {
loop {
let (addr, out_rx) = peer_queue_rx.recv().await.unwrap();
{
// Update state to connecting.
match state.locked.write().peers.states.get_mut(&addr) {
Some(s) => match s {
PeerState::Queued(_) => *s = PeerState::Connecting(addr),
PeerState::Connecting(_) => {
warn!("did not expect to see the peer {} connecting", addr);
continue;
}
PeerState::Live(_) => {
warn!("did not expect to see the peer {} live", addr);
continue;
}
},
None => {
warn!("did not find peer state for {}", addr);
continue;
}
};
}
match state.locked.write().peers.states.get_mut(&addr) {
Some(s @ PeerState::Queued) => *s = PeerState::Connecting,
s => {
warn!("did not expect to see the peer in state {:?}", s);
continue;
}
};
state.peer_semaphore.acquire().await.unwrap().forget();
@ -429,7 +416,7 @@ impl TorrentState {
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
let mut g = self.locked.write();
match g.peers.states.get_mut(&handle) {
Some(s @ &mut PeerState::Connecting(_)) => {
Some(s @ &mut PeerState::Connecting) => {
*s = PeerState::Live(LivePeerState::new(h.peer_id));
}
_ => {
@ -444,14 +431,10 @@ impl TorrentState {
Some(peer) => peer,
None => return false,
};
match peer {
PeerState::Connecting(_) => {}
PeerState::Live(l) => {
for req in l.inflight_requests {
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
}
if let PeerState::Live(l) = peer {
for req in l.inflight_requests {
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
}
PeerState::Queued(_) => {}
}
true
}
@ -541,14 +524,7 @@ impl TorrentState {
pub fn stats_snapshot(&self) -> StatsSnapshot {
let g = self.locked.read();
use Ordering::*;
let (live, connecting, queued) = g.peers.states.values().fold(
(0u32, 0u32, 0u32),
|(live, connecting, queued), p| match p {
PeerState::Connecting(_) => (live, connecting + 1, queued),
PeerState::Live(_) => (live + 1, connecting, queued),
PeerState::Queued(_) => (live, connecting, queued + 1),
},
);
let peer_stats = g.peers.stats();
let downloaded = self.stats.downloaded_and_checked.load(Relaxed);
let remaining = self.needed - downloaded;
StatsSnapshot {
@ -557,13 +533,13 @@ impl TorrentState {
downloaded_and_checked_pieces: self.stats.downloaded_pieces.load(Relaxed),
fetched_bytes: self.stats.fetched_bytes.load(Relaxed),
uploaded_bytes: self.stats.fetched_bytes.load(Relaxed),
live_peers: live,
live_peers: peer_stats.live as u32,
seen_peers: g.peers.seen.len() as u32,
connecting_peers: connecting,
connecting_peers: peer_stats.connecting as u32,
time: Instant::now(),
initially_needed_bytes: self.needed,
remaining_bytes: remaining,
queued_peers: queued,
queued_peers: peer_stats.queued as u32,
total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed),
}
}