diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index 7eb09ec..6d97656 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -3,7 +3,6 @@ name = "librqbit" version = "0.1.0" authors = ["Igor Katson "] edition = "2018" -default-features = ["sha1-system"] [features] default = ["sha1-openssl"] diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 54453ac..377a61e 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -10,6 +10,7 @@ pub mod peer_binary_protocol; pub mod peer_connection; pub mod peer_handler; pub mod peer_id; +pub mod peer_info_reader; pub mod peer_state; pub mod serde_bencode_de; pub mod serde_bencode_ser; diff --git a/crates/librqbit/src/peer_binary_protocol/mod.rs b/crates/librqbit/src/peer_binary_protocol/mod.rs index 5a2024f..e197e08 100644 --- a/crates/librqbit/src/peer_binary_protocol/mod.rs +++ b/crates/librqbit/src/peer_binary_protocol/mod.rs @@ -548,38 +548,6 @@ impl Request { #[cfg(test)] mod tests { - use std::{ - fs::File, - io::{Read, Write}, - net::SocketAddr, - str::FromStr, - }; - - use log::info; - use parking_lot::RwLock; - use tokio::sync::mpsc::UnboundedSender; - - use crate::{ - bencode_value::BencodeValue, - lengths::ceil_div_u64, - peer_binary_protocol::extended::ut_metadata::UtMetadata, - peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest}, - peer_id::generate_peer_id, - }; - use std::sync::Once; - - static LOG_INIT: Once = std::sync::Once::new(); - - fn init_logging() { - LOG_INIT.call_once(pretty_env_logger::init) - } - - fn decode_info_hash(hash_str: &str) -> [u8; 20] { - let mut hash_arr = [0u8; 20]; - hex::decode_to_slice(hash_str, &mut hash_arr).unwrap(); - hash_arr - } - use super::*; #[test] fn test_handshake_serialize() { @@ -628,107 +596,4 @@ mod tests { panic!("resources/test/extended-handshake.bin did not serialize exactly the same. Dumped to /tmp/test_deserialize_serialize_extended_is_same, you can compare with resources/test/extended-handshake.bin") } } - - #[tokio::test] - async fn test_connect_to_local_qbittorrent() { - init_logging(); - - struct Handler { - ehandshake: RwLock>>, - tx: UnboundedSender, - } - - impl PeerConnectionHandler for Handler { - fn get_have_bytes(&self) -> u64 { - 0 - } - - fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec) -> Option { - None - } - - fn on_handshake(&self, handshake: Handshake) { - info!("received handshake: {:?}", handshake) - } - - fn on_received_message(&self, msg: Message>) -> anyhow::Result<()> { - info!("received message: {:?}", msg); - - if let Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Data { - piece, - total_size, - data, - })) = msg - { - // this just assumes piece come in the order requested. - let mut f = std::fs::OpenOptions::new() - .create(true) - .append(true) - .open("/tmp/torrent") - .unwrap(); - f.write_all(&data).unwrap(); - - // test if it's the last piece - if data.len() < CHUNK_SIZE as usize { - let mut buf = Vec::new(); - let mut f = File::open("/tmp/torrent").unwrap(); - f.read_to_end(&mut buf).unwrap(); - - // let torrent: TorrentMetaV1Borrowed = - // crate::torrent_metainfo::torrent_from_bytes(&buf).unwrap(); - let torrent: BencodeValue = - crate::bencode_value::dyn_from_bytes(&buf).unwrap(); - dbg!(torrent); - } - } - Ok(()) - } - - fn on_uploaded_bytes(&self, _bytes: u32) {} - - fn read_chunk(&self, _chunk: &ChunkInfo, _buf: &mut [u8]) -> anyhow::Result<()> { - panic!("dude, why are you requesting chunks") - } - - fn on_extended_handshake(&self, extended_handshake: &ExtendedHandshake) { - self.ehandshake - .write() - .replace(extended_handshake.clone_to_owned()); - self.tx - .send(WriterRequest::Message(Message::Unchoke)) - .unwrap(); - self.tx - .send(WriterRequest::Message(Message::Interested)) - .unwrap(); - - let total_metadata_chunks = ceil_div_u64( - extended_handshake.metadata_size.unwrap() as u64, - CHUNK_SIZE as u64, - ); - - for i in 0..total_metadata_chunks { - self.tx - .send(WriterRequest::Message(Message::Extended( - ExtendedMessage::UtMetadata(UtMetadata::Request(i as u32)), - ))) - .unwrap() - } - } - } - - let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap(); - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let handler = Handler { - tx, - ehandshake: RwLock::new(None), - }; - let peer_id = generate_peer_id(); - let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7"); - - let conn = PeerConnection::new(addr, info_hash, peer_id, handler); - - // tx.send(WriterRequest::Message(Message::Extended(ExtendedMessage))); - - conn.manage_peer(rx).await.unwrap(); - } } diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 7416b8b..899978c 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -19,8 +19,11 @@ use crate::{ pub trait PeerConnectionHandler { fn get_have_bytes(&self) -> u64; fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> Option; - fn on_handshake(&self, handshake: Handshake); - fn on_extended_handshake(&self, extended_handshake: &ExtendedHandshake); + fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>; + fn on_extended_handshake( + &self, + extended_handshake: &ExtendedHandshake, + ) -> anyhow::Result<()>; fn on_received_message(&self, msg: Message>) -> anyhow::Result<()>; fn on_uploaded_bytes(&self, bytes: u32); fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>; @@ -146,7 +149,7 @@ impl PeerConnection { let mut extended_handshake: Option> = None; let supports_extended = h.supports_extended(); - self.handler.on_handshake(h); + self.handler.on_handshake(h)?; if read_so_far > size { read_buf.copy_within(size..read_so_far, 0); } @@ -165,7 +168,7 @@ impl PeerConnection { match extended { Message::Extended(ExtendedMessage::Handshake(h)) => { trace!("received from {}: {:?}", self.addr, &h); - self.handler.on_extended_handshake(&h); + self.handler.on_extended_handshake(&h)?; extended_handshake = Some(h.clone_to_owned()) } other => anyhow::bail!("expected extended handshake, but got {:?}", other), diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs new file mode 100644 index 0000000..f378de2 --- /dev/null +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -0,0 +1,247 @@ +use std::net::SocketAddr; + +use crate::sha1w::ISha1; +use log::debug; +use parking_lot::{Mutex, RwLock}; +use tokio::sync::mpsc::UnboundedSender; + +use crate::{ + buffers::{ByteBuf, ByteString}, + constants::CHUNK_SIZE, + lengths::{ceil_div_u64, last_element_size_u64, ChunkInfo}, + peer_binary_protocol::{ + extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage}, + Handshake, Message, + }, + peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest}, + serde_bencode_de::from_bytes, + torrent_metainfo::TorrentMetaV1Info, + type_aliases::Sha1, +}; + +pub async fn read_metainfo_from_peer( + addr: SocketAddr, + peer_id: [u8; 20], + info_hash: [u8; 20], +) -> anyhow::Result> { + let (result_tx, result_rx) = + tokio::sync::oneshot::channel::>>(); + let (writer_tx, writer_rx) = tokio::sync::mpsc::unbounded_channel::(); + let handler = Handler { + addr, + info_hash, + writer_tx, + result_tx: Mutex::new(Some(result_tx)), + locked: RwLock::new(None), + }; + let connection = PeerConnection::new(addr, info_hash, peer_id, handler); + + let result_reader = async move { result_rx.await? }; + let connection_runner = async move { connection.manage_peer(writer_rx).await }; + + tokio::select! { + result = result_reader => result, + whatever = connection_runner => match whatever { + Ok(_) => anyhow::bail!("connection runner completed first"), + Err(e) => Err(e) + } + } +} + +#[derive(Default)] +struct HandlerLocked { + metadata_size: u32, + total_pieces: usize, + buffer: Vec, + received_pieces: Vec, +} + +impl HandlerLocked { + fn new(metadata_size: u32) -> anyhow::Result { + if metadata_size > 1024 * 1024 { + anyhow::bail!("metadata size {} is too big", metadata_size); + } + let buffer = vec![0u8; metadata_size as usize]; + let total_pieces = ceil_div_u64(metadata_size as u64, CHUNK_SIZE as u64); + let received_pieces = vec![false; total_pieces as usize]; + Ok(Self { + metadata_size, + received_pieces, + buffer, + total_pieces: total_pieces as usize, + }) + } + fn piece_size(&self, index: u32) -> usize { + if index as usize == self.total_pieces - 1 { + last_element_size_u64(self.metadata_size as u64, CHUNK_SIZE as u64) as usize + } else { + CHUNK_SIZE as usize + } + } + fn record_piece( + &mut self, + index: u32, + data: &[u8], + info_hash: [u8; 20], + ) -> anyhow::Result { + if index as usize >= self.total_pieces { + anyhow::bail!("wrong index"); + } + let offset = (index * CHUNK_SIZE) as usize; + let size = self.piece_size(index); + if data.len() != size { + anyhow::bail!( + "expected length of piece {} to be {}, but got {}", + index, + size, + data.len() + ); + } + if self.received_pieces[index as usize] { + anyhow::bail!("already received piece {}", index); + } + let offset_end = offset + size; + (&mut self.buffer[offset..offset_end]).copy_from_slice(data); + self.received_pieces[index as usize] = true; + + if self.received_pieces.iter().all(|p| *p) { + // check metadata + let mut hash = Sha1::new(); + hash.update(&self.buffer); + if hash.finish() != info_hash { + anyhow::bail!("info checksum invalid"); + } + Ok(true) + } else { + Ok(false) + } + } +} + +struct Handler { + addr: SocketAddr, + info_hash: [u8; 20], + writer_tx: UnboundedSender, + result_tx: + Mutex>>>>, + locked: RwLock>, +} + +impl PeerConnectionHandler for Handler { + fn get_have_bytes(&self) -> u64 { + 0 + } + + fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec) -> Option { + None + } + + fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> { + if !handshake.supports_extended() { + anyhow::bail!("this peer does not support extended handshaking, which is a prerequisite to download metadata") + } + Ok(()) + } + + fn on_received_message(&self, msg: Message>) -> anyhow::Result<()> { + debug!("{}: received message: {:?}", self.addr, msg); + + if let Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Data { + piece, + total_size: _, + data, + })) = msg + { + let piece_ready = + self.locked + .write() + .as_mut() + .unwrap() + .record_piece(piece, &data, self.info_hash)?; + if piece_ready { + let buf = self.locked.write().take().unwrap().buffer; + let info = from_bytes::>(&buf); + self.result_tx + .lock() + .take() + .ok_or_else(|| anyhow::anyhow!("oneshot is consumed"))? + .send(info) + .map_err(|_| { + anyhow::anyhow!("torrent info deserialized, but consumer closed") + })?; + } + } + Ok(()) + } + + fn on_uploaded_bytes(&self, _bytes: u32) {} + + fn read_chunk(&self, _chunk: &ChunkInfo, _buf: &mut [u8]) -> anyhow::Result<()> { + anyhow::bail!("the peer is not supposed to be requesting chunks") + } + + fn on_extended_handshake( + &self, + extended_handshake: &ExtendedHandshake, + ) -> anyhow::Result<()> { + let metadata_size = match extended_handshake.metadata_size { + Some(metadata_size) => metadata_size, + None => anyhow::bail!("peer does not have metadata_size"), + }; + + if extended_handshake.get_msgid(b"ut_metadata").is_none() { + anyhow::bail!("peer does not support ut_metadata"); + } + + self.writer_tx + .send(WriterRequest::Message(Message::Unchoke))?; + self.writer_tx + .send(WriterRequest::Message(Message::Interested))?; + + let inner = HandlerLocked::new(metadata_size)?; + let total_pieces = inner.total_pieces; + + self.locked.write().replace(inner); + + for i in 0..total_pieces { + self.writer_tx + .send(WriterRequest::Message(Message::Extended( + ExtendedMessage::UtMetadata(UtMetadata::Request(i as u32)), + )))?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::{net::SocketAddr, str::FromStr, sync::Once}; + + use crate::peer_id::generate_peer_id; + + use super::read_metainfo_from_peer; + + static LOG_INIT: Once = std::sync::Once::new(); + + fn init_logging() { + LOG_INIT.call_once(pretty_env_logger::init) + } + + fn decode_info_hash(hash_str: &str) -> [u8; 20] { + let mut hash_arr = [0u8; 20]; + hex::decode_to_slice(hash_str, &mut hash_arr).unwrap(); + hash_arr + } + + #[tokio::test] + async fn test_get_torrent_metadata_from_localhost_bittorrent_client() { + init_logging(); + + let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap(); + let peer_id = generate_peer_id(); + let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7"); + dbg!(read_metainfo_from_peer(addr, peer_id, info_hash) + .await + .unwrap()); + } +} diff --git a/crates/librqbit/src/serde_bencode_ser.rs b/crates/librqbit/src/serde_bencode_ser.rs index afc5273..036d7c9 100644 --- a/crates/librqbit/src/serde_bencode_ser.rs +++ b/crates/librqbit/src/serde_bencode_ser.rs @@ -245,7 +245,7 @@ impl<'ser, W: std::io::Write> serde::ser::SerializeStruct for SerializeStruct<'s fn end(self) -> Result { for (key, value) in self.tmp { self.ser.write_bytes(key.as_bytes())?; - self.ser.write_raw(&dbg!(value))?; + self.ser.write_raw(&value)?; } self.ser.write_byte(b'e') } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 00cee8a..08a7234 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -505,8 +505,9 @@ impl PeerConnectionHandler for PeerHandler { Some(len) } - fn on_handshake(&self, handshake: Handshake) { - self.state.set_peer_live(self.addr, handshake) + fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> { + self.state.set_peer_live(self.addr, handshake); + Ok(()) } fn on_uploaded_bytes(&self, bytes: u32) { @@ -520,7 +521,9 @@ impl PeerConnectionHandler for PeerHandler { self.state.file_ops().read_chunk(self.addr, chunk, buf) } - fn on_extended_handshake(&self, _: &ExtendedHandshake) {} + fn on_extended_handshake(&self, _: &ExtendedHandshake) -> anyhow::Result<()> { + Ok(()) + } } impl PeerHandler {