Move tx into peer

This commit is contained in:
Igor Katson 2023-11-20 11:42:37 +00:00
parent f1cc9162e9
commit 2693c0eb71
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 29 additions and 36 deletions

View file

@ -183,11 +183,15 @@ impl PeerStateNoMut {
} }
} }
pub fn queued_to_connecting(&mut self, counters: &AggregatePeerStatsAtomic) -> Option<PeerRx> { pub fn queued_to_connecting(
&mut self,
counters: &AggregatePeerStatsAtomic,
) -> Option<(PeerRx, PeerTx)> {
if let PeerState::Queued = &self.0 { if let PeerState::Queued = &self.0 {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
let tx_2 = tx.clone();
self.set(PeerState::Connecting(tx), counters); self.set(PeerState::Connecting(tx), counters);
Some(rx) Some((rx, tx_2))
} else { } else {
None None
} }

View file

@ -205,7 +205,7 @@ impl PeerStates {
live.bitfield = BF::from_vec(bitfield); live.bitfield = BF::from_vec(bitfield);
}) })
} }
pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<PeerRx> { pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<(PeerRx, PeerTx)> {
let rx = self let rx = self
.with_peer_mut(h, "mark_peer_connecting", |peer| { .with_peer_mut(h, "mark_peer_connecting", |peer| {
peer.state peer.state
@ -216,10 +216,6 @@ impl PeerStates {
Ok(rx) Ok(rx)
} }
pub fn clone_tx(&self, handle: PeerHandle) -> Option<PeerTx> {
self.with_live(handle, |live| live.tx.clone())
}
fn reset_peer_backoff(&self, handle: PeerHandle) { fn reset_peer_backoff(&self, handle: PeerHandle) {
self.with_peer_mut(handle, "reset_peer_backoff", |p| { self.with_peer_mut(handle, "reset_peer_backoff", |p| {
p.stats.backoff.reset(); p.stats.backoff.reset();
@ -474,7 +470,7 @@ impl TorrentState {
spawner: BlockingSpawner, spawner: BlockingSpawner,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let state = self; let state = self;
let rx = state.peers.mark_peer_connecting(addr)?; let (rx, tx) = state.peers.mark_peer_connecting(addr)?;
let handler = PeerHandler { let handler = PeerHandler {
addr, addr,
@ -482,6 +478,7 @@ impl TorrentState {
have_notify: Default::default(), have_notify: Default::default(),
requests_sem: Semaphore::new(0), requests_sem: Semaphore::new(0),
state: state.clone(), state: state.clone(),
tx,
spawner, spawner,
}; };
let options = PeerConnectionOptions { let options = PeerConnectionOptions {
@ -918,6 +915,8 @@ struct PeerHandler {
addr: SocketAddr, addr: SocketAddr,
spawner: BlockingSpawner, spawner: BlockingSpawner,
tx: PeerTx,
} }
impl<'a> PeerConnectionHandler for &'a PeerHandler { impl<'a> PeerConnectionHandler for &'a PeerHandler {
@ -1008,31 +1007,24 @@ impl PeerHandler {
} }
}; };
let tx = { if !self
if !self .state
.state .lock_read("is_chunk_ready_to_upload")
.lock_read("is_chunk_ready_to_upload") .chunks
.chunks .is_chunk_ready_to_upload(&chunk_info)
.is_chunk_ready_to_upload(&chunk_info) {
{ anyhow::bail!(
anyhow::bail!( "got request for a chunk that is not ready to upload. chunk {:?}",
"got request for a chunk that is not ready to upload. chunk {:?}", &chunk_info
&chunk_info );
); }
}
self.state
.peers
.clone_tx(peer_handle)
.context("peer died, dropping chunk that it requested")?
};
// TODO: this is not super efficient as it does copying multiple times. // TODO: this is not super efficient as it does copying multiple times.
// Theoretically, this could be done in the sending code, so that it reads straight into // Theoretically, this could be done in the sending code, so that it reads straight into
// the send buffer. // the send buffer.
let request = WriterRequest::ReadChunkRequest(chunk_info); let request = WriterRequest::ReadChunkRequest(chunk_info);
debug!("sending {:?}", &request); debug!("sending {:?}", &request);
Ok::<_, anyhow::Error>(tx.send(request)?) Ok::<_, anyhow::Error>(self.tx.send(request)?)
} }
fn on_have(&self, handle: PeerHandle, have: u32) { fn on_have(&self, handle: PeerHandle, have: u32) {
@ -1055,11 +1047,12 @@ impl PeerHandler {
.update_bitfield_from_vec(handle, bitfield.0); .update_bitfield_from_vec(handle, bitfield.0);
if !self.state.am_i_interested_in_peer(handle) { if !self.state.am_i_interested_in_peer(handle) {
let tx = self.state.peers.clone_tx(handle).context("peer dropped")?; self.tx
tx.send(WriterRequest::Message(MessageOwned::Unchoke))?; .send(WriterRequest::Message(MessageOwned::Unchoke))?;
tx.send(WriterRequest::Message(MessageOwned::NotInterested))?; self.tx
.send(WriterRequest::Message(MessageOwned::NotInterested))?;
if self.state.is_finished() { if self.state.is_finished() {
tx.send(WriterRequest::Disconnect)?; self.tx.send(WriterRequest::Disconnect)?;
} }
return Ok(()); return Ok(());
} }
@ -1070,11 +1063,7 @@ impl PeerHandler {
async fn task_peer_chunk_requester(&self, handle: PeerHandle) -> anyhow::Result<()> { async fn task_peer_chunk_requester(&self, handle: PeerHandle) -> anyhow::Result<()> {
self.on_bitfield_notify.notified().await; self.on_bitfield_notify.notified().await;
let tx = match self.state.peers.clone_tx(handle) { self.tx.send_many([
Some(tx) => tx,
None => return Ok(()),
};
tx.send_many([
WriterRequest::Message(MessageOwned::Unchoke), WriterRequest::Message(MessageOwned::Unchoke),
WriterRequest::Message(MessageOwned::Interested), WriterRequest::Message(MessageOwned::Interested),
])?; ])?;