A small refactor

This commit is contained in:
Igor Katson 2021-07-12 21:59:08 +01:00
parent 2eabebb5c3
commit 6eef3b9b66
25 changed files with 111 additions and 189 deletions

View file

@ -1,72 +0,0 @@
use std::{io::BufRead, io::BufReader, net::SocketAddr, process::Stdio, str::FromStr};
use log::info;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
// Collects seen peers for torrent
// Knows if they work or not.
// Informs subscribers of new peers discovered.
//
// Can discover metainfo quickly (limiting concurrency).
pub struct JsDht {
info_hash: [u8; 20],
}
static NODEJS_DISCOVER_SCRIPT: &str = r#"
const DHT = require('bittorrent-dht')
let dht = new DHT();
let infoHash = process.env["INFOHASH"];
dht.on('peer', function (peer, infoHash, from) {
console.log(peer.host + ':' + peer.port)
})
dht.lookup(infoHash)
"#;
fn infohash_hex(info_hash: [u8; 20]) -> String {
hex::encode(info_hash)
}
impl JsDht {
pub fn new(info_hash: [u8; 20]) -> Self {
Self { info_hash }
}
pub fn start_peer_discovery(self) -> anyhow::Result<UnboundedReceiver<SocketAddr>> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || self.discover_peers_and_send(tx).unwrap());
Ok(rx)
}
fn discover_peers_and_send(self, tx: UnboundedSender<SocketAddr>) -> anyhow::Result<()> {
let mut cmd = std::process::Command::new("node");
cmd.arg("-e")
.arg(NODEJS_DISCOVER_SCRIPT)
.env("NODE_PATH", "/opt/homebrew/lib/node_modules")
.env("INFOHASH", infohash_hex(self.info_hash))
.stdout(Stdio::piped());
info!("Executing {:?}", &cmd);
let mut child = cmd.spawn()?;
let stdout = child.stdout.take().unwrap();
let mut stdout = BufReader::new(stdout);
let mut line = String::new();
loop {
line.clear();
let size = stdout.read_line(&mut line)?;
if size == 0 {
anyhow::bail!("node discover process was not supposed to close")
}
// Remove newline character;
line.pop();
let ipaddr = SocketAddr::from_str(&line)?;
if tx.send(ipaddr).is_err() {
anyhow::bail!("receiver closed")
}
}
}
}

View file

@ -1,2 +0,0 @@
pub mod inforead;
pub mod jsdht;

View file

@ -6,6 +6,7 @@ use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use log::debug;
use crate::peer_info_reader;
use librqbit_core::id20::Id20;
#[derive(Debug)]
pub enum ReadMetainfoResult<Rx> {
@ -20,8 +21,8 @@ pub enum ReadMetainfoResult<Rx> {
}
pub async fn read_metainfo_from_peer_receiver<A: StreamExt<Item = SocketAddr> + Unpin>(
peer_id: [u8; 20],
info_hash: [u8; 20],
peer_id: Id20,
info_hash: Id20,
mut addrs: A,
) -> ReadMetainfoResult<A> {
let mut seen = HashSet::<SocketAddr>::new();
@ -63,13 +64,11 @@ pub async fn read_metainfo_from_peer_receiver<A: StreamExt<Item = SocketAddr> +
#[cfg(test)]
mod tests {
use librqbit_core::{info_hash::decode_info_hash, peer_id::generate_peer_id};
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::dht::jsdht::JsDht;
use dht::{Dht, Id20};
use librqbit_core::peer_id::generate_peer_id;
use super::*;
use std::sync::Once;
use std::{str::FromStr, sync::Once};
static LOG_INIT: Once = Once::new();
@ -81,16 +80,13 @@ mod tests {
async fn read_metainfo_from_dht() {
init_logging();
let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap();
let peer_rx = JsDht::new(info_hash).start_peer_discovery().unwrap();
let info_hash = Id20::from_str("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap();
let dht = Dht::new().await.unwrap();
let peer_rx = dht.get_peers(info_hash).await;
let peer_id = generate_peer_id();
dbg!(
read_metainfo_from_peer_receiver(
peer_id,
info_hash,
UnboundedReceiverStream::new(peer_rx)
)
.await
);
match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx).await {
ReadMetainfoResult::Found { info, rx, seen } => dbg!(info),
ReadMetainfoResult::ChannelClosed { seen } => todo!("should not have happened"),
};
}
}

View file

@ -90,7 +90,7 @@ impl Inner {
.enumerate()
.map(|(id, mgr)| TorrentListResponseItem {
id,
info_hash: hex::encode(mgr.torrent_state().info_hash()),
info_hash: hex::encode(mgr.torrent_state().info_hash().0),
})
.collect(),
}
@ -98,7 +98,7 @@ impl Inner {
fn api_torrent_details(&self, idx: usize) -> Option<TorrentDetailsResponse> {
let handle = self.mgr_handle(idx)?;
let info_hash = hex::encode(handle.torrent_state().info_hash());
let info_hash = hex::encode(handle.torrent_state().info_hash().0);
let files = handle
.torrent_state()
.info()

View file

@ -1,5 +1,5 @@
pub mod chunk_tracker;
pub mod dht;
pub mod dht_utils;
pub mod file_ops;
pub mod http_api;
pub mod peer_connection;
@ -14,7 +14,6 @@ pub mod type_aliases;
pub use buffers::*;
pub use clone_to_owned::CloneToOwned;
pub use librqbit_core::info_hash::*;
pub use librqbit_core::magnet::*;
pub use librqbit_core::peer_id::*;
pub use librqbit_core::torrent_metainfo::*;

View file

@ -3,7 +3,7 @@ use std::{net::SocketAddr, time::Duration};
use anyhow::Context;
use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use librqbit_core::{lengths::ChunkInfo, peer_id::try_decode_peer_id};
use librqbit_core::{id20::Id20, lengths::ChunkInfo, peer_id::try_decode_peer_id};
use log::{debug, trace};
use peer_binary_protocol::{
extended::{handshake::ExtendedHandshake, ExtendedMessage},
@ -34,8 +34,8 @@ pub enum WriterRequest {
pub struct PeerConnection<H> {
handler: H,
addr: SocketAddr,
info_hash: [u8; 20],
peer_id: [u8; 20],
info_hash: Id20,
peer_id: Id20,
}
// async fn read_one<'a, R: AsyncReadExt + Unpin>(
@ -94,7 +94,7 @@ macro_rules! read_one {
}
impl<H: PeerConnectionHandler> PeerConnection<H> {
pub fn new(addr: SocketAddr, info_hash: [u8; 20], peer_id: [u8; 20], handler: H) -> Self {
pub fn new(addr: SocketAddr, info_hash: Id20, peer_id: Id20, handler: H) -> Self {
PeerConnection {
handler,
addr,
@ -112,7 +112,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let mut conn = match timeout(
Duration::from_secs(10),
Duration::from_secs(2),
tokio::net::TcpStream::connect(self.addr),
)
.await

View file

@ -4,6 +4,7 @@ use bencode::from_bytes;
use buffers::{ByteBuf, ByteString};
use librqbit_core::{
constants::CHUNK_SIZE,
id20::Id20,
lengths::{ceil_div_u64, last_element_size_u64, ChunkInfo},
torrent_metainfo::TorrentMetaV1Info,
};
@ -20,8 +21,8 @@ use crate::peer_connection::{PeerConnection, PeerConnectionHandler, WriterReques
pub async fn read_metainfo_from_peer(
addr: SocketAddr,
peer_id: [u8; 20],
info_hash: [u8; 20],
peer_id: Id20,
info_hash: Id20,
) -> anyhow::Result<TorrentMetaV1Info<ByteString>> {
let (result_tx, result_rx) =
tokio::sync::oneshot::channel::<anyhow::Result<TorrentMetaV1Info<ByteString>>>();
@ -77,12 +78,7 @@ impl HandlerLocked {
CHUNK_SIZE as usize
}
}
fn record_piece(
&mut self,
index: u32,
data: &[u8],
info_hash: [u8; 20],
) -> anyhow::Result<bool> {
fn record_piece(&mut self, index: u32, data: &[u8], info_hash: Id20) -> anyhow::Result<bool> {
if index as usize >= self.total_pieces {
anyhow::bail!("wrong index");
}
@ -107,7 +103,7 @@ impl HandlerLocked {
// check metadata
let mut hash = Sha1::new();
hash.update(&self.buffer);
if hash.finish() != info_hash {
if hash.finish() != info_hash.0 {
anyhow::bail!("info checksum invalid");
}
Ok(true)
@ -119,7 +115,7 @@ impl HandlerLocked {
struct Handler {
addr: SocketAddr,
info_hash: [u8; 20],
info_hash: Id20,
writer_tx: UnboundedSender<WriterRequest>,
result_tx:
Mutex<Option<tokio::sync::oneshot::Sender<anyhow::Result<TorrentMetaV1Info<ByteString>>>>>,
@ -216,6 +212,7 @@ impl PeerConnectionHandler for Handler {
mod tests {
use std::{net::SocketAddr, str::FromStr, sync::Once};
use librqbit_core::id20::Id20;
use librqbit_core::peer_id::generate_peer_id;
use super::read_metainfo_from_peer;
@ -226,19 +223,13 @@ mod tests {
LOG_INIT.call_once(pretty_env_logger::init)
}
fn decode_info_hash(hash_str: &str) -> [u8; 20] {
let mut hash_arr = [0u8; 20];
hex::decode_to_slice(hash_str, &mut hash_arr).unwrap();
hash_arr
}
#[tokio::test]
async fn test_get_torrent_metadata_from_localhost_bittorrent_client() {
init_logging();
let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap();
let peer_id = generate_peer_id();
let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7");
let info_hash = Id20::from_str("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap();
dbg!(read_metainfo_from_peer(addr, peer_id, info_hash)
.await
.unwrap());

View file

@ -1,5 +1,6 @@
use std::{collections::HashSet, sync::Arc};
use librqbit_core::id20::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
use tokio::sync::{Notify, Semaphore};
@ -29,7 +30,7 @@ pub enum PeerState {
#[derive(Debug)]
pub struct LivePeerState {
pub peer_id: [u8; 20],
pub peer_id: Id20,
pub i_am_choked: bool,
pub peer_interested: bool,
pub requests_sem: Arc<Semaphore>,
@ -39,7 +40,7 @@ pub struct LivePeerState {
}
impl LivePeerState {
pub fn new(peer_id: [u8; 20]) -> Self {
pub fn new(peer_id: Id20) -> Self {
LivePeerState {
peer_id,
i_am_choked: true,

View file

@ -11,7 +11,7 @@ use anyhow::Context;
use bencode::from_bytes;
use buffers::ByteString;
use librqbit_core::{
lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator,
id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator,
torrent_metainfo::TorrentMetaV1Info,
};
use log::{debug, info};
@ -29,11 +29,11 @@ use crate::{
};
pub struct TorrentManagerBuilder {
info: TorrentMetaV1Info<ByteString>,
info_hash: [u8; 20],
info_hash: Id20,
overwrite: bool,
output_folder: PathBuf,
only_files: Option<Vec<usize>>,
peer_id: Option<[u8; 20]>,
peer_id: Option<Id20>,
force_tracker_interval: Option<Duration>,
spawner: Option<BlockingSpawner>,
}
@ -41,7 +41,7 @@ pub struct TorrentManagerBuilder {
impl TorrentManagerBuilder {
pub fn new<P: AsRef<Path>>(
info: TorrentMetaV1Info<ByteString>,
info_hash: [u8; 20],
info_hash: Id20,
output_folder: P,
) -> Self {
Self {
@ -76,7 +76,7 @@ impl TorrentManagerBuilder {
self
}
pub fn peer_id(&mut self, peer_id: [u8; 20]) -> &mut Self {
pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self {
self.peer_id = Some(peer_id);
self
}
@ -150,12 +150,12 @@ impl TorrentManager {
#[allow(clippy::too_many_arguments)]
fn start<P: AsRef<Path>>(
info: TorrentMetaV1Info<ByteString>,
info_hash: [u8; 20],
info_hash: Id20,
out: P,
overwrite: bool,
only_files: Option<Vec<usize>>,
force_tracker_interval: Option<Duration>,
peer_id: Option<[u8; 20]>,
peer_id: Option<Id20>,
spawner: BlockingSpawner,
) -> anyhow::Result<TorrentManagerHandle> {
let files = {

View file

@ -14,7 +14,7 @@ use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::{
info_hash::InfoHash,
id20::Id20,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
torrent_metainfo::TorrentMetaV1Info,
};
@ -219,8 +219,8 @@ pub struct TorrentState {
info: TorrentMetaV1Info<ByteString>,
locked: Arc<RwLock<TorrentStateLocked>>,
files: Vec<Arc<Mutex<File>>>,
info_hash: [u8; 20],
peer_id: [u8; 20],
info_hash: Id20,
peer_id: Id20,
lengths: Lengths,
needed: u64,
stats: AtomicStats,
@ -234,8 +234,8 @@ impl TorrentState {
#[allow(clippy::too_many_arguments)]
pub fn new(
info: TorrentMetaV1Info<ByteString>,
info_hash: [u8; 20],
peer_id: [u8; 20],
info_hash: Id20,
peer_id: Id20,
files: Vec<Arc<Mutex<File>>>,
chunk_tracker: ChunkTracker,
lengths: Lengths,
@ -304,10 +304,10 @@ impl TorrentState {
pub fn info(&self) -> &TorrentMetaV1Info<ByteString> {
&self.info
}
pub fn info_hash(&self) -> InfoHash {
pub fn info_hash(&self) -> Id20 {
self.info_hash
}
pub fn peer_id(&self) -> [u8; 20] {
pub fn peer_id(&self) -> Id20 {
self.peer_id
}
pub fn file_ops(&self) -> FileOps<'_, Sha1> {

View file

@ -8,6 +8,8 @@ use std::{
str::FromStr,
};
use librqbit_core::id20::Id20;
#[derive(Clone, Copy)]
pub enum TrackerRequestEvent {
Started,
@ -16,8 +18,8 @@ pub enum TrackerRequestEvent {
}
pub struct TrackerRequest {
pub info_hash: [u8; 20],
pub peer_id: [u8; 20],
pub info_hash: Id20,
pub peer_id: Id20,
pub event: Option<TrackerRequestEvent>,
pub port: u16,
pub uploaded: u64,
@ -159,9 +161,9 @@ impl TrackerRequest {
use urlencoding as u;
let mut s = String::new();
s.push_str("info_hash=");
s.push_str(u::encode_binary(&self.info_hash).as_ref());
s.push_str(u::encode_binary(&self.info_hash.0).as_ref());
s.push_str("&peer_id=");
s.push_str(u::encode_binary(&self.peer_id).as_ref());
s.push_str(u::encode_binary(&self.peer_id.0).as_ref());
if let Some(event) = self.event {
write!(
s,
@ -201,12 +203,12 @@ mod tests {
use super::*;
#[test]
fn test_serialize() {
let info_hash = [
let info_hash = Id20([
1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
];
let peer_id = [
]);
let peer_id = Id20([
1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
];
]);
let request = TrackerRequest {
info_hash,
peer_id,