Do not announce when listing torrents

This commit is contained in:
Igor Katson 2023-12-05 23:31:04 +00:00
parent c3eb03c72d
commit bc243143e5
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
5 changed files with 30 additions and 23 deletions

View file

@ -17,7 +17,7 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let dht = DhtBuilder::new().await.context("error initializing DHT")?;
let mut stream = dht.get_peers(info_hash)?;
let mut stream = dht.get_peers(info_hash, None)?;
let stats_printer = async {
loop {

View file

@ -97,6 +97,7 @@ trait RecursiveRequestCallbacks: Sized + Send + Sync + 'static {
struct RecursiveRequestCallbacksGetPeers {
// Id20::from_str("00000fffffffffffffffffffffffffffffffffff").unwrap()
min_distance_to_announce: Id20,
announce_port: Option<u16>,
}
impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers {
@ -109,7 +110,7 @@ impl RecursiveRequestCallbacks for RecursiveRequestCallbacksGetPeers {
addr: SocketAddr,
resp: &anyhow::Result<ResponseOrError>,
) {
let announce_port = match req.dht.announce_port {
let announce_port = match self.announce_port {
Some(a) => a,
None => return,
};
@ -189,7 +190,7 @@ pub struct RequestPeersStream {
}
impl RequestPeersStream {
fn new(dht: Arc<DhtState>, info_hash: Id20) -> Self {
fn new(dht: Arc<DhtState>, info_hash: Id20, announce_port: Option<u16>) -> Self {
let (peer_tx, peer_rx) = unbounded_channel();
let (node_tx, node_rx) = unbounded_channel();
let rp = Arc::new(RecursiveRequest {
@ -206,6 +207,7 @@ impl RequestPeersStream {
"0000ffffffffffffffffffffffffffffffffffff",
)
.unwrap(),
announce_port,
},
});
let join_handle = rp.request_peers_forever(node_rx);
@ -534,7 +536,6 @@ pub struct DhtState {
worker_sender: UnboundedSender<WorkerSendRequest>,
pub(crate) peer_store: PeerStore,
announce_port: Option<u16>,
}
impl DhtState {
@ -544,7 +545,6 @@ impl DhtState {
routing_table: Option<RoutingTable>,
listen_addr: SocketAddr,
peer_store: PeerStore,
announce_port: Option<u16>,
) -> Self {
let routing_table = routing_table.unwrap_or_else(|| RoutingTable::new(id, None));
Self {
@ -556,7 +556,6 @@ impl DhtState {
listen_addr,
rate_limiter: make_rate_limiter(),
peer_store,
announce_port,
}
}
@ -1124,7 +1123,6 @@ pub struct DhtConfig {
pub bootstrap_addrs: Option<Vec<String>>,
pub routing_table: Option<RoutingTable>,
pub listen_addr: Option<SocketAddr>,
pub announce_port: Option<u16>,
pub peer_store: Option<PeerStore>,
}
@ -1160,7 +1158,6 @@ impl DhtState {
config.routing_table,
listen_addr,
config.peer_store.unwrap_or_else(|| PeerStore::new(peer_id)),
config.announce_port,
));
spawn(error_span!("dht"), {
@ -1174,8 +1171,16 @@ impl DhtState {
Ok(state)
}
pub fn get_peers(self: &Arc<Self>, info_hash: Id20) -> anyhow::Result<RequestPeersStream> {
Ok(RequestPeersStream::new(self.clone(), info_hash))
pub fn get_peers(
self: &Arc<Self>,
info_hash: Id20,
announce_port: Option<u16>,
) -> anyhow::Result<RequestPeersStream> {
Ok(RequestPeersStream::new(
self.clone(),
info_hash,
announce_port,
))
}
pub fn listen_addr(&self) -> SocketAddr {

View file

@ -20,7 +20,6 @@ use crate::{Dht, DhtConfig, DhtState};
pub struct PersistentDhtConfig {
pub dump_interval: Option<Duration>,
pub config_filename: Option<PathBuf>,
pub announce_port: Option<u16>,
}
#[derive(Serialize, Deserialize)]
@ -118,7 +117,6 @@ impl PersistentDht {
routing_table,
listen_addr,
peer_store,
announce_port: config.announce_port,
..Default::default()
};
let dht = DhtState::with_config(dht_config).await?;

View file

@ -108,7 +108,7 @@ mod tests {
let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap();
let dht = DhtBuilder::new().await.unwrap();
let peer_rx = dht.get_peers(info_hash).unwrap();
let peer_rx = dht.get_peers(info_hash, None).unwrap();
let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await
{

View file

@ -159,6 +159,8 @@ pub struct Session {
db: RwLock<SessionDatabase>,
output_folder: PathBuf,
tcp_listen_port: Option<u16>,
cancel_tx: tokio::sync::watch::Sender<()>,
cancel_rx: tokio::sync::watch::Receiver<()>,
}
@ -392,14 +394,9 @@ impl Session {
None
} else {
let dht = if opts.disable_dht_persistence {
DhtBuilder::with_config(DhtConfig {
announce_port: tcp_listen_port,
..Default::default()
})
.await
DhtBuilder::with_config(DhtConfig::default()).await
} else {
let mut pdht_config = opts.dht_config.take().unwrap_or_default();
pdht_config.announce_port = tcp_listen_port;
let pdht_config = opts.dht_config.take().unwrap_or_default();
PersistentDht::create(Some(pdht_config)).await
}
.context("error initializing DHT")?;
@ -426,6 +423,7 @@ impl Session {
db: RwLock::new(Default::default()),
cancel_rx,
cancel_tx,
tcp_listen_port,
});
if let Some(tcp_listener) = tcp_listener {
@ -740,6 +738,12 @@ impl Session {
let opts = opts.unwrap_or_default();
let announce_port = if opts.list_only {
None
} else {
self.tcp_listen_port
};
let (info_hash, info, dht_rx, trackers, initial_peers) = match add {
AddTorrent::Url(magnet) if magnet.starts_with("magnet:") => {
let Magnet {
@ -751,7 +755,7 @@ impl Session {
.dht
.as_ref()
.context("magnet links without DHT are not supported")?
.get_peers(info_hash)?;
.get_peers(info_hash, announce_port)?;
let trackers = trackers
.into_iter()
@ -814,7 +818,7 @@ impl Session {
let dht_rx = match self.dht.as_ref() {
Some(dht) if !opts.paused && !opts.list_only => {
debug!("reading peers for {:?} from DHT", torrent.info_hash);
Some(dht.get_peers(torrent.info_hash)?)
Some(dht.get_peers(torrent.info_hash, announce_port)?)
}
_ => None,
};
@ -1047,7 +1051,7 @@ impl Session {
let peer_rx = self
.dht
.as_ref()
.map(|dht| dht.get_peers(handle.info_hash()))
.map(|dht| dht.get_peers(handle.info_hash(), self.tcp_listen_port))
.transpose()?;
handle.start(Default::default(), peer_rx, false)?;
Ok(())