fixing bugs

This commit is contained in:
Igor Katson 2024-08-14 10:53:25 +01:00
parent c196c11860
commit 53d61d0428
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 87 additions and 55 deletions

View file

@ -37,6 +37,12 @@ pub trait PeerConnectionHandler {
fn should_transmit_have(&self, id: ValidPieceIndex) -> bool; fn should_transmit_have(&self, id: ValidPieceIndex) -> bool;
fn on_uploaded_bytes(&self, bytes: u32); fn on_uploaded_bytes(&self, bytes: u32);
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>; fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>;
fn update_my_extended_handshake(
&self,
_handshake: &mut ExtendedHandshake<ByteBuf>,
) -> anyhow::Result<()> {
Ok(())
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -239,8 +245,10 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let supports_extended = handshake_supports_extended; let supports_extended = handshake_supports_extended;
if supports_extended { if supports_extended {
let my_extended = let mut my_extended = ExtendedHandshake::new();
Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new())); self.handler
.update_my_extended_handshake(&mut my_extended)?;
let my_extended = Message::Extended(ExtendedMessage::Handshake(my_extended));
trace!("sending extended handshake: {:?}", &my_extended); trace!("sending extended handshake: {:?}", &my_extended);
my_extended.serialize(&mut write_buf, &|| None).unwrap(); my_extended.serialize(&mut write_buf, &|| None).unwrap();
with_timeout(rwtimeout, conn.write_all(&write_buf)) with_timeout(rwtimeout, conn.write_all(&write_buf))

View file

@ -44,10 +44,7 @@ use librqbit_core::{
magnet::Magnet, magnet::Magnet,
peer_id::generate_peer_id, peer_id::generate_peer_id,
spawn_utils::spawn_with_cancel, spawn_utils::spawn_with_cancel,
torrent_metainfo::{ torrent_metainfo::{TorrentMetaV1Info, TorrentMetaV1Owned},
torrent_from_bytes as bencode_torrent_from_bytes, TorrentMetaV1Borrowed, TorrentMetaV1Info,
TorrentMetaV1Owned,
},
}; };
use parking_lot::RwLock; use parking_lot::RwLock;
use peer_binary_protocol::Handshake; use peer_binary_protocol::Handshake;
@ -62,12 +59,23 @@ pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
pub type TorrentId = usize; pub type TorrentId = usize;
fn torrent_from_bytes(bytes: &[u8]) -> anyhow::Result<TorrentMetaV1Borrowed> { struct ParsedTorrentFile {
info: TorrentMetaV1Owned,
info_bytes: Bytes,
torrent_bytes: Bytes,
}
fn torrent_from_bytes(bytes: Bytes) -> anyhow::Result<ParsedTorrentFile> {
debug!( debug!(
"all fields in torrent: {:#?}", "all fields in torrent: {:#?}",
bencode::dyn_from_bytes::<ByteBuf>(bytes) bencode::dyn_from_bytes::<ByteBuf>(&bytes)
); );
bencode_torrent_from_bytes(bytes) let parsed = librqbit_core::torrent_metainfo::torrent_from_bytes_ext::<ByteBuf>(&bytes)?;
Ok(ParsedTorrentFile {
info: parsed.meta.clone_to_owned(Some(&bytes)),
info_bytes: parsed.info_bytes.clone_to_owned(Some(&bytes)).0,
torrent_bytes: bytes,
})
} }
#[derive(Default)] #[derive(Default)]
@ -242,7 +250,7 @@ pub struct Session {
async fn torrent_from_url( async fn torrent_from_url(
reqwest_client: &reqwest::Client, reqwest_client: &reqwest::Client,
url: &str, url: &str,
) -> anyhow::Result<(TorrentMetaV1Owned, ByteBufOwned)> { ) -> anyhow::Result<ParsedTorrentFile> {
let response = reqwest_client let response = reqwest_client
.get(url) .get(url)
.send() .send()
@ -255,12 +263,7 @@ async fn torrent_from_url(
.bytes() .bytes()
.await .await
.with_context(|| format!("error reading response body from {url}"))?; .with_context(|| format!("error reading response body from {url}"))?;
Ok(( torrent_from_bytes(b).context("error decoding torrent")
torrent_from_bytes(&b)
.context("error decoding torrent")?
.clone_to_owned(Some(&b)),
b.into(),
))
} }
fn compute_only_files_regex<ByteBuf: AsRef<[u8]>>( fn compute_only_files_regex<ByteBuf: AsRef<[u8]>>(
@ -422,7 +425,7 @@ pub fn read_local_file_including_stdin(filename: &str) -> anyhow::Result<Vec<u8>
pub enum AddTorrent<'a> { pub enum AddTorrent<'a> {
Url(Cow<'a, str>), Url(Cow<'a, str>),
TorrentFileBytes(Bytes), TorrentFileBytes(Bytes),
TorrentInfo(Box<TorrentMetaV1Owned>, Bytes), TorrentInfo(Box<TorrentMetaV1Owned>),
} }
impl<'a> AddTorrent<'a> { impl<'a> AddTorrent<'a> {
@ -539,6 +542,7 @@ struct InternalAddResult {
info_hash: Id20, info_hash: Id20,
info: TorrentMetaV1Info<ByteBufOwned>, info: TorrentMetaV1Info<ByteBufOwned>,
torrent_bytes: Bytes, torrent_bytes: Bytes,
info_bytes: Bytes,
trackers: Vec<String>, trackers: Vec<String>,
peer_rx: Option<PeerStream>, peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
@ -887,31 +891,24 @@ impl Session {
let torrent_bytes = storrent.torrent_bytes; let torrent_bytes = storrent.torrent_bytes;
let info = if !torrent_bytes.is_empty() { let add_torrent = if !torrent_bytes.is_empty() {
torrent_from_bytes(&torrent_bytes) AddTorrent::TorrentFileBytes(torrent_bytes)
.map(|t| t.clone_to_owned(Some(&torrent_bytes)))
.ok()
} else { } else {
None let info_hash = Id20::from_str(&storrent.info_hash)?;
}; debug!(?info_hash, "torrent added before 6.1.0, need to readd");
let info = match info { let info = TorrentMetaV1Owned {
Some(info) => info, announce: trackers.first().cloned(),
None => { announce_list: vec![trackers],
let info_hash = Id20::from_str(&storrent.info_hash)?; info: storrent.info,
debug!(?info_hash, "torrent added before 6.1.0, need to readd"); comment: None,
TorrentMetaV1Owned { created_by: None,
announce: trackers.first().cloned(), encoding: None,
announce_list: vec![trackers], publisher: None,
info: storrent.info, publisher_url: None,
comment: None, creation_date: None,
created_by: None, info_hash,
encoding: None, };
publisher: None, AddTorrent::TorrentInfo(Box::new(info))
publisher_url: None,
creation_date: None,
info_hash,
}
}
}; };
futures.push({ futures.push({
@ -919,7 +916,7 @@ impl Session {
async move { async move {
session session
.add_torrent( .add_torrent(
AddTorrent::TorrentInfo(Box::new(info), torrent_bytes), add_torrent,
Some(AddTorrentOptions { Some(AddTorrentOptions {
paused: storrent.is_paused, paused: storrent.is_paused,
output_folder: Some( output_folder: Some(
@ -1048,6 +1045,7 @@ impl Session {
&info_bytes, &info_bytes,
&trackers, &trackers,
)?, )?,
info_bytes: info_bytes.0,
info, info,
trackers, trackers,
peer_rx: Some(rx), peer_rx: Some(rx),
@ -1060,7 +1058,7 @@ impl Session {
} }
} }
other => { other => {
let (torrent, bytes) = match other { let torrent = match other {
AddTorrent::Url(url) AddTorrent::Url(url)
if url.starts_with("http://") || url.starts_with("https://") => if url.starts_with("http://") || url.starts_with("https://") =>
{ {
@ -1072,16 +1070,21 @@ impl Session {
url url
) )
} }
AddTorrent::TorrentFileBytes(bytes) => ( AddTorrent::TorrentFileBytes(bytes) =>
torrent_from_bytes(&bytes) torrent_from_bytes(bytes)
.map(|t| t.clone_to_owned(Some(&bytes))) .context("error decoding torrent")?
.context("error decoding torrent")?, ,
ByteBufOwned(bytes), AddTorrent::TorrentInfo(t) => {
), // TODO: remove this branch entirely
AddTorrent::TorrentInfo(t, bytes) => (*t, bytes.into()), ParsedTorrentFile{
info: *t,
info_bytes: Default::default(),
torrent_bytes: Default::default(),
}
},
}; };
let trackers = torrent let trackers = torrent.info
.iter_announce() .iter_announce()
.unique() .unique()
.filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) {
@ -1097,7 +1100,7 @@ impl Session {
None None
} else { } else {
self.make_peer_rx( self.make_peer_rx(
torrent.info_hash, torrent.info.info_hash,
if opts.disable_trackers { if opts.disable_trackers {
Default::default() Default::default()
} else { } else {
@ -1109,9 +1112,10 @@ impl Session {
}; };
InternalAddResult { InternalAddResult {
info_hash: torrent.info_hash, info_hash: torrent.info.info_hash,
info: torrent.info, info: torrent.info.info,
torrent_bytes: bytes.0, torrent_bytes: torrent.torrent_bytes,
info_bytes: torrent.info_bytes,
trackers, trackers,
peer_rx, peer_rx,
initial_peers: opts initial_peers: opts
@ -1169,6 +1173,7 @@ impl Session {
peer_rx, peer_rx,
initial_peers, initial_peers,
torrent_bytes, torrent_bytes,
info_bytes,
} = add_res; } = add_res;
debug!("Torrent info: {:#?}", &info); debug!("Torrent info: {:#?}", &info);
@ -1213,6 +1218,7 @@ impl Session {
info, info,
info_hash, info_hash,
torrent_bytes, torrent_bytes,
info_bytes,
output_folder, output_folder,
storage_factory, storage_factory,
); );

View file

@ -857,6 +857,19 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
.unwrap_or(true); .unwrap_or(true);
!have !have
} }
fn update_my_extended_handshake(
&self,
handshake: &mut ExtendedHandshake<ByteBuf>,
) -> anyhow::Result<()> {
let info_bytes = &self.state.meta().info_bytes;
if !info_bytes.is_empty() {
if let Ok(len) = info_bytes.len().try_into() {
handshake.metadata_size = Some(len);
}
}
Ok(())
}
} }
impl PeerHandler { impl PeerHandler {

View file

@ -101,6 +101,7 @@ pub(crate) struct ManagedTorrentOptions {
pub struct ManagedTorrentInfo { pub struct ManagedTorrentInfo {
pub info: TorrentMetaV1Info<ByteBufOwned>, pub info: TorrentMetaV1Info<ByteBufOwned>,
pub torrent_bytes: Bytes, pub torrent_bytes: Bytes,
pub info_bytes: Bytes,
pub info_hash: Id20, pub info_hash: Id20,
pub(crate) spawner: BlockingSpawner, pub(crate) spawner: BlockingSpawner,
pub trackers: HashSet<String>, pub trackers: HashSet<String>,
@ -504,6 +505,7 @@ pub(crate) struct ManagedTorrentBuilder {
output_folder: PathBuf, output_folder: PathBuf,
info_hash: Id20, info_hash: Id20,
torrent_bytes: Bytes, torrent_bytes: Bytes,
info_bytes: Bytes,
force_tracker_interval: Option<Duration>, force_tracker_interval: Option<Duration>,
peer_connect_timeout: Option<Duration>, peer_connect_timeout: Option<Duration>,
peer_read_write_timeout: Option<Duration>, peer_read_write_timeout: Option<Duration>,
@ -522,6 +524,7 @@ impl ManagedTorrentBuilder {
info: TorrentMetaV1Info<ByteBufOwned>, info: TorrentMetaV1Info<ByteBufOwned>,
info_hash: Id20, info_hash: Id20,
torrent_bytes: Bytes, torrent_bytes: Bytes,
info_bytes: Bytes,
output_folder: PathBuf, output_folder: PathBuf,
storage_factory: BoxStorageFactory, storage_factory: BoxStorageFactory,
) -> Self { ) -> Self {
@ -529,6 +532,7 @@ impl ManagedTorrentBuilder {
info, info,
info_hash, info_hash,
torrent_bytes, torrent_bytes,
info_bytes,
spawner: None, spawner: None,
force_tracker_interval: None, force_tracker_interval: None,
peer_connect_timeout: None, peer_connect_timeout: None,
@ -614,6 +618,7 @@ impl ManagedTorrentBuilder {
file_infos, file_infos,
info: self.info, info: self.info,
torrent_bytes: self.torrent_bytes, torrent_bytes: self.torrent_bytes,
info_bytes: self.info_bytes,
info_hash: self.info_hash, info_hash: self.info_hash,
trackers: self.trackers.into_iter().collect(), trackers: self.trackers.into_iter().collect(),
spawner: self.spawner.unwrap_or_default(), spawner: self.spawner.unwrap_or_default(),