Create TCP listener

This commit is contained in:
Igor Katson 2023-12-05 14:48:19 +00:00
parent 626c529000
commit 71d49a88b6
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 111 additions and 23 deletions

1
Cargo.lock generated
View file

@ -1275,6 +1275,7 @@ dependencies = [
"librqbit-dht",
"librqbit-peer-protocol",
"librqbit-sha1-wrapper",
"librqbit-upnp",
"openssl",
"parking_lot",
"rand",

View file

@ -29,6 +29,7 @@ clone_to_owned = {path = "../clone_to_owned", package="librqbit-clone-to-owned",
peer_binary_protocol = {path = "../peer_binary_protocol", package="librqbit-peer-protocol", version = "3.2.1"}
sha1w = {path = "../sha1w", default-features=false, package="librqbit-sha1-wrapper", version="2.2.1"}
dht = {path = "../dht", package="librqbit-dht", version="4.0.0"}
librqbit-upnp = {path = "../upnp", version = "0.1.0"}
tokio = {version = "1", features = ["macros", "rt-multi-thread"]}
axum = {version = "0.7"}

View file

@ -23,6 +23,7 @@ use parking_lot::RwLock;
use reqwest::Url;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_with::serde_as;
use tokio::net::TcpListener;
use tracing::{debug, error, error_span, info, warn};
use crate::{
@ -147,6 +148,9 @@ pub struct Session {
spawner: BlockingSpawner,
db: RwLock<SessionDatabase>,
output_folder: PathBuf,
cancel_tx: tokio::sync::watch::Sender<()>,
cancel_rx: tokio::sync::watch::Receiver<()>,
}
async fn torrent_from_url(url: &str) -> anyhow::Result<TorrentMetaV1Owned> {
@ -322,6 +326,23 @@ pub struct SessionOptions {
pub peer_id: Option<Id20>,
/// Configure default peer connection options. Can be overriden per torrent.
pub peer_opts: Option<PeerConnectionOptions>,
pub listen_port_range: Option<std::ops::Range<u16>>,
pub enable_upnp_port_forwarding: bool,
}
async fn create_tcp_listener(
port_range: std::ops::Range<u16>,
) -> anyhow::Result<(TcpListener, u16)> {
for port in port_range.clone() {
match TcpListener::bind(("0.0.0.0", port)).await {
Ok(l) => return Ok((l, port)),
Err(e) => {
debug!("error listening on port {port}: {e:#}")
}
}
}
bail!("no free TCP ports in range {port_range:?}");
}
impl Session {
@ -336,6 +357,16 @@ impl Session {
opts: SessionOptions,
) -> anyhow::Result<Arc<Self>> {
let peer_id = opts.peer_id.unwrap_or_else(generate_peer_id);
let (tcp_listener, port) = if let Some(port_range) = opts.listen_port_range {
let (l, p) = create_tcp_listener(port_range)
.await
.context("error listening on TCP")?;
(Some(l), Some(p))
} else {
(None, None)
};
let dht = if opts.disable_dht {
None
} else {
@ -355,6 +386,9 @@ impl Session {
.join("session.json"),
};
let spawner = BlockingSpawner::default();
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(());
let session = Arc::new(Self {
persistence_filename,
peer_id,
@ -363,8 +397,28 @@ impl Session {
spawner,
output_folder,
db: RwLock::new(Default::default()),
cancel_rx,
cancel_tx,
});
if let Some(tcp_listener) = tcp_listener {
session.spawn(
"tcp listener",
error_span!("tcp_listen", port = port),
session.clone().task_tcp_listener(tcp_listener),
);
}
if let Some(listen_port) = port {
if opts.enable_upnp_port_forwarding {
session.spawn(
"upnp_forward",
error_span!("upnp_forward", port = listen_port),
session.clone().task_upnp_port_forwarder(listen_port),
);
}
}
if opts.persistence {
info!(
"will use {:?} for session persistence",
@ -375,36 +429,50 @@ impl Session {
format!("couldn't create directory {:?} for session storage", parent)
})?;
}
let session = session.clone();
spawn(
let persistence_task = session.clone().task_persistence();
session.spawn(
"session persistene",
error_span!("session_persistence"),
async move {
// Populate initial from the state filename
if let Err(e) = session.populate_from_stored().await {
error!("could not populate session from stored file: {:?}", e);
}
let session = Arc::downgrade(&session);
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let session = match session.upgrade() {
Some(s) => s,
None => break,
};
if let Err(e) = session.dump_to_disk() {
error!("error dumping session to disk: {:?}", e);
}
}
Ok(())
},
persistence_task,
);
}
Ok(session)
}
async fn task_persistence(self: Arc<Self>) -> anyhow::Result<()> {
// Populate initial from the state filename
if let Err(e) = self.populate_from_stored().await {
error!("could not populate session from stored file: {:?}", e);
}
let session = Arc::downgrade(&self);
drop(self);
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
let session = match session.upgrade() {
Some(s) => s,
None => break,
};
if let Err(e) = session.dump_to_disk() {
error!("error dumping session to disk: {:?}", e);
}
}
Ok(())
}
async fn task_tcp_listener(self: Arc<Self>, l: TcpListener) -> anyhow::Result<()> {
// TODO
Ok(())
}
async fn task_upnp_port_forwarder(self: Arc<Self>, port: u16) -> anyhow::Result<()> {
let pf = librqbit_upnp::UpnpPortForwarder::new(vec![port], None)?;
pf.run_forever().await
}
pub fn get_dht(&self) -> Option<&Dht> {
self.dht.as_ref()
}
@ -425,6 +493,24 @@ impl Session {
}
}
fn spawn(
&self,
name: &str,
span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) {
let mut cancel_rx = self.cancel_rx.clone();
spawn(name, span, async move {
tokio::select! {
r = fut => r,
_ = cancel_rx.changed() => {
debug!("task canceled");
Ok(())
}
}
});
}
async fn populate_from_stored(self: &Arc<Self>) -> anyhow::Result<()> {
let mut rdr = match std::fs::File::open(&self.persistence_filename) {
Ok(f) => BufReader::new(f),