diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index d85edf0..b59ffe5 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -31,7 +31,7 @@ use serde::Serialize; use sha1w::Sha1; use tokio::{ sync::{ - mpsc::{unbounded_channel, UnboundedSender}, + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, Notify, Semaphore, }, time::timeout, @@ -291,7 +291,7 @@ impl TorrentState { options: Option, ) -> Arc { let options = options.unwrap_or_default(); - let (peer_queue_tx, mut peer_queue_rx) = unbounded_channel(); + let (peer_queue_tx, peer_queue_rx) = unbounded_channel(); let state = Arc::new(TorrentState { info_hash, info, @@ -315,68 +315,80 @@ impl TorrentState { peer_queue_tx, finished_notify: Notify::new(), }); - spawn(span!(Level::ERROR, "peer_adder"), { - let state = state.clone(); - async move { - loop { - let addr = peer_queue_rx.recv().await.unwrap(); - if state.is_finished() { - debug!("ignoring peer {} as we are finished", addr); - state.locked.write().peers.mark_peer_not_needed(addr); - continue; - } - - let permit = state.peer_semaphore.acquire().await.unwrap(); - permit.forget(); - spawn( - span!(parent: None, Level::ERROR, "manage_peer", peer = addr.to_string()), - { - let state = state.clone(); - async move { - let rx = state.locked.write().peers.mark_peer_connecting(addr)?; - - let handler = PeerHandler { - addr, - state: state.clone(), - spawner, - }; - let options = PeerConnectionOptions { - connect_timeout: state.options.peer_connect_timeout, - read_write_timeout: state.options.peer_read_write_timeout, - ..Default::default() - }; - let peer_connection = PeerConnection::new( - addr, - state.info_hash, - state.peer_id, - handler, - Some(options), - spawner, - ); - - let res = peer_connection.manage_peer(rx).await; - let state = peer_connection.into_handler().state; - state.peer_semaphore.add_permits(1); - - match res { - // We disconnected the peer ourselves as we don't need it - Ok(()) => { - state.on_peer_died(addr, None); - } - Err(e) => { - debug!("error managing peer: {:#}", e); - state.on_peer_died(addr, Some(e)); - } - } - Ok::<_, anyhow::Error>(()) - } - }, - ); - } - } - }); + spawn( + span!(Level::ERROR, "peer_adder"), + state.clone().task_peer_adder(peer_queue_rx, spawner), + ); state } + + pub async fn task_manage_peer( + self: Arc, + addr: SocketAddr, + spawner: BlockingSpawner, + ) -> anyhow::Result<()> { + let state = self; + let rx = state.locked.write().peers.mark_peer_connecting(addr)?; + + let handler = PeerHandler { + addr, + state: state.clone(), + spawner, + }; + let options = PeerConnectionOptions { + connect_timeout: state.options.peer_connect_timeout, + read_write_timeout: state.options.peer_read_write_timeout, + ..Default::default() + }; + let peer_connection = PeerConnection::new( + addr, + state.info_hash, + state.peer_id, + handler, + Some(options), + spawner, + ); + + let res = peer_connection.manage_peer(rx).await; + let state = peer_connection.into_handler().state; + state.peer_semaphore.add_permits(1); + + match res { + // We disconnected the peer ourselves as we don't need it + Ok(()) => { + state.on_peer_died(addr, None); + } + Err(e) => { + debug!("error managing peer: {:#}", e); + state.on_peer_died(addr, Some(e)); + } + } + Ok::<_, anyhow::Error>(()) + } + + pub async fn task_peer_adder( + self: Arc, + mut peer_queue_rx: UnboundedReceiver, + spawner: BlockingSpawner, + ) -> anyhow::Result<()> { + let state = self; + loop { + let addr = peer_queue_rx.recv().await.unwrap(); + if state.is_finished() { + debug!("ignoring peer {} as we are finished", addr); + state.locked.write().peers.mark_peer_not_needed(addr); + continue; + } + + let permit = state.peer_semaphore.acquire().await.unwrap(); + permit.forget(); + spawn( + span!(parent: None, Level::ERROR, "manage_peer", peer = addr.to_string()), + state.clone().task_manage_peer(addr, spawner), + ); + } + } + pub fn info(&self) -> &TorrentMetaV1Info { &self.info }