From 46e87a9b8006677348f6605df1d51642ae1f49e6 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 2 Jul 2021 22:32:55 +0100 Subject: [PATCH] Split stuff into more files --- .../extended/handshake.rs | 141 +++++++ .../src/peer_binary_protocol/extended/mod.rs | 95 +++++ .../extended/ut_metadata.rs | 142 +++++++ .../mod.rs} | 372 +----------------- crates/librqbit/src/peer_connection.rs | 7 +- crates/librqbit/src/torrent_state.rs | 10 +- 6 files changed, 399 insertions(+), 368 deletions(-) create mode 100644 crates/librqbit/src/peer_binary_protocol/extended/handshake.rs create mode 100644 crates/librqbit/src/peer_binary_protocol/extended/mod.rs create mode 100644 crates/librqbit/src/peer_binary_protocol/extended/ut_metadata.rs rename crates/librqbit/src/{peer_binary_protocol.rs => peer_binary_protocol/mod.rs} (67%) diff --git a/crates/librqbit/src/peer_binary_protocol/extended/handshake.rs b/crates/librqbit/src/peer_binary_protocol/extended/handshake.rs new file mode 100644 index 0000000..54e8c72 --- /dev/null +++ b/crates/librqbit/src/peer_binary_protocol/extended/handshake.rs @@ -0,0 +1,141 @@ +use std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr, Ipv6Addr}, +}; + +use byteorder::ByteOrder; +use byteorder::BE; +use serde::{Deserialize, Deserializer, Serialize}; + +use crate::{ + buffers::ByteBuf, clone_to_owned::CloneToOwned, peer_binary_protocol::MY_EXTENDED_UT_METADATA, +}; + +#[derive(Deserialize, Serialize, Debug, Default)] +pub struct ExtendedHandshake { + #[serde(bound(deserialize = "ByteBuf: From<&'de [u8]>"))] + pub m: HashMap, + #[serde(skip_serializing_if = "Option::is_none")] + pub p: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub v: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub yourip: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub ipv6: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub ipv4: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub reqq: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata_size: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub complete_ago: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub upload_only: Option, +} + +impl ExtendedHandshake> { + pub fn new() -> Self { + let mut features = HashMap::new(); + features.insert(ByteBuf(b"ut_metadata"), MY_EXTENDED_UT_METADATA); + Self { + m: features, + ..Default::default() + } + } +} + +impl ExtendedHandshake { + pub fn get_msgid(&self, msg_type: &[u8]) -> Option + where + ByteBuf: AsRef<[u8]>, + { + self.m.iter().find_map(|(k, v)| { + if k.as_ref() == msg_type { + Some(*v) + } else { + None + } + }) + } +} + +impl CloneToOwned for ExtendedHandshake +where + ByteBuf: CloneToOwned + Eq + std::hash::Hash, + ::Target: Eq + std::hash::Hash, +{ + type Target = ExtendedHandshake<::Target>; + + fn clone_to_owned(&self) -> Self::Target { + ExtendedHandshake { + m: self.m.clone_to_owned(), + p: self.p, + v: self.v.clone_to_owned(), + yourip: self.yourip, + ipv6: self.ipv6.clone_to_owned(), + ipv4: self.ipv4.clone_to_owned(), + reqq: self.reqq, + metadata_size: self.metadata_size, + complete_ago: self.complete_ago, + upload_only: self.upload_only, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct YourIP(pub IpAddr); + +impl Serialize for YourIP { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self.0 { + IpAddr::V4(ipv4) => { + let buf = ipv4.octets(); + serializer.serialize_bytes(&buf) + } + IpAddr::V6(_) => todo!(), + } + } +} + +impl<'de> Deserialize<'de> for YourIP { + fn deserialize(de: D) -> Result + where + D: Deserializer<'de>, + { + struct Visitor {} + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = YourIP; + + fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "expecting 4 bytes of ipv4 or 16 bytes of ipv6") + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: serde::de::Error, + { + if v.len() == 4 { + return Ok(YourIP(IpAddr::V4(Ipv4Addr::new(v[0], v[1], v[2], v[3])))); + } else if v.len() == 16 { + return Ok(YourIP(IpAddr::V6(Ipv6Addr::new( + BE::read_u16(&v[..2]), + BE::read_u16(&v[2..4]), + BE::read_u16(&v[4..6]), + BE::read_u16(&v[6..8]), + BE::read_u16(&v[8..10]), + BE::read_u16(&v[10..12]), + BE::read_u16(&v[12..14]), + BE::read_u16(&v[14..]), + )))); + } + Err(E::custom("expected 4 or 16 byte address")) + } + } + de.deserialize_bytes(Visitor {}) + } +} diff --git a/crates/librqbit/src/peer_binary_protocol/extended/mod.rs b/crates/librqbit/src/peer_binary_protocol/extended/mod.rs new file mode 100644 index 0000000..31adbcc --- /dev/null +++ b/crates/librqbit/src/peer_binary_protocol/extended/mod.rs @@ -0,0 +1,95 @@ +use serde::{Deserialize, Serialize}; + +use crate::{bencode_value::BencodeValue, buffers::ByteString, clone_to_owned::CloneToOwned}; + +use self::{handshake::ExtendedHandshake, ut_metadata::UtMetadata}; + +use super::MessageDeserializeError; + +pub mod handshake; +pub mod ut_metadata; + +use super::MY_EXTENDED_UT_METADATA; + +#[derive(Debug)] +pub enum ExtendedMessage { + Handshake(ExtendedHandshake), + UtMetadata(UtMetadata), + Dyn(u8, BencodeValue), +} + +impl CloneToOwned for ExtendedMessage +where + ByteBuf: CloneToOwned + std::hash::Hash + Eq, + ::Target: std::hash::Hash + Eq, +{ + type Target = ExtendedMessage<::Target>; + + fn clone_to_owned(&self) -> Self::Target { + match self { + ExtendedMessage::Handshake(h) => ExtendedMessage::Handshake(h.clone_to_owned()), + ExtendedMessage::Dyn(u, d) => ExtendedMessage::Dyn(*u, d.clone_to_owned()), + ExtendedMessage::UtMetadata(m) => ExtendedMessage::UtMetadata(m.clone_to_owned()), + } + } +} + +impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage { + pub fn serialize( + &self, + out: &mut Vec, + extended_handshake: Option<&ExtendedHandshake>, + ) -> anyhow::Result<()> + where + ByteBuf: AsRef<[u8]>, + { + match self { + ExtendedMessage::Dyn(msg_id, v) => { + out.push(*msg_id); + crate::serde_bencode_ser::bencode_serialize_to_writer(v, out)?; + } + ExtendedMessage::Handshake(h) => { + out.push(0); + crate::serde_bencode_ser::bencode_serialize_to_writer(h, out)?; + } + ExtendedMessage::UtMetadata(u) => { + let h = extended_handshake.ok_or_else(|| { + anyhow::anyhow!("need peer's handshake to serialize ut_metadata") + })?; + let emsg_id = h + .get_msgid(b"ut_metadata") + .ok_or_else(|| anyhow::anyhow!("peer doesn't support ut_metadata"))?; + out.push(emsg_id); + u.serialize(out); + } + } + Ok(()) + } + + pub fn deserialize(mut buf: &'a [u8]) -> Result + where + ByteBuf: Deserialize<'a> + From<&'a [u8]>, + { + use crate::serde_bencode_de::from_bytes; + + let emsg_id = buf.get(0).copied().ok_or_else(|| { + MessageDeserializeError::Other(anyhow::anyhow!( + "cannot deserialize extended message: can't read first byte" + )) + })?; + + buf = &buf.get(1..).ok_or_else(|| { + MessageDeserializeError::Other(anyhow::anyhow!( + "cannot deserialize extended message: buffer empty" + )) + })?; + + match emsg_id { + 0 => Ok(ExtendedMessage::Handshake(from_bytes(&buf)?)), + MY_EXTENDED_UT_METADATA => { + Ok(ExtendedMessage::UtMetadata(UtMetadata::deserialize(&buf)?)) + } + _ => Ok(ExtendedMessage::Dyn(emsg_id, from_bytes(&buf)?)), + } + } +} diff --git a/crates/librqbit/src/peer_binary_protocol/extended/ut_metadata.rs b/crates/librqbit/src/peer_binary_protocol/extended/ut_metadata.rs new file mode 100644 index 0000000..c80f0b7 --- /dev/null +++ b/crates/librqbit/src/peer_binary_protocol/extended/ut_metadata.rs @@ -0,0 +1,142 @@ +use std::io::Write; + +use crate::{ + clone_to_owned::CloneToOwned, peer_binary_protocol::MessageDeserializeError, + serde_bencode_de::BencodeDeserializer, serde_bencode_ser::bencode_serialize_to_writer, +}; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug)] +pub enum UtMetadata { + Request(u32), + Data { + piece: u32, + total_size: u32, + data: ByteBuf, + }, + Reject(u32), +} + +impl CloneToOwned for UtMetadata { + type Target = UtMetadata<::Target>; + + fn clone_to_owned(&self) -> Self::Target { + match self { + UtMetadata::Request(req) => UtMetadata::Request(*req), + UtMetadata::Data { + piece, + total_size, + data, + } => UtMetadata::Data { + piece: *piece, + total_size: *total_size, + data: data.clone_to_owned(), + }, + UtMetadata::Reject(piece) => UtMetadata::Reject(*piece), + } + } +} + +impl<'a, ByteBuf: 'a> UtMetadata { + pub fn serialize(&self, buf: &mut Vec) + where + ByteBuf: AsRef<[u8]>, + { + #[derive(Serialize)] + struct Message { + msg_type: u32, + piece: u32, + #[serde(skip_serializing_if = "Option::is_none")] + total_size: Option, + } + match self { + UtMetadata::Request(piece) => { + let message = Message { + msg_type: 0, + piece: *piece, + total_size: None, + }; + bencode_serialize_to_writer(message, buf).unwrap() + } + UtMetadata::Data { + piece, + total_size, + data, + } => { + let message = Message { + msg_type: 1, + piece: *piece, + total_size: Some(*total_size), + }; + bencode_serialize_to_writer(message, buf).unwrap(); + buf.write_all(data.as_ref()).unwrap(); + } + UtMetadata::Reject(piece) => { + let message = Message { + msg_type: 2, + piece: *piece, + total_size: None, + }; + bencode_serialize_to_writer(message, buf).unwrap(); + } + } + } + pub fn deserialize(buf: &'a [u8]) -> Result + where + ByteBuf: From<&'a [u8]>, + { + let mut de = BencodeDeserializer::new_from_buf(buf); + + #[derive(Deserialize)] + struct Message { + msg_type: u32, + piece: u32, + total_size: Option, + } + + let message = + Message::deserialize(&mut de).map_err(|e| MessageDeserializeError::Other(e.into()))?; + let remaining = de.into_remaining(); + + match message.msg_type { + // request + 0 => { + if !remaining.is_empty() { + return Err(MessageDeserializeError::Other(anyhow::anyhow!( + "trailing bytes when decoding UtMetadata" + ))); + } + Ok(UtMetadata::Request(message.piece)) + } + // data + 1 => { + let total_size = message.total_size.ok_or_else(|| { + MessageDeserializeError::Other(anyhow::anyhow!( + "expected key total_size to be present in UtMetadata \"data\" message" + )) + })?; + Ok(UtMetadata::Data { + piece: message.piece, + total_size, + data: ByteBuf::from(remaining), + }) + } + // reject + 2 => { + if !remaining.is_empty() { + return Err(MessageDeserializeError::Other(anyhow::anyhow!( + "trailing bytes when decoding UtMetadata" + ))); + } + Ok(UtMetadata::Reject(message.piece)) + } + other => { + return Err(MessageDeserializeError::Other(anyhow::anyhow!( + "unrecognized ut_metadata message type {}", + other + ))) + } + } + } +} diff --git a/crates/librqbit/src/peer_binary_protocol.rs b/crates/librqbit/src/peer_binary_protocol/mod.rs similarity index 67% rename from crates/librqbit/src/peer_binary_protocol.rs rename to crates/librqbit/src/peer_binary_protocol/mod.rs index 46eecb3..5a2024f 100644 --- a/crates/librqbit/src/peer_binary_protocol.rs +++ b/crates/librqbit/src/peer_binary_protocol/mod.rs @@ -1,24 +1,18 @@ -use std::{ - collections::HashMap, - io::Write, - marker::PhantomData, - net::{IpAddr, Ipv4Addr, Ipv6Addr}, -}; +pub mod extended; use bincode::Options; use byteorder::{ByteOrder, BE}; -use serde::{Deserialize, Deserializer, Serialize}; +use serde::{Deserialize, Serialize}; use crate::{ - bencode_value::BencodeValue, buffers::{ByteBuf, ByteString}, clone_to_owned::CloneToOwned, constants::CHUNK_SIZE, lengths::ChunkInfo, - serde_bencode_de::BencodeDeserializer, - serde_bencode_ser, }; +use self::extended::{handshake::ExtendedHandshake, ExtendedMessage}; + const INTEGER_LEN: usize = 4; const MSGID_LEN: usize = 1; const PREAMBLE_LEN: usize = INTEGER_LEN + MSGID_LEN; @@ -48,7 +42,7 @@ const MSGID_REQUEST: u8 = 6; const MSGID_PIECE: u8 = 7; const MSGID_EXTENDED: u8 = 20; -const MY_EXTENDED_UT_METADATA: u8 = 3; +pub const MY_EXTENDED_UT_METADATA: u8 = 3; #[derive(Debug)] pub enum MessageDeserializeError { @@ -552,365 +546,25 @@ impl Request { } } -#[derive(Debug)] -pub enum UtMetadata { - Request(u32), - Data { - piece: u32, - total_size: u32, - data: ByteBuf, - }, - Reject(u32), -} - -impl CloneToOwned for UtMetadata { - type Target = UtMetadata<::Target>; - - fn clone_to_owned(&self) -> Self::Target { - match self { - UtMetadata::Request(req) => UtMetadata::Request(*req), - UtMetadata::Data { - piece, - total_size, - data, - } => UtMetadata::Data { - piece: *piece, - total_size: *total_size, - data: data.clone_to_owned(), - }, - UtMetadata::Reject(piece) => UtMetadata::Reject(*piece), - } - } -} - -impl<'a, ByteBuf: 'a> UtMetadata { - fn serialize(&self, buf: &mut Vec) - where - ByteBuf: AsRef<[u8]>, - { - #[derive(Serialize)] - struct Message { - msg_type: u32, - piece: u32, - #[serde(skip_serializing_if = "Option::is_none")] - total_size: Option, - } - match self { - UtMetadata::Request(piece) => { - let message = Message { - msg_type: 0, - piece: *piece, - total_size: None, - }; - serde_bencode_ser::bencode_serialize_to_writer(message, buf).unwrap() - } - UtMetadata::Data { - piece, - total_size, - data, - } => { - let message = Message { - msg_type: 1, - piece: *piece, - total_size: Some(*total_size), - }; - serde_bencode_ser::bencode_serialize_to_writer(message, buf).unwrap(); - buf.write_all(data.as_ref()).unwrap(); - } - UtMetadata::Reject(piece) => { - let message = Message { - msg_type: 2, - piece: *piece, - total_size: None, - }; - serde_bencode_ser::bencode_serialize_to_writer(message, buf).unwrap(); - } - } - } - fn deserialize(buf: &'a [u8]) -> Result - where - ByteBuf: From<&'a [u8]>, - { - let mut de = BencodeDeserializer::new_from_buf(buf); - - #[derive(Deserialize)] - struct Message { - msg_type: u32, - piece: u32, - total_size: Option, - } - - let message = - Message::deserialize(&mut de).map_err(|e| MessageDeserializeError::Other(e.into()))?; - let remaining = de.into_remaining(); - - match message.msg_type { - // request - 0 => { - if !remaining.is_empty() { - return Err(MessageDeserializeError::Other(anyhow::anyhow!( - "trailing bytes when decoding UtMetadata" - ))); - } - Ok(UtMetadata::Request(message.piece)) - } - // data - 1 => { - let total_size = message.total_size.ok_or_else(|| { - MessageDeserializeError::Other(anyhow::anyhow!( - "expected key total_size to be present in UtMetadata \"data\" message" - )) - })?; - Ok(UtMetadata::Data { - piece: message.piece, - total_size, - data: ByteBuf::from(remaining), - }) - } - // reject - 2 => { - if !remaining.is_empty() { - return Err(MessageDeserializeError::Other(anyhow::anyhow!( - "trailing bytes when decoding UtMetadata" - ))); - } - Ok(UtMetadata::Reject(message.piece)) - } - other => { - return Err(MessageDeserializeError::Other(anyhow::anyhow!( - "unrecognized ut_metadata message type {}", - other - ))) - } - } - } -} - -#[derive(Debug)] -pub enum ExtendedMessage { - Handshake(ExtendedHandshake), - UtMetadata(UtMetadata), - Dyn(u8, BencodeValue), -} - -impl CloneToOwned for ExtendedMessage -where - ByteBuf: CloneToOwned + std::hash::Hash + Eq, - ::Target: std::hash::Hash + Eq, -{ - type Target = ExtendedMessage<::Target>; - - fn clone_to_owned(&self) -> Self::Target { - match self { - ExtendedMessage::Handshake(h) => ExtendedMessage::Handshake(h.clone_to_owned()), - ExtendedMessage::Dyn(u, d) => ExtendedMessage::Dyn(*u, d.clone_to_owned()), - ExtendedMessage::UtMetadata(m) => ExtendedMessage::UtMetadata(m.clone_to_owned()), - } - } -} - -impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage { - fn serialize( - &self, - out: &mut Vec, - extended_handshake: Option<&ExtendedHandshake>, - ) -> anyhow::Result<()> - where - ByteBuf: AsRef<[u8]>, - { - match self { - ExtendedMessage::Dyn(msg_id, v) => { - out.push(*msg_id); - crate::serde_bencode_ser::bencode_serialize_to_writer(v, out)?; - } - ExtendedMessage::Handshake(h) => { - out.push(0); - crate::serde_bencode_ser::bencode_serialize_to_writer(h, out)?; - } - ExtendedMessage::UtMetadata(u) => { - let h = extended_handshake.ok_or_else(|| { - anyhow::anyhow!("need peer's handshake to serialize ut_metadata") - })?; - let emsg_id = h - .get_msgid(b"ut_metadata") - .ok_or_else(|| anyhow::anyhow!("peer doesn't support ut_metadata"))?; - out.push(emsg_id); - u.serialize(out); - } - } - Ok(()) - } - - fn deserialize(mut buf: &'a [u8]) -> Result - where - ByteBuf: Deserialize<'a> + From<&'a [u8]>, - { - use crate::serde_bencode_de::from_bytes; - - let emsg_id = buf.get(0).copied().ok_or_else(|| { - MessageDeserializeError::Other(anyhow::anyhow!( - "cannot deserialize extended message: can't read first byte" - )) - })?; - - buf = &buf.get(1..).ok_or_else(|| { - MessageDeserializeError::Other(anyhow::anyhow!( - "cannot deserialize extended message: buffer empty" - )) - })?; - - match emsg_id { - 0 => Ok(ExtendedMessage::Handshake(from_bytes(&buf)?)), - MY_EXTENDED_UT_METADATA => { - Ok(ExtendedMessage::UtMetadata(UtMetadata::deserialize(&buf)?)) - } - _ => Ok(ExtendedMessage::Dyn(emsg_id, from_bytes(&buf)?)), - } - } -} - -#[derive(Debug, Clone, Copy)] -pub struct YourIP(pub IpAddr); - -impl Serialize for YourIP { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - match self.0 { - IpAddr::V4(ipv4) => { - let buf = ipv4.octets(); - serializer.serialize_bytes(&buf) - } - IpAddr::V6(_) => todo!(), - } - } -} - -impl<'de> Deserialize<'de> for YourIP { - fn deserialize(de: D) -> Result - where - D: Deserializer<'de>, - { - struct Visitor {} - impl<'de> serde::de::Visitor<'de> for Visitor { - type Value = YourIP; - - fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "expecting 4 bytes of ipv4 or 16 bytes of ipv6") - } - - fn visit_bytes(self, v: &[u8]) -> Result - where - E: serde::de::Error, - { - if v.len() == 4 { - return Ok(YourIP(IpAddr::V4(Ipv4Addr::new(v[0], v[1], v[2], v[3])))); - } else if v.len() == 16 { - return Ok(YourIP(IpAddr::V6(Ipv6Addr::new( - BE::read_u16(&v[..2]), - BE::read_u16(&v[2..4]), - BE::read_u16(&v[4..6]), - BE::read_u16(&v[6..8]), - BE::read_u16(&v[8..10]), - BE::read_u16(&v[10..12]), - BE::read_u16(&v[12..14]), - BE::read_u16(&v[14..]), - )))); - } - Err(E::custom("expected 4 or 16 byte address")) - } - } - de.deserialize_bytes(Visitor {}) - } -} - -#[derive(Deserialize, Serialize, Debug, Default)] -pub struct ExtendedHandshake { - #[serde(bound(deserialize = "ByteBuf: From<&'de [u8]>"))] - pub m: HashMap, - #[serde(skip_serializing_if = "Option::is_none")] - pub p: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub v: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub yourip: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub ipv6: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub ipv4: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub reqq: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub metadata_size: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub complete_ago: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub upload_only: Option, -} - -impl ExtendedHandshake> { - pub fn new() -> Self { - let mut features = HashMap::new(); - features.insert(ByteBuf(b"ut_metadata"), MY_EXTENDED_UT_METADATA); - Self { - m: features, - ..Default::default() - } - } -} - -impl ExtendedHandshake { - fn get_msgid(&self, msg_type: &[u8]) -> Option - where - ByteBuf: AsRef<[u8]>, - { - self.m.iter().find_map(|(k, v)| { - if k.as_ref() == msg_type { - Some(*v) - } else { - None - } - }) - } -} - -impl CloneToOwned for ExtendedHandshake -where - ByteBuf: CloneToOwned + Eq + std::hash::Hash, - ::Target: Eq + std::hash::Hash, -{ - type Target = ExtendedHandshake<::Target>; - - fn clone_to_owned(&self) -> Self::Target { - ExtendedHandshake { - m: self.m.clone_to_owned(), - p: self.p, - v: self.v.clone_to_owned(), - yourip: self.yourip, - ipv6: self.ipv6.clone_to_owned(), - ipv4: self.ipv4.clone_to_owned(), - reqq: self.reqq, - metadata_size: self.metadata_size, - complete_ago: self.complete_ago, - upload_only: self.upload_only, - } - } -} - #[cfg(test)] mod tests { - use std::{fs::File, io::Read, net::SocketAddr, str::FromStr}; + use std::{ + fs::File, + io::{Read, Write}, + net::SocketAddr, + str::FromStr, + }; use log::info; use parking_lot::RwLock; use tokio::sync::mpsc::UnboundedSender; use crate::{ + bencode_value::BencodeValue, lengths::ceil_div_u64, + peer_binary_protocol::extended::ut_metadata::UtMetadata, peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest}, peer_id::generate_peer_id, - torrent_metainfo::TorrentMetaV1Borrowed, }; use std::sync::Once; diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index d237342..7416b8b 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -2,15 +2,16 @@ use std::{net::SocketAddr, time::Duration}; use anyhow::Context; use log::{debug, trace}; -use tokio::{io::AsyncReadExt, time::timeout}; +use tokio::time::timeout; use crate::{ buffers::{ByteBuf, ByteString}, clone_to_owned::CloneToOwned, lengths::ChunkInfo, peer_binary_protocol::{ - serialize_piece_preamble, ExtendedHandshake, ExtendedMessage, Handshake, Message, - MessageBorrowed, MessageDeserializeError, MessageOwned, PIECE_MESSAGE_DEFAULT_LEN, + extended::{handshake::ExtendedHandshake, ExtendedMessage}, + serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError, + MessageOwned, PIECE_MESSAGE_DEFAULT_LEN, }, peer_id::try_decode_peer_id, }; diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index aa8e012..00cee8a 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -21,7 +21,9 @@ use crate::{ clone_to_owned::CloneToOwned, file_ops::FileOps, lengths::{Lengths, ValidPieceIndex}, - peer_binary_protocol::{Handshake, Message, MessageOwned, Piece, Request}, + peer_binary_protocol::{ + extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request, + }, peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest}, peer_state::{InflightRequest, LivePeerState, PeerState}, spawn_utils::{spawn, BlockingSpawner}, @@ -518,11 +520,7 @@ impl PeerConnectionHandler for PeerHandler { self.state.file_ops().read_chunk(self.addr, chunk, buf) } - fn on_extended_handshake( - &self, - extended_handshake: &crate::peer_binary_protocol::ExtendedHandshake, - ) { - } + fn on_extended_handshake(&self, _: &ExtendedHandshake) {} } impl PeerHandler {