Store torrent bytes

This commit is contained in:
Igor Katson 2024-08-12 23:24:11 +01:00
parent 5740d3ebe9
commit 41a2cd58b3
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 46 additions and 23 deletions

View file

@ -191,6 +191,7 @@ impl Api {
only_files,
seen_peers,
output_folder,
..
}) => ApiAddTorrentResponse {
id: None,
output_folder: output_folder.to_string_lossy().into_owned(),

View file

@ -16,6 +16,7 @@ use librqbit_core::hash_id::Id20;
pub enum ReadMetainfoResult<Rx> {
Found {
info: TorrentMetaV1Info<ByteBufOwned>,
bytes: ByteBufOwned,
rx: Rx,
seen: HashSet<SocketAddr>,
},
@ -80,7 +81,7 @@ pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unp
},
done = unordered.next(), if !unordered.is_empty() => {
match done {
Some(Ok(info)) => return ReadMetainfoResult::Found { info, seen, rx: addrs },
Some(Ok(info)) => return ReadMetainfoResult::Found { info: info.0, bytes: info.1, seen, rx: addrs },
Some(Err(e)) => {
debug!("{:#}", e);
},

View file

@ -32,9 +32,10 @@ pub(crate) async fn read_metainfo_from_peer(
peer_connection_options: Option<PeerConnectionOptions>,
spawner: BlockingSpawner,
connector: Arc<StreamConnector>,
) -> anyhow::Result<TorrentMetaV1Info<ByteBufOwned>> {
let (result_tx, result_rx) =
tokio::sync::oneshot::channel::<anyhow::Result<TorrentMetaV1Info<ByteBufOwned>>>();
) -> anyhow::Result<TorrentAndBytes> {
let (result_tx, result_rx) = tokio::sync::oneshot::channel::<
anyhow::Result<(TorrentMetaV1Info<ByteBufOwned>, ByteBufOwned)>,
>();
let (writer_tx, writer_rx) = tokio::sync::mpsc::unbounded_channel::<WriterRequest>();
let handler = Handler {
addr,
@ -135,13 +136,13 @@ impl HandlerLocked {
}
}
pub type TorrentAndBytes = (TorrentMetaV1Info<ByteBufOwned>, ByteBufOwned);
struct Handler {
addr: SocketAddr,
info_hash: Id20,
writer_tx: UnboundedSender<WriterRequest>,
result_tx: Mutex<
Option<tokio::sync::oneshot::Sender<anyhow::Result<TorrentMetaV1Info<ByteBufOwned>>>>,
>,
result_tx: Mutex<Option<tokio::sync::oneshot::Sender<anyhow::Result<TorrentAndBytes>>>>,
locked: RwLock<Option<HandlerLocked>>,
}
@ -179,6 +180,7 @@ impl PeerConnectionHandler for Handler {
if piece_ready {
let buf = self.locked.write().take().unwrap().buffer;
let info = from_bytes::<TorrentMetaV1Info<ByteBufOwned>>(&buf);
let info = info.map(|i| (i, ByteBufOwned(buf.into_boxed_slice())));
self.result_tx
.lock()
.take()

View file

@ -207,7 +207,7 @@ pub struct Session {
async fn torrent_from_url(
reqwest_client: &reqwest::Client,
url: &str,
) -> anyhow::Result<TorrentMetaV1Owned> {
) -> anyhow::Result<(TorrentMetaV1Owned, ByteBufOwned)> {
let response = reqwest_client
.get(url)
.send()
@ -220,7 +220,10 @@ async fn torrent_from_url(
.bytes()
.await
.with_context(|| format!("error reading response body from {url}"))?;
torrent_from_bytes(&b).context("error decoding torrent")
Ok((
torrent_from_bytes(&b).context("error decoding torrent")?,
b.to_vec().into(),
))
}
fn compute_only_files_regex<ByteBuf: AsRef<[u8]>>(
@ -344,6 +347,7 @@ pub struct ListOnlyResponse {
pub only_files: Option<Vec<usize>>,
pub output_folder: PathBuf,
pub seen_peers: Vec<SocketAddr>,
pub torrent_bytes: ByteBufOwned,
}
#[allow(clippy::large_enum_variant)]
@ -478,6 +482,7 @@ pub(crate) struct CheckedIncomingConnection {
struct InternalAddResult {
info_hash: Id20,
info: TorrentMetaV1Info<ByteBufOwned>,
torrent_bytes: ByteBufOwned,
trackers: Vec<String>,
peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>,
@ -942,7 +947,7 @@ impl Session {
};
debug!(?info_hash, "querying DHT");
let (info, peer_rx, initial_peers) = match read_metainfo_from_peer_receiver(
match read_metainfo_from_peer_receiver(
self.peer_id,
info_hash,
opts.initial_peers.clone().unwrap_or_default(),
@ -952,22 +957,29 @@ impl Session {
)
.await
{
ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen),
ReadMetainfoResult::Found {
info,
bytes,
rx,
seen,
} => {
debug!(?info, "received result from DHT");
InternalAddResult {
info_hash,
info,
torrent_bytes: bytes,
trackers: magnet.trackers.into_iter().unique().collect(),
peer_rx: Some(rx),
initial_peers: seen.into_iter().collect(),
}
}
ReadMetainfoResult::ChannelClosed { .. } => {
bail!("DHT died, no way to discover torrent metainfo")
}
};
debug!(?info, "received result from DHT");
InternalAddResult {
info_hash,
info,
trackers: magnet.trackers.into_iter().unique().collect(),
peer_rx: Some(peer_rx),
initial_peers: initial_peers.into_iter().collect(),
}
}
other => {
let torrent = match other {
let (torrent, bytes) = match other {
AddTorrent::Url(url)
if url.starts_with("http://") || url.starts_with("https://") =>
{
@ -979,10 +991,14 @@ impl Session {
url
)
}
AddTorrent::TorrentFileBytes(bytes) => {
torrent_from_bytes(&bytes).context("error decoding torrent")?
AddTorrent::TorrentFileBytes(bytes) => (
torrent_from_bytes(&bytes).context("error decoding torrent")?,
ByteBufOwned::from(bytes.into_owned()),
),
AddTorrent::TorrentInfo(t) => {
// TODO: this is lossy, as we don't store the bytes.
(*t, ByteBufOwned(Vec::new().into_boxed_slice()))
}
AddTorrent::TorrentInfo(t) => *t,
};
let trackers = torrent
@ -1015,6 +1031,7 @@ impl Session {
InternalAddResult {
info_hash: torrent.info_hash,
info: torrent.info,
torrent_bytes: bytes,
trackers,
peer_rx,
initial_peers: opts
@ -1070,6 +1087,7 @@ impl Session {
trackers,
peer_rx,
initial_peers,
torrent_bytes,
} = add_res;
debug!("Torrent info: {:#?}", &info);
@ -1106,6 +1124,7 @@ impl Session {
only_files,
output_folder,
seen_peers: initial_peers,
torrent_bytes,
}));
}