From 41a2cd58b315af60a182c632c6d33bd731e9dcbf Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Mon, 12 Aug 2024 23:24:11 +0100 Subject: [PATCH] Store torrent bytes --- crates/librqbit/src/api.rs | 1 + crates/librqbit/src/dht_utils.rs | 3 +- crates/librqbit/src/peer_info_reader/mod.rs | 14 +++--- crates/librqbit/src/session.rs | 51 ++++++++++++++------- 4 files changed, 46 insertions(+), 23 deletions(-) diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 78ed72b..4a253ae 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -191,6 +191,7 @@ impl Api { only_files, seen_peers, output_folder, + .. }) => ApiAddTorrentResponse { id: None, output_folder: output_folder.to_string_lossy().into_owned(), diff --git a/crates/librqbit/src/dht_utils.rs b/crates/librqbit/src/dht_utils.rs index d348a7b..c01b552 100644 --- a/crates/librqbit/src/dht_utils.rs +++ b/crates/librqbit/src/dht_utils.rs @@ -16,6 +16,7 @@ use librqbit_core::hash_id::Id20; pub enum ReadMetainfoResult { Found { info: TorrentMetaV1Info, + bytes: ByteBufOwned, rx: Rx, seen: HashSet, }, @@ -80,7 +81,7 @@ pub async fn read_metainfo_from_peer_receiver + Unp }, done = unordered.next(), if !unordered.is_empty() => { match done { - Some(Ok(info)) => return ReadMetainfoResult::Found { info, seen, rx: addrs }, + Some(Ok(info)) => return ReadMetainfoResult::Found { info: info.0, bytes: info.1, seen, rx: addrs }, Some(Err(e)) => { debug!("{:#}", e); }, diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index dc3e0b9..a2b170d 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -32,9 +32,10 @@ pub(crate) async fn read_metainfo_from_peer( peer_connection_options: Option, spawner: BlockingSpawner, connector: Arc, -) -> anyhow::Result> { - let (result_tx, result_rx) = - tokio::sync::oneshot::channel::>>(); +) -> anyhow::Result { + let (result_tx, result_rx) = tokio::sync::oneshot::channel::< + anyhow::Result<(TorrentMetaV1Info, ByteBufOwned)>, + >(); let (writer_tx, writer_rx) = tokio::sync::mpsc::unbounded_channel::(); let handler = Handler { addr, @@ -135,13 +136,13 @@ impl HandlerLocked { } } +pub type TorrentAndBytes = (TorrentMetaV1Info, ByteBufOwned); + struct Handler { addr: SocketAddr, info_hash: Id20, writer_tx: UnboundedSender, - result_tx: Mutex< - Option>>>, - >, + result_tx: Mutex>>>, locked: RwLock>, } @@ -179,6 +180,7 @@ impl PeerConnectionHandler for Handler { if piece_ready { let buf = self.locked.write().take().unwrap().buffer; let info = from_bytes::>(&buf); + let info = info.map(|i| (i, ByteBufOwned(buf.into_boxed_slice()))); self.result_tx .lock() .take() diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 54ae996..e201682 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -207,7 +207,7 @@ pub struct Session { async fn torrent_from_url( reqwest_client: &reqwest::Client, url: &str, -) -> anyhow::Result { +) -> anyhow::Result<(TorrentMetaV1Owned, ByteBufOwned)> { let response = reqwest_client .get(url) .send() @@ -220,7 +220,10 @@ async fn torrent_from_url( .bytes() .await .with_context(|| format!("error reading response body from {url}"))?; - torrent_from_bytes(&b).context("error decoding torrent") + Ok(( + torrent_from_bytes(&b).context("error decoding torrent")?, + b.to_vec().into(), + )) } fn compute_only_files_regex>( @@ -344,6 +347,7 @@ pub struct ListOnlyResponse { pub only_files: Option>, pub output_folder: PathBuf, pub seen_peers: Vec, + pub torrent_bytes: ByteBufOwned, } #[allow(clippy::large_enum_variant)] @@ -478,6 +482,7 @@ pub(crate) struct CheckedIncomingConnection { struct InternalAddResult { info_hash: Id20, info: TorrentMetaV1Info, + torrent_bytes: ByteBufOwned, trackers: Vec, peer_rx: Option, initial_peers: Vec, @@ -942,7 +947,7 @@ impl Session { }; debug!(?info_hash, "querying DHT"); - let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver( + match read_metainfo_from_peer_receiver( self.peer_id, info_hash, opts.initial_peers.clone().unwrap_or_default(), @@ -952,22 +957,29 @@ impl Session { ) .await { - ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen), + ReadMetainfoResult::Found { + info, + bytes, + rx, + seen, + } => { + debug!(?info, "received result from DHT"); + InternalAddResult { + info_hash, + info, + torrent_bytes: bytes, + trackers: magnet.trackers.into_iter().unique().collect(), + peer_rx: Some(rx), + initial_peers: seen.into_iter().collect(), + } + } ReadMetainfoResult::ChannelClosed { .. } => { bail!("DHT died, no way to discover torrent metainfo") } - }; - debug!(?info, "received result from DHT"); - InternalAddResult { - info_hash, - info, - trackers: magnet.trackers.into_iter().unique().collect(), - peer_rx: Some(peer_rx), - initial_peers: initial_peers.into_iter().collect(), } } other => { - let torrent = match other { + let (torrent, bytes) = match other { AddTorrent::Url(url) if url.starts_with("http://") || url.starts_with("https://") => { @@ -979,10 +991,14 @@ impl Session { url ) } - AddTorrent::TorrentFileBytes(bytes) => { - torrent_from_bytes(&bytes).context("error decoding torrent")? + AddTorrent::TorrentFileBytes(bytes) => ( + torrent_from_bytes(&bytes).context("error decoding torrent")?, + ByteBufOwned::from(bytes.into_owned()), + ), + AddTorrent::TorrentInfo(t) => { + // TODO: this is lossy, as we don't store the bytes. + (*t, ByteBufOwned(Vec::new().into_boxed_slice())) } - AddTorrent::TorrentInfo(t) => *t, }; let trackers = torrent @@ -1015,6 +1031,7 @@ impl Session { InternalAddResult { info_hash: torrent.info_hash, info: torrent.info, + torrent_bytes: bytes, trackers, peer_rx, initial_peers: opts @@ -1070,6 +1087,7 @@ impl Session { trackers, peer_rx, initial_peers, + torrent_bytes, } = add_res; debug!("Torrent info: {:#?}", &info); @@ -1106,6 +1124,7 @@ impl Session { only_files, output_folder, seen_peers: initial_peers, + torrent_bytes, })); }