Peers now reconnect!

This commit is contained in:
Igor Katson 2023-11-17 23:30:07 +00:00
parent 55e692d476
commit 6ebf2120a4
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
5 changed files with 105 additions and 20 deletions

21
Cargo.lock generated
View file

@ -146,6 +146,17 @@ dependencies = [
"tower-service",
]
[[package]]
name = "backoff"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1"
dependencies = [
"getrandom",
"instant",
"rand",
]
[[package]]
name = "backtrace"
version = "0.3.69"
@ -755,6 +766,15 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
[[package]]
name = "ipnet"
version = "2.9.0"
@ -825,6 +845,7 @@ version = "2.2.2"
dependencies = [
"anyhow",
"axum",
"backoff",
"bincode",
"bitvec",
"byteorder",

View file

@ -55,7 +55,8 @@ uuid = {version = "1.2", features = ["v4"]}
futures = "0.3"
url = "2"
hex = "0.4"
backoff = "0.4.0"
[dev-dependencies]
futures = {version = "0.3"}
pretty_env_logger = "0.5"
pretty_env_logger = "0.5"

View file

@ -1,5 +1,7 @@
use std::time::Duration;
use std::{collections::HashSet, sync::Arc};
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
use librqbit_core::id20::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
@ -27,9 +29,22 @@ impl From<&ChunkInfo> for InflightRequest {
pub type PeerRx = UnboundedReceiver<WriterRequest>;
pub type PeerTx = Arc<UnboundedSender<WriterRequest>>;
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct PeerStats {
pub unsuccessful_connection_attempts: usize,
pub backoff: ExponentialBackoff,
}
impl Default for PeerStats {
fn default() -> Self {
Self {
backoff: ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_secs(10))
.with_multiplier(6.)
.with_max_interval(Duration::from_secs(3600))
.with_max_elapsed_time(Some(Duration::from_secs(86400)))
.build(),
}
}
}
#[derive(Debug, Default)]
@ -44,6 +59,7 @@ pub enum PeerState {
Queued,
Connecting(PeerTx),
Live(LivePeerState),
Dead,
}
#[derive(Debug)]

View file

@ -11,6 +11,7 @@ use std::{
};
use anyhow::{bail, Context};
use backoff::backoff::Backoff;
use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use futures::{stream::FuturesUnordered, StreamExt};
@ -28,7 +29,7 @@ use serde::Serialize;
use sha1w::Sha1;
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
mpsc::{unbounded_channel, UnboundedSender},
Notify, Semaphore,
},
time::timeout,
@ -63,6 +64,7 @@ pub struct AggregatePeerStats {
pub connecting: usize,
pub live: usize,
pub seen: usize,
pub dead: usize,
}
impl PeerStates {
@ -75,6 +77,7 @@ impl PeerStates {
PeerState::Connecting(_) => s.connecting += 1,
PeerState::Live(_) => s.live += 1,
PeerState::Queued => s.queued += 1,
PeerState::Dead => s.dead += 1,
};
s
});
@ -116,6 +119,13 @@ impl PeerStates {
self.states.insert(handle, Default::default());
Some(handle)
}
pub fn mark_peer_dead(&mut self, handle: PeerHandle) -> Option<LivePeerState> {
let peer = self.states.get_mut(&handle)?;
match std::mem::replace(&mut peer.state, PeerState::Dead) {
PeerState::Live(l) => Some(l),
_ => None,
}
}
pub fn drop_peer(&mut self, handle: PeerHandle) -> Option<Peer> {
self.states.remove(&handle)
}
@ -169,6 +179,14 @@ impl PeerStates {
pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option<InflightPiece> {
self.inflight_pieces.remove(&piece)
}
fn mark_peer_trustworthy(&mut self, handle: SocketAddr) {
let p = match self.states.get_mut(&handle) {
Some(p) => p,
None => return,
};
p.stats.backoff.reset();
}
}
pub struct TorrentStateLocked {
@ -332,7 +350,7 @@ impl TorrentState {
debug!("error managing peer {}: {:#}", addr, e)
};
let state = peer_connection.into_handler().state;
state.drop_peer(addr);
state.on_peer_died(addr);
state.peer_semaphore.add_permits(1);
Ok::<_, anyhow::Error>(())
}
@ -482,18 +500,45 @@ impl TorrentState {
*s = PeerState::Live(LivePeerState::new(Id20(h.peer_id), tx));
}
fn drop_peer(&self, handle: PeerHandle) -> bool {
fn on_peer_died(self: &Arc<Self>, handle: PeerHandle) {
let mut g = self.locked.write();
let peer = match g.peers.drop_peer(handle) {
let live = match g.peers.mark_peer_dead(handle) {
Some(peer) => peer,
None => return false,
None => return,
};
if let PeerState::Live(l) = peer.state {
for req in l.inflight_requests {
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
}
for req in live.inflight_requests {
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
}
let backoff = g
.peers
.states
.get_mut(&handle)
.unwrap()
.stats
.backoff
.next_backoff();
if let Some(dur) = backoff {
let state = self.clone();
spawn("wait for peer", async move {
tokio::time::sleep(dur).await;
{
let mut g = state.locked.write();
let peer = match g.peers.states.get_mut(&handle) {
Some(p) => p,
None => bail!("bug: peer disappeared"),
};
match &peer.state {
PeerState::Dead => peer.state = PeerState::Queued,
_ => bail!("peer in unexpected state"),
}
}
state.peer_queue_tx.send(handle)?;
Ok::<_, anyhow::Error>(())
});
} else {
g.peers.drop_peer(handle);
}
true
}
pub fn get_uploaded(&self) -> u64 {
@ -511,7 +556,7 @@ impl TorrentState {
let mut futures = Vec::new();
let g = self.locked.read();
for (handle, peer) in g.peers.states.iter() {
for (_, peer) in g.peers.states.iter() {
match &peer.state {
PeerState::Live(live) => {
if !live.peer_interested {
@ -1064,11 +1109,12 @@ impl PeerHandler {
full_piece_download_time.as_millis() as u64,
Ordering::Relaxed,
);
self.state
.locked
.write()
.chunks
.mark_piece_downloaded(chunk_info.piece_index);
{
let mut g = self.state.locked.write();
g.chunks.mark_piece_downloaded(chunk_info.piece_index);
g.peers.mark_peer_trustworthy(handle);
}
debug!(
"piece={} successfully downloaded and verified from {}",

View file

@ -223,7 +223,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
(progress as f64 / total as f64) * 100f64
};
info!(
"[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}}}",
"[{}]: {:.2}% ({:.2}), down speed {:.2} MiB/s, fetched {}, remaining {:.2} of {:.2}, uploaded {:.2}, peers: {{live: {}, connecting: {}, queued: {}, seen: {}, dead: {}}}",
idx,
downloaded_pct,
SF::new(progress),
@ -236,6 +236,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
peer_stats.connecting,
peer_stats.queued,
peer_stats.seen,
peer_stats.dead
);
},
}