Not even sure what I'm doing

This commit is contained in:
Igor Katson 2023-11-18 10:08:12 +00:00
parent 2203ffe4a9
commit db12bba7a6
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 78 additions and 38 deletions

View file

@ -1,6 +1,7 @@
use std::time::Duration; use std::time::Duration;
use std::{collections::HashSet, sync::Arc}; use std::{collections::HashSet, sync::Arc};
use anyhow::Context;
use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use backoff::{ExponentialBackoff, ExponentialBackoffBuilder};
use librqbit_core::id20::Id20; use librqbit_core::id20::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
@ -27,7 +28,20 @@ impl From<&ChunkInfo> for InflightRequest {
// TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak. // TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak.
pub type PeerRx = UnboundedReceiver<WriterRequest>; pub type PeerRx = UnboundedReceiver<WriterRequest>;
pub type PeerTx = Arc<UnboundedSender<WriterRequest>>; pub type PeerTx = UnboundedSender<WriterRequest>;
pub trait SendMany {
fn send_many(&self, requests: impl IntoIterator<Item = WriterRequest>) -> anyhow::Result<()>;
}
impl SendMany for PeerTx {
fn send_many(&self, requests: impl IntoIterator<Item = WriterRequest>) -> anyhow::Result<()> {
requests
.into_iter()
.try_for_each(|r| self.send(r))
.context("peer dropped")
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct PeerStats { pub struct PeerStats {
@ -64,10 +78,20 @@ pub enum PeerState {
Dead, Dead,
// The peer has the full torrent, and we have the full torrent, so no need // The peer has the full torrent, and we have the full torrent, so no need
// to keep talking to it. // to keep talking to it.
FullyHaveNoLongerNeeded, NotNeeded,
} }
impl PeerState { impl PeerState {
pub fn name(&self) -> &'static str {
match self {
PeerState::Queued => "queued",
PeerState::Connecting(_) => "connecting",
PeerState::Live(_) => "live",
PeerState::Dead => "dead",
PeerState::NotNeeded => "not needed",
}
}
fn take_connecting(&mut self) -> Option<PeerTx> { fn take_connecting(&mut self) -> Option<PeerTx> {
if let PeerState::Connecting(_) = self { if let PeerState::Connecting(_) = self {
match std::mem::take(self) { match std::mem::take(self) {
@ -100,7 +124,7 @@ impl PeerState {
pub fn queued_to_connecting(&mut self) -> Option<PeerRx> { pub fn queued_to_connecting(&mut self) -> Option<PeerRx> {
if let PeerState::Queued = self { if let PeerState::Queued = self {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
*self = PeerState::Connecting(Arc::new(tx)); *self = PeerState::Connecting(tx);
Some(rx) Some(rx)
} else { } else {
None None
@ -120,9 +144,10 @@ impl PeerState {
false false
} }
pub fn to_dead(&mut self) -> Option<LivePeerState> { pub fn to_dead(&mut self) -> Option<Option<LivePeerState>> {
match std::mem::replace(self, PeerState::Dead) { match std::mem::replace(self, PeerState::Dead) {
PeerState::Live(l) => Some(l), PeerState::Live(l) => Some(Some(l)),
PeerState::Connecting(_) => Some(None),
_ => None, _ => None,
} }
} }

View file

@ -1,3 +1,6 @@
// The main logic of rqbit is here - connecting to peers, reading and writing messages
// to them, tracking peer state etc.
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
fs::File, fs::File,
@ -41,7 +44,7 @@ use crate::{
peer_connection::{ peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest, PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
}, },
peer_state::{InflightRequest, LivePeerState, Peer, PeerRx, PeerState, PeerTx}, peer_state::{InflightRequest, LivePeerState, Peer, PeerRx, PeerState, PeerTx, SendMany},
spawn_utils::{spawn, BlockingSpawner}, spawn_utils::{spawn, BlockingSpawner},
type_aliases::{PeerHandle, BF}, type_aliases::{PeerHandle, BF},
}; };
@ -79,7 +82,7 @@ impl PeerStates {
PeerState::Live(_) => s.live += 1, PeerState::Live(_) => s.live += 1,
PeerState::Queued => s.queued += 1, PeerState::Queued => s.queued += 1,
PeerState::Dead => s.dead += 1, PeerState::Dead => s.dead += 1,
PeerState::FullyHaveNoLongerNeeded => s.fully_have_and_we_are_finished += 1, PeerState::NotNeeded => s.fully_have_and_we_are_finished += 1,
}; };
s s
}); });
@ -121,7 +124,7 @@ impl PeerStates {
self.states.insert(handle, Default::default()); self.states.insert(handle, Default::default());
Some(handle) Some(handle)
} }
pub fn mark_peer_dead(&mut self, handle: PeerHandle) -> Option<LivePeerState> { pub fn mark_peer_dead(&mut self, handle: PeerHandle) -> Option<Option<LivePeerState>> {
let peer = self.states.get_mut(&handle)?; let peer = self.states.get_mut(&handle)?;
peer.state.to_dead() peer.state.to_dead()
} }
@ -174,7 +177,7 @@ impl PeerStates {
self.inflight_pieces.remove(&piece) self.inflight_pieces.remove(&piece)
} }
fn mark_peer_trustworthy(&mut self, handle: SocketAddr) { fn reset_peer_backoff(&mut self, handle: PeerHandle) {
let p = match self.states.get_mut(&handle) { let p = match self.states.get_mut(&handle) {
Some(p) => p, Some(p) => p,
None => return, None => return,
@ -340,12 +343,20 @@ impl TorrentState {
spawner, spawner,
); );
if let Err(e) = peer_connection.manage_peer(rx).await { let res = peer_connection.manage_peer(rx).await;
debug!("error managing peer {}: {:#}", addr, e)
};
let state = peer_connection.into_handler().state; let state = peer_connection.into_handler().state;
state.on_peer_died(addr);
state.peer_semaphore.add_permits(1); state.peer_semaphore.add_permits(1);
match res {
// We disconnected the peer ourselves as we don't need it
Ok(()) => {
state.on_peer_died(addr, None);
}
Err(e) => {
debug!("error managing peer {}: {:#}", addr, e);
state.on_peer_died(addr, Some(e));
}
}
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
} }
}); });
@ -483,12 +494,22 @@ impl TorrentState {
peer.state.connecting_to_live(Id20(h.peer_id)); peer.state.connecting_to_live(Id20(h.peer_id));
} }
fn on_peer_died(self: &Arc<Self>, handle: PeerHandle) { fn on_peer_died(self: &Arc<Self>, handle: PeerHandle, error: Option<anyhow::Error>) {
let mut g = self.locked.write(); let mut g = self.locked.write();
if let Some(live) = g.peers.mark_peer_dead(handle) { match g.peers.mark_peer_dead(handle) {
for req in live.inflight_requests { Some(Some(live)) => {
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); for req in live.inflight_requests {
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
}
} }
// Other valid state to transition to dead.
Some(None) => {}
// Peer was in an unexpected state.
None => return,
}
if error.is_none() {
return;
} }
let backoff = g let backoff = g
@ -502,17 +523,21 @@ impl TorrentState {
if let Some(dur) = backoff { if let Some(dur) = backoff {
let state = self.clone(); let state = self.clone();
spawn("wait for peer", async move { spawn(format!("wait_for_peer({handle}, {dur:?})"), async move {
tokio::time::sleep(dur).await; tokio::time::sleep(dur).await;
{ {
let mut g = state.locked.write(); let mut g = state.locked.write();
let peer = match g.peers.states.get_mut(&handle) { let peer = match g.peers.states.get_mut(&handle) {
Some(p) => p, Some(p) => p,
None => bail!("bug: peer disappeared"), None => bail!("bug: peer {} disappeared", handle),
}; };
match &peer.state { match &peer.state {
PeerState::Dead => peer.state = PeerState::Queued, PeerState::Dead => peer.state = PeerState::Queued,
_ => bail!("peer in unexpected state"), other => bail!(
"peer {} in unexpected state: {}. Expected dead",
handle,
other.name()
),
} }
} }
state.peer_queue_tx.send(handle)?; state.peer_queue_tx.send(handle)?;
@ -554,7 +579,7 @@ impl TorrentState {
continue; continue;
} }
let tx = Arc::downgrade(&live.tx); let tx = live.tx.downgrade();
futures.push(async move { futures.push(async move {
if let Some(tx) = tx.upgrade() { if let Some(tx) = tx.upgrade() {
if tx if tx
@ -586,18 +611,12 @@ impl TorrentState {
} }
pub fn add_peer_if_not_seen(self: &Arc<Self>, addr: SocketAddr) -> bool { pub fn add_peer_if_not_seen(self: &Arc<Self>, addr: SocketAddr) -> bool {
// let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<WriterRequest>();
match self.locked.write().peers.add_if_not_seen(addr) { match self.locked.write().peers.add_if_not_seen(addr) {
Some(handle) => handle, Some(handle) => handle,
None => return false, None => return false,
}; };
match self.peer_queue_tx.send(addr) { let _ = self.peer_queue_tx.send(addr);
Ok(_) => {}
Err(_) => {
warn!("peer adder died, can't add peer")
}
}
true true
} }
@ -818,11 +837,10 @@ impl PeerHandler {
Some(tx) => tx, Some(tx) => tx,
None => return Ok(()), None => return Ok(()),
}; };
tx.send(WriterRequest::Message(MessageOwned::Unchoke)) tx.send_many([
.context("peer dropped")?; WriterRequest::Message(MessageOwned::Unchoke),
tx.send(WriterRequest::Message(MessageOwned::Interested)) WriterRequest::Message(MessageOwned::Interested),
.context("peer dropped")?; ])?;
self.requester(handle).await?; self.requester(handle).await?;
Ok::<_, anyhow::Error>(()) Ok::<_, anyhow::Error>(())
} }
@ -1095,7 +1113,7 @@ impl PeerHandler {
let mut g = self.state.locked.write(); let mut g = self.state.locked.write();
g.chunks.mark_piece_downloaded(chunk_info.piece_index); g.chunks.mark_piece_downloaded(chunk_info.piece_index);
g.peers.mark_peer_trustworthy(handle); g.peers.reset_peer_backoff(handle);
} }
debug!( debug!(
@ -1134,10 +1152,7 @@ impl PeerHandler {
for (_, peer) in g.peers.states.iter_mut() { for (_, peer) in g.peers.states.iter_mut() {
if let PeerState::Live(l) = &peer.state { if let PeerState::Live(l) = &peer.state {
if l.has_full_torrent(self.state.lengths.total_pieces() as usize) { if l.has_full_torrent(self.state.lengths.total_pieces() as usize) {
let live = peer let live = peer.state.live_to(PeerState::NotNeeded).unwrap();
.state
.live_to(PeerState::FullyHaveNoLongerNeeded)
.unwrap();
let _ = live.tx.send(WriterRequest::Disconnect); let _ = live.tx.send(WriterRequest::Disconnect);
} }
} }