Fixed bugs
This commit is contained in:
parent
6ebf2120a4
commit
2203ffe4a9
7 changed files with 536 additions and 44 deletions
|
|
@ -31,6 +31,7 @@ pub trait PeerConnectionHandler {
|
|||
pub enum WriterRequest {
|
||||
Message(MessageOwned),
|
||||
ReadChunkRequest(ChunkInfo),
|
||||
Disconnect,
|
||||
}
|
||||
|
||||
#[derive(Default, Copy, Clone)]
|
||||
|
|
@ -247,6 +248,9 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
|||
uploaded_add = Some(chunk.size);
|
||||
full_len
|
||||
}
|
||||
WriterRequest::Disconnect => {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
debug!("sending to {}: {:?}, length={}", self.addr, &req, len);
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use std::{collections::HashSet, sync::Arc};
|
|||
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
|
||||
use librqbit_core::id20::Id20;
|
||||
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::{Notify, Semaphore};
|
||||
|
||||
use crate::peer_connection::WriterRequest;
|
||||
|
|
@ -56,10 +56,82 @@ pub struct Peer {
|
|||
#[derive(Debug, Default)]
|
||||
pub enum PeerState {
|
||||
#[default]
|
||||
// Will be tried to be connected as soon as possible.
|
||||
Queued,
|
||||
Connecting(PeerTx),
|
||||
Live(LivePeerState),
|
||||
// There was an error, and it's waiting for exponential backoff.
|
||||
Dead,
|
||||
// The peer has the full torrent, and we have the full torrent, so no need
|
||||
// to keep talking to it.
|
||||
FullyHaveNoLongerNeeded,
|
||||
}
|
||||
|
||||
impl PeerState {
|
||||
fn take_connecting(&mut self) -> Option<PeerTx> {
|
||||
if let PeerState::Connecting(_) = self {
|
||||
match std::mem::take(self) {
|
||||
PeerState::Connecting(tx) => Some(tx),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn take_live(&mut self) -> Option<LivePeerState> {
|
||||
if let PeerState::Live(_) = self {
|
||||
match std::mem::take(self) {
|
||||
PeerState::Live(l) => Some(l),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
|
||||
match self {
|
||||
PeerState::Live(l) => Some(l),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn queued_to_connecting(&mut self) -> Option<PeerRx> {
|
||||
if let PeerState::Queued = self {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
*self = PeerState::Connecting(Arc::new(tx));
|
||||
Some(rx)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
pub fn connecting_to_live(&mut self, peer_id: Id20) -> Option<&mut LivePeerState> {
|
||||
let tx = self.take_connecting()?;
|
||||
*self = PeerState::Live(LivePeerState::new(peer_id, tx));
|
||||
self.get_live_mut()
|
||||
}
|
||||
|
||||
pub fn dead_to_queued(&mut self) -> bool {
|
||||
if let PeerState::Dead = self {
|
||||
*self = PeerState::Queued;
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub fn to_dead(&mut self) -> Option<LivePeerState> {
|
||||
match std::mem::replace(self, PeerState::Dead) {
|
||||
PeerState::Live(l) => Some(l),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn live_to(&mut self, new_state: PeerState) -> Option<LivePeerState> {
|
||||
let l = self.take_live()?;
|
||||
*self = new_state;
|
||||
Some(l)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
|
@ -87,4 +159,12 @@ impl LivePeerState {
|
|||
tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn has_full_torrent(&self, total_pieces: usize) -> bool {
|
||||
let bf = match self.bitfield.as_ref() {
|
||||
Some(bf) => bf,
|
||||
None => return false,
|
||||
};
|
||||
bf.get(0..total_pieces).map_or(false, |s| s.all())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,6 +65,7 @@ pub struct AggregatePeerStats {
|
|||
pub live: usize,
|
||||
pub seen: usize,
|
||||
pub dead: usize,
|
||||
pub fully_have_and_we_are_finished: usize,
|
||||
}
|
||||
|
||||
impl PeerStates {
|
||||
|
|
@ -78,6 +79,7 @@ impl PeerStates {
|
|||
PeerState::Live(_) => s.live += 1,
|
||||
PeerState::Queued => s.queued += 1,
|
||||
PeerState::Dead => s.dead += 1,
|
||||
PeerState::FullyHaveNoLongerNeeded => s.fully_have_and_we_are_finished += 1,
|
||||
};
|
||||
s
|
||||
});
|
||||
|
|
@ -121,10 +123,7 @@ impl PeerStates {
|
|||
}
|
||||
pub fn mark_peer_dead(&mut self, handle: PeerHandle) -> Option<LivePeerState> {
|
||||
let peer = self.states.get_mut(&handle)?;
|
||||
match std::mem::replace(&mut peer.state, PeerState::Dead) {
|
||||
PeerState::Live(l) => Some(l),
|
||||
_ => None,
|
||||
}
|
||||
peer.state.to_dead()
|
||||
}
|
||||
pub fn drop_peer(&mut self, handle: PeerHandle) -> Option<Peer> {
|
||||
self.states.remove(&handle)
|
||||
|
|
@ -161,16 +160,11 @@ impl PeerStates {
|
|||
.states
|
||||
.get_mut(&h)
|
||||
.context("peer not found in states")?;
|
||||
match &mut peer.state {
|
||||
s @ PeerState::Queued => {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
*s = PeerState::Connecting(Arc::new(tx));
|
||||
Ok(rx)
|
||||
}
|
||||
s => {
|
||||
bail!("did not expect to see the peer in state {:?}", s);
|
||||
}
|
||||
}
|
||||
let rx = peer
|
||||
.state
|
||||
.queued_to_connecting()
|
||||
.context("invalid peer state")?;
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
pub fn clone_tx(&self, handle: PeerHandle) -> Option<PeerTx> {
|
||||
|
|
@ -486,29 +480,17 @@ impl TorrentState {
|
|||
return;
|
||||
}
|
||||
};
|
||||
let s = match &mut peer.state {
|
||||
s @ PeerState::Connecting(_) => s,
|
||||
_ => {
|
||||
warn!("peer {} was in a wrong state", handle);
|
||||
return;
|
||||
}
|
||||
};
|
||||
let tx = match std::mem::take(s) {
|
||||
PeerState::Connecting(tx) => tx,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
*s = PeerState::Live(LivePeerState::new(Id20(h.peer_id), tx));
|
||||
peer.state.connecting_to_live(Id20(h.peer_id));
|
||||
}
|
||||
|
||||
fn on_peer_died(self: &Arc<Self>, handle: PeerHandle) {
|
||||
let mut g = self.locked.write();
|
||||
let live = match g.peers.mark_peer_dead(handle) {
|
||||
Some(peer) => peer,
|
||||
None => return,
|
||||
};
|
||||
for req in live.inflight_requests {
|
||||
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
|
||||
if let Some(live) = g.peers.mark_peer_dead(handle) {
|
||||
for req in live.inflight_requests {
|
||||
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
|
||||
}
|
||||
}
|
||||
|
||||
let backoff = g
|
||||
.peers
|
||||
.states
|
||||
|
|
@ -1123,6 +1105,7 @@ impl PeerHandler {
|
|||
|
||||
if self.state.get_left_to_download() == 0 {
|
||||
self.state.finished_notify.notify_waiters();
|
||||
self.disconnect_all_peers_that_have_full_torrent();
|
||||
self.reopen_read_only()?;
|
||||
}
|
||||
|
||||
|
|
@ -1145,4 +1128,19 @@ impl PeerHandler {
|
|||
.with_context(|| format!("error processing received chunk {chunk_info:?}"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn disconnect_all_peers_that_have_full_torrent(&self) {
|
||||
let mut g = self.state.locked.write();
|
||||
for (_, peer) in g.peers.states.iter_mut() {
|
||||
if let PeerState::Live(l) = &peer.state {
|
||||
if l.has_full_torrent(self.state.lengths.total_pieces() as usize) {
|
||||
let live = peer
|
||||
.state
|
||||
.live_to(PeerState::FullyHaveNoLongerNeeded)
|
||||
.unwrap();
|
||||
let _ = live.tx.send(WriterRequest::Disconnect);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue