From 6eef3b9b6650b58695f5c6bbe22bf44d71d0d6f1 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 12 Jul 2021 21:59:08 +0100 Subject: [PATCH] A small refactor --- Cargo.lock | 1 + crates/dht/src/dht.rs | 8 ++- crates/dht/src/id20.rs | 1 - crates/dht/src/lib.rs | 2 + crates/dht/src/main.rs | 4 +- crates/librqbit/Cargo.toml | 1 + crates/librqbit/src/dht/jsdht.rs | 72 ------------------- crates/librqbit/src/dht/mod.rs | 2 - .../src/{dht/inforead.rs => dht_utils.rs} | 30 ++++---- crates/librqbit/src/http_api.rs | 4 +- crates/librqbit/src/lib.rs | 3 +- crates/librqbit/src/peer_connection.rs | 10 +-- crates/librqbit/src/peer_info_reader/mod.rs | 25 +++---- crates/librqbit/src/peer_state.rs | 5 +- crates/librqbit/src/torrent_manager.rs | 14 ++-- crates/librqbit/src/torrent_state.rs | 14 ++-- crates/librqbit/src/tracker_comms.rs | 18 ++--- crates/librqbit_core/src/id20.rs | 5 +- crates/librqbit_core/src/info_hash.rs | 7 -- crates/librqbit_core/src/lib.rs | 1 - crates/librqbit_core/src/magnet.rs | 11 +-- crates/librqbit_core/src/peer_id.rs | 11 +-- crates/librqbit_core/src/torrent_metainfo.rs | 13 ++-- crates/peer_binary_protocol/src/lib.rs | 16 ++--- crates/rqbit/src/main.rs | 22 +++--- 25 files changed, 111 insertions(+), 189 deletions(-) delete mode 100644 crates/dht/src/id20.rs delete mode 100644 crates/librqbit/src/dht/jsdht.rs delete mode 100644 crates/librqbit/src/dht/mod.rs rename crates/librqbit/src/{dht/inforead.rs => dht_utils.rs} (77%) delete mode 100644 crates/librqbit_core/src/info_hash.rs diff --git a/Cargo.lock b/Cargo.lock index ef988fa..16ebca5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -692,6 +692,7 @@ dependencies = [ "byteorder", "clone_to_owned", "crypto-hash", + "dht", "futures", "hex 0.4.3", "librqbit_core", diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index 63d0661..6d90908 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -8,6 +8,7 @@ use crate::{ self, CompactNodeInfo, FindNodeRequest, GetPeersRequest, Message, MessageKind, Node, }, routing_table::{InsertResult, RoutingTable}, + DHT_BOOTSTRAP, }; use anyhow::Context; use bencode::ByteString; @@ -495,12 +496,15 @@ impl DhtWorker { } impl Dht { - pub async fn new(bootstrap_addrs: &[&str]) -> anyhow::Result { + pub async fn new() -> anyhow::Result { + Self::with_bootstrap_addrs(DHT_BOOTSTRAP).await + } + pub async fn with_bootstrap_addrs(bootstrap_addrs: &[&str]) -> anyhow::Result { let (request_tx, request_rx) = channel(1); let socket = UdpSocket::bind("0.0.0.0:0") .await .context("error binding socket")?; - let peer_id = Id20(generate_peer_id()); + let peer_id = generate_peer_id(); info!("starting up DHT with peer id {:?}", peer_id); let bootstrap_addrs = bootstrap_addrs .iter() diff --git a/crates/dht/src/id20.rs b/crates/dht/src/id20.rs deleted file mode 100644 index 8b13789..0000000 --- a/crates/dht/src/id20.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/dht/src/lib.rs b/crates/dht/src/lib.rs index 7069dea..193af3d 100644 --- a/crates/dht/src/lib.rs +++ b/crates/dht/src/lib.rs @@ -4,3 +4,5 @@ mod routing_table; pub use dht::Dht; pub use librqbit_core::id20::Id20; + +pub static DHT_BOOTSTRAP: &[&str] = &["dht.transmissionbt.com:6881", "dht.libtorrent.org:25401"]; diff --git a/crates/dht/src/main.rs b/crates/dht/src/main.rs index 18115a6..a4abbc9 100644 --- a/crates/dht/src/main.rs +++ b/crates/dht/src/main.rs @@ -9,9 +9,7 @@ async fn main() -> anyhow::Result<()> { pretty_env_logger::init(); let info_hash = Id20::from_str("64a980abe6e448226bb930ba061592e44c3781a1").unwrap(); - let dht = Dht::new(&["dht.transmissionbt.com:6881", "dht.libtorrent.org:25401"]) - .await - .context("error initializing dht")?; + let dht = Dht::new().await.context("error initializing DHT")?; let mut stream = dht.get_peers(info_hash).await; let mut seen = HashSet::new(); while let Some(peer) = stream.next().await { diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 57f3cc8..6b86dd4 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -19,6 +19,7 @@ librqbit_core = {path = "../librqbit_core"} clone_to_owned = {path = "../clone_to_owned"} peer_binary_protocol = {path = "../peer_binary_protocol"} sha1w = {path = "../sha1w"} +dht = {path = "../dht"} tokio = {version = "1", features = ["macros", "rt-multi-thread"]} tokio-stream = "0.1" diff --git a/crates/librqbit/src/dht/jsdht.rs b/crates/librqbit/src/dht/jsdht.rs deleted file mode 100644 index dc7268d..0000000 --- a/crates/librqbit/src/dht/jsdht.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::{io::BufRead, io::BufReader, net::SocketAddr, process::Stdio, str::FromStr}; - -use log::info; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; - -// Collects seen peers for torrent -// Knows if they work or not. -// Informs subscribers of new peers discovered. -// -// Can discover metainfo quickly (limiting concurrency). - -pub struct JsDht { - info_hash: [u8; 20], -} - -static NODEJS_DISCOVER_SCRIPT: &str = r#" -const DHT = require('bittorrent-dht') - -let dht = new DHT(); -let infoHash = process.env["INFOHASH"]; - -dht.on('peer', function (peer, infoHash, from) { - console.log(peer.host + ':' + peer.port) -}) - -dht.lookup(infoHash) -"#; - -fn infohash_hex(info_hash: [u8; 20]) -> String { - hex::encode(info_hash) -} - -impl JsDht { - pub fn new(info_hash: [u8; 20]) -> Self { - Self { info_hash } - } - pub fn start_peer_discovery(self) -> anyhow::Result> { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - std::thread::spawn(move || self.discover_peers_and_send(tx).unwrap()); - Ok(rx) - } - fn discover_peers_and_send(self, tx: UnboundedSender) -> anyhow::Result<()> { - let mut cmd = std::process::Command::new("node"); - cmd.arg("-e") - .arg(NODEJS_DISCOVER_SCRIPT) - .env("NODE_PATH", "/opt/homebrew/lib/node_modules") - .env("INFOHASH", infohash_hex(self.info_hash)) - .stdout(Stdio::piped()); - - info!("Executing {:?}", &cmd); - - let mut child = cmd.spawn()?; - - let stdout = child.stdout.take().unwrap(); - let mut stdout = BufReader::new(stdout); - let mut line = String::new(); - loop { - line.clear(); - let size = stdout.read_line(&mut line)?; - if size == 0 { - anyhow::bail!("node discover process was not supposed to close") - } - // Remove newline character; - line.pop(); - - let ipaddr = SocketAddr::from_str(&line)?; - if tx.send(ipaddr).is_err() { - anyhow::bail!("receiver closed") - } - } - } -} diff --git a/crates/librqbit/src/dht/mod.rs b/crates/librqbit/src/dht/mod.rs deleted file mode 100644 index 8f0756c..0000000 --- a/crates/librqbit/src/dht/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod inforead; -pub mod jsdht; diff --git a/crates/librqbit/src/dht/inforead.rs b/crates/librqbit/src/dht_utils.rs similarity index 77% rename from crates/librqbit/src/dht/inforead.rs rename to crates/librqbit/src/dht_utils.rs index d243cdd..27ced68 100644 --- a/crates/librqbit/src/dht/inforead.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -6,6 +6,7 @@ use librqbit_core::torrent_metainfo::TorrentMetaV1Info; use log::debug; use crate::peer_info_reader; +use librqbit_core::id20::Id20; #[derive(Debug)] pub enum ReadMetainfoResult { @@ -20,8 +21,8 @@ pub enum ReadMetainfoResult { } pub async fn read_metainfo_from_peer_receiver + Unpin>( - peer_id: [u8; 20], - info_hash: [u8; 20], + peer_id: Id20, + info_hash: Id20, mut addrs: A, ) -> ReadMetainfoResult { let mut seen = HashSet::::new(); @@ -63,13 +64,11 @@ pub async fn read_metainfo_from_peer_receiver + #[cfg(test)] mod tests { - use librqbit_core::{info_hash::decode_info_hash, peer_id::generate_peer_id}; - use tokio_stream::wrappers::UnboundedReceiverStream; - - use crate::dht::jsdht::JsDht; + use dht::{Dht, Id20}; + use librqbit_core::peer_id::generate_peer_id; use super::*; - use std::sync::Once; + use std::{str::FromStr, sync::Once}; static LOG_INIT: Once = Once::new(); @@ -81,16 +80,13 @@ mod tests { async fn read_metainfo_from_dht() { init_logging(); - let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap(); - let peer_rx = JsDht::new(info_hash).start_peer_discovery().unwrap(); + let info_hash = Id20::from_str("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap(); + let dht = Dht::new().await.unwrap(); + let peer_rx = dht.get_peers(info_hash).await; let peer_id = generate_peer_id(); - dbg!( - read_metainfo_from_peer_receiver( - peer_id, - info_hash, - UnboundedReceiverStream::new(peer_rx) - ) - .await - ); + match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx).await { + ReadMetainfoResult::Found { info, rx, seen } => dbg!(info), + ReadMetainfoResult::ChannelClosed { seen } => todo!("should not have happened"), + }; } } diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 9833508..79b8408 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -90,7 +90,7 @@ impl Inner { .enumerate() .map(|(id, mgr)| TorrentListResponseItem { id, - info_hash: hex::encode(mgr.torrent_state().info_hash()), + info_hash: hex::encode(mgr.torrent_state().info_hash().0), }) .collect(), } @@ -98,7 +98,7 @@ impl Inner { fn api_torrent_details(&self, idx: usize) -> Option { let handle = self.mgr_handle(idx)?; - let info_hash = hex::encode(handle.torrent_state().info_hash()); + let info_hash = hex::encode(handle.torrent_state().info_hash().0); let files = handle .torrent_state() .info() diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 9a6fbc5..5def0b3 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -1,5 +1,5 @@ pub mod chunk_tracker; -pub mod dht; +pub mod dht_utils; pub mod file_ops; pub mod http_api; pub mod peer_connection; @@ -14,7 +14,6 @@ pub mod type_aliases; pub use buffers::*; pub use clone_to_owned::CloneToOwned; -pub use librqbit_core::info_hash::*; pub use librqbit_core::magnet::*; pub use librqbit_core::peer_id::*; pub use librqbit_core::torrent_metainfo::*; diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index c9fc308..8236016 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, time::Duration}; use anyhow::Context; use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; -use librqbit_core::{lengths::ChunkInfo, peer_id::try_decode_peer_id}; +use librqbit_core::{id20::Id20, lengths::ChunkInfo, peer_id::try_decode_peer_id}; use log::{debug, trace}; use peer_binary_protocol::{ extended::{handshake::ExtendedHandshake, ExtendedMessage}, @@ -34,8 +34,8 @@ pub enum WriterRequest { pub struct PeerConnection { handler: H, addr: SocketAddr, - info_hash: [u8; 20], - peer_id: [u8; 20], + info_hash: Id20, + peer_id: Id20, } // async fn read_one<'a, R: AsyncReadExt + Unpin>( @@ -94,7 +94,7 @@ macro_rules! read_one { } impl PeerConnection { - pub fn new(addr: SocketAddr, info_hash: [u8; 20], peer_id: [u8; 20], handler: H) -> Self { + pub fn new(addr: SocketAddr, info_hash: Id20, peer_id: Id20, handler: H) -> Self { PeerConnection { handler, addr, @@ -112,7 +112,7 @@ impl PeerConnection { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; let mut conn = match timeout( - Duration::from_secs(10), + Duration::from_secs(2), tokio::net::TcpStream::connect(self.addr), ) .await diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index 0e9c267..71c9cc2 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -4,6 +4,7 @@ use bencode::from_bytes; use buffers::{ByteBuf, ByteString}; use librqbit_core::{ constants::CHUNK_SIZE, + id20::Id20, lengths::{ceil_div_u64, last_element_size_u64, ChunkInfo}, torrent_metainfo::TorrentMetaV1Info, }; @@ -20,8 +21,8 @@ use crate::peer_connection::{PeerConnection, PeerConnectionHandler, WriterReques pub async fn read_metainfo_from_peer( addr: SocketAddr, - peer_id: [u8; 20], - info_hash: [u8; 20], + peer_id: Id20, + info_hash: Id20, ) -> anyhow::Result> { let (result_tx, result_rx) = tokio::sync::oneshot::channel::>>(); @@ -77,12 +78,7 @@ impl HandlerLocked { CHUNK_SIZE as usize } } - fn record_piece( - &mut self, - index: u32, - data: &[u8], - info_hash: [u8; 20], - ) -> anyhow::Result { + fn record_piece(&mut self, index: u32, data: &[u8], info_hash: Id20) -> anyhow::Result { if index as usize >= self.total_pieces { anyhow::bail!("wrong index"); } @@ -107,7 +103,7 @@ impl HandlerLocked { // check metadata let mut hash = Sha1::new(); hash.update(&self.buffer); - if hash.finish() != info_hash { + if hash.finish() != info_hash.0 { anyhow::bail!("info checksum invalid"); } Ok(true) @@ -119,7 +115,7 @@ impl HandlerLocked { struct Handler { addr: SocketAddr, - info_hash: [u8; 20], + info_hash: Id20, writer_tx: UnboundedSender, result_tx: Mutex>>>>, @@ -216,6 +212,7 @@ impl PeerConnectionHandler for Handler { mod tests { use std::{net::SocketAddr, str::FromStr, sync::Once}; + use librqbit_core::id20::Id20; use librqbit_core::peer_id::generate_peer_id; use super::read_metainfo_from_peer; @@ -226,19 +223,13 @@ mod tests { LOG_INIT.call_once(pretty_env_logger::init) } - fn decode_info_hash(hash_str: &str) -> [u8; 20] { - let mut hash_arr = [0u8; 20]; - hex::decode_to_slice(hash_str, &mut hash_arr).unwrap(); - hash_arr - } - #[tokio::test] async fn test_get_torrent_metadata_from_localhost_bittorrent_client() { init_logging(); let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap(); let peer_id = generate_peer_id(); - let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7"); + let info_hash = Id20::from_str("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap(); dbg!(read_metainfo_from_peer(addr, peer_id, info_hash) .await .unwrap()); diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index b03e70f..27bfa63 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -1,5 +1,6 @@ use std::{collections::HashSet, sync::Arc}; +use librqbit_core::id20::Id20; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; use tokio::sync::{Notify, Semaphore}; @@ -29,7 +30,7 @@ pub enum PeerState { #[derive(Debug)] pub struct LivePeerState { - pub peer_id: [u8; 20], + pub peer_id: Id20, pub i_am_choked: bool, pub peer_interested: bool, pub requests_sem: Arc, @@ -39,7 +40,7 @@ pub struct LivePeerState { } impl LivePeerState { - pub fn new(peer_id: [u8; 20]) -> Self { + pub fn new(peer_id: Id20) -> Self { LivePeerState { peer_id, i_am_choked: true, diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 78d467d..51fea6d 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -11,7 +11,7 @@ use anyhow::Context; use bencode::from_bytes; use buffers::ByteString; use librqbit_core::{ - lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator, + id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator, torrent_metainfo::TorrentMetaV1Info, }; use log::{debug, info}; @@ -29,11 +29,11 @@ use crate::{ }; pub struct TorrentManagerBuilder { info: TorrentMetaV1Info, - info_hash: [u8; 20], + info_hash: Id20, overwrite: bool, output_folder: PathBuf, only_files: Option>, - peer_id: Option<[u8; 20]>, + peer_id: Option, force_tracker_interval: Option, spawner: Option, } @@ -41,7 +41,7 @@ pub struct TorrentManagerBuilder { impl TorrentManagerBuilder { pub fn new>( info: TorrentMetaV1Info, - info_hash: [u8; 20], + info_hash: Id20, output_folder: P, ) -> Self { Self { @@ -76,7 +76,7 @@ impl TorrentManagerBuilder { self } - pub fn peer_id(&mut self, peer_id: [u8; 20]) -> &mut Self { + pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self { self.peer_id = Some(peer_id); self } @@ -150,12 +150,12 @@ impl TorrentManager { #[allow(clippy::too_many_arguments)] fn start>( info: TorrentMetaV1Info, - info_hash: [u8; 20], + info_hash: Id20, out: P, overwrite: bool, only_files: Option>, force_tracker_interval: Option, - peer_id: Option<[u8; 20]>, + peer_id: Option, spawner: BlockingSpawner, ) -> anyhow::Result { let files = { diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 2b96efe..8432195 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -14,7 +14,7 @@ use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; use futures::{stream::FuturesUnordered, StreamExt}; use librqbit_core::{ - info_hash::InfoHash, + id20::Id20, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, torrent_metainfo::TorrentMetaV1Info, }; @@ -219,8 +219,8 @@ pub struct TorrentState { info: TorrentMetaV1Info, locked: Arc>, files: Vec>>, - info_hash: [u8; 20], - peer_id: [u8; 20], + info_hash: Id20, + peer_id: Id20, lengths: Lengths, needed: u64, stats: AtomicStats, @@ -234,8 +234,8 @@ impl TorrentState { #[allow(clippy::too_many_arguments)] pub fn new( info: TorrentMetaV1Info, - info_hash: [u8; 20], - peer_id: [u8; 20], + info_hash: Id20, + peer_id: Id20, files: Vec>>, chunk_tracker: ChunkTracker, lengths: Lengths, @@ -304,10 +304,10 @@ impl TorrentState { pub fn info(&self) -> &TorrentMetaV1Info { &self.info } - pub fn info_hash(&self) -> InfoHash { + pub fn info_hash(&self) -> Id20 { self.info_hash } - pub fn peer_id(&self) -> [u8; 20] { + pub fn peer_id(&self) -> Id20 { self.peer_id } pub fn file_ops(&self) -> FileOps<'_, Sha1> { diff --git a/crates/librqbit/src/tracker_comms.rs b/crates/librqbit/src/tracker_comms.rs index 9827ba6..3d26a37 100644 --- a/crates/librqbit/src/tracker_comms.rs +++ b/crates/librqbit/src/tracker_comms.rs @@ -8,6 +8,8 @@ use std::{ str::FromStr, }; +use librqbit_core::id20::Id20; + #[derive(Clone, Copy)] pub enum TrackerRequestEvent { Started, @@ -16,8 +18,8 @@ pub enum TrackerRequestEvent { } pub struct TrackerRequest { - pub info_hash: [u8; 20], - pub peer_id: [u8; 20], + pub info_hash: Id20, + pub peer_id: Id20, pub event: Option, pub port: u16, pub uploaded: u64, @@ -159,9 +161,9 @@ impl TrackerRequest { use urlencoding as u; let mut s = String::new(); s.push_str("info_hash="); - s.push_str(u::encode_binary(&self.info_hash).as_ref()); + s.push_str(u::encode_binary(&self.info_hash.0).as_ref()); s.push_str("&peer_id="); - s.push_str(u::encode_binary(&self.peer_id).as_ref()); + s.push_str(u::encode_binary(&self.peer_id.0).as_ref()); if let Some(event) = self.event { write!( s, @@ -201,12 +203,12 @@ mod tests { use super::*; #[test] fn test_serialize() { - let info_hash = [ + let info_hash = Id20([ 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, - ]; - let peer_id = [ + ]); + let peer_id = Id20([ 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, - ]; + ]); let request = TrackerRequest { info_hash, peer_id, diff --git a/crates/librqbit_core/src/id20.rs b/crates/librqbit_core/src/id20.rs index 6d7bc01..c1dc6eb 100644 --- a/crates/librqbit_core/src/id20.rs +++ b/crates/librqbit_core/src/id20.rs @@ -2,7 +2,7 @@ use std::{cmp::Ordering, str::FromStr}; use serde::{Deserialize, Deserializer, Serialize}; -#[derive(Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, Default)] pub struct Id20(pub [u8; 20]); impl FromStr for Id20 { @@ -67,6 +67,9 @@ impl<'de> Deserialize<'de> for Id20 { } impl Id20 { + pub fn to_string(&self) -> String { + hex::encode(self.0) + } pub fn distance(&self, other: &Id20) -> Id20 { let mut xor = [0u8; 20]; for (idx, (s, o)) in self diff --git a/crates/librqbit_core/src/info_hash.rs b/crates/librqbit_core/src/info_hash.rs deleted file mode 100644 index 6acdb03..0000000 --- a/crates/librqbit_core/src/info_hash.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub type InfoHash = [u8; 20]; - -pub fn decode_info_hash(hash_str: &str) -> anyhow::Result { - let mut hash_arr = [0u8; 20]; - hex::decode_to_slice(hash_str, &mut hash_arr)?; - Ok(hash_arr) -} diff --git a/crates/librqbit_core/src/lib.rs b/crates/librqbit_core/src/lib.rs index 9ddde49..1c4d034 100644 --- a/crates/librqbit_core/src/lib.rs +++ b/crates/librqbit_core/src/lib.rs @@ -1,6 +1,5 @@ pub mod constants; pub mod id20; -pub mod info_hash; pub mod lengths; pub mod magnet; pub mod peer_id; diff --git a/crates/librqbit_core/src/magnet.rs b/crates/librqbit_core/src/magnet.rs index be46172..5b5739e 100644 --- a/crates/librqbit_core/src/magnet.rs +++ b/crates/librqbit_core/src/magnet.rs @@ -1,8 +1,11 @@ -use crate::info_hash::{decode_info_hash, InfoHash}; +use std::str::FromStr; + use anyhow::Context; +use crate::id20::Id20; + pub struct Magnet { - pub info_hash: InfoHash, + pub info_hash: Id20, pub trackers: Vec, } @@ -12,13 +15,13 @@ impl Magnet { if url.scheme() != "magnet" { anyhow::bail!("expected scheme magnet"); } - let mut info_hash: Option = None; + let mut info_hash: Option = None; let mut trackers = Vec::::new(); for (key, value) in url.query_pairs() { match key.as_ref() { "xt" => match value.as_ref().strip_prefix("urn:btih:") { Some(infohash) => { - info_hash.replace(decode_info_hash(infohash)?); + info_hash.replace(Id20::from_str(infohash)?); } None => anyhow::bail!("expected xt to start with urn:btih:"), }, diff --git a/crates/librqbit_core/src/peer_id.rs b/crates/librqbit_core/src/peer_id.rs index 633f1b4..86bf796 100644 --- a/crates/librqbit_core/src/peer_id.rs +++ b/crates/librqbit_core/src/peer_id.rs @@ -1,3 +1,5 @@ +use crate::id20::Id20; + #[derive(Debug)] pub enum AzureusStyleKind { Deluge, @@ -23,7 +25,8 @@ impl AzureusStyleKind { } } -fn try_decode_azureus_style(p: &[u8; 20]) -> Option { +fn try_decode_azureus_style(p: &Id20) -> Option { + let p = p.0; if !(p[0] == b'-' && p[7] == b'-') { return None; } @@ -40,11 +43,11 @@ pub enum PeerId { AzureusStyle(AzureusStyle), } -pub fn try_decode_peer_id(p: [u8; 20]) -> Option { +pub fn try_decode_peer_id(p: Id20) -> Option { Some(PeerId::AzureusStyle(try_decode_azureus_style(&p)?)) } -pub fn generate_peer_id() -> [u8; 20] { +pub fn generate_peer_id() -> Id20 { let mut peer_id = [0u8; 20]; let u = uuid::Uuid::new_v4(); @@ -52,5 +55,5 @@ pub fn generate_peer_id() -> [u8; 20] { (&mut peer_id[..8]).copy_from_slice(b"-rQ0001-"); - peer_id + Id20(peer_id) } diff --git a/crates/librqbit_core/src/torrent_metainfo.rs b/crates/librqbit_core/src/torrent_metainfo.rs index 19e3d72..afc60a6 100644 --- a/crates/librqbit_core/src/torrent_metainfo.rs +++ b/crates/librqbit_core/src/torrent_metainfo.rs @@ -5,6 +5,8 @@ use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; use serde::Deserialize; +use crate::id20::Id20; + pub type TorrentMetaV1Borrowed<'a> = TorrentMetaV1>; pub type TorrentMetaV1Owned = TorrentMetaV1; @@ -14,7 +16,10 @@ pub fn torrent_from_bytes<'de, ByteBuf: Deserialize<'de>>( let mut de = BencodeDeserializer::new_from_buf(buf); de.is_torrent_info = true; let mut t = TorrentMetaV1::deserialize(&mut de)?; - t.info_hash = de.torrent_info_digest.unwrap(); + t.info_hash = Id20( + de.torrent_info_digest + .ok_or_else(|| anyhow::anyhow!("programming error"))?, + ); Ok(t) } @@ -35,7 +40,7 @@ pub struct TorrentMetaV1 { pub creation_date: Option, #[serde(skip)] - pub info_hash: [u8; 20], + pub info_hash: Id20, } impl TorrentMetaV1 { @@ -292,8 +297,8 @@ mod tests { let torrent: TorrentMetaV1Borrowed = torrent_from_bytes(&buf).unwrap(); assert_eq!( - torrent.info_hash, - *b"\x64\xa9\x80\xab\xe6\xe4\x48\x22\x6b\xb9\x30\xba\x06\x15\x92\xe4\x4c\x37\x81\xa1" + torrent.info_hash.to_string(), + "64a980abe6e448226bb930ba061592e44c3781a1" ); } } diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index 844ad8f..675c497 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -4,7 +4,7 @@ use bincode::Options; use buffers::{ByteBuf, ByteString}; use byteorder::{ByteOrder, BE}; use clone_to_owned::CloneToOwned; -use librqbit_core::{constants::CHUNK_SIZE, lengths::ChunkInfo}; +use librqbit_core::{constants::CHUNK_SIZE, id20::Id20, lengths::ChunkInfo}; use serde::{Deserialize, Serialize}; use self::extended::{handshake::ExtendedHandshake, ExtendedMessage}; @@ -474,8 +474,8 @@ where pub struct Handshake<'a> { pub pstr: &'a str, pub reserved: [u8; 8], - pub info_hash: [u8; 20], - pub peer_id: [u8; 20], + pub info_hash: Id20, + pub peer_id: Id20, } fn bopts() -> impl bincode::Options { @@ -485,7 +485,7 @@ fn bopts() -> impl bincode::Options { } impl<'a> Handshake<'a> { - pub fn new(info_hash: [u8; 20], peer_id: [u8; 20]) -> Handshake<'static> { + pub fn new(info_hash: Id20, peer_id: Id20) -> Handshake<'static> { debug_assert_eq!(PSTR_BT1.len(), 19); let mut reserved: u64 = 0; @@ -552,12 +552,12 @@ mod tests { use super::*; #[test] fn test_handshake_serialize() { - let info_hash = [ + let info_hash = Id20([ 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, - ]; - let peer_id = [ + ]); + let peer_id = Id20([ 1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, - ]; + ]); let mut buf = Vec::new(); Handshake::new(info_hash, peer_id).serialize(&mut buf); assert_eq!(buf.len(), 20 + 20 + 8 + 19 + 1); diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 3cb67c5..d5065d7 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -5,12 +5,12 @@ use clap::Clap; use dht::{Dht, Id20}; use futures::StreamExt; use librqbit::{ - dht::inforead::read_metainfo_from_peer_receiver, + dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, generate_peer_id, spawn_utils::{spawn, BlockingSpawner}, torrent_from_bytes, torrent_manager::TorrentManagerBuilder, - ByteString, InfoHash, Magnet, TorrentMetaV1Info, TorrentMetaV1Owned, + ByteString, Magnet, TorrentMetaV1Info, TorrentMetaV1Owned, }; use log::{info, warn}; use reqwest::Url; @@ -169,22 +169,18 @@ fn main() -> anyhow::Result<()> { async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> { let peer_id = generate_peer_id(); - let dht = Dht::new(&["dht.transmissionbt.com:6881", "dht.libtorrent.org:25401"]) - .await - .context("error initializing DHT")?; + let dht = Dht::new().await.context("error initializing DHT")?; if opts.torrent_path.starts_with("magnet:") { let Magnet { info_hash, trackers, } = Magnet::parse(&opts.torrent_path).context("provided path is not a valid magnet URL")?; - let dht_rx = dht.get_peers(Id20(info_hash)).await; + let dht_rx = dht.get_peers(info_hash).await; let (info, dht_rx, initial_peers) = match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx).await { - librqbit::dht::inforead::ReadMetainfoResult::Found { info, rx, seen } => { - (info, rx, seen) - } - librqbit::dht::inforead::ReadMetainfoResult::ChannelClosed { .. } => { + ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), + ReadMetainfoResult::ChannelClosed { .. } => { anyhow::bail!("DHT died, no way to discover torrent metainfo") } }; @@ -216,7 +212,7 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> } else { torrent_from_file(&opts.torrent_path)? }; - let dht_rx = dht.get_peers(Id20(torrent.info_hash)).await; + let dht_rx = dht.get_peers(torrent.info_hash).await; let trackers = torrent .iter_announce() .filter_map(|tracker| { @@ -253,9 +249,9 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> #[allow(clippy::too_many_arguments)] async fn main_info( opts: Opts, - info_hash: InfoHash, + info_hash: Id20, info: TorrentMetaV1Info, - peer_id: [u8; 20], + peer_id: Id20, mut dht_peer_rx: impl StreamExt + Unpin + Send + Sync + 'static, initial_peers: Vec, trackers: Vec,