From 2eabebb5c32df252abe0afa38de6001d00b99e49 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 12 Jul 2021 19:55:23 +0100 Subject: [PATCH] Replaced DHT with custom one! Lets see if it works --- Cargo.lock | 193 ++-------------------------- crates/dht/Cargo.toml | 1 - crates/dht/src/bprotocol.rs | 3 +- crates/dht/src/dht.rs | 3 +- crates/dht/src/id20.rs | 130 ------------------- crates/dht/src/lib.rs | 10 +- crates/dht/src/main.rs | 4 +- crates/dht/src/routing_table.rs | 6 +- crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/dht/inforead.rs | 38 +++--- crates/librqbit_core/src/id20.rs | 131 +++++++++++++++++++ crates/librqbit_core/src/lib.rs | 1 + crates/rqbit/Cargo.toml | 2 + crates/rqbit/src/main.rs | 19 ++- 14 files changed, 192 insertions(+), 350 deletions(-) create mode 100644 crates/librqbit_core/src/id20.rs diff --git a/Cargo.lock b/Cargo.lock index 9acc6be..ef988fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,32 +11,12 @@ dependencies = [ "memchr", ] -[[package]] -name = "ansi_term" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" -dependencies = [ - "winapi", -] - [[package]] name = "anyhow" version = "1.0.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15af2628f6890fe2609a3b91bef4c83450512802e59489f9c1cb1fa5df064a61" -[[package]] -name = "async-trait" -version = "0.1.50" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "atty" version = "0.2.14" @@ -155,21 +135,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "clap" -version = "2.33.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" -dependencies = [ - "ansi_term", - "atty", - "bitflags", - "strsim 0.8.0", - "textwrap 0.11.0", - "unicode-width", - "vec_map", -] - [[package]] name = "clap" version = "3.0.0-beta.2" @@ -182,9 +147,9 @@ dependencies = [ "indexmap", "lazy_static", "os_str_bytes", - "strsim 0.10.0", + "strsim", "termcolor", - "textwrap 0.12.1", + "textwrap", "unicode-width", "vec_map", ] @@ -206,15 +171,6 @@ dependencies = [ name = "clone_to_owned" version = "0.1.0" -[[package]] -name = "cloudabi" -version = "0.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" -dependencies = [ - "bitflags", -] - [[package]] name = "commoncrypto" version = "0.2.0" @@ -277,9 +233,8 @@ dependencies = [ "anyhow", "bencode", "clone_to_owned", - "futures 0.3.15", + "futures", "hex 0.4.3", - "kad", "librqbit_core", "log", "parking_lot", @@ -352,24 +307,12 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fuchsia-cprng" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" - [[package]] name = "funty" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1847abb9cb65d566acd5942e94aea9c8f547ad02c98e1649326fc0e8910b8b1e" -[[package]] -name = "futures" -version = "0.1.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" - [[package]] name = "futures" version = "0.3.15" @@ -443,15 +386,6 @@ version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a16bef9fc1a4dddb5bee51c989e3fbba26569cbb0e31f5b303c184e3dd33dae" -[[package]] -name = "futures-timer" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5cedfe9b6dc756220782cc1ba5bcb1fa091cdcba155e40d3556159c3db58043" -dependencies = [ - "futures 0.1.31", -] - [[package]] name = "futures-util" version = "0.3.15" @@ -734,23 +668,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "kad" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ed647905c649de73b7df476658b34898472f49245de662a0041bbfefc5cdcb" -dependencies = [ - "async-trait", - "futures 0.3.15", - "futures-timer", - "humantime", - "log", - "num", - "rand 0.5.6", - "structopt", - "tracing", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -775,7 +692,7 @@ dependencies = [ "byteorder", "clone_to_owned", "crypto-hash", - "futures 0.3.15", + "futures", "hex 0.4.3", "librqbit_core", "log", @@ -791,6 +708,7 @@ dependencies = [ "sha1w", "size_format", "tokio", + "tokio-stream", "url", "urlencoding", "uuid", @@ -932,7 +850,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36" dependencies = [ - "num-bigint", "num-complex", "num-integer", "num-iter", @@ -940,18 +857,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-bigint" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", - "rand 0.5.6", -] - [[package]] name = "num-complex" version = "0.2.4" @@ -960,7 +865,6 @@ checksum = "b6b19411a9719e753aff12e5187b74d60d3dc449ec3f4dc21e3989c3f554bc95" dependencies = [ "autocfg", "num-traits", - "rand 0.5.6", ] [[package]] @@ -991,7 +895,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef" dependencies = [ "autocfg", - "num-bigint", "num-integer", "num-traits", ] @@ -1232,19 +1135,6 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "643f8f41a8ebc4c5dc4515c82bb8abd397b527fc20fd681b7c011c2aee5d44fb" -[[package]] -name = "rand" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9" -dependencies = [ - "cloudabi", - "fuchsia-cprng", - "libc", - "rand_core 0.3.1", - "winapi", -] - [[package]] name = "rand" version = "0.7.3" @@ -1290,21 +1180,6 @@ dependencies = [ "rand_core 0.6.3", ] -[[package]] -name = "rand_core" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" -dependencies = [ - "rand_core 0.4.2", -] - -[[package]] -name = "rand_core" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" - [[package]] name = "rand_core" version = "0.5.1" @@ -1415,8 +1290,9 @@ name = "rqbit" version = "0.1.0" dependencies = [ "anyhow", - "clap 3.0.0-beta.2", - "futures 0.3.15", + "clap", + "dht", + "futures", "librqbit", "log", "pretty_env_logger", @@ -1585,42 +1461,12 @@ dependencies = [ "winapi", ] -[[package]] -name = "strsim" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" - [[package]] name = "strsim" version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" -[[package]] -name = "structopt" -version = "0.3.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69b041cdcb67226aca307e6e7be44c8806423d83e018bd662360a93dabce4d71" -dependencies = [ - "clap 2.33.3", - "lazy_static", - "structopt-derive", -] - -[[package]] -name = "structopt-derive" -version = "0.4.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7813934aecf5f51a54775e00068c237de98489463968231a51746bbbc03f9c10" -dependencies = [ - "heck", - "proc-macro-error", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "syn" version = "1.0.73" @@ -1661,15 +1507,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "textwrap" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" -dependencies = [ - "unicode-width", -] - [[package]] name = "textwrap" version = "0.12.1" @@ -1795,21 +1632,9 @@ dependencies = [ "cfg-if", "log", "pin-project-lite", - "tracing-attributes", "tracing-core", ] -[[package]] -name = "tracing-attributes" -version = "0.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "tracing-core" version = "0.1.18" @@ -1972,7 +1797,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "332d47745e9a0c38636dbd454729b147d16bd1ed08ae67b3ab281c4506771054" dependencies = [ "bytes", - "futures 0.3.15", + "futures", "headers", "http", "hyper", diff --git a/crates/dht/Cargo.toml b/crates/dht/Cargo.toml index 5e2aa63..0ad6cfb 100644 --- a/crates/dht/Cargo.toml +++ b/crates/dht/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -kad = "0.6" tokio = {version = "1", features = ["macros", "rt-multi-thread", "net", "sync"]} tokio-stream = "0.1" serde = {version = "1", features = ["derive"]} diff --git a/crates/dht/src/bprotocol.rs b/crates/dht/src/bprotocol.rs index f2b31a3..236b30a 100644 --- a/crates/dht/src/bprotocol.rs +++ b/crates/dht/src/bprotocol.rs @@ -6,13 +6,12 @@ use std::{ use bencode::ByteBuf; use clone_to_owned::CloneToOwned; +use librqbit_core::id20::Id20; use serde::{ de::{IgnoredAny, Unexpected}, Deserialize, Deserializer, Serialize, }; -use crate::id20::Id20; - #[derive(Debug)] enum MessageType { Request, diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 6a6fbf2..63d0661 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -7,13 +7,12 @@ use crate::{ bprotocol::{ self, CompactNodeInfo, FindNodeRequest, GetPeersRequest, Message, MessageKind, Node, }, - id20::Id20, routing_table::{InsertResult, RoutingTable}, }; use anyhow::Context; use bencode::ByteString; use futures::{stream::FuturesUnordered, StreamExt}; -use librqbit_core::peer_id::generate_peer_id; +use librqbit_core::{id20::Id20, peer_id::generate_peer_id}; use log::{debug, info, trace, warn}; use parking_lot::Mutex; use tokio::{ diff --git a/crates/dht/src/id20.rs b/crates/dht/src/id20.rs index 6d7bc01..8b13789 100644 --- a/crates/dht/src/id20.rs +++ b/crates/dht/src/id20.rs @@ -1,131 +1 @@ -use std::{cmp::Ordering, str::FromStr}; -use serde::{Deserialize, Deserializer, Serialize}; - -#[derive(Clone, Copy, PartialEq, Eq, Hash)] -pub struct Id20(pub [u8; 20]); - -impl FromStr for Id20 { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - let mut out = [0u8; 20]; - if s.len() != 40 { - anyhow::bail!("expected a hex string of length 40") - }; - hex::decode_to_slice(s, &mut out)?; - Ok(Id20(out)) - } -} - -impl std::fmt::Debug for Id20 { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "<")?; - for byte in self.0 { - write!(f, "{:02x?}", byte)?; - } - write!(f, ">")?; - Ok(()) - } -} - -impl Serialize for Id20 { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_bytes(&self.0) - } -} - -impl<'de> Deserialize<'de> for Id20 { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct Visitor; - impl<'de> serde::de::Visitor<'de> for Visitor { - type Value = Id20; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(formatter, "a 20 byte slice") - } - fn visit_bytes(self, v: &[u8]) -> Result - where - E: serde::de::Error, - { - if v.len() != 20 { - return Err(E::invalid_length(20, &self)); - } - let mut buf = [0u8; 20]; - buf.copy_from_slice(&v); - Ok(Id20(buf)) - } - } - deserializer.deserialize_bytes(Visitor {}) - } -} - -impl Id20 { - pub fn distance(&self, other: &Id20) -> Id20 { - let mut xor = [0u8; 20]; - for (idx, (s, o)) in self - .0 - .iter() - .copied() - .zip(other.0.iter().copied()) - .enumerate() - { - xor[idx] = s ^ o; - } - Id20(xor) - } - pub fn set_bit(&mut self, bit: u8, value: bool) { - let n = &mut self.0[(bit / 8) as usize]; - if value { - *n |= 1 << (7 - bit % 8) - } else { - let mask = !(1 << (7 - bit % 8)); - *n &= mask; - } - } - pub fn set_bits_range(&mut self, r: std::ops::Range, value: bool) { - for bit in r { - self.set_bit(bit, value) - } - } -} - -impl Ord for Id20 { - fn cmp(&self, other: &Id20) -> Ordering { - for (s, o) in self.0.iter().copied().zip(other.0.iter().copied()) { - match s.cmp(&o) { - Ordering::Less => return Ordering::Less, - Ordering::Equal => continue, - Ordering::Greater => return Ordering::Greater, - } - } - Ordering::Equal - } -} - -impl PartialOrd for Id20 { - fn partial_cmp(&self, other: &Id20) -> Option { - Some(self.cmp(other)) - } -} - -#[cfg(test)] -mod tests { - use super::Id20; - - #[test] - fn test_set_bit_range() { - let mut id = Id20([0u8; 20]); - id.set_bits_range(9..17, true); - assert_eq!( - id, - Id20([0, 127, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) - ) - } -} diff --git a/crates/dht/src/lib.rs b/crates/dht/src/lib.rs index 3658052..7069dea 100644 --- a/crates/dht/src/lib.rs +++ b/crates/dht/src/lib.rs @@ -1,4 +1,6 @@ -pub mod bprotocol; -pub mod dht; -pub mod id20; -pub mod routing_table; +mod bprotocol; +mod dht; +mod routing_table; + +pub use dht::Dht; +pub use librqbit_core::id20::Id20; diff --git a/crates/dht/src/main.rs b/crates/dht/src/main.rs index 16fed0a..18115a6 100644 --- a/crates/dht/src/main.rs +++ b/crates/dht/src/main.rs @@ -1,7 +1,7 @@ -use std::{collections::HashSet, str::FromStr, time::Duration}; +use std::{collections::HashSet, str::FromStr}; use anyhow::Context; -use dht::{dht::Dht, id20::Id20}; +use dht::{Dht, Id20}; use tokio_stream::StreamExt; #[tokio::main] diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index e284f99..cdeaf33 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -3,10 +3,9 @@ use std::{ time::{Duration, Instant}, }; +use librqbit_core::id20::Id20; use log::debug; -use crate::id20::Id20; - #[derive(Debug)] enum BucketTreeNode { Leaf(Vec), @@ -426,9 +425,10 @@ impl RoutingTable { mod tests { use std::net::SocketAddrV4; + use librqbit_core::id20::Id20; use rand::Rng; - use crate::{id20::Id20, routing_table::compute_split_start_end}; + use crate::routing_table::compute_split_start_end; use super::RoutingTable; diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index d058412..57f3cc8 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -21,6 +21,7 @@ peer_binary_protocol = {path = "../peer_binary_protocol"} sha1w = {path = "../sha1w"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} +tokio-stream = "0.1" serde = {version = "1", features=["derive"]} serde_json = "1" anyhow = "1" diff --git a/crates/librqbit/src/dht/inforead.rs b/crates/librqbit/src/dht/inforead.rs index 5385b32..d243cdd 100644 --- a/crates/librqbit/src/dht/inforead.rs +++ b/crates/librqbit/src/dht/inforead.rs @@ -1,36 +1,35 @@ -use std::net::SocketAddr; +use std::{collections::HashSet, net::SocketAddr}; use buffers::ByteString; -use futures::{stream::FuturesUnordered, Stream, StreamExt}; +use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use log::debug; -use tokio::sync::mpsc::UnboundedReceiver; use crate::peer_info_reader; #[derive(Debug)] -pub enum ReadMetainfoResult { +pub enum ReadMetainfoResult { Found { info: TorrentMetaV1Info, - rx: UnboundedReceiver, - seen: Vec, + rx: Rx, + seen: HashSet, }, ChannelClosed { - seen: Vec, + seen: HashSet, }, } -pub async fn read_metainfo_from_peer_receiver( +pub async fn read_metainfo_from_peer_receiver + Unpin>( peer_id: [u8; 20], info_hash: [u8; 20], - mut addrs: impl StreamExt + Unpin, -) -> ReadMetainfoResult { - let mut seen = Vec::::new(); + mut addrs: A, +) -> ReadMetainfoResult { + let mut seen = HashSet::::new(); let first_addr = match addrs.next().await { Some(addr) => addr, None => return ReadMetainfoResult::ChannelClosed { seen }, }; - seen.push(first_addr); + seen.insert(first_addr); let mut unordered = FuturesUnordered::new(); unordered.push(peer_info_reader::read_metainfo_from_peer( @@ -42,8 +41,9 @@ pub async fn read_metainfo_from_peer_receiver( next_addr = addrs.next() => { match next_addr { Some(addr) => { - seen.push(addr); - unordered.push(peer_info_reader::read_metainfo_from_peer(addr, peer_id, info_hash)); + if seen.insert(addr) { + unordered.push(peer_info_reader::read_metainfo_from_peer(addr, peer_id, info_hash)); + } }, None => return ReadMetainfoResult::ChannelClosed { seen }, } @@ -64,6 +64,7 @@ pub async fn read_metainfo_from_peer_receiver( #[cfg(test)] mod tests { use librqbit_core::{info_hash::decode_info_hash, peer_id::generate_peer_id}; + use tokio_stream::wrappers::UnboundedReceiverStream; use crate::dht::jsdht::JsDht; @@ -83,6 +84,13 @@ mod tests { let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap(); let peer_rx = JsDht::new(info_hash).start_peer_discovery().unwrap(); let peer_id = generate_peer_id(); - dbg!(read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx).await); + dbg!( + read_metainfo_from_peer_receiver( + peer_id, + info_hash, + UnboundedReceiverStream::new(peer_rx) + ) + .await + ); } } diff --git a/crates/librqbit_core/src/id20.rs b/crates/librqbit_core/src/id20.rs new file mode 100644 index 0000000..6d7bc01 --- /dev/null +++ b/crates/librqbit_core/src/id20.rs @@ -0,0 +1,131 @@ +use std::{cmp::Ordering, str::FromStr}; + +use serde::{Deserialize, Deserializer, Serialize}; + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct Id20(pub [u8; 20]); + +impl FromStr for Id20 { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let mut out = [0u8; 20]; + if s.len() != 40 { + anyhow::bail!("expected a hex string of length 40") + }; + hex::decode_to_slice(s, &mut out)?; + Ok(Id20(out)) + } +} + +impl std::fmt::Debug for Id20 { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "<")?; + for byte in self.0 { + write!(f, "{:02x?}", byte)?; + } + write!(f, ">")?; + Ok(()) + } +} + +impl Serialize for Id20 { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_bytes(&self.0) + } +} + +impl<'de> Deserialize<'de> for Id20 { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct Visitor; + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = Id20; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "a 20 byte slice") + } + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + if v.len() != 20 { + return Err(E::invalid_length(20, &self)); + } + let mut buf = [0u8; 20]; + buf.copy_from_slice(&v); + Ok(Id20(buf)) + } + } + deserializer.deserialize_bytes(Visitor {}) + } +} + +impl Id20 { + pub fn distance(&self, other: &Id20) -> Id20 { + let mut xor = [0u8; 20]; + for (idx, (s, o)) in self + .0 + .iter() + .copied() + .zip(other.0.iter().copied()) + .enumerate() + { + xor[idx] = s ^ o; + } + Id20(xor) + } + pub fn set_bit(&mut self, bit: u8, value: bool) { + let n = &mut self.0[(bit / 8) as usize]; + if value { + *n |= 1 << (7 - bit % 8) + } else { + let mask = !(1 << (7 - bit % 8)); + *n &= mask; + } + } + pub fn set_bits_range(&mut self, r: std::ops::Range, value: bool) { + for bit in r { + self.set_bit(bit, value) + } + } +} + +impl Ord for Id20 { + fn cmp(&self, other: &Id20) -> Ordering { + for (s, o) in self.0.iter().copied().zip(other.0.iter().copied()) { + match s.cmp(&o) { + Ordering::Less => return Ordering::Less, + Ordering::Equal => continue, + Ordering::Greater => return Ordering::Greater, + } + } + Ordering::Equal + } +} + +impl PartialOrd for Id20 { + fn partial_cmp(&self, other: &Id20) -> Option { + Some(self.cmp(other)) + } +} + +#[cfg(test)] +mod tests { + use super::Id20; + + #[test] + fn test_set_bit_range() { + let mut id = Id20([0u8; 20]); + id.set_bits_range(9..17, true); + assert_eq!( + id, + Id20([0, 127, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) + ) + } +} diff --git a/crates/librqbit_core/src/lib.rs b/crates/librqbit_core/src/lib.rs index 44c4b75..9ddde49 100644 --- a/crates/librqbit_core/src/lib.rs +++ b/crates/librqbit_core/src/lib.rs @@ -1,4 +1,5 @@ pub mod constants; +pub mod id20; pub mod info_hash; pub mod lengths; pub mod magnet; diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index 6a7ef15..d83f485 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -9,6 +9,7 @@ rt-single-thread = [] [dependencies] librqbit = {path="../librqbit"} +dht = {path="../dht"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} anyhow = "1" clap = "3.0.0-beta.2" @@ -16,6 +17,7 @@ log = "0.4" pretty_env_logger = "0.4" reqwest = "0.11" regex = "1" +futures = "0.3" [dev-dependencies] futures = {version = "0.3"} \ No newline at end of file diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 27aad84..3cb67c5 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -2,8 +2,10 @@ use std::{fs::File, io::Read, net::SocketAddr, time::Duration}; use anyhow::Context; use clap::Clap; +use dht::{Dht, Id20}; +use futures::StreamExt; use librqbit::{ - dht::{inforead::read_metainfo_from_peer_receiver, jsdht::JsDht}, + dht::inforead::read_metainfo_from_peer_receiver, generate_peer_id, spawn_utils::{spawn, BlockingSpawner}, torrent_from_bytes, @@ -12,7 +14,6 @@ use librqbit::{ }; use log::{info, warn}; use reqwest::Url; -use tokio::sync::mpsc::UnboundedReceiver; async fn torrent_from_url(url: &str) -> anyhow::Result { let response = reqwest::get(url) @@ -168,12 +169,16 @@ fn main() -> anyhow::Result<()> { async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> { let peer_id = generate_peer_id(); + let dht = Dht::new(&["dht.transmissionbt.com:6881", "dht.libtorrent.org:25401"]) + .await + .context("error initializing DHT")?; + if opts.torrent_path.starts_with("magnet:") { let Magnet { info_hash, trackers, } = Magnet::parse(&opts.torrent_path).context("provided path is not a valid magnet URL")?; - let dht_rx = JsDht::new(info_hash).start_peer_discovery()?; + let dht_rx = dht.get_peers(Id20(info_hash)).await; let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx).await { librqbit::dht::inforead::ReadMetainfoResult::Found { info, rx, seen } => { @@ -189,7 +194,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> info, peer_id, dht_rx, - initial_peers, + initial_peers.into_iter().collect(), trackers .into_iter() .filter_map(|url| match reqwest::Url::parse(&url) { @@ -211,7 +216,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> } else { torrent_from_file(&opts.torrent_path)? }; - let dht_rx = JsDht::new(torrent.info_hash).start_peer_discovery()?; + let dht_rx = dht.get_peers(Id20(torrent.info_hash)).await; let trackers = torrent .iter_announce() .filter_map(|tracker| { @@ -251,7 +256,7 @@ async fn main_info( info_hash: InfoHash, info: TorrentMetaV1Info, peer_id: [u8; 20], - mut dht_peer_rx: UnboundedReceiver, + mut dht_peer_rx: impl StreamExt + Unpin + Send + Sync + 'static, initial_peers: Vec, trackers: Vec, spawner: BlockingSpawner, @@ -298,7 +303,7 @@ async fn main_info( spawn("DHT peer adder", { let handle = handle.clone(); async move { - while let Some(peer) = dht_peer_rx.recv().await { + while let Some(peer) = dht_peer_rx.next().await { handle.add_peer(peer); } warn!("dht was closed");