diff --git a/Cargo.lock b/Cargo.lock index 86ced0f..d22b608 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -444,6 +444,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.4" @@ -605,6 +611,7 @@ dependencies = [ "bitvec", "byteorder", "futures", + "hex", "log", "openssl", "parking_lot", @@ -613,6 +620,7 @@ dependencies = [ "serde", "size_format", "tokio", + "url", "urlencoding", "uuid", "warp", diff --git a/crates/librqbit/Cargo.lock b/crates/librqbit/Cargo.lock index f9f28ee..aa6e4f2 100644 --- a/crates/librqbit/Cargo.lock +++ b/crates/librqbit/Cargo.lock @@ -618,6 +618,7 @@ dependencies = [ "sha1", "size_format", "tokio", + "url", "urlencoding", "uuid", "warp", diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 6d97656..606e99c 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -34,6 +34,8 @@ sha1 = {version = "0.6", optional=true} uuid = {version = "0.8", features = ["v4"]} futures = "0.3" +url = "2" +hex = "0.4" [dev-dependencies] futures = {version = "0.3"} diff --git a/crates/librqbit/src/dht/inforead.rs b/crates/librqbit/src/dht/inforead.rs new file mode 100644 index 0000000..9841003 --- /dev/null +++ b/crates/librqbit/src/dht/inforead.rs @@ -0,0 +1,84 @@ +use std::net::SocketAddr; + +use futures::{stream::FuturesUnordered, StreamExt}; +use log::warn; +use tokio::sync::mpsc::UnboundedReceiver; + +use crate::{buffers::ByteString, peer_info_reader, torrent_metainfo::TorrentMetaV1Info}; + +#[derive(Debug)] +pub enum ReadMetainfoResult { + Found { + info: TorrentMetaV1Info, + rx: UnboundedReceiver, + seen: Vec, + }, + ChannelClosed { + seen: Vec, + }, +} + +pub async fn read_metainfo_from_peer_receiver( + peer_id: [u8; 20], + info_hash: [u8; 20], + mut addrs: UnboundedReceiver, +) -> ReadMetainfoResult { + let mut seen = Vec::::new(); + let first_addr = match addrs.recv().await { + Some(addr) => addr, + None => return ReadMetainfoResult::ChannelClosed { seen }, + }; + seen.push(first_addr); + + let mut unordered = FuturesUnordered::new(); + unordered.push(peer_info_reader::read_metainfo_from_peer( + first_addr, peer_id, info_hash, + )); + + loop { + tokio::select! { + next_addr = addrs.recv() => { + match next_addr { + Some(addr) => { + seen.push(addr); + unordered.push(peer_info_reader::read_metainfo_from_peer(addr, peer_id, info_hash)); + }, + None => return ReadMetainfoResult::ChannelClosed { seen }, + } + }, + done = unordered.next(), if !unordered.is_empty() => { + match done { + Some(Ok(info)) => return ReadMetainfoResult::Found { info, seen, rx: addrs }, + Some(Err(e)) => { + warn!("error in peer_info_reader::read_metainfo_from_peer: {}", e); + }, + None => unreachable!() + } + } + }; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::info_hash::decode_info_hash; + use crate::{dht::jsdht::JsDht, peer_id::generate_peer_id}; + use std::sync::Once; + + static LOG_INIT: Once = Once::new(); + + fn init_logging() { + LOG_INIT.call_once(pretty_env_logger::init) + } + + #[tokio::test] + async fn read_metainfo_from_dht() { + init_logging(); + + let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap(); + let peer_rx = JsDht::new(info_hash).start_peer_discovery().unwrap(); + let peer_id = generate_peer_id(); + dbg!(read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx).await); + } +} diff --git a/crates/librqbit/src/dht/jsdht.rs b/crates/librqbit/src/dht/jsdht.rs new file mode 100644 index 0000000..dc7268d --- /dev/null +++ b/crates/librqbit/src/dht/jsdht.rs @@ -0,0 +1,72 @@ +use std::{io::BufRead, io::BufReader, net::SocketAddr, process::Stdio, str::FromStr}; + +use log::info; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + +// Collects seen peers for torrent +// Knows if they work or not. +// Informs subscribers of new peers discovered. +// +// Can discover metainfo quickly (limiting concurrency). + +pub struct JsDht { + info_hash: [u8; 20], +} + +static NODEJS_DISCOVER_SCRIPT: &str = r#" +const DHT = require('bittorrent-dht') + +let dht = new DHT(); +let infoHash = process.env["INFOHASH"]; + +dht.on('peer', function (peer, infoHash, from) { + console.log(peer.host + ':' + peer.port) +}) + +dht.lookup(infoHash) +"#; + +fn infohash_hex(info_hash: [u8; 20]) -> String { + hex::encode(info_hash) +} + +impl JsDht { + pub fn new(info_hash: [u8; 20]) -> Self { + Self { info_hash } + } + pub fn start_peer_discovery(self) -> anyhow::Result> { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + std::thread::spawn(move || self.discover_peers_and_send(tx).unwrap()); + Ok(rx) + } + fn discover_peers_and_send(self, tx: UnboundedSender) -> anyhow::Result<()> { + let mut cmd = std::process::Command::new("node"); + cmd.arg("-e") + .arg(NODEJS_DISCOVER_SCRIPT) + .env("NODE_PATH", "/opt/homebrew/lib/node_modules") + .env("INFOHASH", infohash_hex(self.info_hash)) + .stdout(Stdio::piped()); + + info!("Executing {:?}", &cmd); + + let mut child = cmd.spawn()?; + + let stdout = child.stdout.take().unwrap(); + let mut stdout = BufReader::new(stdout); + let mut line = String::new(); + loop { + line.clear(); + let size = stdout.read_line(&mut line)?; + if size == 0 { + anyhow::bail!("node discover process was not supposed to close") + } + // Remove newline character; + line.pop(); + + let ipaddr = SocketAddr::from_str(&line)?; + if tx.send(ipaddr).is_err() { + anyhow::bail!("receiver closed") + } + } + } +} diff --git a/crates/librqbit/src/dht/mod.rs b/crates/librqbit/src/dht/mod.rs new file mode 100644 index 0000000..8f0756c --- /dev/null +++ b/crates/librqbit/src/dht/mod.rs @@ -0,0 +1,2 @@ +pub mod inforead; +pub mod jsdht; diff --git a/crates/librqbit/src/info_hash.rs b/crates/librqbit/src/info_hash.rs new file mode 100644 index 0000000..6acdb03 --- /dev/null +++ b/crates/librqbit/src/info_hash.rs @@ -0,0 +1,7 @@ +pub type InfoHash = [u8; 20]; + +pub fn decode_info_hash(hash_str: &str) -> anyhow::Result { + let mut hash_arr = [0u8; 20]; + hex::decode_to_slice(hash_str, &mut hash_arr)?; + Ok(hash_arr) +} diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 377a61e..979c2dc 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -3,9 +3,12 @@ pub mod buffers; pub mod chunk_tracker; pub mod clone_to_owned; pub mod constants; +pub mod dht; pub mod file_ops; pub mod http_api; +pub mod info_hash; pub mod lengths; +pub mod magnet; pub mod peer_binary_protocol; pub mod peer_connection; pub mod peer_handler; diff --git a/crates/librqbit/src/magnet.rs b/crates/librqbit/src/magnet.rs new file mode 100644 index 0000000..d298326 --- /dev/null +++ b/crates/librqbit/src/magnet.rs @@ -0,0 +1,50 @@ +use crate::info_hash::{decode_info_hash, InfoHash}; +use anyhow::Context; + +pub struct Magnet { + pub info_hash: InfoHash, + pub trackers: Vec, +} + +impl Magnet { + pub fn parse(url: &str) -> anyhow::Result { + let url = url::Url::parse(url).context("magnet link must be a valid URL")?; + if url.scheme() != "magnet" { + anyhow::bail!("expected scheme magnet"); + } + let mut info_hash: Option = None; + let mut trackers = Vec::::new(); + for (key, value) in url.query_pairs() { + match key.as_ref() { + "xt" => match value.as_ref().strip_prefix("urn:btih:") { + Some(infohash) => { + info_hash.replace(decode_info_hash(infohash)?); + } + None => anyhow::bail!("expected xt to start with urn:btih:"), + }, + "tr" => trackers.push(value.into()), + _ => {} + } + } + match info_hash { + Some(info_hash) => { + return Ok(Magnet { + info_hash, + trackers, + }) + } + None => { + anyhow::bail!("did not find infohash") + } + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn test_parse_magnet_as_url() { + let magnet = "magnet:?xt=urn:btih:a621779b5e3d486e127c3efbca9b6f8d135f52e5&dn=rutor.info_%D0%92%D0%BE%D0%B9%D0%BD%D0%B0+%D0%B1%D1%83%D0%B4%D1%83%D1%89%D0%B5%D0%B3%D0%BE+%2F+The+Tomorrow+War+%282021%29+WEB-DLRip+%D0%BE%D1%82+MegaPeer+%7C+P+%7C+NewComers&tr=udp://opentor.org:2710&tr=udp://opentor.org:2710&tr=http://retracker.local/announce"; + dbg!(url::Url::parse(magnet).unwrap()); + } +} diff --git a/src/main.rs b/src/main.rs index e9aff44..897eca5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,20 @@ -use std::{fs::File, io::Read, time::Duration}; +use std::{fs::File, io::Read, net::SocketAddr, time::Duration}; use anyhow::Context; use clap::Clap; use librqbit::{ - spawn_utils::BlockingSpawner, + buffers::ByteString, + dht::{inforead::read_metainfo_from_peer_receiver, jsdht::JsDht}, + info_hash::InfoHash, + magnet, + peer_id::generate_peer_id, + spawn_utils::{spawn, BlockingSpawner}, torrent_manager::TorrentManagerBuilder, - torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Owned}, + torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned}, }; use log::{info, warn}; use reqwest::Url; +use tokio::sync::mpsc::UnboundedReceiver; async fn torrent_from_url(url: &str) -> anyhow::Result { let response = reqwest::get(url) @@ -87,13 +93,13 @@ struct Opts { single_thread_runtime: bool, } -fn compute_only_files( - torrent: &TorrentMetaV1Owned, +fn compute_only_files + AsRef<[u8]>>( + torrent: &TorrentMetaV1Info, filename_re: &str, ) -> anyhow::Result> { let filename_re = regex::Regex::new(&filename_re).context("filename regex is incorrect")?; let mut only_files = Vec::new(); - for (idx, (filename, _)) in torrent.info.iter_filenames_and_lengths().enumerate() { + for (idx, (filename, _)) in torrent.iter_filenames_and_lengths().enumerate() { let full_path = filename .to_pathbuf() .with_context(|| format!("filename of file {} is not valid utf8", idx))?; @@ -159,7 +165,48 @@ fn main() -> anyhow::Result<()> { .max_blocking_threads(8) .build()?; - rt.block_on(async move { + rt.block_on(async_main(opts, spawner)) +} + +async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> { + let peer_id = generate_peer_id(); + if opts.torrent_path.starts_with("magnet:") { + let magnet::Magnet { + info_hash, + trackers, + } = magnet::Magnet::parse(&opts.torrent_path) + .context("provided path is not a valid magnet URL")?; + let dht_rx = JsDht::new(info_hash).start_peer_discovery()?; + let (info, dht_rx, initial_peers) = + match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx).await { + librqbit::dht::inforead::ReadMetainfoResult::Found { info, rx, seen } => { + (info, rx, seen) + } + librqbit::dht::inforead::ReadMetainfoResult::ChannelClosed { seen } => { + anyhow::bail!("DHT died, no way to discover torrent metainfo") + } + }; + main_info( + opts, + info_hash, + info, + peer_id, + dht_rx, + initial_peers, + trackers + .into_iter() + .filter_map(|url| match reqwest::Url::parse(&url) { + Ok(url) => Some(url), + Err(e) => { + warn!("error parsing tracker {} as url", url); + None + } + }) + .collect(), + spawner, + ) + .await + } else { let torrent = if opts.torrent_path.starts_with("http://") || opts.torrent_path.starts_with("https://") { @@ -167,18 +214,7 @@ fn main() -> anyhow::Result<()> { } else { torrent_from_file(&opts.torrent_path)? }; - - info!("Torrent metadata: {:#?}", &torrent); - if opts.list { - return Ok(()); - } - - let only_files = if let Some(filename_re) = opts.only_files_matching_regex { - Some(compute_only_files(&torrent, &filename_re)?) - } else { - None - }; - + let dht_rx = JsDht::new(torrent.info_hash).start_peer_discovery()?; let trackers = torrent .iter_announce() .filter_map(|tracker| { @@ -198,25 +234,65 @@ fn main() -> anyhow::Result<()> { } }) .collect::>(); - - let mut builder = - TorrentManagerBuilder::new(torrent.info, torrent.info_hash, opts.output_folder); - builder.overwrite(opts.overwrite).spawner(spawner); - if let Some(only_files) = only_files { - builder.only_files(only_files); - } - - if let Some(interval) = opts.force_tracker_interval { - builder.force_tracker_interval(Duration::from_secs(interval)); - } - - let handle = builder.start_manager()?; - - for url in trackers { - handle.add_tracker(url); - } - - handle.wait_until_completed().await?; - Ok(()) - }) + main_info( + opts, + torrent.info_hash, + torrent.info, + peer_id, + dht_rx, + Vec::new(), + trackers, + spawner, + ) + .await + } +} + +#[allow(clippy::too_many_arguments)] +async fn main_info( + opts: Opts, + info_hash: InfoHash, + info: TorrentMetaV1Info, + peer_id: [u8; 20], + mut dht_peer_rx: UnboundedReceiver, + initial_peers: Vec, + trackers: Vec, + spawner: BlockingSpawner, +) -> anyhow::Result<()> { + info!("Torrent info: {:#?}", &info); + if opts.list { + return Ok(()); + } + let only_files = if let Some(filename_re) = opts.only_files_matching_regex { + Some(compute_only_files(&info, &filename_re)?) + } else { + None + }; + let mut builder = TorrentManagerBuilder::new(info, info_hash, opts.output_folder); + builder.overwrite(opts.overwrite).spawner(spawner); + if let Some(only_files) = only_files { + builder.only_files(only_files); + } + if let Some(interval) = opts.force_tracker_interval { + builder.force_tracker_interval(Duration::from_secs(interval)); + } + let handle = builder.start_manager()?; + for url in trackers { + handle.add_tracker(url); + } + for peer in initial_peers { + handle.add_peer(peer); + } + spawn("peer adder", { + let handle = handle.clone(); + async move { + while let Some(peer) = dht_peer_rx.recv().await { + handle.add_peer(peer); + } + warn!("dht was closed"); + Ok(()) + } + }); + handle.wait_until_completed().await?; + Ok(()) }