diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index fb8308c..65d188f 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -527,6 +527,13 @@ impl Dht { } pub async fn get_peers(&self, info_hash: Id20) -> impl StreamExt { let (tx, rx) = unbounded_channel::(); + + // This is a hack to test localhost speeds, uncomment to test that quickly. + // + // tx.send(Response::Peer("127.0.0.1:27311".parse().unwrap())) + // .unwrap(); + // std::mem::forget(tx); + self.request_tx .send((Request::GetPeers(info_hash), tx)) .await diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index d5065d7..9a0e3cf 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -94,6 +94,9 @@ struct Opts { /// profilers work better with this one. #[clap(short, long)] single_thread_runtime: bool, + + #[clap(long = "disable-dht")] + disable_dht: bool, } fn compute_only_files>( @@ -169,14 +172,21 @@ fn main() -> anyhow::Result<()> { async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> { let peer_id = generate_peer_id(); - let dht = Dht::new().await.context("error initializing DHT")?; + let dht = if opts.disable_dht { + None + } else { + Some(Dht::new().await.context("error initializing DHT")?) + }; if opts.torrent_path.starts_with("magnet:") { let Magnet { info_hash, trackers, } = Magnet::parse(&opts.torrent_path).context("provided path is not a valid magnet URL")?; - let dht_rx = dht.get_peers(info_hash).await; + let dht_rx = dht + .ok_or_else(|| anyhow::anyhow!("magnet links without DHT are not supported"))? + .get_peers(info_hash) + .await; let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx).await { ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), @@ -189,7 +199,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> info_hash, info, peer_id, - dht_rx, + Some(dht_rx), initial_peers.into_iter().collect(), trackers .into_iter() @@ -212,7 +222,10 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> } else { torrent_from_file(&opts.torrent_path)? }; - let dht_rx = dht.get_peers(torrent.info_hash).await; + let dht_rx = match dht { + Some(dht) => Some(dht.get_peers(torrent.info_hash).await), + None => None, + }; let trackers = torrent .iter_announce() .filter_map(|tracker| { @@ -252,7 +265,7 @@ async fn main_info( info_hash: Id20, info: TorrentMetaV1Info, peer_id: Id20, - mut dht_peer_rx: impl StreamExt + Unpin + Send + Sync + 'static, + dht_peer_rx: Option + Unpin + Send + Sync + 'static>, initial_peers: Vec, trackers: Vec, spawner: BlockingSpawner, @@ -296,16 +309,19 @@ async fn main_info( for peer in initial_peers { handle.add_peer(peer); } - spawn("DHT peer adder", { - let handle = handle.clone(); - async move { - while let Some(peer) = dht_peer_rx.next().await { - handle.add_peer(peer); + if let Some(mut dht_peer_rx) = dht_peer_rx { + spawn("DHT peer adder", { + let handle = handle.clone(); + async move { + while let Some(peer) = dht_peer_rx.next().await { + handle.add_peer(peer); + } + warn!("dht was closed"); + Ok(()) } - warn!("dht was closed"); - Ok(()) - } - }); + }); + } + handle.wait_until_completed().await?; Ok(()) }