Add a semaphore around peers

This commit is contained in:
Igor Katson 2021-07-04 11:36:16 +01:00
parent 9472d66bf9
commit 64b1e47c77
4 changed files with 96 additions and 48 deletions

View file

@ -2,7 +2,6 @@ use std::sync::Arc;
use librqbit_core::speed_estimator::SpeedEstimator;
use std::io::Write;
use std::sync::atomic::Ordering;
use std::time::Instant;
use warp::Filter;
@ -21,22 +20,22 @@ pub async fn make_and_run_http_api(
let dump_stats = warp::path("stats").map({
let state = state.clone();
let start_time = Instant::now();
let initial_downloaded_and_checked =
state.stats().downloaded_and_checked.load(Ordering::Relaxed);
let initial_downloaded_and_checked = state.stats_snapshot().downloaded_and_checked_bytes;
move || {
let snapshot = state.stats_snapshot();
let mut buf = Vec::new();
writeln!(buf, "{:#?}", state.stats_snapshot()).unwrap();
writeln!(
buf,
"Average download time: {:?}",
state.stats().average_piece_download_time()
snapshot.average_piece_download_time()
)
.unwrap();
// Poor mans download speed computation
let elapsed = start_time.elapsed();
let downloaded_bytes = state.stats().downloaded_and_checked.load(Ordering::Relaxed)
- initial_downloaded_and_checked;
let downloaded_bytes =
snapshot.downloaded_and_checked_bytes - initial_downloaded_and_checked;
let downloaded_mb = downloaded_bytes as f64 / 1024f64 / 1024f64;
writeln!(
buf,

View file

@ -111,9 +111,16 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let mut conn = tokio::net::TcpStream::connect(self.addr)
.await
.context("error connecting")?;
let mut conn = match timeout(
Duration::from_secs(10),
tokio::net::TcpStream::connect(self.addr),
)
.await
{
Ok(conn) => conn.context("error connecting")?,
Err(_) => anyhow::bail!("timeout connecting to {}", self.addr),
};
let mut write_buf = Vec::<u8>::with_capacity(PIECE_MESSAGE_DEFAULT_LEN);
let handshake = Handshake::new(self.info_hash, self.peer_id);
handshake.serialize(&mut write_buf);

View file

@ -209,7 +209,7 @@ impl TorrentManager {
lengths,
);
let state = Arc::new(TorrentState::new(
let state = TorrentState::new(
info,
info_hash,
peer_id,
@ -219,7 +219,7 @@ impl TorrentManager {
initial_check_results.have_bytes,
initial_check_results.needed_bytes,
spawner,
));
);
let estimator = Arc::new(SpeedEstimator::new(5));
@ -242,7 +242,7 @@ impl TorrentManager {
let state = mgr.state.clone();
async move {
loop {
let downloaded = state.stats().downloaded_and_checked.load(Ordering::Relaxed);
let downloaded = state.stats_snapshot().downloaded_and_checked_bytes;
let needed = state.initially_needed();
let remaining = needed - downloaded;
estimator.add_snapshot(downloaded, remaining, Instant::now());
@ -258,33 +258,25 @@ impl TorrentManager {
loop {
let live_peer_stats = self.state.locked.read().peers.stats();
let seen_peers_count = self.state.locked.read().peers.seen().len();
let have = self.state.stats().have.load(Ordering::Relaxed);
let fetched = self.state.stats().fetched_bytes.load(Ordering::Relaxed);
let stats = self.state.stats_snapshot();
let needed = self.state.initially_needed();
let downloaded = self
.state
.stats()
.downloaded_and_checked
.load(Ordering::Relaxed);
let remaining = needed - downloaded;
let uploaded = self.state.stats().uploaded.load(Ordering::Relaxed);
let downloaded_pct = if downloaded == needed {
let downloaded_pct = if stats.remaining_bytes == 0 {
100f64
} else {
(downloaded as f64 / needed as f64) * 100f64
(stats.downloaded_and_checked_bytes as f64 / needed as f64) * 100f64
};
info!(
"Stats: downloaded {:.2}% ({:.2}), peers {{live: {}, connecting: {}, seen: {}}}, fetched {}, remaining {:.2} out of {:.2}, uploaded {:.2}, total have {:.2}",
downloaded_pct,
SF::new(downloaded),
SF::new(stats.downloaded_and_checked_bytes),
live_peer_stats.live,
live_peer_stats.connecting,
seen_peers_count,
SF::new(fetched),
SF::new(remaining),
SF::new(stats.fetched_bytes),
SF::new(stats.remaining_bytes),
SF::new(needed),
SF::new(uploaded),
SF::new(have)
SF::new(stats.uploaded_bytes),
SF::new(stats.have_bytes)
);
tokio::time::sleep(Duration::from_secs(1)).await;
}

View file

@ -24,7 +24,13 @@ use peer_binary_protocol::{
extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request,
};
use sha1w::Sha1;
use tokio::{sync::mpsc::UnboundedSender, time::timeout};
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Semaphore,
},
time::timeout,
};
use crate::{
chunk_tracker::{ChunkMarkingResult, ChunkTracker},
@ -164,6 +170,8 @@ pub struct AtomicStats {
pub downloaded_pieces: AtomicU64,
pub total_piece_download_ms: AtomicU64,
pub queued_peers: AtomicU64,
}
impl AtomicStats {
@ -190,6 +198,19 @@ pub struct StatsSnapshot {
pub seen_peers: u32,
pub connecting_peers: u32,
pub time: Instant,
pub queued_peers: u32,
total_piece_download_ms: u64,
}
impl StatsSnapshot {
pub fn average_piece_download_time(&self) -> Option<Duration> {
let d = self.downloaded_and_checked_pieces;
let t = self.total_piece_download_ms;
if d == 0 {
return None;
}
Some(Duration::from_secs_f64(t as f64 / d as f64 / 1000f64))
}
}
pub struct TorrentState {
@ -201,8 +222,9 @@ pub struct TorrentState {
lengths: Lengths,
needed: u64,
stats: AtomicStats,
spawner: BlockingSpawner,
peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver<WriterRequest>)>,
}
impl TorrentState {
@ -217,8 +239,10 @@ impl TorrentState {
have_bytes: u64,
needed_bytes: u64,
spawner: BlockingSpawner,
) -> Self {
TorrentState {
) -> Arc<Self> {
let (peer_queue_tx, mut peer_queue_rx) = unbounded_channel();
let peer_semaphore = Arc::new(Semaphore::new(128));
let state = Arc::new(TorrentState {
info_hash,
info,
peer_id,
@ -234,7 +258,37 @@ impl TorrentState {
needed: needed_bytes,
lengths,
spawner,
}
peer_queue_tx,
});
spawn("peer adder", {
let state = state.clone();
async move {
loop {
let (addr, out_rx) = peer_queue_rx.recv().await.unwrap();
state.stats.queued_peers.fetch_sub(1, Ordering::Relaxed);
let permit = peer_semaphore.clone().acquire_owned().await.unwrap();
let handler = PeerHandler {
addr,
state: state.clone(),
spawner: state.spawner,
};
let peer_connection =
PeerConnection::new(addr, state.info_hash, state.peer_id, handler);
spawn(format!("manage_peer({})", addr), async move {
if let Err(e) = peer_connection.manage_peer(out_rx).await {
debug!("error managing peer {}: {:#}", addr, e)
};
let state = peer_connection.into_handler().state;
state.drop_peer(addr);
drop(permit);
Ok::<_, anyhow::Error>(())
});
}
}
});
state
}
pub fn info(&self) -> &TorrentMetaV1Info<ByteString> {
&self.info
@ -251,7 +305,7 @@ impl TorrentState {
pub fn initially_needed(&self) -> u64 {
self.needed
}
pub fn stats(&self) -> &AtomicStats {
fn stats(&self) -> &AtomicStats {
&self.stats
}
@ -450,24 +504,18 @@ impl TorrentState {
pub fn add_peer_if_not_seen(self: &Arc<Self>, addr: SocketAddr) -> bool {
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<WriterRequest>();
let handle = match self.locked.write().peers.add_if_not_seen(addr, out_tx) {
match self.locked.write().peers.add_if_not_seen(addr, out_tx) {
Some(handle) => handle,
None => return false,
};
let handler = PeerHandler {
addr,
state: self.clone(),
spawner: self.spawner,
};
let peer_connection = PeerConnection::new(addr, self.info_hash, self.peer_id, handler);
spawn(format!("manage_peer({})", handle), async move {
if let Err(e) = peer_connection.manage_peer(out_rx).await {
debug!("error managing peer {}: {:#}", handle, e)
};
peer_connection.into_handler().state.drop_peer(handle);
Ok::<_, anyhow::Error>(())
});
self.stats.queued_peers.fetch_add(1, Ordering::Relaxed);
match self.peer_queue_tx.send((addr, out_rx)) {
Ok(_) => {}
Err(_) => {
warn!("peer adder died, can't add peer")
}
}
true
}
@ -496,6 +544,8 @@ impl TorrentState {
time: Instant::now(),
initially_needed_bytes: self.needed,
remaining_bytes: remaining,
queued_peers: self.stats.queued_peers.load(Relaxed) as u32,
total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed),
}
}
}