Split up a couple methods

This commit is contained in:
Igor Katson 2023-11-19 14:04:47 +00:00
parent d39479a251
commit a745257be2
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -31,7 +31,7 @@ use serde::Serialize;
use sha1w::Sha1;
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedSender},
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Notify, Semaphore,
},
time::timeout,
@ -291,7 +291,7 @@ impl TorrentState {
options: Option<TorrentStateOptions>,
) -> Arc<Self> {
let options = options.unwrap_or_default();
let (peer_queue_tx, mut peer_queue_rx) = unbounded_channel();
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
let state = Arc::new(TorrentState {
info_hash,
info,
@ -315,68 +315,80 @@ impl TorrentState {
peer_queue_tx,
finished_notify: Notify::new(),
});
spawn(span!(Level::ERROR, "peer_adder"), {
let state = state.clone();
async move {
loop {
let addr = peer_queue_rx.recv().await.unwrap();
if state.is_finished() {
debug!("ignoring peer {} as we are finished", addr);
state.locked.write().peers.mark_peer_not_needed(addr);
continue;
}
let permit = state.peer_semaphore.acquire().await.unwrap();
permit.forget();
spawn(
span!(parent: None, Level::ERROR, "manage_peer", peer = addr.to_string()),
{
let state = state.clone();
async move {
let rx = state.locked.write().peers.mark_peer_connecting(addr)?;
let handler = PeerHandler {
addr,
state: state.clone(),
spawner,
};
let options = PeerConnectionOptions {
connect_timeout: state.options.peer_connect_timeout,
read_write_timeout: state.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
addr,
state.info_hash,
state.peer_id,
handler,
Some(options),
spawner,
);
let res = peer_connection.manage_peer(rx).await;
let state = peer_connection.into_handler().state;
state.peer_semaphore.add_permits(1);
match res {
// We disconnected the peer ourselves as we don't need it
Ok(()) => {
state.on_peer_died(addr, None);
}
Err(e) => {
debug!("error managing peer: {:#}", e);
state.on_peer_died(addr, Some(e));
}
}
Ok::<_, anyhow::Error>(())
}
},
);
}
}
});
spawn(
span!(Level::ERROR, "peer_adder"),
state.clone().task_peer_adder(peer_queue_rx, spawner),
);
state
}
pub async fn task_manage_peer(
self: Arc<Self>,
addr: SocketAddr,
spawner: BlockingSpawner,
) -> anyhow::Result<()> {
let state = self;
let rx = state.locked.write().peers.mark_peer_connecting(addr)?;
let handler = PeerHandler {
addr,
state: state.clone(),
spawner,
};
let options = PeerConnectionOptions {
connect_timeout: state.options.peer_connect_timeout,
read_write_timeout: state.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
addr,
state.info_hash,
state.peer_id,
handler,
Some(options),
spawner,
);
let res = peer_connection.manage_peer(rx).await;
let state = peer_connection.into_handler().state;
state.peer_semaphore.add_permits(1);
match res {
// We disconnected the peer ourselves as we don't need it
Ok(()) => {
state.on_peer_died(addr, None);
}
Err(e) => {
debug!("error managing peer: {:#}", e);
state.on_peer_died(addr, Some(e));
}
}
Ok::<_, anyhow::Error>(())
}
pub async fn task_peer_adder(
self: Arc<Self>,
mut peer_queue_rx: UnboundedReceiver<SocketAddr>,
spawner: BlockingSpawner,
) -> anyhow::Result<()> {
let state = self;
loop {
let addr = peer_queue_rx.recv().await.unwrap();
if state.is_finished() {
debug!("ignoring peer {} as we are finished", addr);
state.locked.write().peers.mark_peer_not_needed(addr);
continue;
}
let permit = state.peer_semaphore.acquire().await.unwrap();
permit.forget();
spawn(
span!(parent: None, Level::ERROR, "manage_peer", peer = addr.to_string()),
state.clone().task_manage_peer(addr, spawner),
);
}
}
pub fn info(&self) -> &TorrentMetaV1Info<ByteString> {
&self.info
}