The sucker now works with dht too! Albeit hacky JS one, but still!
This commit is contained in:
parent
b4ade5eb13
commit
2061fe56bb
10 changed files with 345 additions and 40 deletions
84
crates/librqbit/src/dht/inforead.rs
Normal file
84
crates/librqbit/src/dht/inforead.rs
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
use std::net::SocketAddr;
|
||||
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use log::warn;
|
||||
use tokio::sync::mpsc::UnboundedReceiver;
|
||||
|
||||
use crate::{buffers::ByteString, peer_info_reader, torrent_metainfo::TorrentMetaV1Info};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ReadMetainfoResult {
|
||||
Found {
|
||||
info: TorrentMetaV1Info<ByteString>,
|
||||
rx: UnboundedReceiver<SocketAddr>,
|
||||
seen: Vec<SocketAddr>,
|
||||
},
|
||||
ChannelClosed {
|
||||
seen: Vec<SocketAddr>,
|
||||
},
|
||||
}
|
||||
|
||||
pub async fn read_metainfo_from_peer_receiver(
|
||||
peer_id: [u8; 20],
|
||||
info_hash: [u8; 20],
|
||||
mut addrs: UnboundedReceiver<SocketAddr>,
|
||||
) -> ReadMetainfoResult {
|
||||
let mut seen = Vec::<SocketAddr>::new();
|
||||
let first_addr = match addrs.recv().await {
|
||||
Some(addr) => addr,
|
||||
None => return ReadMetainfoResult::ChannelClosed { seen },
|
||||
};
|
||||
seen.push(first_addr);
|
||||
|
||||
let mut unordered = FuturesUnordered::new();
|
||||
unordered.push(peer_info_reader::read_metainfo_from_peer(
|
||||
first_addr, peer_id, info_hash,
|
||||
));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
next_addr = addrs.recv() => {
|
||||
match next_addr {
|
||||
Some(addr) => {
|
||||
seen.push(addr);
|
||||
unordered.push(peer_info_reader::read_metainfo_from_peer(addr, peer_id, info_hash));
|
||||
},
|
||||
None => return ReadMetainfoResult::ChannelClosed { seen },
|
||||
}
|
||||
},
|
||||
done = unordered.next(), if !unordered.is_empty() => {
|
||||
match done {
|
||||
Some(Ok(info)) => return ReadMetainfoResult::Found { info, seen, rx: addrs },
|
||||
Some(Err(e)) => {
|
||||
warn!("error in peer_info_reader::read_metainfo_from_peer: {}", e);
|
||||
},
|
||||
None => unreachable!()
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::info_hash::decode_info_hash;
|
||||
use crate::{dht::jsdht::JsDht, peer_id::generate_peer_id};
|
||||
use std::sync::Once;
|
||||
|
||||
static LOG_INIT: Once = Once::new();
|
||||
|
||||
fn init_logging() {
|
||||
LOG_INIT.call_once(pretty_env_logger::init)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_metainfo_from_dht() {
|
||||
init_logging();
|
||||
|
||||
let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap();
|
||||
let peer_rx = JsDht::new(info_hash).start_peer_discovery().unwrap();
|
||||
let peer_id = generate_peer_id();
|
||||
dbg!(read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx).await);
|
||||
}
|
||||
}
|
||||
72
crates/librqbit/src/dht/jsdht.rs
Normal file
72
crates/librqbit/src/dht/jsdht.rs
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
use std::{io::BufRead, io::BufReader, net::SocketAddr, process::Stdio, str::FromStr};
|
||||
|
||||
use log::info;
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
|
||||
// Collects seen peers for torrent
|
||||
// Knows if they work or not.
|
||||
// Informs subscribers of new peers discovered.
|
||||
//
|
||||
// Can discover metainfo quickly (limiting concurrency).
|
||||
|
||||
pub struct JsDht {
|
||||
info_hash: [u8; 20],
|
||||
}
|
||||
|
||||
static NODEJS_DISCOVER_SCRIPT: &str = r#"
|
||||
const DHT = require('bittorrent-dht')
|
||||
|
||||
let dht = new DHT();
|
||||
let infoHash = process.env["INFOHASH"];
|
||||
|
||||
dht.on('peer', function (peer, infoHash, from) {
|
||||
console.log(peer.host + ':' + peer.port)
|
||||
})
|
||||
|
||||
dht.lookup(infoHash)
|
||||
"#;
|
||||
|
||||
fn infohash_hex(info_hash: [u8; 20]) -> String {
|
||||
hex::encode(info_hash)
|
||||
}
|
||||
|
||||
impl JsDht {
|
||||
pub fn new(info_hash: [u8; 20]) -> Self {
|
||||
Self { info_hash }
|
||||
}
|
||||
pub fn start_peer_discovery(self) -> anyhow::Result<UnboundedReceiver<SocketAddr>> {
|
||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
std::thread::spawn(move || self.discover_peers_and_send(tx).unwrap());
|
||||
Ok(rx)
|
||||
}
|
||||
fn discover_peers_and_send(self, tx: UnboundedSender<SocketAddr>) -> anyhow::Result<()> {
|
||||
let mut cmd = std::process::Command::new("node");
|
||||
cmd.arg("-e")
|
||||
.arg(NODEJS_DISCOVER_SCRIPT)
|
||||
.env("NODE_PATH", "/opt/homebrew/lib/node_modules")
|
||||
.env("INFOHASH", infohash_hex(self.info_hash))
|
||||
.stdout(Stdio::piped());
|
||||
|
||||
info!("Executing {:?}", &cmd);
|
||||
|
||||
let mut child = cmd.spawn()?;
|
||||
|
||||
let stdout = child.stdout.take().unwrap();
|
||||
let mut stdout = BufReader::new(stdout);
|
||||
let mut line = String::new();
|
||||
loop {
|
||||
line.clear();
|
||||
let size = stdout.read_line(&mut line)?;
|
||||
if size == 0 {
|
||||
anyhow::bail!("node discover process was not supposed to close")
|
||||
}
|
||||
// Remove newline character;
|
||||
line.pop();
|
||||
|
||||
let ipaddr = SocketAddr::from_str(&line)?;
|
||||
if tx.send(ipaddr).is_err() {
|
||||
anyhow::bail!("receiver closed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
2
crates/librqbit/src/dht/mod.rs
Normal file
2
crates/librqbit/src/dht/mod.rs
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
pub mod inforead;
|
||||
pub mod jsdht;
|
||||
Loading…
Add table
Add a link
Reference in a new issue