From 49cb6f9d65e953f09b0c7d2149adc157f3fd5ecd Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Sun, 18 Jul 2021 15:53:23 +0100 Subject: [PATCH] Nothing much --- crates/dht/src/dht.rs | 23 +++++++++++++--- crates/dht/src/persistence.rs | 49 +++++++++++++++++++++++------------ crates/rqbit/src/main.rs | 32 +++++++++++------------ 3 files changed, 69 insertions(+), 35 deletions(-) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index aa1058e..da9a025 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -42,6 +42,7 @@ struct DhtState { next_transaction_id: u16, outstanding_requests: HashMap<(u16, SocketAddr), Request>, routing_table: RoutingTable, + listen_addr: SocketAddr, // This sender sends requests to the worker. // It is unbounded so that the methods on Dht state don't need to be async. @@ -61,6 +62,7 @@ impl DhtState { id: Id20, sender: UnboundedSender<(Message, SocketAddr)>, routing_table: Option, + listen_addr: SocketAddr, ) -> Self { let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id)); Self { @@ -69,6 +71,7 @@ impl DhtState { outstanding_requests: Default::default(), routing_table, sender, + listen_addr, seen_peers: Default::default(), get_peers_subscribers: Default::default(), made_requests: Default::default(), @@ -612,6 +615,7 @@ pub struct DhtConfig { pub peer_id: Option, pub bootstrap_addrs: Option>, pub routing_table: Option, + pub listen_addr: Option, } impl Dht { @@ -619,9 +623,17 @@ impl Dht { Self::with_config(DhtConfig::default()).await } pub async fn with_config(config: DhtConfig) -> anyhow::Result { - let socket = UdpSocket::bind("0.0.0.0:0") - .await - .context("error binding socket")?; + let socket = match config.listen_addr { + Some(addr) => UdpSocket::bind(addr).await, + None => UdpSocket::bind("0.0.0.0:0").await, + } + .context("error binding socket")?; + + let listen_addr = socket + .local_addr() + .context("cannot determine UDP listen addr")?; + info!("DHT listening on {:?}", listen_addr); + let peer_id = config.peer_id.unwrap_or_else(generate_peer_id); info!("starting up DHT with peer id {:?}", peer_id); let bootstrap_addrs = config @@ -633,6 +645,7 @@ impl Dht { peer_id, in_tx.clone(), config.routing_table, + listen_addr, ))); tokio::spawn({ @@ -663,6 +676,10 @@ impl Dht { }) } + pub fn listen_addr(&self) -> SocketAddr { + self.state.lock().listen_addr + } + pub fn stats(&self) -> DhtStats { self.state.lock().get_stats() } diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index 1cdbff0..4ed6b9e 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -1,11 +1,13 @@ // TODO: this now stores only the routing table, but we also need AT LEAST the same socket address... +use serde::{Deserialize, Serialize}; use std::fs::OpenOptions; +use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::time::Duration; use anyhow::Context; -use log::{debug, error, info, warn}; +use log::{debug, error, info, trace, warn}; use tokio::spawn; use crate::dht::{Dht, DhtConfig}; @@ -17,6 +19,12 @@ pub struct PersistentDhtConfig { pub config_filename: Option, } +#[derive(Serialize, Deserialize)] +struct DhtSerialize { + addr: SocketAddr, + table: Table, +} + pub struct PersistentDht { // config_filename: PathBuf, } @@ -29,7 +37,10 @@ fn dump_dht(dht: &Dht, filename: &Path, tempfile_name: &Path) -> anyhow::Result< .open(&tempfile_name) .with_context(|| format!("error opening {:?}", tempfile_name))?; - match dht.with_routing_table(|r| serde_json::to_writer(&mut file, r)) { + let addr = dht.listen_addr(); + match dht + .with_routing_table(|r| serde_json::to_writer(&mut file, &DhtSerialize { addr, table: r })) + { Ok(_) => { debug!("dumped DHT to {:?}", &tempfile_name); } @@ -63,29 +74,35 @@ impl PersistentDht { .with_context(|| format!("error creating dir {:?}", &parent))?; } - let routing_table = match OpenOptions::new().read(true).open(&config_filename) { - Ok(dht_json) => match serde_json::from_reader::<_, RoutingTable>(&dht_json) { - Ok(r) => { - info!("loaded DHT routing table from {:?}", &config_filename); - Some(r) + let de = match OpenOptions::new().read(true).open(&config_filename) { + Ok(dht_json) => { + match serde_json::from_reader::<_, DhtSerialize>(&dht_json) { + Ok(r) => { + info!("loaded DHT routing table from {:?}", &config_filename); + Some(r) + } + Err(e) => { + warn!( + "cannot deserialize routing table from file {:?}: {:#}", + &config_filename, e + ); + None + } } - Err(e) => { - warn!( - "cannot deserialize routing table from file {:?}: {:#}", - &config_filename, e - ); - None - } - }, + } Err(e) => match e.kind() { std::io::ErrorKind::NotFound => None, _ => return Err(e).with_context(|| format!("error reading {:?}", config_filename)), }, }; + let (listen_addr, routing_table) = de + .map(|de| (Some(de.addr), Some(de.table))) + .unwrap_or((None, None)); let peer_id = routing_table.as_ref().map(|r| r.id()); let dht_config = DhtConfig { peer_id, routing_table, + listen_addr, ..Default::default() }; let dht = Dht::with_config(dht_config).await?; @@ -104,8 +121,8 @@ impl PersistentDht { }; loop { + trace!("sleeping for {:?}", &dump_interval); tokio::time::sleep(dump_interval).await; - debug!("dumping DHT to {:?}", &config_filename); match dump_dht(&dht, &config_filename, &tempfile_name) { Ok(_) => debug!("dumped DHT to {:?}", &config_filename), diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 965ae09..e19dd3b 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -136,23 +136,23 @@ fn compute_only_files>( } fn init_logging(opts: &Opts) { - match opts.log_level.as_ref() { - Some(level) => { - let level_str = match level { - LogLevel::Trace => "trace", - LogLevel::Debug => "debug", - LogLevel::Info => "info", - LogLevel::Warn => "warn", - LogLevel::Error => "error", - }; - std::env::set_var("RUST_LOG", level_str); - } - None => { - if std::env::var_os("RUST_LOG").is_none() { + if std::env::var_os("RUST_LOG").is_none() { + match opts.log_level.as_ref() { + Some(level) => { + let level_str = match level { + LogLevel::Trace => "trace", + LogLevel::Debug => "debug", + LogLevel::Info => "info", + LogLevel::Warn => "warn", + LogLevel::Error => "error", + }; + std::env::set_var("RUST_LOG", level_str); + } + None => { std::env::set_var("RUST_LOG", "info"); - }; - } - }; + } + }; + } pretty_env_logger::init(); }