Small refactorings

This commit is contained in:
Igor Katson 2023-11-19 13:44:14 +00:00
parent 48a14823fa
commit d39479a251
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 56 additions and 19 deletions

View file

@ -76,6 +76,7 @@ pub enum PeerState {
Live(LivePeerState),
// There was an error, and it's waiting for exponential backoff.
Dead,
// We don't need to do anything with the peer any longer.
// The peer has the full torrent, and we have the full torrent, so no need
// to keep talking to it.
NotNeeded,
@ -103,7 +104,7 @@ impl PeerState {
}
}
fn take_live(&mut self) -> Option<LivePeerState> {
pub fn take_live(&mut self) -> Option<LivePeerState> {
if let PeerState::Live(_) = self {
match std::mem::take(self) {
PeerState::Live(l) => Some(l),
@ -152,10 +153,11 @@ impl PeerState {
}
}
pub fn live_to(&mut self, new_state: PeerState) -> Option<LivePeerState> {
let l = self.take_live()?;
*self = new_state;
Some(l)
pub fn to_not_needed(&mut self) -> Option<LivePeerState> {
match std::mem::replace(self, PeerState::NotNeeded) {
PeerState::Live(l) => Some(l),
_ => None,
}
}
}

View file

@ -184,6 +184,10 @@ impl PeerStates {
};
p.stats.backoff.reset();
}
fn mark_peer_not_needed(&mut self, handle: PeerHandle) -> Option<LivePeerState> {
self.states.get_mut(&handle)?.state.to_not_needed()
}
}
pub struct TorrentStateLocked {
@ -316,6 +320,11 @@ impl TorrentState {
async move {
loop {
let addr = peer_queue_rx.recv().await.unwrap();
if state.is_finished() {
debug!("ignoring peer {} as we are finished", addr);
state.locked.write().peers.mark_peer_not_needed(addr);
continue;
}
let permit = state.peer_semaphore.acquire().await.unwrap();
permit.forget();
@ -499,8 +508,16 @@ impl TorrentState {
fn on_peer_died(self: &Arc<Self>, handle: PeerHandle, error: Option<anyhow::Error>) {
let mut g = self.locked.write();
match g.peers.mark_peer_dead(handle) {
Some(Some(live)) => {
let peer = match g.peers.states.get_mut(&handle) {
Some(peer) => peer,
None => {
warn!("bug: peer not found in table. Forgetting it forever");
return;
}
};
match std::mem::take(&mut peer.state) {
PeerState::Connecting(_) => {}
PeerState::Live(live) => {
for req in live.inflight_requests {
debug!(
"peer dead, marking chunk request cancelled, index={}, chunk={}",
@ -510,17 +527,30 @@ impl TorrentState {
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
}
}
// Other valid state to transition to dead.
Some(None) => {}
// Peer was in an unexpected state.
None => return,
}
PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => {
warn!("bug: peer was in a wrong state, ignoring it forever");
g.peers.drop_peer(handle);
return;
}
};
// Re-borrow as we were modifying states above
// (otherwise borrow checker rightfully says we're wrong).
let peer = g.peers.states.get_mut(&handle).unwrap();
if error.is_none() {
debug!("peer died without errors, not re-queueing");
peer.state = PeerState::NotNeeded;
return;
}
if self.is_finished() {
debug!("torrent finished, not re-queueing");
peer.state = PeerState::NotNeeded;
return;
}
peer.state = PeerState::Dead;
let backoff = {
let peer = match g.peers.states.get_mut(&handle) {
Some(p) => p,
@ -576,6 +606,10 @@ impl TorrentState {
self.stats.downloaded_and_checked.load(Ordering::Relaxed)
}
pub fn is_finished(&self) -> bool {
self.get_left_to_download() == 0
}
pub fn get_left_to_download(&self) -> u64 {
self.needed - self.get_downloaded()
}
@ -675,7 +709,7 @@ impl TorrentState {
}
pub async fn wait_until_completed(&self) {
if self.get_left_to_download() == 0 {
if self.is_finished() {
return;
}
self.finished_notify.notified().await;
@ -836,10 +870,11 @@ impl PeerHandler {
.peers
.clone_tx(handle)
.context("peer dropped")?;
tx.send(WriterRequest::Message(MessageOwned::Unchoke))
.context("peer dropped")?;
tx.send(WriterRequest::Message(MessageOwned::NotInterested))
.context("peer dropped")?;
tx.send(WriterRequest::Message(MessageOwned::Unchoke))?;
tx.send(WriterRequest::Message(MessageOwned::NotInterested))?;
if self.state.is_finished() {
tx.send(WriterRequest::Disconnect)?;
}
return Ok(());
}
@ -1129,7 +1164,7 @@ impl PeerHandler {
debug!("piece={} successfully downloaded and verified", index);
if self.state.get_left_to_download() == 0 {
if self.state.is_finished() {
self.state.finished_notify.notify_waiters();
self.disconnect_all_peers_that_have_full_torrent();
self.reopen_read_only()?;
@ -1157,7 +1192,7 @@ impl PeerHandler {
for (_, peer) in g.peers.states.iter_mut() {
if let PeerState::Live(l) = &peer.state {
if l.has_full_torrent(self.state.lengths.total_pieces() as usize) {
let live = peer.state.live_to(PeerState::NotNeeded).unwrap();
let live = peer.state.to_not_needed().unwrap();
let _ = live.tx.send(WriterRequest::Disconnect);
}
}