From c7ed475f54594b4fd134f9f29f2014fbff28b023 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Wed, 14 Aug 2024 12:08:46 +0100 Subject: [PATCH] Use bytes crate for zerocopy and memory re-use (#182) * Use bytes. Not yet zerocopy everywhere but compiles * Actually zerocopy * Actually zerocopy * Not actually storing the torrent on disk now --- Cargo.lock | 8 ++ crates/bencode/Cargo.toml | 1 + crates/bencode/src/bencode_value.rs | 9 +- crates/buffers/Cargo.toml | 1 + crates/buffers/src/lib.rs | 34 ++++++-- crates/clone_to_owned/Cargo.toml | 1 + crates/clone_to_owned/src/lib.rs | 24 ++++-- crates/dht/Cargo.toml | 1 + crates/dht/src/bprotocol.rs | 5 +- crates/librqbit/src/peer_connection.rs | 2 +- crates/librqbit/src/peer_info_reader/mod.rs | 12 ++- crates/librqbit/src/session.rs | 86 +++++++++++++------ crates/librqbit/src/torrent_state/live/mod.rs | 6 +- crates/librqbit_core/Cargo.toml | 1 + crates/librqbit_core/src/torrent_metainfo.rs | 36 ++++---- crates/peer_binary_protocol/Cargo.toml | 1 + .../src/extended/handshake.rs | 11 +-- .../peer_binary_protocol/src/extended/mod.rs | 13 ++- .../src/extended/ut_metadata.rs | 8 +- crates/peer_binary_protocol/src/lib.rs | 17 ++-- 20 files changed, 182 insertions(+), 95 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8d90173..09eec36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1363,6 +1363,7 @@ name = "librqbit-bencode" version = "2.2.3" dependencies = [ "anyhow", + "bytes", "librqbit-buffers", "librqbit-clone-to-owned", "librqbit-sha1-wrapper", @@ -1373,6 +1374,7 @@ dependencies = [ name = "librqbit-buffers" version = "3.0.1" dependencies = [ + "bytes", "librqbit-clone-to-owned", "serde", ] @@ -1380,12 +1382,16 @@ dependencies = [ [[package]] name = "librqbit-clone-to-owned" version = "2.2.1" +dependencies = [ + "bytes", +] [[package]] name = "librqbit-core" version = "3.9.0" dependencies = [ "anyhow", + "bytes", "data-encoding", "directories", "hex 0.4.3", @@ -1409,6 +1415,7 @@ version = "5.0.4" dependencies = [ "anyhow", "backoff", + "bytes", "chrono", "dashmap", "futures", @@ -1437,6 +1444,7 @@ dependencies = [ "bincode", "bitvec", "byteorder", + "bytes", "librqbit-bencode", "librqbit-buffers", "librqbit-clone-to-owned", diff --git a/crates/bencode/Cargo.toml b/crates/bencode/Cargo.toml index c633e9d..f8674ad 100644 --- a/crates/bencode/Cargo.toml +++ b/crates/bencode/Cargo.toml @@ -16,3 +16,4 @@ buffers = { path = "../buffers", package = "librqbit-buffers", version = "3.0.1" clone_to_owned = { path = "../clone_to_owned", package = "librqbit-clone-to-owned", version = "2.2.1" } anyhow = "1" sha1w = { path = "../sha1w", default-features = false, package = "librqbit-sha1-wrapper", version = "3.0.0" } +bytes = "1.7.1" diff --git a/crates/bencode/src/bencode_value.rs b/crates/bencode/src/bencode_value.rs index f507447..7e0ab98 100644 --- a/crates/bencode/src/bencode_value.rs +++ b/crates/bencode/src/bencode_value.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, marker::PhantomData}; use buffers::{ByteBuf, ByteBufOwned}; +use bytes::Bytes; use clone_to_owned::CloneToOwned; use serde::Deserializer; @@ -122,12 +123,12 @@ where { type Target = BencodeValue<::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { match self { - BencodeValue::Bytes(b) => BencodeValue::Bytes(b.clone_to_owned()), + BencodeValue::Bytes(b) => BencodeValue::Bytes(b.clone_to_owned(within_buffer)), BencodeValue::Integer(i) => BencodeValue::Integer(*i), - BencodeValue::List(l) => BencodeValue::List(l.clone_to_owned()), - BencodeValue::Dict(d) => BencodeValue::Dict(d.clone_to_owned()), + BencodeValue::List(l) => BencodeValue::List(l.clone_to_owned(within_buffer)), + BencodeValue::Dict(d) => BencodeValue::Dict(d.clone_to_owned(within_buffer)), } } } diff --git a/crates/buffers/Cargo.toml b/crates/buffers/Cargo.toml index f8289ce..5bdcdd3 100644 --- a/crates/buffers/Cargo.toml +++ b/crates/buffers/Cargo.toml @@ -12,3 +12,4 @@ readme = "README.md" [dependencies] serde = { version = "1", features = ["derive"] } clone_to_owned = { path = "../clone_to_owned", package = "librqbit-clone-to-owned", version = "2.2.1" } +bytes = "1" diff --git a/crates/buffers/src/lib.rs b/crates/buffers/src/lib.rs index 51385a7..2561678 100644 --- a/crates/buffers/src/lib.rs +++ b/crates/buffers/src/lib.rs @@ -3,12 +3,13 @@ // // Not useful outside of librqbit. +use bytes::Bytes; use serde::{Deserialize, Deserializer}; use clone_to_owned::CloneToOwned; #[derive(Default, PartialEq, Eq, Hash, Clone, PartialOrd, Ord)] -pub struct ByteBufOwned(pub Box<[u8]>); +pub struct ByteBufOwned(pub bytes::Bytes); #[derive(Default, Deserialize, PartialEq, Eq, Hash, Clone, PartialOrd, Ord)] #[serde(transparent)] @@ -90,15 +91,30 @@ impl std::fmt::Display for ByteBufOwned { impl<'a> CloneToOwned for ByteBuf<'a> { type Target = ByteBufOwned; - fn clone_to_owned(&self) -> Self::Target { - ByteBufOwned(self.as_slice().to_owned().into_boxed_slice()) + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { + // Try zero-copy from the provided buffer. + if let Some(within_buffer) = within_buffer { + let haystack = within_buffer.as_ptr() as usize; + let haystack_end = haystack + within_buffer.len(); + let needle = self.0.as_ptr() as usize; + let needle_end = needle + self.0.len(); + + if needle >= haystack && needle_end <= haystack_end { + return ByteBufOwned(within_buffer.slice_ref(self.0.as_ref())); + } else { + #[cfg(debug_assertions)] + panic!("bug: broken buffers! not inside within_buffer"); + } + } + + ByteBufOwned(Bytes::copy_from_slice(self.0)) } } impl CloneToOwned for ByteBufOwned { type Target = ByteBufOwned; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, _within_buffer: Option<&Bytes>) -> Self::Target { ByteBufOwned(self.0.clone()) } } @@ -139,13 +155,19 @@ impl<'a> From<&'a [u8]> for ByteBuf<'a> { impl<'a> From<&'a [u8]> for ByteBufOwned { fn from(b: &'a [u8]) -> Self { - Self(b.into()) + Self(b.to_owned().into()) } } impl From> for ByteBufOwned { fn from(b: Vec) -> Self { - Self(b.into_boxed_slice()) + Self(b.into()) + } +} + +impl From for ByteBufOwned { + fn from(b: Bytes) -> Self { + Self(b) } } diff --git a/crates/clone_to_owned/Cargo.toml b/crates/clone_to_owned/Cargo.toml index 7af8c4b..c180c4f 100644 --- a/crates/clone_to_owned/Cargo.toml +++ b/crates/clone_to_owned/Cargo.toml @@ -10,3 +10,4 @@ readme = "README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bytes = "1.7.1" diff --git a/crates/clone_to_owned/src/lib.rs b/crates/clone_to_owned/src/lib.rs index 561033f..0d15970 100644 --- a/crates/clone_to_owned/src/lib.rs +++ b/crates/clone_to_owned/src/lib.rs @@ -6,12 +6,13 @@ // This lets us express types like TorrentMetaInfo<&[u8]> for zero-copy metadata about a bencode buffer in memory, // but to have one-line conversion for it into TorrentMetaInfo> so that we can store it later somewhere. +use bytes::Bytes; use std::collections::HashMap; pub trait CloneToOwned { type Target; - fn clone_to_owned(&self) -> Self::Target; + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target; } impl CloneToOwned for Option @@ -20,8 +21,8 @@ where { type Target = Option<::Target>; - fn clone_to_owned(&self) -> Self::Target { - self.as_ref().map(|i| i.clone_to_owned()) + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { + self.as_ref().map(|i| i.clone_to_owned(within_buffer)) } } @@ -31,15 +32,17 @@ where { type Target = Vec<::Target>; - fn clone_to_owned(&self) -> Self::Target { - self.iter().map(|i| i.clone_to_owned()).collect() + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { + self.iter() + .map(|i| i.clone_to_owned(within_buffer)) + .collect() } } impl CloneToOwned for u8 { type Target = u8; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, _within_buffer: Option<&Bytes>) -> Self::Target { *self } } @@ -47,7 +50,7 @@ impl CloneToOwned for u8 { impl CloneToOwned for u32 { type Target = u32; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, _within_buffer: Option<&Bytes>) -> Self::Target { *self } } @@ -60,10 +63,13 @@ where { type Target = HashMap<::Target, ::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { let mut result = HashMap::with_capacity(self.capacity()); for (k, v) in self { - result.insert(k.clone_to_owned(), v.clone_to_owned()); + result.insert( + k.clone_to_owned(within_buffer), + v.clone_to_owned(within_buffer), + ); } result } diff --git a/crates/dht/Cargo.toml b/crates/dht/Cargo.toml index 8709748..99291a8 100644 --- a/crates/dht/Cargo.toml +++ b/crates/dht/Cargo.toml @@ -35,6 +35,7 @@ clone_to_owned = { path = "../clone_to_owned", package = "librqbit-clone-to-owne librqbit-core = { path = "../librqbit_core", version = "3.9.0" } chrono = { version = "0.4.31", features = ["serde"] } tokio-util = "0.7.10" +bytes = "1.7.1" [dev-dependencies] tracing-subscriber = "0.3" diff --git a/crates/dht/src/bprotocol.rs b/crates/dht/src/bprotocol.rs index 49ed116..4395f82 100644 --- a/crates/dht/src/bprotocol.rs +++ b/crates/dht/src/bprotocol.rs @@ -5,6 +5,7 @@ use std::{ }; use bencode::{ByteBuf, ByteBufOwned}; +use bytes::Bytes; use clone_to_owned::CloneToOwned; use librqbit_core::hash_id::Id20; use serde::{ @@ -73,10 +74,10 @@ where { type Target = ErrorDescription<::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { ErrorDescription { code: self.code, - description: self.description.clone_to_owned(), + description: self.description.clone_to_owned(within_buffer), } } } diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 3409a54..0091f34 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -382,7 +382,7 @@ impl PeerConnection { trace!("received: {:?}", &message); if let Message::Extended(ExtendedMessage::Handshake(h)) = &message { - *extended_handshake_ref.write() = Some(h.clone_to_owned()); + *extended_handshake_ref.write() = Some(h.clone_to_owned(None)); self.handler.on_extended_handshake(h)?; trace!("remembered extended handshake for future serializing"); } else { diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index aff3e44..9815f5e 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -2,6 +2,7 @@ use std::{net::SocketAddr, sync::Arc}; use bencode::from_bytes; use buffers::{ByteBuf, ByteBufOwned}; +use bytes::Bytes; use librqbit_core::{ constants::CHUNK_SIZE, hash_id::Id20, @@ -178,9 +179,14 @@ impl PeerConnectionHandler for Handler { .unwrap() .record_piece(piece, &data, self.info_hash)?; if piece_ready { - let buf = self.locked.write().take().unwrap().buffer; - let info = from_bytes::>(&buf); - let info = info.map(|i| (i, ByteBufOwned(buf.into_boxed_slice()))); + let buf = Bytes::from(self.locked.write().take().unwrap().buffer); + let info = from_bytes::>(&buf) + .map(|i| { + use clone_to_owned::CloneToOwned; + i.clone_to_owned(Some(&buf)) + }) + .map(|i| (i, ByteBufOwned(buf))); + self.result_tx .lock() .take() diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index f6e27d0..29f1c3d 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -45,7 +45,8 @@ use librqbit_core::{ peer_id::generate_peer_id, spawn_utils::spawn_with_cancel, torrent_metainfo::{ - torrent_from_bytes as bencode_torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned, + torrent_from_bytes as bencode_torrent_from_bytes, TorrentMetaV1Borrowed, TorrentMetaV1Info, + TorrentMetaV1Owned, }, }; use parking_lot::RwLock; @@ -61,7 +62,7 @@ pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"]; pub type TorrentId = usize; -fn torrent_from_bytes(bytes: &[u8]) -> anyhow::Result { +fn torrent_from_bytes(bytes: &[u8]) -> anyhow::Result { debug!( "all fields in torrent: {:#?}", bencode::dyn_from_bytes::(bytes) @@ -120,7 +121,11 @@ impl SessionDatabase { .map(|u| u.to_string()) .collect(), info_hash: torrent.info_hash().as_string(), - torrent_bytes: torrent.info.torrent_bytes.clone(), + // TODO: this could take up too much space / time / resources to write on interval. + // Store this outside the JSON file + // + // torrent_bytes: torrent.info.torrent_bytes.clone(), + torrent_bytes: Bytes::new(), info: torrent.info().info.clone(), only_files: torrent.only_files().clone(), is_paused: torrent @@ -251,8 +256,10 @@ async fn torrent_from_url( .await .with_context(|| format!("error reading response body from {url}"))?; Ok(( - torrent_from_bytes(&b).context("error decoding torrent")?, - b.to_vec().into(), + torrent_from_bytes(&b) + .context("error decoding torrent")? + .clone_to_owned(Some(&b)), + b.into(), )) } @@ -415,7 +422,7 @@ pub fn read_local_file_including_stdin(filename: &str) -> anyhow::Result pub enum AddTorrent<'a> { Url(Cow<'a, str>), TorrentFileBytes(Cow<'a, [u8]>), - TorrentInfo(Box), + TorrentInfo(Box, Bytes), } impl<'a> AddTorrent<'a> { @@ -448,7 +455,7 @@ impl<'a> AddTorrent<'a> { match self { Self::Url(s) => s.into_owned().into_bytes(), Self::TorrentFileBytes(b) => b.into_owned(), - Self::TorrentInfo(_) => unimplemented!(), + Self::TorrentInfo(..) => unimplemented!(), } } } @@ -752,7 +759,7 @@ impl Session { } }; - let handshake = h.clone_to_owned(); + let handshake = h.clone_to_owned(None); return Ok(( live, @@ -877,24 +884,42 @@ impl Session { .into_iter() .map(|t| ByteBufOwned::from(t.into_bytes())) .collect(); - let info = TorrentMetaV1Owned { - announce: trackers.first().cloned(), - announce_list: vec![trackers], - info: storrent.info, - comment: None, - created_by: None, - encoding: None, - publisher: None, - publisher_url: None, - creation_date: None, - info_hash: Id20::from_str(&storrent.info_hash)?, + + let torrent_bytes = storrent.torrent_bytes; + + let info = if !torrent_bytes.is_empty() { + torrent_from_bytes(&torrent_bytes) + .map(|t| t.clone_to_owned(Some(&torrent_bytes))) + .ok() + } else { + None }; + let info = match info { + Some(info) => info, + None => { + let info_hash = Id20::from_str(&storrent.info_hash)?; + debug!(?info_hash, "torrent added before 6.1.0, need to readd"); + TorrentMetaV1Owned { + announce: trackers.first().cloned(), + announce_list: vec![trackers], + info: storrent.info, + comment: None, + created_by: None, + encoding: None, + publisher: None, + publisher_url: None, + creation_date: None, + info_hash, + } + } + }; + futures.push({ let session = self.clone(); async move { session .add_torrent( - AddTorrent::TorrentInfo(Box::new(info)), + AddTorrent::TorrentInfo(Box::new(info), torrent_bytes), Some(AddTorrentOptions { paused: storrent.is_paused, output_folder: Some( @@ -1041,14 +1066,19 @@ impl Session { url ) } - AddTorrent::TorrentFileBytes(bytes) => ( - torrent_from_bytes(&bytes).context("error decoding torrent")?, - ByteBufOwned::from(bytes.into_owned()), - ), - AddTorrent::TorrentInfo(t) => { - // TODO: this is lossy, as we don't store the bytes. - (*t, ByteBufOwned(Vec::new().into_boxed_slice())) + AddTorrent::TorrentFileBytes(bytes) => { + let bytes = match bytes { + Cow::Borrowed(b) => ::bytes::Bytes::copy_from_slice(b), + Cow::Owned(v) => ::bytes::Bytes::from(v), + }; + ( + torrent_from_bytes(&bytes) + .map(|t| t.clone_to_owned(Some(&bytes))) + .context("error decoding torrent")?, + ByteBufOwned(bytes), + ) } + AddTorrent::TorrentInfo(t, bytes) => (*t, bytes.into()), }; let trackers = torrent @@ -1081,7 +1111,7 @@ impl Session { InternalAddResult { info_hash: torrent.info_hash, info: torrent.info, - torrent_bytes: Bytes::from(bytes.0), + torrent_bytes: bytes.0, trackers, peer_rx, initial_peers: opts diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index bd23503..596a5d1 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -779,7 +779,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { .context("on_download_request")?; } Message::Bitfield(b) => self - .on_bitfield(b.clone_to_owned()) + .on_bitfield(b.clone_to_owned(None)) .context("on_bitfield")?, Message::Choke => self.on_i_am_choked(), Message::Unchoke => self.on_i_am_unchoked(), @@ -1127,7 +1127,7 @@ impl PeerHandler { } self.state .peers - .update_bitfield_from_vec(self.addr, bitfield.0); + .update_bitfield_from_vec(self.addr, bitfield.0.to_vec().into_boxed_slice()); self.on_bitfield_notify.notify_waiters(); Ok(()) } @@ -1480,7 +1480,7 @@ impl PeerHandler { let state = self.state.clone(); let addr = self.addr; let counters = self.counters.clone(); - let piece = piece.clone_to_owned(); + let piece = piece.clone_to_owned(None); let tx = self.tx.clone(); let span = tracing::error_span!("deferred_write"); diff --git a/crates/librqbit_core/Cargo.toml b/crates/librqbit_core/Cargo.toml index 37045d4..89d4d1a 100644 --- a/crates/librqbit_core/Cargo.toml +++ b/crates/librqbit_core/Cargo.toml @@ -26,6 +26,7 @@ itertools = "0.12" directories = "5" tokio-util = "0.7.10" data-encoding = "2.6.0" +bytes = "1.7.1" [dev-dependencies] diff --git a/crates/librqbit_core/src/torrent_metainfo.rs b/crates/librqbit_core/src/torrent_metainfo.rs index 0ca37f6..b1caf55 100644 --- a/crates/librqbit_core/src/torrent_metainfo.rs +++ b/crates/librqbit_core/src/torrent_metainfo.rs @@ -1,11 +1,11 @@ -use std::{iter::once, path::PathBuf}; - use anyhow::Context; use bencode::BencodeDeserializer; use buffers::{ByteBuf, ByteBufOwned}; +use bytes::Bytes; use clone_to_owned::CloneToOwned; use itertools::Either; use serde::{Deserialize, Serialize}; +use std::{iter::once, path::PathBuf}; use crate::{hash_id::Id20, lengths::Lengths}; @@ -274,10 +274,10 @@ where { type Target = TorrentMetaV1File<::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { TorrentMetaV1File { length: self.length, - path: self.path.clone_to_owned(), + path: self.path.clone_to_owned(within_buffer), } } } @@ -288,14 +288,14 @@ where { type Target = TorrentMetaV1Info<::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { TorrentMetaV1Info { - name: self.name.clone_to_owned(), - pieces: self.pieces.clone_to_owned(), + name: self.name.clone_to_owned(within_buffer), + pieces: self.pieces.clone_to_owned(within_buffer), piece_length: self.piece_length, length: self.length, - md5sum: self.md5sum.clone_to_owned(), - files: self.files.clone_to_owned(), + md5sum: self.md5sum.clone_to_owned(within_buffer), + files: self.files.clone_to_owned(within_buffer), } } } @@ -306,16 +306,16 @@ where { type Target = TorrentMetaV1<::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { TorrentMetaV1 { - announce: self.announce.clone_to_owned(), - announce_list: self.announce_list.clone_to_owned(), - info: self.info.clone_to_owned(), - comment: self.comment.clone_to_owned(), - created_by: self.created_by.clone_to_owned(), - encoding: self.encoding.clone_to_owned(), - publisher: self.publisher.clone_to_owned(), - publisher_url: self.publisher_url.clone_to_owned(), + announce: self.announce.clone_to_owned(within_buffer), + announce_list: self.announce_list.clone_to_owned(within_buffer), + info: self.info.clone_to_owned(within_buffer), + comment: self.comment.clone_to_owned(within_buffer), + created_by: self.created_by.clone_to_owned(within_buffer), + encoding: self.encoding.clone_to_owned(within_buffer), + publisher: self.publisher.clone_to_owned(within_buffer), + publisher_url: self.publisher_url.clone_to_owned(within_buffer), creation_date: self.creation_date, info_hash: self.info_hash, } diff --git a/crates/peer_binary_protocol/Cargo.toml b/crates/peer_binary_protocol/Cargo.toml index c5e7714..d6de7dd 100644 --- a/crates/peer_binary_protocol/Cargo.toml +++ b/crates/peer_binary_protocol/Cargo.toml @@ -20,3 +20,4 @@ clone_to_owned = { path = "../clone_to_owned", package = "librqbit-clone-to-owne librqbit-core = { path = "../librqbit_core", version = "3.9.0" } bitvec = "1" anyhow = "1" +bytes = "1.7.1" diff --git a/crates/peer_binary_protocol/src/extended/handshake.rs b/crates/peer_binary_protocol/src/extended/handshake.rs index 62f0955..e9a686f 100644 --- a/crates/peer_binary_protocol/src/extended/handshake.rs +++ b/crates/peer_binary_protocol/src/extended/handshake.rs @@ -6,6 +6,7 @@ use std::{ use buffers::ByteBuf; use byteorder::ByteOrder; use byteorder::BE; +use bytes::Bytes; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Deserializer, Serialize}; @@ -75,14 +76,14 @@ where { type Target = ExtendedHandshake<::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { ExtendedHandshake { - m: self.m.clone_to_owned(), + m: self.m.clone_to_owned(within_buffer), p: self.p, - v: self.v.clone_to_owned(), + v: self.v.clone_to_owned(within_buffer), yourip: self.yourip, - ipv6: self.ipv6.clone_to_owned(), - ipv4: self.ipv4.clone_to_owned(), + ipv6: self.ipv6.clone_to_owned(within_buffer), + ipv4: self.ipv4.clone_to_owned(within_buffer), reqq: self.reqq, metadata_size: self.metadata_size, complete_ago: self.complete_ago, diff --git a/crates/peer_binary_protocol/src/extended/mod.rs b/crates/peer_binary_protocol/src/extended/mod.rs index 9ad8f32..2ac4607 100644 --- a/crates/peer_binary_protocol/src/extended/mod.rs +++ b/crates/peer_binary_protocol/src/extended/mod.rs @@ -1,6 +1,7 @@ use bencode::bencode_serialize_to_writer; use bencode::from_bytes; use bencode::BencodeValue; +use bytes::Bytes; use clone_to_owned::CloneToOwned; use serde::{Deserialize, Serialize}; @@ -27,11 +28,15 @@ where { type Target = ExtendedMessage<::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { match self { - ExtendedMessage::Handshake(h) => ExtendedMessage::Handshake(h.clone_to_owned()), - ExtendedMessage::Dyn(u, d) => ExtendedMessage::Dyn(*u, d.clone_to_owned()), - ExtendedMessage::UtMetadata(m) => ExtendedMessage::UtMetadata(m.clone_to_owned()), + ExtendedMessage::Handshake(h) => { + ExtendedMessage::Handshake(h.clone_to_owned(within_buffer)) + } + ExtendedMessage::Dyn(u, d) => ExtendedMessage::Dyn(*u, d.clone_to_owned(within_buffer)), + ExtendedMessage::UtMetadata(m) => { + ExtendedMessage::UtMetadata(m.clone_to_owned(within_buffer)) + } } } } diff --git a/crates/peer_binary_protocol/src/extended/ut_metadata.rs b/crates/peer_binary_protocol/src/extended/ut_metadata.rs index 56d7924..ac7e58b 100644 --- a/crates/peer_binary_protocol/src/extended/ut_metadata.rs +++ b/crates/peer_binary_protocol/src/extended/ut_metadata.rs @@ -1,10 +1,10 @@ -use std::io::Write; - use bencode::bencode_serialize_to_writer; use bencode::BencodeDeserializer; +use bytes::Bytes; use clone_to_owned::CloneToOwned; use serde::Deserialize; use serde::Serialize; +use std::io::Write; use crate::MessageDeserializeError; @@ -22,7 +22,7 @@ pub enum UtMetadata { impl CloneToOwned for UtMetadata { type Target = UtMetadata<::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { match self { UtMetadata::Request(req) => UtMetadata::Request(*req), UtMetadata::Data { @@ -32,7 +32,7 @@ impl CloneToOwned for UtMetadata { } => UtMetadata::Data { piece: *piece, total_size: *total_size, - data: data.clone_to_owned(), + data: data.clone_to_owned(within_buffer), }, UtMetadata::Reject(piece) => UtMetadata::Reject(*piece), } diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index 1756d01..b187072 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -7,6 +7,7 @@ pub mod extended; use bincode::Options; use buffers::{ByteBuf, ByteBufOwned}; use byteorder::{ByteOrder, BE}; +use bytes::Bytes; use clone_to_owned::CloneToOwned; use librqbit_core::{constants::CHUNK_SIZE, hash_id::Id20, lengths::ChunkInfo}; use serde::{Deserialize, Serialize}; @@ -84,11 +85,11 @@ pub struct Piece { impl CloneToOwned for Piece { type Target = Piece; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { Piece { index: self.index, begin: self.begin, - block: self.block.clone_to_owned(), + block: self.block.clone_to_owned(within_buffer), } } } @@ -211,23 +212,23 @@ where { type Target = Message<::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { match self { Message::Request(req) => Message::Request(*req), Message::Cancel(req) => Message::Cancel(*req), - Message::Bitfield(b) => Message::Bitfield(b.clone_to_owned()), + Message::Bitfield(b) => Message::Bitfield(b.clone_to_owned(within_buffer)), Message::Choke => Message::Choke, Message::Unchoke => Message::Unchoke, Message::Interested => Message::Interested, Message::Piece(piece) => Message::Piece(Piece { index: piece.index, begin: piece.begin, - block: piece.block.clone_to_owned(), + block: piece.block.clone_to_owned(within_buffer), }), Message::KeepAlive => Message::KeepAlive, Message::Have(v) => Message::Have(*v), Message::NotInterested => Message::NotInterested, - Message::Extended(e) => Message::Extended(e.clone_to_owned()), + Message::Extended(e) => Message::Extended(e.clone_to_owned(within_buffer)), } } } @@ -585,9 +586,9 @@ where { type Target = Handshake<::Target>; - fn clone_to_owned(&self) -> Self::Target { + fn clone_to_owned(&self, within_buffer: Option<&Bytes>) -> Self::Target { Handshake { - pstr: self.pstr.clone_to_owned(), + pstr: self.pstr.clone_to_owned(within_buffer), reserved: self.reserved, info_hash: self.info_hash, peer_id: self.peer_id,