Use bytes crate for zerocopy and memory re-use (#182)
* Use bytes. Not yet zerocopy everywhere but compiles * Actually zerocopy * Actually zerocopy * Not actually storing the torrent on disk now
This commit is contained in:
parent
3cc9e444b1
commit
c7ed475f54
20 changed files with 182 additions and 95 deletions
|
|
@ -382,7 +382,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
|||
trace!("received: {:?}", &message);
|
||||
|
||||
if let Message::Extended(ExtendedMessage::Handshake(h)) = &message {
|
||||
*extended_handshake_ref.write() = Some(h.clone_to_owned());
|
||||
*extended_handshake_ref.write() = Some(h.clone_to_owned(None));
|
||||
self.handler.on_extended_handshake(h)?;
|
||||
trace!("remembered extended handshake for future serializing");
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use std::{net::SocketAddr, sync::Arc};
|
|||
|
||||
use bencode::from_bytes;
|
||||
use buffers::{ByteBuf, ByteBufOwned};
|
||||
use bytes::Bytes;
|
||||
use librqbit_core::{
|
||||
constants::CHUNK_SIZE,
|
||||
hash_id::Id20,
|
||||
|
|
@ -178,9 +179,14 @@ impl PeerConnectionHandler for Handler {
|
|||
.unwrap()
|
||||
.record_piece(piece, &data, self.info_hash)?;
|
||||
if piece_ready {
|
||||
let buf = self.locked.write().take().unwrap().buffer;
|
||||
let info = from_bytes::<TorrentMetaV1Info<ByteBufOwned>>(&buf);
|
||||
let info = info.map(|i| (i, ByteBufOwned(buf.into_boxed_slice())));
|
||||
let buf = Bytes::from(self.locked.write().take().unwrap().buffer);
|
||||
let info = from_bytes::<TorrentMetaV1Info<ByteBuf>>(&buf)
|
||||
.map(|i| {
|
||||
use clone_to_owned::CloneToOwned;
|
||||
i.clone_to_owned(Some(&buf))
|
||||
})
|
||||
.map(|i| (i, ByteBufOwned(buf)));
|
||||
|
||||
self.result_tx
|
||||
.lock()
|
||||
.take()
|
||||
|
|
|
|||
|
|
@ -45,7 +45,8 @@ use librqbit_core::{
|
|||
peer_id::generate_peer_id,
|
||||
spawn_utils::spawn_with_cancel,
|
||||
torrent_metainfo::{
|
||||
torrent_from_bytes as bencode_torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned,
|
||||
torrent_from_bytes as bencode_torrent_from_bytes, TorrentMetaV1Borrowed, TorrentMetaV1Info,
|
||||
TorrentMetaV1Owned,
|
||||
},
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
|
|
@ -61,7 +62,7 @@ pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
|
|||
|
||||
pub type TorrentId = usize;
|
||||
|
||||
fn torrent_from_bytes(bytes: &[u8]) -> anyhow::Result<TorrentMetaV1Owned> {
|
||||
fn torrent_from_bytes(bytes: &[u8]) -> anyhow::Result<TorrentMetaV1Borrowed> {
|
||||
debug!(
|
||||
"all fields in torrent: {:#?}",
|
||||
bencode::dyn_from_bytes::<ByteBuf>(bytes)
|
||||
|
|
@ -120,7 +121,11 @@ impl SessionDatabase {
|
|||
.map(|u| u.to_string())
|
||||
.collect(),
|
||||
info_hash: torrent.info_hash().as_string(),
|
||||
torrent_bytes: torrent.info.torrent_bytes.clone(),
|
||||
// TODO: this could take up too much space / time / resources to write on interval.
|
||||
// Store this outside the JSON file
|
||||
//
|
||||
// torrent_bytes: torrent.info.torrent_bytes.clone(),
|
||||
torrent_bytes: Bytes::new(),
|
||||
info: torrent.info().info.clone(),
|
||||
only_files: torrent.only_files().clone(),
|
||||
is_paused: torrent
|
||||
|
|
@ -251,8 +256,10 @@ async fn torrent_from_url(
|
|||
.await
|
||||
.with_context(|| format!("error reading response body from {url}"))?;
|
||||
Ok((
|
||||
torrent_from_bytes(&b).context("error decoding torrent")?,
|
||||
b.to_vec().into(),
|
||||
torrent_from_bytes(&b)
|
||||
.context("error decoding torrent")?
|
||||
.clone_to_owned(Some(&b)),
|
||||
b.into(),
|
||||
))
|
||||
}
|
||||
|
||||
|
|
@ -415,7 +422,7 @@ pub fn read_local_file_including_stdin(filename: &str) -> anyhow::Result<Vec<u8>
|
|||
pub enum AddTorrent<'a> {
|
||||
Url(Cow<'a, str>),
|
||||
TorrentFileBytes(Cow<'a, [u8]>),
|
||||
TorrentInfo(Box<TorrentMetaV1Owned>),
|
||||
TorrentInfo(Box<TorrentMetaV1Owned>, Bytes),
|
||||
}
|
||||
|
||||
impl<'a> AddTorrent<'a> {
|
||||
|
|
@ -448,7 +455,7 @@ impl<'a> AddTorrent<'a> {
|
|||
match self {
|
||||
Self::Url(s) => s.into_owned().into_bytes(),
|
||||
Self::TorrentFileBytes(b) => b.into_owned(),
|
||||
Self::TorrentInfo(_) => unimplemented!(),
|
||||
Self::TorrentInfo(..) => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -752,7 +759,7 @@ impl Session {
|
|||
}
|
||||
};
|
||||
|
||||
let handshake = h.clone_to_owned();
|
||||
let handshake = h.clone_to_owned(None);
|
||||
|
||||
return Ok((
|
||||
live,
|
||||
|
|
@ -877,24 +884,42 @@ impl Session {
|
|||
.into_iter()
|
||||
.map(|t| ByteBufOwned::from(t.into_bytes()))
|
||||
.collect();
|
||||
let info = TorrentMetaV1Owned {
|
||||
announce: trackers.first().cloned(),
|
||||
announce_list: vec![trackers],
|
||||
info: storrent.info,
|
||||
comment: None,
|
||||
created_by: None,
|
||||
encoding: None,
|
||||
publisher: None,
|
||||
publisher_url: None,
|
||||
creation_date: None,
|
||||
info_hash: Id20::from_str(&storrent.info_hash)?,
|
||||
|
||||
let torrent_bytes = storrent.torrent_bytes;
|
||||
|
||||
let info = if !torrent_bytes.is_empty() {
|
||||
torrent_from_bytes(&torrent_bytes)
|
||||
.map(|t| t.clone_to_owned(Some(&torrent_bytes)))
|
||||
.ok()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let info = match info {
|
||||
Some(info) => info,
|
||||
None => {
|
||||
let info_hash = Id20::from_str(&storrent.info_hash)?;
|
||||
debug!(?info_hash, "torrent added before 6.1.0, need to readd");
|
||||
TorrentMetaV1Owned {
|
||||
announce: trackers.first().cloned(),
|
||||
announce_list: vec![trackers],
|
||||
info: storrent.info,
|
||||
comment: None,
|
||||
created_by: None,
|
||||
encoding: None,
|
||||
publisher: None,
|
||||
publisher_url: None,
|
||||
creation_date: None,
|
||||
info_hash,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
futures.push({
|
||||
let session = self.clone();
|
||||
async move {
|
||||
session
|
||||
.add_torrent(
|
||||
AddTorrent::TorrentInfo(Box::new(info)),
|
||||
AddTorrent::TorrentInfo(Box::new(info), torrent_bytes),
|
||||
Some(AddTorrentOptions {
|
||||
paused: storrent.is_paused,
|
||||
output_folder: Some(
|
||||
|
|
@ -1041,14 +1066,19 @@ impl Session {
|
|||
url
|
||||
)
|
||||
}
|
||||
AddTorrent::TorrentFileBytes(bytes) => (
|
||||
torrent_from_bytes(&bytes).context("error decoding torrent")?,
|
||||
ByteBufOwned::from(bytes.into_owned()),
|
||||
),
|
||||
AddTorrent::TorrentInfo(t) => {
|
||||
// TODO: this is lossy, as we don't store the bytes.
|
||||
(*t, ByteBufOwned(Vec::new().into_boxed_slice()))
|
||||
AddTorrent::TorrentFileBytes(bytes) => {
|
||||
let bytes = match bytes {
|
||||
Cow::Borrowed(b) => ::bytes::Bytes::copy_from_slice(b),
|
||||
Cow::Owned(v) => ::bytes::Bytes::from(v),
|
||||
};
|
||||
(
|
||||
torrent_from_bytes(&bytes)
|
||||
.map(|t| t.clone_to_owned(Some(&bytes)))
|
||||
.context("error decoding torrent")?,
|
||||
ByteBufOwned(bytes),
|
||||
)
|
||||
}
|
||||
AddTorrent::TorrentInfo(t, bytes) => (*t, bytes.into()),
|
||||
};
|
||||
|
||||
let trackers = torrent
|
||||
|
|
@ -1081,7 +1111,7 @@ impl Session {
|
|||
InternalAddResult {
|
||||
info_hash: torrent.info_hash,
|
||||
info: torrent.info,
|
||||
torrent_bytes: Bytes::from(bytes.0),
|
||||
torrent_bytes: bytes.0,
|
||||
trackers,
|
||||
peer_rx,
|
||||
initial_peers: opts
|
||||
|
|
|
|||
|
|
@ -779,7 +779,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
|||
.context("on_download_request")?;
|
||||
}
|
||||
Message::Bitfield(b) => self
|
||||
.on_bitfield(b.clone_to_owned())
|
||||
.on_bitfield(b.clone_to_owned(None))
|
||||
.context("on_bitfield")?,
|
||||
Message::Choke => self.on_i_am_choked(),
|
||||
Message::Unchoke => self.on_i_am_unchoked(),
|
||||
|
|
@ -1127,7 +1127,7 @@ impl PeerHandler {
|
|||
}
|
||||
self.state
|
||||
.peers
|
||||
.update_bitfield_from_vec(self.addr, bitfield.0);
|
||||
.update_bitfield_from_vec(self.addr, bitfield.0.to_vec().into_boxed_slice());
|
||||
self.on_bitfield_notify.notify_waiters();
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -1480,7 +1480,7 @@ impl PeerHandler {
|
|||
let state = self.state.clone();
|
||||
let addr = self.addr;
|
||||
let counters = self.counters.clone();
|
||||
let piece = piece.clone_to_owned();
|
||||
let piece = piece.clone_to_owned(None);
|
||||
let tx = self.tx.clone();
|
||||
|
||||
let span = tracing::error_span!("deferred_write");
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue