Peer info reader done
This commit is contained in:
parent
46e87a9b80
commit
897517521e
7 changed files with 262 additions and 144 deletions
|
|
@ -3,7 +3,6 @@ name = "librqbit"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
authors = ["Igor Katson <igor.katson@gmail.com>"]
|
authors = ["Igor Katson <igor.katson@gmail.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
default-features = ["sha1-system"]
|
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["sha1-openssl"]
|
default = ["sha1-openssl"]
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ pub mod peer_binary_protocol;
|
||||||
pub mod peer_connection;
|
pub mod peer_connection;
|
||||||
pub mod peer_handler;
|
pub mod peer_handler;
|
||||||
pub mod peer_id;
|
pub mod peer_id;
|
||||||
|
pub mod peer_info_reader;
|
||||||
pub mod peer_state;
|
pub mod peer_state;
|
||||||
pub mod serde_bencode_de;
|
pub mod serde_bencode_de;
|
||||||
pub mod serde_bencode_ser;
|
pub mod serde_bencode_ser;
|
||||||
|
|
|
||||||
|
|
@ -548,38 +548,6 @@ impl Request {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::{
|
|
||||||
fs::File,
|
|
||||||
io::{Read, Write},
|
|
||||||
net::SocketAddr,
|
|
||||||
str::FromStr,
|
|
||||||
};
|
|
||||||
|
|
||||||
use log::info;
|
|
||||||
use parking_lot::RwLock;
|
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
bencode_value::BencodeValue,
|
|
||||||
lengths::ceil_div_u64,
|
|
||||||
peer_binary_protocol::extended::ut_metadata::UtMetadata,
|
|
||||||
peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest},
|
|
||||||
peer_id::generate_peer_id,
|
|
||||||
};
|
|
||||||
use std::sync::Once;
|
|
||||||
|
|
||||||
static LOG_INIT: Once = std::sync::Once::new();
|
|
||||||
|
|
||||||
fn init_logging() {
|
|
||||||
LOG_INIT.call_once(pretty_env_logger::init)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn decode_info_hash(hash_str: &str) -> [u8; 20] {
|
|
||||||
let mut hash_arr = [0u8; 20];
|
|
||||||
hex::decode_to_slice(hash_str, &mut hash_arr).unwrap();
|
|
||||||
hash_arr
|
|
||||||
}
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
#[test]
|
#[test]
|
||||||
fn test_handshake_serialize() {
|
fn test_handshake_serialize() {
|
||||||
|
|
@ -628,107 +596,4 @@ mod tests {
|
||||||
panic!("resources/test/extended-handshake.bin did not serialize exactly the same. Dumped to /tmp/test_deserialize_serialize_extended_is_same, you can compare with resources/test/extended-handshake.bin")
|
panic!("resources/test/extended-handshake.bin did not serialize exactly the same. Dumped to /tmp/test_deserialize_serialize_extended_is_same, you can compare with resources/test/extended-handshake.bin")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_connect_to_local_qbittorrent() {
|
|
||||||
init_logging();
|
|
||||||
|
|
||||||
struct Handler {
|
|
||||||
ehandshake: RwLock<Option<ExtendedHandshake<ByteString>>>,
|
|
||||||
tx: UnboundedSender<WriterRequest>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PeerConnectionHandler for Handler {
|
|
||||||
fn get_have_bytes(&self) -> u64 {
|
|
||||||
0
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec<u8>) -> Option<usize> {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_handshake(&self, handshake: Handshake) {
|
|
||||||
info!("received handshake: {:?}", handshake)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
|
|
||||||
info!("received message: {:?}", msg);
|
|
||||||
|
|
||||||
if let Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Data {
|
|
||||||
piece,
|
|
||||||
total_size,
|
|
||||||
data,
|
|
||||||
})) = msg
|
|
||||||
{
|
|
||||||
// this just assumes piece come in the order requested.
|
|
||||||
let mut f = std::fs::OpenOptions::new()
|
|
||||||
.create(true)
|
|
||||||
.append(true)
|
|
||||||
.open("/tmp/torrent")
|
|
||||||
.unwrap();
|
|
||||||
f.write_all(&data).unwrap();
|
|
||||||
|
|
||||||
// test if it's the last piece
|
|
||||||
if data.len() < CHUNK_SIZE as usize {
|
|
||||||
let mut buf = Vec::new();
|
|
||||||
let mut f = File::open("/tmp/torrent").unwrap();
|
|
||||||
f.read_to_end(&mut buf).unwrap();
|
|
||||||
|
|
||||||
// let torrent: TorrentMetaV1Borrowed =
|
|
||||||
// crate::torrent_metainfo::torrent_from_bytes(&buf).unwrap();
|
|
||||||
let torrent: BencodeValue<ByteBuf> =
|
|
||||||
crate::bencode_value::dyn_from_bytes(&buf).unwrap();
|
|
||||||
dbg!(torrent);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_uploaded_bytes(&self, _bytes: u32) {}
|
|
||||||
|
|
||||||
fn read_chunk(&self, _chunk: &ChunkInfo, _buf: &mut [u8]) -> anyhow::Result<()> {
|
|
||||||
panic!("dude, why are you requesting chunks")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn on_extended_handshake(&self, extended_handshake: &ExtendedHandshake<ByteBuf>) {
|
|
||||||
self.ehandshake
|
|
||||||
.write()
|
|
||||||
.replace(extended_handshake.clone_to_owned());
|
|
||||||
self.tx
|
|
||||||
.send(WriterRequest::Message(Message::Unchoke))
|
|
||||||
.unwrap();
|
|
||||||
self.tx
|
|
||||||
.send(WriterRequest::Message(Message::Interested))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let total_metadata_chunks = ceil_div_u64(
|
|
||||||
extended_handshake.metadata_size.unwrap() as u64,
|
|
||||||
CHUNK_SIZE as u64,
|
|
||||||
);
|
|
||||||
|
|
||||||
for i in 0..total_metadata_chunks {
|
|
||||||
self.tx
|
|
||||||
.send(WriterRequest::Message(Message::Extended(
|
|
||||||
ExtendedMessage::UtMetadata(UtMetadata::Request(i as u32)),
|
|
||||||
)))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap();
|
|
||||||
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
|
|
||||||
let handler = Handler {
|
|
||||||
tx,
|
|
||||||
ehandshake: RwLock::new(None),
|
|
||||||
};
|
|
||||||
let peer_id = generate_peer_id();
|
|
||||||
let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7");
|
|
||||||
|
|
||||||
let conn = PeerConnection::new(addr, info_hash, peer_id, handler);
|
|
||||||
|
|
||||||
// tx.send(WriterRequest::Message(Message::Extended(ExtendedMessage)));
|
|
||||||
|
|
||||||
conn.manage_peer(rx).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,8 +19,11 @@ use crate::{
|
||||||
pub trait PeerConnectionHandler {
|
pub trait PeerConnectionHandler {
|
||||||
fn get_have_bytes(&self) -> u64;
|
fn get_have_bytes(&self) -> u64;
|
||||||
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> Option<usize>;
|
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> Option<usize>;
|
||||||
fn on_handshake(&self, handshake: Handshake);
|
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>;
|
||||||
fn on_extended_handshake(&self, extended_handshake: &ExtendedHandshake<ByteBuf>);
|
fn on_extended_handshake(
|
||||||
|
&self,
|
||||||
|
extended_handshake: &ExtendedHandshake<ByteBuf>,
|
||||||
|
) -> anyhow::Result<()>;
|
||||||
fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()>;
|
fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()>;
|
||||||
fn on_uploaded_bytes(&self, bytes: u32);
|
fn on_uploaded_bytes(&self, bytes: u32);
|
||||||
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>;
|
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>;
|
||||||
|
|
@ -146,7 +149,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
let mut extended_handshake: Option<ExtendedHandshake<ByteString>> = None;
|
let mut extended_handshake: Option<ExtendedHandshake<ByteString>> = None;
|
||||||
let supports_extended = h.supports_extended();
|
let supports_extended = h.supports_extended();
|
||||||
|
|
||||||
self.handler.on_handshake(h);
|
self.handler.on_handshake(h)?;
|
||||||
if read_so_far > size {
|
if read_so_far > size {
|
||||||
read_buf.copy_within(size..read_so_far, 0);
|
read_buf.copy_within(size..read_so_far, 0);
|
||||||
}
|
}
|
||||||
|
|
@ -165,7 +168,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
match extended {
|
match extended {
|
||||||
Message::Extended(ExtendedMessage::Handshake(h)) => {
|
Message::Extended(ExtendedMessage::Handshake(h)) => {
|
||||||
trace!("received from {}: {:?}", self.addr, &h);
|
trace!("received from {}: {:?}", self.addr, &h);
|
||||||
self.handler.on_extended_handshake(&h);
|
self.handler.on_extended_handshake(&h)?;
|
||||||
extended_handshake = Some(h.clone_to_owned())
|
extended_handshake = Some(h.clone_to_owned())
|
||||||
}
|
}
|
||||||
other => anyhow::bail!("expected extended handshake, but got {:?}", other),
|
other => anyhow::bail!("expected extended handshake, but got {:?}", other),
|
||||||
|
|
|
||||||
247
crates/librqbit/src/peer_info_reader/mod.rs
Normal file
247
crates/librqbit/src/peer_info_reader/mod.rs
Normal file
|
|
@ -0,0 +1,247 @@
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use crate::sha1w::ISha1;
|
||||||
|
use log::debug;
|
||||||
|
use parking_lot::{Mutex, RwLock};
|
||||||
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
buffers::{ByteBuf, ByteString},
|
||||||
|
constants::CHUNK_SIZE,
|
||||||
|
lengths::{ceil_div_u64, last_element_size_u64, ChunkInfo},
|
||||||
|
peer_binary_protocol::{
|
||||||
|
extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage},
|
||||||
|
Handshake, Message,
|
||||||
|
},
|
||||||
|
peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest},
|
||||||
|
serde_bencode_de::from_bytes,
|
||||||
|
torrent_metainfo::TorrentMetaV1Info,
|
||||||
|
type_aliases::Sha1,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub async fn read_metainfo_from_peer(
|
||||||
|
addr: SocketAddr,
|
||||||
|
peer_id: [u8; 20],
|
||||||
|
info_hash: [u8; 20],
|
||||||
|
) -> anyhow::Result<TorrentMetaV1Info<ByteString>> {
|
||||||
|
let (result_tx, result_rx) =
|
||||||
|
tokio::sync::oneshot::channel::<anyhow::Result<TorrentMetaV1Info<ByteString>>>();
|
||||||
|
let (writer_tx, writer_rx) = tokio::sync::mpsc::unbounded_channel::<WriterRequest>();
|
||||||
|
let handler = Handler {
|
||||||
|
addr,
|
||||||
|
info_hash,
|
||||||
|
writer_tx,
|
||||||
|
result_tx: Mutex::new(Some(result_tx)),
|
||||||
|
locked: RwLock::new(None),
|
||||||
|
};
|
||||||
|
let connection = PeerConnection::new(addr, info_hash, peer_id, handler);
|
||||||
|
|
||||||
|
let result_reader = async move { result_rx.await? };
|
||||||
|
let connection_runner = async move { connection.manage_peer(writer_rx).await };
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
result = result_reader => result,
|
||||||
|
whatever = connection_runner => match whatever {
|
||||||
|
Ok(_) => anyhow::bail!("connection runner completed first"),
|
||||||
|
Err(e) => Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct HandlerLocked {
|
||||||
|
metadata_size: u32,
|
||||||
|
total_pieces: usize,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
received_pieces: Vec<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HandlerLocked {
|
||||||
|
fn new(metadata_size: u32) -> anyhow::Result<Self> {
|
||||||
|
if metadata_size > 1024 * 1024 {
|
||||||
|
anyhow::bail!("metadata size {} is too big", metadata_size);
|
||||||
|
}
|
||||||
|
let buffer = vec![0u8; metadata_size as usize];
|
||||||
|
let total_pieces = ceil_div_u64(metadata_size as u64, CHUNK_SIZE as u64);
|
||||||
|
let received_pieces = vec![false; total_pieces as usize];
|
||||||
|
Ok(Self {
|
||||||
|
metadata_size,
|
||||||
|
received_pieces,
|
||||||
|
buffer,
|
||||||
|
total_pieces: total_pieces as usize,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
fn piece_size(&self, index: u32) -> usize {
|
||||||
|
if index as usize == self.total_pieces - 1 {
|
||||||
|
last_element_size_u64(self.metadata_size as u64, CHUNK_SIZE as u64) as usize
|
||||||
|
} else {
|
||||||
|
CHUNK_SIZE as usize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn record_piece(
|
||||||
|
&mut self,
|
||||||
|
index: u32,
|
||||||
|
data: &[u8],
|
||||||
|
info_hash: [u8; 20],
|
||||||
|
) -> anyhow::Result<bool> {
|
||||||
|
if index as usize >= self.total_pieces {
|
||||||
|
anyhow::bail!("wrong index");
|
||||||
|
}
|
||||||
|
let offset = (index * CHUNK_SIZE) as usize;
|
||||||
|
let size = self.piece_size(index);
|
||||||
|
if data.len() != size {
|
||||||
|
anyhow::bail!(
|
||||||
|
"expected length of piece {} to be {}, but got {}",
|
||||||
|
index,
|
||||||
|
size,
|
||||||
|
data.len()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if self.received_pieces[index as usize] {
|
||||||
|
anyhow::bail!("already received piece {}", index);
|
||||||
|
}
|
||||||
|
let offset_end = offset + size;
|
||||||
|
(&mut self.buffer[offset..offset_end]).copy_from_slice(data);
|
||||||
|
self.received_pieces[index as usize] = true;
|
||||||
|
|
||||||
|
if self.received_pieces.iter().all(|p| *p) {
|
||||||
|
// check metadata
|
||||||
|
let mut hash = Sha1::new();
|
||||||
|
hash.update(&self.buffer);
|
||||||
|
if hash.finish() != info_hash {
|
||||||
|
anyhow::bail!("info checksum invalid");
|
||||||
|
}
|
||||||
|
Ok(true)
|
||||||
|
} else {
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Handler {
|
||||||
|
addr: SocketAddr,
|
||||||
|
info_hash: [u8; 20],
|
||||||
|
writer_tx: UnboundedSender<WriterRequest>,
|
||||||
|
result_tx:
|
||||||
|
Mutex<Option<tokio::sync::oneshot::Sender<anyhow::Result<TorrentMetaV1Info<ByteString>>>>>,
|
||||||
|
locked: RwLock<Option<HandlerLocked>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PeerConnectionHandler for Handler {
|
||||||
|
fn get_have_bytes(&self) -> u64 {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
|
||||||
|
fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec<u8>) -> Option<usize> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> {
|
||||||
|
if !handshake.supports_extended() {
|
||||||
|
anyhow::bail!("this peer does not support extended handshaking, which is a prerequisite to download metadata")
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
|
||||||
|
debug!("{}: received message: {:?}", self.addr, msg);
|
||||||
|
|
||||||
|
if let Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Data {
|
||||||
|
piece,
|
||||||
|
total_size: _,
|
||||||
|
data,
|
||||||
|
})) = msg
|
||||||
|
{
|
||||||
|
let piece_ready =
|
||||||
|
self.locked
|
||||||
|
.write()
|
||||||
|
.as_mut()
|
||||||
|
.unwrap()
|
||||||
|
.record_piece(piece, &data, self.info_hash)?;
|
||||||
|
if piece_ready {
|
||||||
|
let buf = self.locked.write().take().unwrap().buffer;
|
||||||
|
let info = from_bytes::<TorrentMetaV1Info<ByteString>>(&buf);
|
||||||
|
self.result_tx
|
||||||
|
.lock()
|
||||||
|
.take()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("oneshot is consumed"))?
|
||||||
|
.send(info)
|
||||||
|
.map_err(|_| {
|
||||||
|
anyhow::anyhow!("torrent info deserialized, but consumer closed")
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_uploaded_bytes(&self, _bytes: u32) {}
|
||||||
|
|
||||||
|
fn read_chunk(&self, _chunk: &ChunkInfo, _buf: &mut [u8]) -> anyhow::Result<()> {
|
||||||
|
anyhow::bail!("the peer is not supposed to be requesting chunks")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_extended_handshake(
|
||||||
|
&self,
|
||||||
|
extended_handshake: &ExtendedHandshake<ByteBuf>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let metadata_size = match extended_handshake.metadata_size {
|
||||||
|
Some(metadata_size) => metadata_size,
|
||||||
|
None => anyhow::bail!("peer does not have metadata_size"),
|
||||||
|
};
|
||||||
|
|
||||||
|
if extended_handshake.get_msgid(b"ut_metadata").is_none() {
|
||||||
|
anyhow::bail!("peer does not support ut_metadata");
|
||||||
|
}
|
||||||
|
|
||||||
|
self.writer_tx
|
||||||
|
.send(WriterRequest::Message(Message::Unchoke))?;
|
||||||
|
self.writer_tx
|
||||||
|
.send(WriterRequest::Message(Message::Interested))?;
|
||||||
|
|
||||||
|
let inner = HandlerLocked::new(metadata_size)?;
|
||||||
|
let total_pieces = inner.total_pieces;
|
||||||
|
|
||||||
|
self.locked.write().replace(inner);
|
||||||
|
|
||||||
|
for i in 0..total_pieces {
|
||||||
|
self.writer_tx
|
||||||
|
.send(WriterRequest::Message(Message::Extended(
|
||||||
|
ExtendedMessage::UtMetadata(UtMetadata::Request(i as u32)),
|
||||||
|
)))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use std::{net::SocketAddr, str::FromStr, sync::Once};
|
||||||
|
|
||||||
|
use crate::peer_id::generate_peer_id;
|
||||||
|
|
||||||
|
use super::read_metainfo_from_peer;
|
||||||
|
|
||||||
|
static LOG_INIT: Once = std::sync::Once::new();
|
||||||
|
|
||||||
|
fn init_logging() {
|
||||||
|
LOG_INIT.call_once(pretty_env_logger::init)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode_info_hash(hash_str: &str) -> [u8; 20] {
|
||||||
|
let mut hash_arr = [0u8; 20];
|
||||||
|
hex::decode_to_slice(hash_str, &mut hash_arr).unwrap();
|
||||||
|
hash_arr
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_get_torrent_metadata_from_localhost_bittorrent_client() {
|
||||||
|
init_logging();
|
||||||
|
|
||||||
|
let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap();
|
||||||
|
let peer_id = generate_peer_id();
|
||||||
|
let info_hash = decode_info_hash("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7");
|
||||||
|
dbg!(read_metainfo_from_peer(addr, peer_id, info_hash)
|
||||||
|
.await
|
||||||
|
.unwrap());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -245,7 +245,7 @@ impl<'ser, W: std::io::Write> serde::ser::SerializeStruct for SerializeStruct<'s
|
||||||
fn end(self) -> Result<Self::Ok, Self::Error> {
|
fn end(self) -> Result<Self::Ok, Self::Error> {
|
||||||
for (key, value) in self.tmp {
|
for (key, value) in self.tmp {
|
||||||
self.ser.write_bytes(key.as_bytes())?;
|
self.ser.write_bytes(key.as_bytes())?;
|
||||||
self.ser.write_raw(&dbg!(value))?;
|
self.ser.write_raw(&value)?;
|
||||||
}
|
}
|
||||||
self.ser.write_byte(b'e')
|
self.ser.write_byte(b'e')
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -505,8 +505,9 @@ impl PeerConnectionHandler for PeerHandler {
|
||||||
Some(len)
|
Some(len)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_handshake(&self, handshake: Handshake) {
|
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> {
|
||||||
self.state.set_peer_live(self.addr, handshake)
|
self.state.set_peer_live(self.addr, handshake);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_uploaded_bytes(&self, bytes: u32) {
|
fn on_uploaded_bytes(&self, bytes: u32) {
|
||||||
|
|
@ -520,7 +521,9 @@ impl PeerConnectionHandler for PeerHandler {
|
||||||
self.state.file_ops().read_chunk(self.addr, chunk, buf)
|
self.state.file_ops().read_chunk(self.addr, chunk, buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_extended_handshake(&self, _: &ExtendedHandshake<ByteBuf>) {}
|
fn on_extended_handshake(&self, _: &ExtendedHandshake<ByteBuf>) -> anyhow::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PeerHandler {
|
impl PeerHandler {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue