Accepting TCP connections + publishing = works. Still yet to pass it to the live torrent

This commit is contained in:
Igor Katson 2023-12-05 16:35:16 +00:00
parent 71d49a88b6
commit 41fb3bfd37
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
5 changed files with 93 additions and 11 deletions

View file

@ -1,6 +1,6 @@
use std::{
cmp::Reverse,
net::SocketAddr,
net::{SocketAddr, SocketAddrV4},
sync::{
atomic::{AtomicU16, Ordering},
Arc,
@ -1059,7 +1059,8 @@ pub struct DhtConfig {
pub bootstrap_addrs: Option<Vec<String>>,
pub routing_table: Option<RoutingTable>,
pub listen_addr: Option<SocketAddr>,
pub(crate) peer_store: Option<PeerStore>,
pub announce_addr: Option<SocketAddr>,
pub peer_store: Option<PeerStore>,
}
impl DhtState {

View file

@ -16,10 +16,11 @@ use crate::peer_store::PeerStore;
use crate::routing_table::RoutingTable;
use crate::{Dht, DhtConfig, DhtState};
#[derive(Default, Clone)]
#[derive(Default)]
pub struct PersistentDhtConfig {
pub dump_interval: Option<Duration>,
pub config_filename: Option<PathBuf>,
pub announce_addr: Option<SocketAddr>,
}
#[derive(Serialize, Deserialize)]
@ -111,11 +112,13 @@ impl PersistentDht {
.map(|de| (Some(de.addr), Some(de.table), de.peer_store))
.unwrap_or((None, None, None));
let peer_id = routing_table.as_ref().map(|r| r.id());
let dht_config = DhtConfig {
peer_id,
routing_table,
listen_addr,
peer_store,
announce_addr: config.announce_addr,
..Default::default()
};
let dht = DhtState::with_config(dht_config).await?;

View file

@ -42,7 +42,7 @@ anyhow = "1"
itertools = "0.12"
http = "1"
regex = "1"
reqwest = {version="0.11.22", default-features=false}
reqwest = {version="0.11.22", default-features=false, features = ["json"]}
urlencoding = "2"
byteorder = "1"
bincode = "1"

View file

@ -2,7 +2,7 @@ use std::{
borrow::Cow,
collections::{HashMap, HashSet},
io::{BufReader, BufWriter, Read},
net::SocketAddr,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
path::PathBuf,
str::FromStr,
sync::Arc,
@ -12,7 +12,9 @@ use std::{
use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::{ByteBufT, ByteString};
use dht::{Dht, DhtBuilder, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream};
use dht::{
Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream,
};
use librqbit_core::{
directories::get_configuration_directory,
magnet::Magnet,
@ -345,6 +347,34 @@ async fn create_tcp_listener(
bail!("no free TCP ports in range {port_range:?}");
}
async fn get_public_announce_addr(port: u16) -> anyhow::Result<SocketAddr> {
async fn get_ipify() -> anyhow::Result<Ipv4Addr> {
#[derive(Deserialize)]
struct Data {
ip: Ipv4Addr,
}
let resp: Data = reqwest::get("https://api.ipify.org?format=json")
.await
.context("error getting public IP address")?
.error_for_status()?
.json()
.await?;
Ok(resp.ip)
}
async fn get_public_ip() -> anyhow::Result<Ipv4Addr> {
get_ipify().await
}
let ip = get_public_ip()
.await
.context("error getting public IP address")?;
let addr = SocketAddr::V4(SocketAddrV4::new(ip, port));
info!("using public IP address {addr} to publish on DHT");
Ok(addr)
}
impl Session {
/// Create a new session. The passed in folder will be used as a default unless overriden per torrent.
pub async fn new(output_folder: PathBuf) -> anyhow::Result<Arc<Self>> {
@ -354,7 +384,7 @@ impl Session {
/// Create a new session with options.
pub async fn new_with_opts(
output_folder: PathBuf,
opts: SessionOptions,
mut opts: SessionOptions,
) -> anyhow::Result<Arc<Self>> {
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
@ -362,6 +392,7 @@ impl Session {
let (l, p) = create_tcp_listener(port_range)
.await
.context("error listening on TCP")?;
info!("Listening on 0.0.0.0:{p} for incoming peer connections");
(Some(l), Some(p))
} else {
(None, None)
@ -370,10 +401,25 @@ impl Session {
let dht = if opts.disable_dht {
None
} else {
let dht = if opts.disable_dht_persistence {
DhtBuilder::new().await
let announce_addr = if let Some(port) = port {
Some(
get_public_announce_addr(port)
.await
.context("error getting public announce address")?,
)
} else {
PersistentDht::create(opts.dht_config).await
None
};
let dht = if opts.disable_dht_persistence {
DhtBuilder::with_config(DhtConfig {
announce_addr,
..Default::default()
})
.await
} else {
let mut pdht_config = opts.dht_config.take().unwrap_or_default();
pdht_config.announce_addr = announce_addr;
PersistentDht::create(Some(pdht_config)).await
}
.context("error initializing DHT")?;
Some(dht)
@ -464,7 +510,12 @@ impl Session {
}
async fn task_tcp_listener(self: Arc<Self>, l: TcpListener) -> anyhow::Result<()> {
// TODO
let mut buf = vec![0u8; 4096];
loop {
let (stream, addr) = l.accept().await.context("error accepting")?;
info!("accepted connection from {addr}");
}
Ok(())
}
@ -511,6 +562,10 @@ impl Session {
});
}
fn stop(&self) {
let _ = self.cancel_tx.send(());
}
async fn populate_from_stored(self: &Arc<Self>) -> anyhow::Result<()> {
let mut rdr = match std::fs::File::open(&self.persistence_filename) {
Ok(f) => BufReader::new(f),

View file

@ -71,6 +71,22 @@ struct Opts {
#[arg(short = 't', long)]
worker_threads: Option<usize>,
// Enable to listen on 0.0.0.0 on TCP for torrent requests.
#[arg(long = "tcp-listen", default_value = "true")]
tcp_listen: bool,
/// The minimal port to listen for incoming connections.
#[arg(long = "tcp-min-port", default_value = "4240")]
tcp_listen_min_port: u16,
/// The maximal port to listen for incoming connections.
#[arg(long = "tcp-max-port", default_value = "4260")]
tcp_listen_max_port: u16,
/// If set, will try to publish the chosen port through upnp on your router.
#[arg(long = "enable-upnp", default_value = "true")]
enable_upnp: bool,
#[command(subcommand)]
subcommand: SubCommand,
}
@ -311,6 +327,12 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
read_write_timeout: Some(opts.peer_read_write_timeout),
..Default::default()
}),
listen_port_range: if opts.tcp_listen {
Some(opts.tcp_listen_min_port..opts.tcp_listen_max_port)
} else {
None
},
enable_upnp_port_forwarding: opts.enable_upnp,
};
let stats_printer = |session: Arc<Session>| async move {
@ -371,6 +393,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
sopts.persistence = !start_opts.disable_persistence;
sopts.persistence_filename =
start_opts.persistence_filename.clone().map(PathBuf::from);
let session =
Session::new_with_opts(PathBuf::from(&start_opts.output_folder), sopts)
.await