Handshake clone to owned
This commit is contained in:
parent
41fb3bfd37
commit
9c7cf61e1a
4 changed files with 56 additions and 21 deletions
|
|
@ -23,7 +23,7 @@ pub trait PeerConnectionHandler {
|
||||||
fn on_connected(&self, _connection_time: Duration) {}
|
fn on_connected(&self, _connection_time: Duration) {}
|
||||||
fn get_have_bytes(&self) -> u64;
|
fn get_have_bytes(&self) -> u64;
|
||||||
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize>;
|
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize>;
|
||||||
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>;
|
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()>;
|
||||||
fn on_extended_handshake(
|
fn on_extended_handshake(
|
||||||
&self,
|
&self,
|
||||||
extended_handshake: &ExtendedHandshake<ByteBuf>,
|
extended_handshake: &ExtendedHandshake<ByteBuf>,
|
||||||
|
|
@ -120,7 +120,16 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn manage_peer(
|
pub async fn manage_peer_incoming(
|
||||||
|
&self,
|
||||||
|
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
|
||||||
|
handshake: Handshake<ByteString>,
|
||||||
|
socket: tokio::net::TcpSocket,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn manage_peer_outgoing(
|
||||||
&self,
|
&self,
|
||||||
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
|
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ pub(crate) async fn read_metainfo_from_peer(
|
||||||
);
|
);
|
||||||
|
|
||||||
let result_reader = async move { result_rx.await? };
|
let result_reader = async move { result_rx.await? };
|
||||||
let connection_runner = async move { connection.manage_peer(writer_rx).await };
|
let connection_runner = async move { connection.manage_peer_outgoing(writer_rx).await };
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = result_reader => result,
|
result = result_reader => result,
|
||||||
|
|
@ -145,7 +145,7 @@ impl PeerConnectionHandler for Handler {
|
||||||
Ok(0)
|
Ok(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> {
|
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()> {
|
||||||
if !handshake.supports_extended() {
|
if !handshake.supports_extended() {
|
||||||
anyhow::bail!("this peer does not support extended handshaking, which is a prerequisite to download metadata")
|
anyhow::bail!("this peer does not support extended handshaking, which is a prerequisite to download metadata")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -404,7 +404,7 @@ impl TorrentStateLive {
|
||||||
.fetch_add(1, Ordering::Relaxed);
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
let res = tokio::select! {
|
let res = tokio::select! {
|
||||||
r = requester => {r}
|
r = requester => {r}
|
||||||
r = peer_connection.manage_peer(rx) => {r}
|
r = peer_connection.manage_peer_outgoing(rx) => {r}
|
||||||
};
|
};
|
||||||
|
|
||||||
handler.state.peer_semaphore.add_permits(1);
|
handler.state.peer_semaphore.add_permits(1);
|
||||||
|
|
@ -502,7 +502,7 @@ impl TorrentStateLive {
|
||||||
matches!(self.get_next_needed_piece(handle), Ok(Some(_)))
|
matches!(self.get_next_needed_piece(handle), Ok(Some(_)))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
|
fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
|
||||||
let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| {
|
let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| {
|
||||||
p.state
|
p.state
|
||||||
.connecting_to_live(Id20(h.peer_id), &self.peers.stats)
|
.connecting_to_live(Id20(h.peer_id), &self.peers.stats)
|
||||||
|
|
@ -771,7 +771,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
Ok(len)
|
Ok(len)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> {
|
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()> {
|
||||||
self.state.set_peer_live(self.addr, handshake);
|
self.state.set_peer_live(self.addr, handshake);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@
|
||||||
pub mod extended;
|
pub mod extended;
|
||||||
|
|
||||||
use bincode::Options;
|
use bincode::Options;
|
||||||
use buffers::{ByteBuf, ByteString};
|
use buffers::{ByteBuf, ByteBufT, ByteString};
|
||||||
use byteorder::{ByteOrder, BE};
|
use byteorder::{ByteOrder, BE};
|
||||||
use clone_to_owned::CloneToOwned;
|
use clone_to_owned::CloneToOwned;
|
||||||
use librqbit_core::{constants::CHUNK_SIZE, id20::Id20, lengths::ChunkInfo};
|
use librqbit_core::{constants::CHUNK_SIZE, id20::Id20, lengths::ChunkInfo};
|
||||||
|
|
@ -472,8 +472,8 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub struct Handshake<'a> {
|
pub struct Handshake<ByteBuf> {
|
||||||
pub pstr: &'a str,
|
pub pstr: ByteBuf,
|
||||||
pub reserved: [u8; 8],
|
pub reserved: [u8; 8],
|
||||||
pub info_hash: [u8; 20],
|
pub info_hash: [u8; 20],
|
||||||
pub peer_id: [u8; 20],
|
pub peer_id: [u8; 20],
|
||||||
|
|
@ -485,8 +485,8 @@ fn bopts() -> impl bincode::Options {
|
||||||
.with_big_endian()
|
.with_big_endian()
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Handshake<'a> {
|
impl Handshake<ByteBuf<'static>> {
|
||||||
pub fn new(info_hash: Id20, peer_id: Id20) -> Handshake<'static> {
|
pub fn new(info_hash: Id20, peer_id: Id20) -> Handshake<ByteBuf<'static>> {
|
||||||
debug_assert_eq!(PSTR_BT1.len(), 19);
|
debug_assert_eq!(PSTR_BT1.len(), 19);
|
||||||
|
|
||||||
let mut reserved: u64 = 0;
|
let mut reserved: u64 = 0;
|
||||||
|
|
@ -496,19 +496,16 @@ impl<'a> Handshake<'a> {
|
||||||
BE::write_u64(&mut reserved_arr, reserved);
|
BE::write_u64(&mut reserved_arr, reserved);
|
||||||
|
|
||||||
Handshake {
|
Handshake {
|
||||||
pstr: PSTR_BT1,
|
pstr: ByteBuf(PSTR_BT1.as_bytes()),
|
||||||
reserved: reserved_arr,
|
reserved: reserved_arr,
|
||||||
info_hash: info_hash.0,
|
info_hash: info_hash.0,
|
||||||
peer_id: peer_id.0,
|
peer_id: peer_id.0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub fn supports_extended(&self) -> bool {
|
|
||||||
self.reserved[5] & 0x10 > 0
|
pub fn deserialize(
|
||||||
}
|
b: &[u8],
|
||||||
fn bopts() -> impl bincode::Options {
|
) -> Result<(Handshake<ByteBuf<'_>>, usize), MessageDeserializeError> {
|
||||||
bincode::DefaultOptions::new()
|
|
||||||
}
|
|
||||||
pub fn deserialize(b: &[u8]) -> Result<(Handshake<'_>, usize), MessageDeserializeError> {
|
|
||||||
let pstr_len = *b
|
let pstr_len = *b
|
||||||
.first()
|
.first()
|
||||||
.ok_or(MessageDeserializeError::NotEnoughData(1, "handshake"))?;
|
.ok_or(MessageDeserializeError::NotEnoughData(1, "handshake"))?;
|
||||||
|
|
@ -526,11 +523,40 @@ impl<'a> Handshake<'a> {
|
||||||
expected_len,
|
expected_len,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
pub fn serialize(&self, buf: &mut Vec<u8>) {
|
}
|
||||||
|
|
||||||
|
impl<B> Handshake<B> {
|
||||||
|
pub fn supports_extended(&self) -> bool {
|
||||||
|
self.reserved[5] & 0x10 > 0
|
||||||
|
}
|
||||||
|
fn bopts() -> impl bincode::Options {
|
||||||
|
bincode::DefaultOptions::new()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn serialize(&self, buf: &mut Vec<u8>)
|
||||||
|
where
|
||||||
|
B: Serialize,
|
||||||
|
{
|
||||||
Self::bopts().serialize_into(buf, &self).unwrap()
|
Self::bopts().serialize_into(buf, &self).unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<B> CloneToOwned for Handshake<B>
|
||||||
|
where
|
||||||
|
B: CloneToOwned,
|
||||||
|
{
|
||||||
|
type Target = Handshake<<B as CloneToOwned>::Target>;
|
||||||
|
|
||||||
|
fn clone_to_owned(&self) -> Self::Target {
|
||||||
|
Handshake {
|
||||||
|
pstr: self.pstr.clone_to_owned(),
|
||||||
|
reserved: self.reserved,
|
||||||
|
info_hash: self.info_hash,
|
||||||
|
peer_id: self.peer_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
|
||||||
pub struct Request {
|
pub struct Request {
|
||||||
pub index: u32,
|
pub index: u32,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue