Split stuff into more files
This commit is contained in:
parent
48dcf2d1bd
commit
46e87a9b80
6 changed files with 399 additions and 368 deletions
141
crates/librqbit/src/peer_binary_protocol/extended/handshake.rs
Normal file
141
crates/librqbit/src/peer_binary_protocol/extended/handshake.rs
Normal file
|
|
@ -0,0 +1,141 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr},
|
||||
};
|
||||
|
||||
use byteorder::ByteOrder;
|
||||
use byteorder::BE;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
|
||||
use crate::{
|
||||
buffers::ByteBuf, clone_to_owned::CloneToOwned, peer_binary_protocol::MY_EXTENDED_UT_METADATA,
|
||||
};
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug, Default)]
|
||||
pub struct ExtendedHandshake<ByteBuf: Eq + std::hash::Hash> {
|
||||
#[serde(bound(deserialize = "ByteBuf: From<&'de [u8]>"))]
|
||||
pub m: HashMap<ByteBuf, u8>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub p: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub v: Option<ByteBuf>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub yourip: Option<YourIP>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ipv6: Option<ByteBuf>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ipv4: Option<ByteBuf>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub reqq: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub metadata_size: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub complete_ago: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub upload_only: Option<u32>,
|
||||
}
|
||||
|
||||
impl ExtendedHandshake<ByteBuf<'static>> {
|
||||
pub fn new() -> Self {
|
||||
let mut features = HashMap::new();
|
||||
features.insert(ByteBuf(b"ut_metadata"), MY_EXTENDED_UT_METADATA);
|
||||
Self {
|
||||
m: features,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<ByteBuf: Eq + std::hash::Hash> ExtendedHandshake<ByteBuf> {
|
||||
pub fn get_msgid(&self, msg_type: &[u8]) -> Option<u8>
|
||||
where
|
||||
ByteBuf: AsRef<[u8]>,
|
||||
{
|
||||
self.m.iter().find_map(|(k, v)| {
|
||||
if k.as_ref() == msg_type {
|
||||
Some(*v)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<ByteBuf> CloneToOwned for ExtendedHandshake<ByteBuf>
|
||||
where
|
||||
ByteBuf: CloneToOwned + Eq + std::hash::Hash,
|
||||
<ByteBuf as CloneToOwned>::Target: Eq + std::hash::Hash,
|
||||
{
|
||||
type Target = ExtendedHandshake<<ByteBuf as CloneToOwned>::Target>;
|
||||
|
||||
fn clone_to_owned(&self) -> Self::Target {
|
||||
ExtendedHandshake {
|
||||
m: self.m.clone_to_owned(),
|
||||
p: self.p,
|
||||
v: self.v.clone_to_owned(),
|
||||
yourip: self.yourip,
|
||||
ipv6: self.ipv6.clone_to_owned(),
|
||||
ipv4: self.ipv4.clone_to_owned(),
|
||||
reqq: self.reqq,
|
||||
metadata_size: self.metadata_size,
|
||||
complete_ago: self.complete_ago,
|
||||
upload_only: self.upload_only,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct YourIP(pub IpAddr);
|
||||
|
||||
impl Serialize for YourIP {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
match self.0 {
|
||||
IpAddr::V4(ipv4) => {
|
||||
let buf = ipv4.octets();
|
||||
serializer.serialize_bytes(&buf)
|
||||
}
|
||||
IpAddr::V6(_) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for YourIP {
|
||||
fn deserialize<D>(de: D) -> Result<YourIP, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
struct Visitor {}
|
||||
impl<'de> serde::de::Visitor<'de> for Visitor {
|
||||
type Value = YourIP;
|
||||
|
||||
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "expecting 4 bytes of ipv4 or 16 bytes of ipv6")
|
||||
}
|
||||
|
||||
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
if v.len() == 4 {
|
||||
return Ok(YourIP(IpAddr::V4(Ipv4Addr::new(v[0], v[1], v[2], v[3]))));
|
||||
} else if v.len() == 16 {
|
||||
return Ok(YourIP(IpAddr::V6(Ipv6Addr::new(
|
||||
BE::read_u16(&v[..2]),
|
||||
BE::read_u16(&v[2..4]),
|
||||
BE::read_u16(&v[4..6]),
|
||||
BE::read_u16(&v[6..8]),
|
||||
BE::read_u16(&v[8..10]),
|
||||
BE::read_u16(&v[10..12]),
|
||||
BE::read_u16(&v[12..14]),
|
||||
BE::read_u16(&v[14..]),
|
||||
))));
|
||||
}
|
||||
Err(E::custom("expected 4 or 16 byte address"))
|
||||
}
|
||||
}
|
||||
de.deserialize_bytes(Visitor {})
|
||||
}
|
||||
}
|
||||
95
crates/librqbit/src/peer_binary_protocol/extended/mod.rs
Normal file
95
crates/librqbit/src/peer_binary_protocol/extended/mod.rs
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{bencode_value::BencodeValue, buffers::ByteString, clone_to_owned::CloneToOwned};
|
||||
|
||||
use self::{handshake::ExtendedHandshake, ut_metadata::UtMetadata};
|
||||
|
||||
use super::MessageDeserializeError;
|
||||
|
||||
pub mod handshake;
|
||||
pub mod ut_metadata;
|
||||
|
||||
use super::MY_EXTENDED_UT_METADATA;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ExtendedMessage<ByteBuf: std::hash::Hash + Eq> {
|
||||
Handshake(ExtendedHandshake<ByteBuf>),
|
||||
UtMetadata(UtMetadata<ByteBuf>),
|
||||
Dyn(u8, BencodeValue<ByteBuf>),
|
||||
}
|
||||
|
||||
impl<ByteBuf> CloneToOwned for ExtendedMessage<ByteBuf>
|
||||
where
|
||||
ByteBuf: CloneToOwned + std::hash::Hash + Eq,
|
||||
<ByteBuf as CloneToOwned>::Target: std::hash::Hash + Eq,
|
||||
{
|
||||
type Target = ExtendedMessage<<ByteBuf as CloneToOwned>::Target>;
|
||||
|
||||
fn clone_to_owned(&self) -> Self::Target {
|
||||
match self {
|
||||
ExtendedMessage::Handshake(h) => ExtendedMessage::Handshake(h.clone_to_owned()),
|
||||
ExtendedMessage::Dyn(u, d) => ExtendedMessage::Dyn(*u, d.clone_to_owned()),
|
||||
ExtendedMessage::UtMetadata(m) => ExtendedMessage::UtMetadata(m.clone_to_owned()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage<ByteBuf> {
|
||||
pub fn serialize(
|
||||
&self,
|
||||
out: &mut Vec<u8>,
|
||||
extended_handshake: Option<&ExtendedHandshake<ByteString>>,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
ByteBuf: AsRef<[u8]>,
|
||||
{
|
||||
match self {
|
||||
ExtendedMessage::Dyn(msg_id, v) => {
|
||||
out.push(*msg_id);
|
||||
crate::serde_bencode_ser::bencode_serialize_to_writer(v, out)?;
|
||||
}
|
||||
ExtendedMessage::Handshake(h) => {
|
||||
out.push(0);
|
||||
crate::serde_bencode_ser::bencode_serialize_to_writer(h, out)?;
|
||||
}
|
||||
ExtendedMessage::UtMetadata(u) => {
|
||||
let h = extended_handshake.ok_or_else(|| {
|
||||
anyhow::anyhow!("need peer's handshake to serialize ut_metadata")
|
||||
})?;
|
||||
let emsg_id = h
|
||||
.get_msgid(b"ut_metadata")
|
||||
.ok_or_else(|| anyhow::anyhow!("peer doesn't support ut_metadata"))?;
|
||||
out.push(emsg_id);
|
||||
u.serialize(out);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn deserialize(mut buf: &'a [u8]) -> Result<Self, MessageDeserializeError>
|
||||
where
|
||||
ByteBuf: Deserialize<'a> + From<&'a [u8]>,
|
||||
{
|
||||
use crate::serde_bencode_de::from_bytes;
|
||||
|
||||
let emsg_id = buf.get(0).copied().ok_or_else(|| {
|
||||
MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"cannot deserialize extended message: can't read first byte"
|
||||
))
|
||||
})?;
|
||||
|
||||
buf = &buf.get(1..).ok_or_else(|| {
|
||||
MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"cannot deserialize extended message: buffer empty"
|
||||
))
|
||||
})?;
|
||||
|
||||
match emsg_id {
|
||||
0 => Ok(ExtendedMessage::Handshake(from_bytes(&buf)?)),
|
||||
MY_EXTENDED_UT_METADATA => {
|
||||
Ok(ExtendedMessage::UtMetadata(UtMetadata::deserialize(&buf)?))
|
||||
}
|
||||
_ => Ok(ExtendedMessage::Dyn(emsg_id, from_bytes(&buf)?)),
|
||||
}
|
||||
}
|
||||
}
|
||||
142
crates/librqbit/src/peer_binary_protocol/extended/ut_metadata.rs
Normal file
142
crates/librqbit/src/peer_binary_protocol/extended/ut_metadata.rs
Normal file
|
|
@ -0,0 +1,142 @@
|
|||
use std::io::Write;
|
||||
|
||||
use crate::{
|
||||
clone_to_owned::CloneToOwned, peer_binary_protocol::MessageDeserializeError,
|
||||
serde_bencode_de::BencodeDeserializer, serde_bencode_ser::bencode_serialize_to_writer,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum UtMetadata<ByteBuf> {
|
||||
Request(u32),
|
||||
Data {
|
||||
piece: u32,
|
||||
total_size: u32,
|
||||
data: ByteBuf,
|
||||
},
|
||||
Reject(u32),
|
||||
}
|
||||
|
||||
impl<ByteBuf: CloneToOwned> CloneToOwned for UtMetadata<ByteBuf> {
|
||||
type Target = UtMetadata<<ByteBuf as CloneToOwned>::Target>;
|
||||
|
||||
fn clone_to_owned(&self) -> Self::Target {
|
||||
match self {
|
||||
UtMetadata::Request(req) => UtMetadata::Request(*req),
|
||||
UtMetadata::Data {
|
||||
piece,
|
||||
total_size,
|
||||
data,
|
||||
} => UtMetadata::Data {
|
||||
piece: *piece,
|
||||
total_size: *total_size,
|
||||
data: data.clone_to_owned(),
|
||||
},
|
||||
UtMetadata::Reject(piece) => UtMetadata::Reject(*piece),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, ByteBuf: 'a> UtMetadata<ByteBuf> {
|
||||
pub fn serialize(&self, buf: &mut Vec<u8>)
|
||||
where
|
||||
ByteBuf: AsRef<[u8]>,
|
||||
{
|
||||
#[derive(Serialize)]
|
||||
struct Message {
|
||||
msg_type: u32,
|
||||
piece: u32,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
total_size: Option<u32>,
|
||||
}
|
||||
match self {
|
||||
UtMetadata::Request(piece) => {
|
||||
let message = Message {
|
||||
msg_type: 0,
|
||||
piece: *piece,
|
||||
total_size: None,
|
||||
};
|
||||
bencode_serialize_to_writer(message, buf).unwrap()
|
||||
}
|
||||
UtMetadata::Data {
|
||||
piece,
|
||||
total_size,
|
||||
data,
|
||||
} => {
|
||||
let message = Message {
|
||||
msg_type: 1,
|
||||
piece: *piece,
|
||||
total_size: Some(*total_size),
|
||||
};
|
||||
bencode_serialize_to_writer(message, buf).unwrap();
|
||||
buf.write_all(data.as_ref()).unwrap();
|
||||
}
|
||||
UtMetadata::Reject(piece) => {
|
||||
let message = Message {
|
||||
msg_type: 2,
|
||||
piece: *piece,
|
||||
total_size: None,
|
||||
};
|
||||
bencode_serialize_to_writer(message, buf).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn deserialize(buf: &'a [u8]) -> Result<Self, MessageDeserializeError>
|
||||
where
|
||||
ByteBuf: From<&'a [u8]>,
|
||||
{
|
||||
let mut de = BencodeDeserializer::new_from_buf(buf);
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Message {
|
||||
msg_type: u32,
|
||||
piece: u32,
|
||||
total_size: Option<u32>,
|
||||
}
|
||||
|
||||
let message =
|
||||
Message::deserialize(&mut de).map_err(|e| MessageDeserializeError::Other(e.into()))?;
|
||||
let remaining = de.into_remaining();
|
||||
|
||||
match message.msg_type {
|
||||
// request
|
||||
0 => {
|
||||
if !remaining.is_empty() {
|
||||
return Err(MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"trailing bytes when decoding UtMetadata"
|
||||
)));
|
||||
}
|
||||
Ok(UtMetadata::Request(message.piece))
|
||||
}
|
||||
// data
|
||||
1 => {
|
||||
let total_size = message.total_size.ok_or_else(|| {
|
||||
MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"expected key total_size to be present in UtMetadata \"data\" message"
|
||||
))
|
||||
})?;
|
||||
Ok(UtMetadata::Data {
|
||||
piece: message.piece,
|
||||
total_size,
|
||||
data: ByteBuf::from(remaining),
|
||||
})
|
||||
}
|
||||
// reject
|
||||
2 => {
|
||||
if !remaining.is_empty() {
|
||||
return Err(MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"trailing bytes when decoding UtMetadata"
|
||||
)));
|
||||
}
|
||||
Ok(UtMetadata::Reject(message.piece))
|
||||
}
|
||||
other => {
|
||||
return Err(MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"unrecognized ut_metadata message type {}",
|
||||
other
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,24 +1,18 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
io::Write,
|
||||
marker::PhantomData,
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr},
|
||||
};
|
||||
pub mod extended;
|
||||
|
||||
use bincode::Options;
|
||||
use byteorder::{ByteOrder, BE};
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
bencode_value::BencodeValue,
|
||||
buffers::{ByteBuf, ByteString},
|
||||
clone_to_owned::CloneToOwned,
|
||||
constants::CHUNK_SIZE,
|
||||
lengths::ChunkInfo,
|
||||
serde_bencode_de::BencodeDeserializer,
|
||||
serde_bencode_ser,
|
||||
};
|
||||
|
||||
use self::extended::{handshake::ExtendedHandshake, ExtendedMessage};
|
||||
|
||||
const INTEGER_LEN: usize = 4;
|
||||
const MSGID_LEN: usize = 1;
|
||||
const PREAMBLE_LEN: usize = INTEGER_LEN + MSGID_LEN;
|
||||
|
|
@ -48,7 +42,7 @@ const MSGID_REQUEST: u8 = 6;
|
|||
const MSGID_PIECE: u8 = 7;
|
||||
const MSGID_EXTENDED: u8 = 20;
|
||||
|
||||
const MY_EXTENDED_UT_METADATA: u8 = 3;
|
||||
pub const MY_EXTENDED_UT_METADATA: u8 = 3;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MessageDeserializeError {
|
||||
|
|
@ -552,365 +546,25 @@ impl Request {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum UtMetadata<ByteBuf> {
|
||||
Request(u32),
|
||||
Data {
|
||||
piece: u32,
|
||||
total_size: u32,
|
||||
data: ByteBuf,
|
||||
},
|
||||
Reject(u32),
|
||||
}
|
||||
|
||||
impl<ByteBuf: CloneToOwned> CloneToOwned for UtMetadata<ByteBuf> {
|
||||
type Target = UtMetadata<<ByteBuf as CloneToOwned>::Target>;
|
||||
|
||||
fn clone_to_owned(&self) -> Self::Target {
|
||||
match self {
|
||||
UtMetadata::Request(req) => UtMetadata::Request(*req),
|
||||
UtMetadata::Data {
|
||||
piece,
|
||||
total_size,
|
||||
data,
|
||||
} => UtMetadata::Data {
|
||||
piece: *piece,
|
||||
total_size: *total_size,
|
||||
data: data.clone_to_owned(),
|
||||
},
|
||||
UtMetadata::Reject(piece) => UtMetadata::Reject(*piece),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, ByteBuf: 'a> UtMetadata<ByteBuf> {
|
||||
fn serialize(&self, buf: &mut Vec<u8>)
|
||||
where
|
||||
ByteBuf: AsRef<[u8]>,
|
||||
{
|
||||
#[derive(Serialize)]
|
||||
struct Message {
|
||||
msg_type: u32,
|
||||
piece: u32,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
total_size: Option<u32>,
|
||||
}
|
||||
match self {
|
||||
UtMetadata::Request(piece) => {
|
||||
let message = Message {
|
||||
msg_type: 0,
|
||||
piece: *piece,
|
||||
total_size: None,
|
||||
};
|
||||
serde_bencode_ser::bencode_serialize_to_writer(message, buf).unwrap()
|
||||
}
|
||||
UtMetadata::Data {
|
||||
piece,
|
||||
total_size,
|
||||
data,
|
||||
} => {
|
||||
let message = Message {
|
||||
msg_type: 1,
|
||||
piece: *piece,
|
||||
total_size: Some(*total_size),
|
||||
};
|
||||
serde_bencode_ser::bencode_serialize_to_writer(message, buf).unwrap();
|
||||
buf.write_all(data.as_ref()).unwrap();
|
||||
}
|
||||
UtMetadata::Reject(piece) => {
|
||||
let message = Message {
|
||||
msg_type: 2,
|
||||
piece: *piece,
|
||||
total_size: None,
|
||||
};
|
||||
serde_bencode_ser::bencode_serialize_to_writer(message, buf).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
fn deserialize(buf: &'a [u8]) -> Result<Self, MessageDeserializeError>
|
||||
where
|
||||
ByteBuf: From<&'a [u8]>,
|
||||
{
|
||||
let mut de = BencodeDeserializer::new_from_buf(buf);
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Message {
|
||||
msg_type: u32,
|
||||
piece: u32,
|
||||
total_size: Option<u32>,
|
||||
}
|
||||
|
||||
let message =
|
||||
Message::deserialize(&mut de).map_err(|e| MessageDeserializeError::Other(e.into()))?;
|
||||
let remaining = de.into_remaining();
|
||||
|
||||
match message.msg_type {
|
||||
// request
|
||||
0 => {
|
||||
if !remaining.is_empty() {
|
||||
return Err(MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"trailing bytes when decoding UtMetadata"
|
||||
)));
|
||||
}
|
||||
Ok(UtMetadata::Request(message.piece))
|
||||
}
|
||||
// data
|
||||
1 => {
|
||||
let total_size = message.total_size.ok_or_else(|| {
|
||||
MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"expected key total_size to be present in UtMetadata \"data\" message"
|
||||
))
|
||||
})?;
|
||||
Ok(UtMetadata::Data {
|
||||
piece: message.piece,
|
||||
total_size,
|
||||
data: ByteBuf::from(remaining),
|
||||
})
|
||||
}
|
||||
// reject
|
||||
2 => {
|
||||
if !remaining.is_empty() {
|
||||
return Err(MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"trailing bytes when decoding UtMetadata"
|
||||
)));
|
||||
}
|
||||
Ok(UtMetadata::Reject(message.piece))
|
||||
}
|
||||
other => {
|
||||
return Err(MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"unrecognized ut_metadata message type {}",
|
||||
other
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ExtendedMessage<ByteBuf: std::hash::Hash + Eq> {
|
||||
Handshake(ExtendedHandshake<ByteBuf>),
|
||||
UtMetadata(UtMetadata<ByteBuf>),
|
||||
Dyn(u8, BencodeValue<ByteBuf>),
|
||||
}
|
||||
|
||||
impl<ByteBuf> CloneToOwned for ExtendedMessage<ByteBuf>
|
||||
where
|
||||
ByteBuf: CloneToOwned + std::hash::Hash + Eq,
|
||||
<ByteBuf as CloneToOwned>::Target: std::hash::Hash + Eq,
|
||||
{
|
||||
type Target = ExtendedMessage<<ByteBuf as CloneToOwned>::Target>;
|
||||
|
||||
fn clone_to_owned(&self) -> Self::Target {
|
||||
match self {
|
||||
ExtendedMessage::Handshake(h) => ExtendedMessage::Handshake(h.clone_to_owned()),
|
||||
ExtendedMessage::Dyn(u, d) => ExtendedMessage::Dyn(*u, d.clone_to_owned()),
|
||||
ExtendedMessage::UtMetadata(m) => ExtendedMessage::UtMetadata(m.clone_to_owned()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, ByteBuf: 'a + std::hash::Hash + Eq + Serialize> ExtendedMessage<ByteBuf> {
|
||||
fn serialize(
|
||||
&self,
|
||||
out: &mut Vec<u8>,
|
||||
extended_handshake: Option<&ExtendedHandshake<ByteString>>,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
ByteBuf: AsRef<[u8]>,
|
||||
{
|
||||
match self {
|
||||
ExtendedMessage::Dyn(msg_id, v) => {
|
||||
out.push(*msg_id);
|
||||
crate::serde_bencode_ser::bencode_serialize_to_writer(v, out)?;
|
||||
}
|
||||
ExtendedMessage::Handshake(h) => {
|
||||
out.push(0);
|
||||
crate::serde_bencode_ser::bencode_serialize_to_writer(h, out)?;
|
||||
}
|
||||
ExtendedMessage::UtMetadata(u) => {
|
||||
let h = extended_handshake.ok_or_else(|| {
|
||||
anyhow::anyhow!("need peer's handshake to serialize ut_metadata")
|
||||
})?;
|
||||
let emsg_id = h
|
||||
.get_msgid(b"ut_metadata")
|
||||
.ok_or_else(|| anyhow::anyhow!("peer doesn't support ut_metadata"))?;
|
||||
out.push(emsg_id);
|
||||
u.serialize(out);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn deserialize(mut buf: &'a [u8]) -> Result<Self, MessageDeserializeError>
|
||||
where
|
||||
ByteBuf: Deserialize<'a> + From<&'a [u8]>,
|
||||
{
|
||||
use crate::serde_bencode_de::from_bytes;
|
||||
|
||||
let emsg_id = buf.get(0).copied().ok_or_else(|| {
|
||||
MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"cannot deserialize extended message: can't read first byte"
|
||||
))
|
||||
})?;
|
||||
|
||||
buf = &buf.get(1..).ok_or_else(|| {
|
||||
MessageDeserializeError::Other(anyhow::anyhow!(
|
||||
"cannot deserialize extended message: buffer empty"
|
||||
))
|
||||
})?;
|
||||
|
||||
match emsg_id {
|
||||
0 => Ok(ExtendedMessage::Handshake(from_bytes(&buf)?)),
|
||||
MY_EXTENDED_UT_METADATA => {
|
||||
Ok(ExtendedMessage::UtMetadata(UtMetadata::deserialize(&buf)?))
|
||||
}
|
||||
_ => Ok(ExtendedMessage::Dyn(emsg_id, from_bytes(&buf)?)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct YourIP(pub IpAddr);
|
||||
|
||||
impl Serialize for YourIP {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
match self.0 {
|
||||
IpAddr::V4(ipv4) => {
|
||||
let buf = ipv4.octets();
|
||||
serializer.serialize_bytes(&buf)
|
||||
}
|
||||
IpAddr::V6(_) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for YourIP {
|
||||
fn deserialize<D>(de: D) -> Result<YourIP, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
struct Visitor {}
|
||||
impl<'de> serde::de::Visitor<'de> for Visitor {
|
||||
type Value = YourIP;
|
||||
|
||||
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "expecting 4 bytes of ipv4 or 16 bytes of ipv6")
|
||||
}
|
||||
|
||||
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
if v.len() == 4 {
|
||||
return Ok(YourIP(IpAddr::V4(Ipv4Addr::new(v[0], v[1], v[2], v[3]))));
|
||||
} else if v.len() == 16 {
|
||||
return Ok(YourIP(IpAddr::V6(Ipv6Addr::new(
|
||||
BE::read_u16(&v[..2]),
|
||||
BE::read_u16(&v[2..4]),
|
||||
BE::read_u16(&v[4..6]),
|
||||
BE::read_u16(&v[6..8]),
|
||||
BE::read_u16(&v[8..10]),
|
||||
BE::read_u16(&v[10..12]),
|
||||
BE::read_u16(&v[12..14]),
|
||||
BE::read_u16(&v[14..]),
|
||||
))));
|
||||
}
|
||||
Err(E::custom("expected 4 or 16 byte address"))
|
||||
}
|
||||
}
|
||||
de.deserialize_bytes(Visitor {})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug, Default)]
|
||||
pub struct ExtendedHandshake<ByteBuf: Eq + std::hash::Hash> {
|
||||
#[serde(bound(deserialize = "ByteBuf: From<&'de [u8]>"))]
|
||||
pub m: HashMap<ByteBuf, u8>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub p: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub v: Option<ByteBuf>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub yourip: Option<YourIP>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ipv6: Option<ByteBuf>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ipv4: Option<ByteBuf>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub reqq: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub metadata_size: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub complete_ago: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub upload_only: Option<u32>,
|
||||
}
|
||||
|
||||
impl ExtendedHandshake<ByteBuf<'static>> {
|
||||
pub fn new() -> Self {
|
||||
let mut features = HashMap::new();
|
||||
features.insert(ByteBuf(b"ut_metadata"), MY_EXTENDED_UT_METADATA);
|
||||
Self {
|
||||
m: features,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<ByteBuf: Eq + std::hash::Hash> ExtendedHandshake<ByteBuf> {
|
||||
fn get_msgid(&self, msg_type: &[u8]) -> Option<u8>
|
||||
where
|
||||
ByteBuf: AsRef<[u8]>,
|
||||
{
|
||||
self.m.iter().find_map(|(k, v)| {
|
||||
if k.as_ref() == msg_type {
|
||||
Some(*v)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<ByteBuf> CloneToOwned for ExtendedHandshake<ByteBuf>
|
||||
where
|
||||
ByteBuf: CloneToOwned + Eq + std::hash::Hash,
|
||||
<ByteBuf as CloneToOwned>::Target: Eq + std::hash::Hash,
|
||||
{
|
||||
type Target = ExtendedHandshake<<ByteBuf as CloneToOwned>::Target>;
|
||||
|
||||
fn clone_to_owned(&self) -> Self::Target {
|
||||
ExtendedHandshake {
|
||||
m: self.m.clone_to_owned(),
|
||||
p: self.p,
|
||||
v: self.v.clone_to_owned(),
|
||||
yourip: self.yourip,
|
||||
ipv6: self.ipv6.clone_to_owned(),
|
||||
ipv4: self.ipv4.clone_to_owned(),
|
||||
reqq: self.reqq,
|
||||
metadata_size: self.metadata_size,
|
||||
complete_ago: self.complete_ago,
|
||||
upload_only: self.upload_only,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{fs::File, io::Read, net::SocketAddr, str::FromStr};
|
||||
use std::{
|
||||
fs::File,
|
||||
io::{Read, Write},
|
||||
net::SocketAddr,
|
||||
str::FromStr,
|
||||
};
|
||||
|
||||
use log::info;
|
||||
use parking_lot::RwLock;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
|
||||
use crate::{
|
||||
bencode_value::BencodeValue,
|
||||
lengths::ceil_div_u64,
|
||||
peer_binary_protocol::extended::ut_metadata::UtMetadata,
|
||||
peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest},
|
||||
peer_id::generate_peer_id,
|
||||
torrent_metainfo::TorrentMetaV1Borrowed,
|
||||
};
|
||||
use std::sync::Once;
|
||||
|
||||
|
|
@ -2,15 +2,16 @@ use std::{net::SocketAddr, time::Duration};
|
|||
|
||||
use anyhow::Context;
|
||||
use log::{debug, trace};
|
||||
use tokio::{io::AsyncReadExt, time::timeout};
|
||||
use tokio::time::timeout;
|
||||
|
||||
use crate::{
|
||||
buffers::{ByteBuf, ByteString},
|
||||
clone_to_owned::CloneToOwned,
|
||||
lengths::ChunkInfo,
|
||||
peer_binary_protocol::{
|
||||
serialize_piece_preamble, ExtendedHandshake, ExtendedMessage, Handshake, Message,
|
||||
MessageBorrowed, MessageDeserializeError, MessageOwned, PIECE_MESSAGE_DEFAULT_LEN,
|
||||
extended::{handshake::ExtendedHandshake, ExtendedMessage},
|
||||
serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError,
|
||||
MessageOwned, PIECE_MESSAGE_DEFAULT_LEN,
|
||||
},
|
||||
peer_id::try_decode_peer_id,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -21,7 +21,9 @@ use crate::{
|
|||
clone_to_owned::CloneToOwned,
|
||||
file_ops::FileOps,
|
||||
lengths::{Lengths, ValidPieceIndex},
|
||||
peer_binary_protocol::{Handshake, Message, MessageOwned, Piece, Request},
|
||||
peer_binary_protocol::{
|
||||
extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request,
|
||||
},
|
||||
peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest},
|
||||
peer_state::{InflightRequest, LivePeerState, PeerState},
|
||||
spawn_utils::{spawn, BlockingSpawner},
|
||||
|
|
@ -518,11 +520,7 @@ impl PeerConnectionHandler for PeerHandler {
|
|||
self.state.file_ops().read_chunk(self.addr, chunk, buf)
|
||||
}
|
||||
|
||||
fn on_extended_handshake(
|
||||
&self,
|
||||
extended_handshake: &crate::peer_binary_protocol::ExtendedHandshake<ByteBuf>,
|
||||
) {
|
||||
}
|
||||
fn on_extended_handshake(&self, _: &ExtendedHandshake<ByteBuf>) {}
|
||||
}
|
||||
|
||||
impl PeerHandler {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue