diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index c780f1c..1dbc0b1 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -1,6 +1,6 @@ use std::{ cmp::Reverse, - net::SocketAddr, + net::{SocketAddr, SocketAddrV4}, sync::{ atomic::{AtomicU16, Ordering}, Arc, @@ -1059,7 +1059,8 @@ pub struct DhtConfig { pub bootstrap_addrs: Option>, pub routing_table: Option, pub listen_addr: Option, - pub(crate) peer_store: Option, + pub announce_addr: Option, + pub peer_store: Option, } impl DhtState { diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index 49d478e..1e39c05 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -16,10 +16,11 @@ use crate::peer_store::PeerStore; use crate::routing_table::RoutingTable; use crate::{Dht, DhtConfig, DhtState}; -#[derive(Default, Clone)] +#[derive(Default)] pub struct PersistentDhtConfig { pub dump_interval: Option, pub config_filename: Option, + pub announce_addr: Option, } #[derive(Serialize, Deserialize)] @@ -111,11 +112,13 @@ impl PersistentDht { .map(|de| (Some(de.addr), Some(de.table), de.peer_store)) .unwrap_or((None, None, None)); let peer_id = routing_table.as_ref().map(|r| r.id()); + let dht_config = DhtConfig { peer_id, routing_table, listen_addr, peer_store, + announce_addr: config.announce_addr, ..Default::default() }; let dht = DhtState::with_config(dht_config).await?; diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index a22500d..87f940c 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -42,7 +42,7 @@ anyhow = "1" itertools = "0.12" http = "1" regex = "1" -reqwest = {version="0.11.22", default-features=false} +reqwest = {version="0.11.22", default-features=false, features = ["json"]} urlencoding = "2" byteorder = "1" bincode = "1" diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index c46dbd4..04a8924 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -2,7 +2,7 @@ use std::{ borrow::Cow, collections::{HashMap, HashSet}, io::{BufReader, BufWriter, Read}, - net::SocketAddr, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, path::PathBuf, str::FromStr, sync::Arc, @@ -12,7 +12,9 @@ use std::{ use anyhow::{bail, Context}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use buffers::{ByteBufT, ByteString}; -use dht::{Dht, DhtBuilder, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream}; +use dht::{ + Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream, +}; use librqbit_core::{ directories::get_configuration_directory, magnet::Magnet, @@ -345,6 +347,34 @@ async fn create_tcp_listener( bail!("no free TCP ports in range {port_range:?}"); } +async fn get_public_announce_addr(port: u16) -> anyhow::Result { + async fn get_ipify() -> anyhow::Result { + #[derive(Deserialize)] + struct Data { + ip: Ipv4Addr, + } + let resp: Data = reqwest::get("https://api.ipify.org?format=json") + .await + .context("error getting public IP address")? + .error_for_status()? + .json() + .await?; + Ok(resp.ip) + } + + async fn get_public_ip() -> anyhow::Result { + get_ipify().await + } + + let ip = get_public_ip() + .await + .context("error getting public IP address")?; + + let addr = SocketAddr::V4(SocketAddrV4::new(ip, port)); + info!("using public IP address {addr} to publish on DHT"); + Ok(addr) +} + impl Session { /// Create a new session. The passed in folder will be used as a default unless overriden per torrent. pub async fn new(output_folder: PathBuf) -> anyhow::Result> { @@ -354,7 +384,7 @@ impl Session { /// Create a new session with options. pub async fn new_with_opts( output_folder: PathBuf, - opts: SessionOptions, + mut opts: SessionOptions, ) -> anyhow::Result> { let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id); @@ -362,6 +392,7 @@ impl Session { let (l, p) = create_tcp_listener(port_range) .await .context("error listening on TCP")?; + info!("Listening on 0.0.0.0:{p} for incoming peer connections"); (Some(l), Some(p)) } else { (None, None) @@ -370,10 +401,25 @@ impl Session { let dht = if opts.disable_dht { None } else { - let dht = if opts.disable_dht_persistence { - DhtBuilder::new().await + let announce_addr = if let Some(port) = port { + Some( + get_public_announce_addr(port) + .await + .context("error getting public announce address")?, + ) } else { - PersistentDht::create(opts.dht_config).await + None + }; + let dht = if opts.disable_dht_persistence { + DhtBuilder::with_config(DhtConfig { + announce_addr, + ..Default::default() + }) + .await + } else { + let mut pdht_config = opts.dht_config.take().unwrap_or_default(); + pdht_config.announce_addr = announce_addr; + PersistentDht::create(Some(pdht_config)).await } .context("error initializing DHT")?; Some(dht) @@ -464,7 +510,12 @@ impl Session { } async fn task_tcp_listener(self: Arc, l: TcpListener) -> anyhow::Result<()> { - // TODO + let mut buf = vec![0u8; 4096]; + + loop { + let (stream, addr) = l.accept().await.context("error accepting")?; + info!("accepted connection from {addr}"); + } Ok(()) } @@ -511,6 +562,10 @@ impl Session { }); } + fn stop(&self) { + let _ = self.cancel_tx.send(()); + } + async fn populate_from_stored(self: &Arc) -> anyhow::Result<()> { let mut rdr = match std::fs::File::open(&self.persistence_filename) { Ok(f) => BufReader::new(f), diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 056ccd6..85b60ca 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -71,6 +71,22 @@ struct Opts { #[arg(short = 't', long)] worker_threads: Option, + // Enable to listen on 0.0.0.0 on TCP for torrent requests. + #[arg(long = "tcp-listen", default_value = "true")] + tcp_listen: bool, + + /// The minimal port to listen for incoming connections. + #[arg(long = "tcp-min-port", default_value = "4240")] + tcp_listen_min_port: u16, + + /// The maximal port to listen for incoming connections. + #[arg(long = "tcp-max-port", default_value = "4260")] + tcp_listen_max_port: u16, + + /// If set, will try to publish the chosen port through upnp on your router. + #[arg(long = "enable-upnp", default_value = "true")] + enable_upnp: bool, + #[command(subcommand)] subcommand: SubCommand, } @@ -311,6 +327,12 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { read_write_timeout: Some(opts.peer_read_write_timeout), ..Default::default() }), + listen_port_range: if opts.tcp_listen { + Some(opts.tcp_listen_min_port..opts.tcp_listen_max_port) + } else { + None + }, + enable_upnp_port_forwarding: opts.enable_upnp, }; let stats_printer = |session: Arc| async move { @@ -371,6 +393,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { sopts.persistence = !start_opts.disable_persistence; sopts.persistence_filename = start_opts.persistence_filename.clone().map(PathBuf::from); + let session = Session::new_with_opts(PathBuf::from(&start_opts.output_folder), sopts) .await