Made peer chunk requester the same task as manage_peer

This commit is contained in:
Igor Katson 2023-11-20 11:30:16 +00:00
parent 6d8e245103
commit 8c67127a85
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -478,6 +478,7 @@ impl TorrentState {
let handler = PeerHandler {
addr,
on_bitfield_notify: Arc::new(Default::default()),
state: state.clone(),
spawner,
};
@ -486,17 +487,23 @@ impl TorrentState {
read_write_timeout: state.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
addr,
state.info_hash,
state.peer_id,
handler,
&handler,
Some(options),
spawner,
);
let requester = handler.task_peer_chunk_requester(addr);
let res = peer_connection.manage_peer(rx).await;
let state = peer_connection.into_handler().state;
let res = tokio::select! {
r = requester => {r}
r = peer_connection.manage_peer(rx) => {r}
};
let state = handler.state;
state.peer_semaphore.add_permits(1);
match res {
@ -898,11 +905,12 @@ impl TorrentState {
#[derive(Clone)]
struct PeerHandler {
state: Arc<TorrentState>,
on_bitfield_notify: Arc<Notify>,
addr: SocketAddr,
spawner: BlockingSpawner,
}
impl PeerConnectionHandler for PeerHandler {
impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn on_received_message(&self, message: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
match message {
Message::Request(request) => {
@ -1046,20 +1054,12 @@ impl PeerHandler {
return Ok(());
}
// Additional spawn per peer, not good.
spawn(
span!(
parent: None,
Level::ERROR,
"peer_chunk_requester",
peer = handle.to_string()
),
self.clone().task_peer_chunk_requester(handle),
);
self.on_bitfield_notify.notify_waiters();
Ok(())
}
async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> {
async fn task_peer_chunk_requester(&self, handle: PeerHandle) -> anyhow::Result<()> {
self.on_bitfield_notify.notified().await;
let tx = match self.state.peers.clone_tx(handle) {
Some(tx) => tx,
None => return Ok(()),
@ -1068,21 +1068,7 @@ impl PeerHandler {
WriterRequest::Message(MessageOwned::Unchoke),
WriterRequest::Message(MessageOwned::Interested),
])?;
self.requester(handle).await?;
Ok::<_, anyhow::Error>(())
}
fn on_i_am_choked(&self, handle: PeerHandle) {
debug!("we are choked");
self.state.peers.mark_i_am_choked(handle, true);
}
fn on_peer_interested(&self, handle: PeerHandle) {
debug!("peer is interested");
self.state.peers.mark_peer_interested(handle, true);
}
async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> {
let notify = match self
.state
.peers
@ -1091,8 +1077,6 @@ impl PeerHandler {
Some(notify) => notify,
None => return Ok(()),
};
// TODO: this might dangle, same below.
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), notify.notified()).await;
@ -1195,6 +1179,16 @@ impl PeerHandler {
}
}
fn on_i_am_choked(&self, handle: PeerHandle) {
debug!("we are choked");
self.state.peers.mark_i_am_choked(handle, true);
}
fn on_peer_interested(&self, handle: PeerHandle) {
debug!("peer is interested");
self.state.peers.mark_peer_interested(handle, true);
}
fn reopen_read_only(&self) -> anyhow::Result<()> {
fn dummy_file() -> anyhow::Result<std::fs::File> {
#[cfg(target_os = "windows")]