This commit is contained in:
Igor Katson 2021-07-04 12:11:02 +01:00
parent 64b1e47c77
commit 60c0c73005
3 changed files with 59 additions and 38 deletions

View file

@ -21,6 +21,7 @@ impl From<&ChunkInfo> for InflightRequest {
}
pub enum PeerState {
Queued(SocketAddr),
Connecting(SocketAddr),
Live(LivePeerState),
}

View file

@ -266,11 +266,12 @@ impl TorrentManager {
(stats.downloaded_and_checked_bytes as f64 / needed as f64) * 100f64
};
info!(
"Stats: downloaded {:.2}% ({:.2}), peers {{live: {}, connecting: {}, seen: {}}}, fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, total have {:.2}",
"Stats: downloaded {:.2}% ({:.2}), peers {{live: {}, connecting: {}, queued: {}, seen: {}}}, fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, total have {:.2}",
downloaded_pct,
SF::new(stats.downloaded_and_checked_bytes),
live_peer_stats.live,
live_peer_stats.connecting,
live_peer_stats.queued,
seen_peers_count,
SF::new(stats.fetched_bytes),
SF::new(stats.remaining_bytes),

View file

@ -56,6 +56,7 @@ pub struct PeerStates {
#[derive(Debug, Default)]
pub struct AggregatePeerStats {
pub queued: usize,
pub connecting: usize,
pub live: usize,
}
@ -68,6 +69,7 @@ impl PeerStates {
match p {
PeerState::Connecting(_) => s.connecting += 1,
PeerState::Live(_) => s.live += 1,
PeerState::Queued(_) => s.queued += 1,
};
s
})
@ -112,7 +114,7 @@ impl PeerStates {
if self.states.contains_key(&addr) {
return None;
}
self.states.insert(handle, PeerState::Connecting(addr));
self.states.insert(handle, PeerState::Queued(addr));
self.tx.insert(handle, Arc::new(tx));
Some(handle)
}
@ -162,20 +164,18 @@ pub struct TorrentStateLocked {
}
#[derive(Default, Debug)]
pub struct AtomicStats {
pub have: AtomicU64,
pub downloaded_and_checked: AtomicU64,
pub uploaded: AtomicU64,
pub fetched_bytes: AtomicU64,
struct AtomicStats {
have: AtomicU64,
downloaded_and_checked: AtomicU64,
uploaded: AtomicU64,
fetched_bytes: AtomicU64,
pub downloaded_pieces: AtomicU64,
pub total_piece_download_ms: AtomicU64,
pub queued_peers: AtomicU64,
downloaded_pieces: AtomicU64,
total_piece_download_ms: AtomicU64,
}
impl AtomicStats {
pub fn average_piece_download_time(&self) -> Option<Duration> {
fn average_piece_download_time(&self) -> Option<Duration> {
let d = self.downloaded_pieces.load(Ordering::Relaxed);
let t = self.total_piece_download_ms.load(Ordering::Relaxed);
if d == 0 {
@ -224,6 +224,7 @@ pub struct TorrentState {
stats: AtomicStats,
spawner: BlockingSpawner,
peer_semaphore: Semaphore,
peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver<WriterRequest>)>,
}
@ -241,7 +242,6 @@ impl TorrentState {
spawner: BlockingSpawner,
) -> Arc<Self> {
let (peer_queue_tx, mut peer_queue_rx) = unbounded_channel();
let peer_semaphore = Arc::new(Semaphore::new(128));
let state = Arc::new(TorrentState {
info_hash,
info,
@ -259,6 +259,7 @@ impl TorrentState {
lengths,
spawner,
peer_semaphore: Semaphore::new(128),
peer_queue_tx,
});
spawn("peer adder", {
@ -266,8 +267,29 @@ impl TorrentState {
async move {
loop {
let (addr, out_rx) = peer_queue_rx.recv().await.unwrap();
state.stats.queued_peers.fetch_sub(1, Ordering::Relaxed);
let permit = peer_semaphore.clone().acquire_owned().await.unwrap();
{
// Update state to connecting.
match state.locked.write().peers.states.get_mut(&addr) {
Some(s) => match s {
PeerState::Queued(_) => *s = PeerState::Connecting(addr),
PeerState::Connecting(_) => {
warn!("did not expect to see the peer {} connecting", addr);
continue;
}
PeerState::Live(_) => {
warn!("did not expect to see the peer {} live", addr);
continue;
}
},
None => {
warn!("did not find peer state for {}", addr);
continue;
}
};
}
state.peer_semaphore.acquire().await.unwrap().forget();
let handler = PeerHandler {
addr,
@ -282,7 +304,7 @@ impl TorrentState {
};
let state = peer_connection.into_handler().state;
state.drop_peer(addr);
drop(permit);
state.peer_semaphore.add_permits(1);
Ok::<_, anyhow::Error>(())
});
}
@ -305,11 +327,8 @@ impl TorrentState {
pub fn initially_needed(&self) -> u64 {
self.needed
}
fn stats(&self) -> &AtomicStats {
&self.stats
}
pub fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
let g = self.locked.read();
let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?;
for n in g.chunks.get_needed_pieces().iter_ones() {
@ -321,7 +340,7 @@ impl TorrentState {
None
}
pub fn am_i_choked(&self, peer_handle: PeerHandle) -> Option<bool> {
fn am_i_choked(&self, peer_handle: PeerHandle) -> Option<bool> {
self.locked
.read()
.peers
@ -329,7 +348,7 @@ impl TorrentState {
.map(|l| l.i_am_choked)
}
pub fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
if self.am_i_choked(peer_handle)? {
warn!("we are choked by {}, can't reserve next piece", peer_handle);
return None;
@ -358,11 +377,11 @@ impl TorrentState {
Some(n)
}
pub fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool {
fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool {
self.get_next_needed_piece(handle).is_some()
}
pub fn try_steal_old_slow_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
fn try_steal_old_slow_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
let total = self.stats.downloaded_pieces.load(Ordering::Relaxed);
// heuristic for not enough precision in average time
@ -394,7 +413,7 @@ impl TorrentState {
None
}
pub fn try_steal_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
fn try_steal_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
let mut rng = rand::thread_rng();
use rand::seq::IteratorRandom;
let g = self.locked.read();
@ -407,7 +426,7 @@ impl TorrentState {
.copied()
}
pub fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
let mut g = self.locked.write();
match g.peers.states.get_mut(&handle) {
Some(s @ &mut PeerState::Connecting(_)) => {
@ -419,7 +438,7 @@ impl TorrentState {
}
}
pub fn drop_peer(&self, handle: PeerHandle) -> bool {
fn drop_peer(&self, handle: PeerHandle) -> bool {
let mut g = self.locked.write();
let peer = match g.peers.drop_peer(handle) {
Some(peer) => peer,
@ -432,6 +451,7 @@ impl TorrentState {
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
}
}
PeerState::Queued(_) => {}
}
true
}
@ -447,7 +467,7 @@ impl TorrentState {
self.needed - self.get_downloaded()
}
pub fn maybe_transmit_haves(&self, index: ValidPieceIndex) {
fn maybe_transmit_haves(&self, index: ValidPieceIndex) {
let mut futures = Vec::new();
let g = self.locked.read();
@ -509,7 +529,6 @@ impl TorrentState {
None => return false,
};
self.stats.queued_peers.fetch_add(1, Ordering::Relaxed);
match self.peer_queue_tx.send((addr, out_rx)) {
Ok(_) => {}
Err(_) => {
@ -522,14 +541,14 @@ impl TorrentState {
pub fn stats_snapshot(&self) -> StatsSnapshot {
let g = self.locked.read();
use Ordering::*;
let (live, connecting) =
g.peers
.states
.values()
.fold((0u32, 0u32), |(live, connecting), p| match p {
PeerState::Connecting(_) => (live, connecting + 1),
PeerState::Live(_) => (live + 1, connecting),
});
let (live, connecting, queued) = g.peers.states.values().fold(
(0u32, 0u32, 0u32),
|(live, connecting, queued), p| match p {
PeerState::Connecting(_) => (live, connecting + 1, queued),
PeerState::Live(_) => (live + 1, connecting, queued),
PeerState::Queued(_) => (live, connecting, queued + 1),
},
);
let downloaded = self.stats.downloaded_and_checked.load(Relaxed);
let remaining = self.needed - downloaded;
StatsSnapshot {
@ -544,7 +563,7 @@ impl TorrentState {
time: Instant::now(),
initially_needed_bytes: self.needed,
remaining_bytes: remaining,
queued_peers: self.stats.queued_peers.load(Relaxed) as u32,
queued_peers: queued,
total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed),
}
}