From 71d49a88b686e31217bcc9f904ab7168e7affef4 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Tue, 5 Dec 2023 14:48:19 +0000 Subject: [PATCH] Create TCP listener --- Cargo.lock | 1 + crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/session.rs | 132 +++++++++++++++++++++++++++------ 3 files changed, 111 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ec8066..b55d42f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1275,6 +1275,7 @@ dependencies = [ "librqbit-dht", "librqbit-peer-protocol", "librqbit-sha1-wrapper", + "librqbit-upnp", "openssl", "parking_lot", "rand", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 25fbe25..a22500d 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -29,6 +29,7 @@ clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned", peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.2.1"} sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"} dht = {path = "../dht", package="librqbit-dht", version="4.0.0"} +librqbit-upnp = {path = "../upnp", version = "0.1.0"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} axum = {version = "0.7"} diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 42c2166..c46dbd4 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -23,6 +23,7 @@ use parking_lot::RwLock; use reqwest::Url; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_with::serde_as; +use tokio::net::TcpListener; use tracing::{debug, error, error_span, info, warn}; use crate::{ @@ -147,6 +148,9 @@ pub struct Session { spawner: BlockingSpawner, db: RwLock, output_folder: PathBuf, + + cancel_tx: tokio::sync::watch::Sender<()>, + cancel_rx: tokio::sync::watch::Receiver<()>, } async fn torrent_from_url(url: &str) -> anyhow::Result { @@ -322,6 +326,23 @@ pub struct SessionOptions { pub peer_id: Option, /// Configure default peer connection options. Can be overriden per torrent. pub peer_opts: Option, + + pub listen_port_range: Option>, + pub enable_upnp_port_forwarding: bool, +} + +async fn create_tcp_listener( + port_range: std::ops::Range, +) -> anyhow::Result<(TcpListener, u16)> { + for port in port_range.clone() { + match TcpListener::bind(("0.0.0.0", port)).await { + Ok(l) => return Ok((l, port)), + Err(e) => { + debug!("error listening on port {port}: {e:#}") + } + } + } + bail!("no free TCP ports in range {port_range:?}"); } impl Session { @@ -336,6 +357,16 @@ impl Session { opts: SessionOptions, ) -> anyhow::Result> { let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); + + let (tcp_listener, port) = if let Some(port_range) = opts.listen_port_range { + let (l, p) = create_tcp_listener(port_range) + .await + .context("error listening on TCP")?; + (Some(l), Some(p)) + } else { + (None, None) + }; + let dht = if opts.disable_dht { None } else { @@ -355,6 +386,9 @@ impl Session { .join("session.json"), }; let spawner = BlockingSpawner::default(); + + let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(()); + let session = Arc::new(Self { persistence_filename, peer_id, @@ -363,8 +397,28 @@ impl Session { spawner, output_folder, db: RwLock::new(Default::default()), + cancel_rx, + cancel_tx, }); + if let Some(tcp_listener) = tcp_listener { + session.spawn( + "tcp listener", + error_span!("tcp_listen", port = port), + session.clone().task_tcp_listener(tcp_listener), + ); + } + + if let Some(listen_port) = port { + if opts.enable_upnp_port_forwarding { + session.spawn( + "upnp_forward", + error_span!("upnp_forward", port = listen_port), + session.clone().task_upnp_port_forwarder(listen_port), + ); + } + } + if opts.persistence { info!( "will use {:?} for session persistence", @@ -375,36 +429,50 @@ impl Session { format!("couldn't create directory {:?} for session storage", parent) })?; } - let session = session.clone(); - spawn( + let persistence_task = session.clone().task_persistence(); + session.spawn( "session persistene", error_span!("session_persistence"), - async move { - // Populate initial from the state filename - if let Err(e) = session.populate_from_stored().await { - error!("could not populate session from stored file: {:?}", e); - } - - let session = Arc::downgrade(&session); - - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - let session = match session.upgrade() { - Some(s) => s, - None => break, - }; - if let Err(e) = session.dump_to_disk() { - error!("error dumping session to disk: {:?}", e); - } - } - - Ok(()) - }, + persistence_task, ); } Ok(session) } + + async fn task_persistence(self: Arc) -> anyhow::Result<()> { + // Populate initial from the state filename + if let Err(e) = self.populate_from_stored().await { + error!("could not populate session from stored file: {:?}", e); + } + + let session = Arc::downgrade(&self); + drop(self); + + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + let session = match session.upgrade() { + Some(s) => s, + None => break, + }; + if let Err(e) = session.dump_to_disk() { + error!("error dumping session to disk: {:?}", e); + } + } + + Ok(()) + } + + async fn task_tcp_listener(self: Arc, l: TcpListener) -> anyhow::Result<()> { + // TODO + Ok(()) + } + + async fn task_upnp_port_forwarder(self: Arc, port: u16) -> anyhow::Result<()> { + let pf = librqbit_upnp::UpnpPortForwarder::new(vec![port], None)?; + pf.run_forever().await + } + pub fn get_dht(&self) -> Option<&Dht> { self.dht.as_ref() } @@ -425,6 +493,24 @@ impl Session { } } + fn spawn( + &self, + name: &str, + span: tracing::Span, + fut: impl std::future::Future> + Send + 'static, + ) { + let mut cancel_rx = self.cancel_rx.clone(); + spawn(name, span, async move { + tokio::select! { + r = fut => r, + _ = cancel_rx.changed() => { + debug!("task canceled"); + Ok(()) + } + } + }); + } + async fn populate_from_stored(self: &Arc) -> anyhow::Result<()> { let mut rdr = match std::fs::File::open(&self.persistence_filename) { Ok(f) => BufReader::new(f),