Merge pull request #184 from ikatson/share-ext-metadata

[Feature] support sending metadata to peers who request it (via extended request)
This commit is contained in:
Igor Katson 2024-08-14 15:31:31 +01:00 committed by GitHub
commit c4fc107c4e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 246 additions and 118 deletions

View file

@ -44,6 +44,10 @@ install: build-release
$(MAKE) sign-release $(MAKE) sign-release
install target/release/rqbit "$(HOME)/bin/" install target/release/rqbit "$(HOME)/bin/"
@PHONY: test
test:
ulimit -n unlimited && cargo test
@PHONY: release-macos-universal @PHONY: release-macos-universal
release-macos-universal: release-macos-universal:
cargo build --target aarch64-apple-darwin --profile release-github cargo build --target aarch64-apple-darwin --profile release-github

View file

@ -1,3 +1,4 @@
use bytes::Bytes;
use librqbit::{ use librqbit::{
storage::{StorageFactory, StorageFactoryExt, TorrentStorage}, storage::{StorageFactory, StorageFactoryExt, TorrentStorage},
SessionOptions, SessionOptions,
@ -79,9 +80,9 @@ async fn main() -> anyhow::Result<()> {
.await?; .await?;
let handle = s let handle = s
.add_torrent( .add_torrent(
librqbit::AddTorrent::TorrentFileBytes( librqbit::AddTorrent::TorrentFileBytes(Bytes::from_static(include_bytes!(
include_bytes!("../resources/ubuntu-21.04-live-server-amd64.iso.torrent").into(), "../resources/ubuntu-21.04-live-server-amd64.iso.torrent"
), ))),
Some(librqbit::AddTorrentOptions { Some(librqbit::AddTorrentOptions {
storage_factory: Some(CustomStorageFactory::default().boxed()), storage_factory: Some(CustomStorageFactory::default().boxed()),
paused: false, paused: false,

View file

@ -6,6 +6,7 @@ use std::path::Path;
use anyhow::Context; use anyhow::Context;
use bencode::bencode_serialize_to_writer; use bencode::bencode_serialize_to_writer;
use buffers::ByteBufOwned; use buffers::ByteBufOwned;
use bytes::Bytes;
use librqbit_core::torrent_metainfo::{TorrentMetaV1File, TorrentMetaV1Info, TorrentMetaV1Owned}; use librqbit_core::torrent_metainfo::{TorrentMetaV1File, TorrentMetaV1Info, TorrentMetaV1Owned};
use librqbit_core::Id20; use librqbit_core::Id20;
use sha1w::{ISha1, Sha1}; use sha1w::{ISha1, Sha1};
@ -185,10 +186,10 @@ impl CreateTorrentResult {
self.meta.info_hash self.meta.info_hash
} }
pub fn as_bytes(&self) -> anyhow::Result<Vec<u8>> { pub fn as_bytes(&self) -> anyhow::Result<Bytes> {
let mut b = Vec::new(); let mut b = Vec::new();
bencode_serialize_to_writer(&self.meta, &mut b).context("error serializing torrent")?; bencode_serialize_to_writer(&self.meta, &mut b).context("error serializing torrent")?;
Ok(b) Ok(b.into())
} }
} }

View file

@ -67,18 +67,14 @@ pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unp
unordered.push(read_info_guarded(a)); unordered.push(read_info_guarded(a));
} }
let mut addrs_completed = false;
loop { loop {
if addrs_completed && unordered.is_empty() {
return ReadMetainfoResult::ChannelClosed { seen };
}
tokio::select! { tokio::select! {
next_addr = addrs.next() => {
match next_addr {
Some(addr) => {
if seen.insert(addr) {
unordered.push(read_info_guarded(addr));
}
},
None => return ReadMetainfoResult::ChannelClosed { seen },
}
},
done = unordered.next(), if !unordered.is_empty() => { done = unordered.next(), if !unordered.is_empty() => {
match done { match done {
Some(Ok((info, info_bytes))) => return ReadMetainfoResult::Found { info, info_bytes, seen, rx: addrs }, Some(Ok((info, info_bytes))) => return ReadMetainfoResult::Found { info, info_bytes, seen, rx: addrs },
@ -88,6 +84,20 @@ pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unp
None => unreachable!() None => unreachable!()
} }
} }
next_addr = addrs.next(), if !addrs_completed => {
match next_addr {
Some(addr) => {
if seen.insert(addr) {
unordered.push(read_info_guarded(addr));
}
continue;
},
None => {
addrs_completed = true;
},
}
}
}; };
} }
} }

View file

@ -37,6 +37,12 @@ pub trait PeerConnectionHandler {
fn should_transmit_have(&self, id: ValidPieceIndex) -> bool; fn should_transmit_have(&self, id: ValidPieceIndex) -> bool;
fn on_uploaded_bytes(&self, bytes: u32); fn on_uploaded_bytes(&self, bytes: u32);
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>; fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>;
fn update_my_extended_handshake(
&self,
_handshake: &mut ExtendedHandshake<ByteBuf>,
) -> anyhow::Result<()> {
Ok(())
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -239,8 +245,10 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let supports_extended = handshake_supports_extended; let supports_extended = handshake_supports_extended;
if supports_extended { if supports_extended {
let my_extended = let mut my_extended = ExtendedHandshake::new();
Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new())); self.handler
.update_my_extended_handshake(&mut my_extended)?;
let my_extended = Message::Extended(ExtendedMessage::Handshake(my_extended));
trace!("sending extended handshake: {:?}", &my_extended); trace!("sending extended handshake: {:?}", &my_extended);
my_extended.serialize(&mut write_buf, &|| None).unwrap(); my_extended.serialize(&mut write_buf, &|| None).unwrap();
with_timeout(rwtimeout, conn.write_all(&write_buf)) with_timeout(rwtimeout, conn.write_all(&write_buf))

View file

@ -44,10 +44,7 @@ use librqbit_core::{
magnet::Magnet, magnet::Magnet,
peer_id::generate_peer_id, peer_id::generate_peer_id,
spawn_utils::spawn_with_cancel, spawn_utils::spawn_with_cancel,
torrent_metainfo::{ torrent_metainfo::{TorrentMetaV1Info, TorrentMetaV1Owned},
torrent_from_bytes as bencode_torrent_from_bytes, TorrentMetaV1Borrowed, TorrentMetaV1Info,
TorrentMetaV1Owned,
},
}; };
use parking_lot::RwLock; use parking_lot::RwLock;
use peer_binary_protocol::Handshake; use peer_binary_protocol::Handshake;
@ -62,12 +59,23 @@ pub const SUPPORTED_SCHEMES: [&str; 3] = ["http:", "https:", "magnet:"];
pub type TorrentId = usize; pub type TorrentId = usize;
fn torrent_from_bytes(bytes: &[u8]) -> anyhow::Result<TorrentMetaV1Borrowed> { struct ParsedTorrentFile {
info: TorrentMetaV1Owned,
info_bytes: Bytes,
torrent_bytes: Bytes,
}
fn torrent_from_bytes(bytes: Bytes) -> anyhow::Result<ParsedTorrentFile> {
debug!( debug!(
"all fields in torrent: {:#?}", "all fields in torrent: {:#?}",
bencode::dyn_from_bytes::<ByteBuf>(bytes) bencode::dyn_from_bytes::<ByteBuf>(&bytes)
); );
bencode_torrent_from_bytes(bytes) let parsed = librqbit_core::torrent_metainfo::torrent_from_bytes_ext::<ByteBuf>(&bytes)?;
Ok(ParsedTorrentFile {
info: parsed.meta.clone_to_owned(Some(&bytes)),
info_bytes: parsed.info_bytes.clone_to_owned(Some(&bytes)).0,
torrent_bytes: bytes,
})
} }
#[derive(Default)] #[derive(Default)]
@ -242,7 +250,7 @@ pub struct Session {
async fn torrent_from_url( async fn torrent_from_url(
reqwest_client: &reqwest::Client, reqwest_client: &reqwest::Client,
url: &str, url: &str,
) -> anyhow::Result<(TorrentMetaV1Owned, ByteBufOwned)> { ) -> anyhow::Result<ParsedTorrentFile> {
let response = reqwest_client let response = reqwest_client
.get(url) .get(url)
.send() .send()
@ -255,12 +263,7 @@ async fn torrent_from_url(
.bytes() .bytes()
.await .await
.with_context(|| format!("error reading response body from {url}"))?; .with_context(|| format!("error reading response body from {url}"))?;
Ok(( torrent_from_bytes(b).context("error decoding torrent")
torrent_from_bytes(&b)
.context("error decoding torrent")?
.clone_to_owned(Some(&b)),
b.into(),
))
} }
fn compute_only_files_regex<ByteBuf: AsRef<[u8]>>( fn compute_only_files_regex<ByteBuf: AsRef<[u8]>>(
@ -421,8 +424,8 @@ pub fn read_local_file_including_stdin(filename: &str) -> anyhow::Result<Vec<u8>
pub enum AddTorrent<'a> { pub enum AddTorrent<'a> {
Url(Cow<'a, str>), Url(Cow<'a, str>),
TorrentFileBytes(Cow<'a, [u8]>), TorrentFileBytes(Bytes),
TorrentInfo(Box<TorrentMetaV1Owned>, Bytes), TorrentInfo(Box<TorrentMetaV1Owned>),
} }
impl<'a> AddTorrent<'a> { impl<'a> AddTorrent<'a> {
@ -439,7 +442,7 @@ impl<'a> AddTorrent<'a> {
Self::Url(url.into()) Self::Url(url.into())
} }
pub fn from_bytes(bytes: impl Into<Cow<'a, [u8]>>) -> Self { pub fn from_bytes(bytes: impl Into<Bytes>) -> Self {
Self::TorrentFileBytes(bytes.into()) Self::TorrentFileBytes(bytes.into())
} }
@ -448,13 +451,13 @@ impl<'a> AddTorrent<'a> {
pub fn from_local_filename(filename: &str) -> anyhow::Result<Self> { pub fn from_local_filename(filename: &str) -> anyhow::Result<Self> {
let file = read_local_file_including_stdin(filename) let file = read_local_file_including_stdin(filename)
.with_context(|| format!("error reading local file {filename:?}"))?; .with_context(|| format!("error reading local file {filename:?}"))?;
Ok(Self::TorrentFileBytes(Cow::Owned(file))) Ok(Self::TorrentFileBytes(file.into()))
} }
pub fn into_bytes(self) -> Vec<u8> { pub fn into_bytes(self) -> Bytes {
match self { match self {
Self::Url(s) => s.into_owned().into_bytes(), Self::Url(s) => s.into_owned().into_bytes().into(),
Self::TorrentFileBytes(b) => b.into_owned(), Self::TorrentFileBytes(b) => b,
Self::TorrentInfo(..) => unimplemented!(), Self::TorrentInfo(..) => unimplemented!(),
} }
} }
@ -539,6 +542,7 @@ struct InternalAddResult {
info_hash: Id20, info_hash: Id20,
info: TorrentMetaV1Info<ByteBufOwned>, info: TorrentMetaV1Info<ByteBufOwned>,
torrent_bytes: Bytes, torrent_bytes: Bytes,
info_bytes: Bytes,
trackers: Vec<String>, trackers: Vec<String>,
peer_rx: Option<PeerStream>, peer_rx: Option<PeerStream>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
@ -887,31 +891,24 @@ impl Session {
let torrent_bytes = storrent.torrent_bytes; let torrent_bytes = storrent.torrent_bytes;
let info = if !torrent_bytes.is_empty() { let add_torrent = if !torrent_bytes.is_empty() {
torrent_from_bytes(&torrent_bytes) AddTorrent::TorrentFileBytes(torrent_bytes)
.map(|t| t.clone_to_owned(Some(&torrent_bytes)))
.ok()
} else { } else {
None let info_hash = Id20::from_str(&storrent.info_hash)?;
}; debug!(?info_hash, "torrent added before 6.1.0, need to readd");
let info = match info { let info = TorrentMetaV1Owned {
Some(info) => info, announce: trackers.first().cloned(),
None => { announce_list: vec![trackers],
let info_hash = Id20::from_str(&storrent.info_hash)?; info: storrent.info,
debug!(?info_hash, "torrent added before 6.1.0, need to readd"); comment: None,
TorrentMetaV1Owned { created_by: None,
announce: trackers.first().cloned(), encoding: None,
announce_list: vec![trackers], publisher: None,
info: storrent.info, publisher_url: None,
comment: None, creation_date: None,
created_by: None, info_hash,
encoding: None, };
publisher: None, AddTorrent::TorrentInfo(Box::new(info))
publisher_url: None,
creation_date: None,
info_hash,
}
}
}; };
futures.push({ futures.push({
@ -919,7 +916,7 @@ impl Session {
async move { async move {
session session
.add_torrent( .add_torrent(
AddTorrent::TorrentInfo(Box::new(info), torrent_bytes), add_torrent,
Some(AddTorrentOptions { Some(AddTorrentOptions {
paused: storrent.is_paused, paused: storrent.is_paused,
output_folder: Some( output_folder: Some(
@ -1012,16 +1009,22 @@ impl Session {
announce_port, announce_port,
opts.force_tracker_interval, opts.force_tracker_interval,
)?; )?;
let initial_peers_stream = opts
.initial_peers
.clone()
.and_then(|v| if v.is_empty() { None } else { Some(v) })
.map(futures::stream::iter);
let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_stream);
let peer_rx = match peer_rx { let peer_rx = match peer_rx {
Some(peer_rx) => peer_rx, Some(peer_rx) => peer_rx,
None => bail!("can't find peers: DHT disabled and no trackers in magnet"), None => bail!("can't find peers: DHT is disabled, no trackers in magnet, and no initial peers provided"),
}; };
debug!(?info_hash, "querying DHT"); debug!(?info_hash, "querying DHT");
match read_metainfo_from_peer_receiver( match read_metainfo_from_peer_receiver(
self.peer_id, self.peer_id,
info_hash, info_hash,
opts.initial_peers.clone().unwrap_or_default(), Default::default(),
peer_rx, peer_rx,
Some(self.merge_peer_opts(opts.peer_opts)), Some(self.merge_peer_opts(opts.peer_opts)),
self.connector.clone(), self.connector.clone(),
@ -1042,6 +1045,7 @@ impl Session {
&info_bytes, &info_bytes,
&trackers, &trackers,
)?, )?,
info_bytes: info_bytes.0,
info, info,
trackers, trackers,
peer_rx: Some(rx), peer_rx: Some(rx),
@ -1049,12 +1053,12 @@ impl Session {
} }
} }
ReadMetainfoResult::ChannelClosed { .. } => { ReadMetainfoResult::ChannelClosed { .. } => {
bail!("DHT died, no way to discover torrent metainfo") bail!("input address stream exhausted, no way to discover torrent metainfo")
} }
} }
} }
other => { other => {
let (torrent, bytes) = match other { let torrent = match other {
AddTorrent::Url(url) AddTorrent::Url(url)
if url.starts_with("http://") || url.starts_with("https://") => if url.starts_with("http://") || url.starts_with("https://") =>
{ {
@ -1066,22 +1070,21 @@ impl Session {
url url
) )
} }
AddTorrent::TorrentFileBytes(bytes) => { AddTorrent::TorrentFileBytes(bytes) =>
let bytes = match bytes { torrent_from_bytes(bytes)
Cow::Borrowed(b) => ::bytes::Bytes::copy_from_slice(b), .context("error decoding torrent")?
Cow::Owned(v) => ::bytes::Bytes::from(v), ,
}; AddTorrent::TorrentInfo(t) => {
( // TODO: remove this branch entirely
torrent_from_bytes(&bytes) ParsedTorrentFile{
.map(|t| t.clone_to_owned(Some(&bytes))) info: *t,
.context("error decoding torrent")?, info_bytes: Default::default(),
ByteBufOwned(bytes), torrent_bytes: Default::default(),
) }
} },
AddTorrent::TorrentInfo(t, bytes) => (*t, bytes.into()),
}; };
let trackers = torrent let trackers = torrent.info
.iter_announce() .iter_announce()
.unique() .unique()
.filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) { .filter_map(|tracker| match std::str::from_utf8(tracker.as_ref()) {
@ -1097,7 +1100,7 @@ impl Session {
None None
} else { } else {
self.make_peer_rx( self.make_peer_rx(
torrent.info_hash, torrent.info.info_hash,
if opts.disable_trackers { if opts.disable_trackers {
Default::default() Default::default()
} else { } else {
@ -1109,9 +1112,10 @@ impl Session {
}; };
InternalAddResult { InternalAddResult {
info_hash: torrent.info_hash, info_hash: torrent.info.info_hash,
info: torrent.info, info: torrent.info.info,
torrent_bytes: bytes.0, torrent_bytes: torrent.torrent_bytes,
info_bytes: torrent.info_bytes,
trackers, trackers,
peer_rx, peer_rx,
initial_peers: opts initial_peers: opts
@ -1169,6 +1173,7 @@ impl Session {
peer_rx, peer_rx,
initial_peers, initial_peers,
torrent_bytes, torrent_bytes,
info_bytes,
} = add_res; } = add_res;
debug!("Torrent info: {:#?}", &info); debug!("Torrent info: {:#?}", &info);
@ -1213,6 +1218,7 @@ impl Session {
info, info,
info_hash, info_hash,
torrent_bytes, torrent_bytes,
info_bytes,
output_folder, output_folder,
storage_factory, storage_factory,
); );

View file

@ -1,11 +1,11 @@
use std::{ use std::{
borrow::Cow,
net::{Ipv4Addr, SocketAddr}, net::{Ipv4Addr, SocketAddr},
time::Duration, time::Duration,
}; };
use anyhow::bail; use anyhow::bail;
use futures::{stream::FuturesUnordered, StreamExt}; use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::magnet::Magnet;
use rand::Rng; use rand::Rng;
use tokio::{ use tokio::{
spawn, spawn,
@ -84,7 +84,7 @@ async fn test_e2e_download() {
let handle = session let handle = session
.add_torrent( .add_torrent(
crate::AddTorrent::TorrentFileBytes(Cow::Owned(torrent_file_bytes)), crate::AddTorrent::TorrentFileBytes(torrent_file_bytes),
Some(AddTorrentOptions { Some(AddTorrentOptions {
overwrite: true, overwrite: true,
output_folder: Some(tempdir.to_str().unwrap().to_owned()), output_folder: Some(tempdir.to_str().unwrap().to_owned()),
@ -139,6 +139,8 @@ async fn test_e2e_download() {
.and_then(|v| v.parse().ok()) .and_then(|v| v.parse().ok())
.unwrap_or(1usize); .unwrap_or(1usize);
let magnet = Magnet::from_id20(torrent_file.info_hash(), Vec::new()).to_string();
// 3. Start a client with the initial peers, and download the file. // 3. Start a client with the initial peers, and download the file.
for _ in 0..client_iters { for _ in 0..client_iters {
let outdir = tempfile::TempDir::with_prefix("rqbit_e2e_client").unwrap(); let outdir = tempfile::TempDir::with_prefix("rqbit_e2e_client").unwrap();
@ -163,7 +165,7 @@ async fn test_e2e_download() {
let (id, handle) = { let (id, handle) = {
let r = session let r = session
.add_torrent( .add_torrent(
crate::AddTorrent::TorrentFileBytes(Cow::Owned(torrent_file_bytes.clone())), crate::AddTorrent::Url((&magnet).into()),
Some(AddTorrentOptions { Some(AddTorrentOptions {
initial_peers: Some(peers.clone()), initial_peers: Some(peers.clone()),
// only_files: Some(vec![0]), // only_files: Some(vec![0]),
@ -235,7 +237,7 @@ async fn test_e2e_download() {
// 4. After downloading, recheck its integrity. // 4. After downloading, recheck its integrity.
let handle = session let handle = session
.add_torrent( .add_torrent(
crate::AddTorrent::TorrentFileBytes(Cow::Owned(torrent_file_bytes.clone())), crate::AddTorrent::TorrentFileBytes(torrent_file_bytes.clone()),
Some(AddTorrentOptions { Some(AddTorrentOptions {
paused: true, paused: true,
overwrite: true, overwrite: true,

View file

@ -5,7 +5,9 @@ use tempfile::TempDir;
use tokio::{io::AsyncReadExt, time::timeout}; use tokio::{io::AsyncReadExt, time::timeout};
use tracing::info; use tracing::info;
use crate::{create_torrent, AddTorrent, CreateTorrentOptions, Session}; use crate::{
create_torrent, tests::test_util::TestPeerMetadata, AddTorrent, CreateTorrentOptions, Session,
};
use super::test_util::create_default_random_dir_with_torrents; use super::test_util::create_default_random_dir_with_torrents;
@ -21,11 +23,11 @@ async fn e2e_stream() -> anyhow::Result<()> {
.await?; .await?;
let orig_content = std::fs::read(files.path().join("0.data")).unwrap(); let orig_content = std::fs::read(files.path().join("0.data")).unwrap();
let server_session = Session::new_with_opts( let server_session = Session::new_with_opts(
files.path().into(), files.path().into(),
crate::SessionOptions { crate::SessionOptions {
disable_dht: true, disable_dht: true,
peer_id: Some(TestPeerMetadata::good().as_peer_id()),
persistence: false, persistence: false,
listen_port_range: Some(16001..16100), listen_port_range: Some(16001..16100),
enable_upnp_port_forwarding: false, enable_upnp_port_forwarding: false,
@ -71,6 +73,7 @@ async fn e2e_stream() -> anyhow::Result<()> {
crate::SessionOptions { crate::SessionOptions {
disable_dht: true, disable_dht: true,
persistence: false, persistence: false,
peer_id: Some(TestPeerMetadata::good().as_peer_id()),
listen_port_range: None, listen_port_range: None,
enable_upnp_port_forwarding: false, enable_upnp_port_forwarding: false,
..Default::default() ..Default::default()

View file

@ -1,7 +1,7 @@
use std::{io::Write, path::Path}; use std::{io::Write, path::Path};
use librqbit_core::Id20; use librqbit_core::Id20;
use rand::{RngCore, SeedableRng}; use rand::{thread_rng, Rng, RngCore, SeedableRng};
use tempfile::TempDir; use tempfile::TempDir;
pub fn create_new_file_with_random_content(path: &Path, mut size: usize) { pub fn create_new_file_with_random_content(path: &Path, mut size: usize) {
@ -43,8 +43,16 @@ pub struct TestPeerMetadata {
} }
impl TestPeerMetadata { impl TestPeerMetadata {
pub fn good() -> Self {
Self {
server_id: 0,
max_random_sleep_ms: 0,
}
}
pub fn as_peer_id(&self) -> Id20 { pub fn as_peer_id(&self) -> Id20 {
let mut peer_id = Id20::default(); let mut peer_id = Id20::default();
thread_rng().fill(&mut peer_id.0);
peer_id.0[0] = self.server_id; peer_id.0[0] = self.server_id;
peer_id.0[1] = self.max_random_sleep_ms; peer_id.0[1] = self.max_random_sleep_ms;
peer_id peer_id

View file

@ -58,6 +58,7 @@ use backoff::backoff::Backoff;
use buffers::{ByteBuf, ByteBufOwned}; use buffers::{ByteBuf, ByteBufOwned};
use clone_to_owned::CloneToOwned; use clone_to_owned::CloneToOwned;
use librqbit_core::{ use librqbit_core::{
constants::CHUNK_SIZE,
hash_id::Id20, hash_id::Id20,
lengths::{ChunkInfo, Lengths, ValidPieceIndex}, lengths::{ChunkInfo, Lengths, ValidPieceIndex},
spawn_utils::spawn_with_cancel, spawn_utils::spawn_with_cancel,
@ -66,7 +67,8 @@ use librqbit_core::{
}; };
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use peer_binary_protocol::{ use peer_binary_protocol::{
extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request, extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage},
Handshake, Message, MessageOwned, Piece, Request,
}; };
use tokio::{ use tokio::{
sync::{ sync::{
@ -798,6 +800,12 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
Message::Cancel(_) => { Message::Cancel(_) => {
trace!("received \"cancel\", but we don't process it yet") trace!("received \"cancel\", but we don't process it yet")
} }
Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Request(
metadata_piece_id,
))) => {
self.send_metadata_piece(metadata_piece_id)
.with_context(|| format!("error sending metadata piece {metadata_piece_id}"))?;
}
message => { message => {
warn!("received unsupported message {:?}, ignoring", message); warn!("received unsupported message {:?}, ignoring", message);
} }
@ -849,6 +857,19 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
.unwrap_or(true); .unwrap_or(true);
!have !have
} }
fn update_my_extended_handshake(
&self,
handshake: &mut ExtendedHandshake<ByteBuf>,
) -> anyhow::Result<()> {
let info_bytes = &self.state.meta().info_bytes;
if !info_bytes.is_empty() {
if let Ok(len) = info_bytes.len().try_into() {
handshake.metadata_size = Some(len);
}
}
Ok(())
}
} }
impl PeerHandler { impl PeerHandler {
@ -1504,4 +1525,34 @@ impl PeerHandler {
Ok(()) Ok(())
} }
fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> {
let data = &self.state.meta().info_bytes;
let metadata_size = data.len();
if metadata_size == 0 {
anyhow::bail!("peer requested for info metadata but we don't have it")
}
let total_pieces: usize = (metadata_size as u64)
.div_ceil(CHUNK_SIZE as u64)
.try_into()?;
if piece_id as usize > total_pieces {
bail!("piece out of bounds")
}
let offset = piece_id * CHUNK_SIZE;
let end = (offset + CHUNK_SIZE).min(data.len().try_into()?);
let data = data.slice(offset as usize..end as usize);
self.tx
.send(WriterRequest::Message(Message::Extended(
ExtendedMessage::UtMetadata(UtMetadata::Data {
piece: piece_id,
total_size: end - offset,
data: data.into(),
}),
)))
.context("error sending UtMetadata: channel closed")?;
Ok(())
}
} }

View file

@ -101,6 +101,7 @@ pub(crate) struct ManagedTorrentOptions {
pub struct ManagedTorrentInfo { pub struct ManagedTorrentInfo {
pub info: TorrentMetaV1Info<ByteBufOwned>, pub info: TorrentMetaV1Info<ByteBufOwned>,
pub torrent_bytes: Bytes, pub torrent_bytes: Bytes,
pub info_bytes: Bytes,
pub info_hash: Id20, pub info_hash: Id20,
pub(crate) spawner: BlockingSpawner, pub(crate) spawner: BlockingSpawner,
pub trackers: HashSet<String>, pub trackers: HashSet<String>,
@ -504,6 +505,7 @@ pub(crate) struct ManagedTorrentBuilder {
output_folder: PathBuf, output_folder: PathBuf,
info_hash: Id20, info_hash: Id20,
torrent_bytes: Bytes, torrent_bytes: Bytes,
info_bytes: Bytes,
force_tracker_interval: Option<Duration>, force_tracker_interval: Option<Duration>,
peer_connect_timeout: Option<Duration>, peer_connect_timeout: Option<Duration>,
peer_read_write_timeout: Option<Duration>, peer_read_write_timeout: Option<Duration>,
@ -522,6 +524,7 @@ impl ManagedTorrentBuilder {
info: TorrentMetaV1Info<ByteBufOwned>, info: TorrentMetaV1Info<ByteBufOwned>,
info_hash: Id20, info_hash: Id20,
torrent_bytes: Bytes, torrent_bytes: Bytes,
info_bytes: Bytes,
output_folder: PathBuf, output_folder: PathBuf,
storage_factory: BoxStorageFactory, storage_factory: BoxStorageFactory,
) -> Self { ) -> Self {
@ -529,6 +532,7 @@ impl ManagedTorrentBuilder {
info, info,
info_hash, info_hash,
torrent_bytes, torrent_bytes,
info_bytes,
spawner: None, spawner: None,
force_tracker_interval: None, force_tracker_interval: None,
peer_connect_timeout: None, peer_connect_timeout: None,
@ -614,6 +618,7 @@ impl ManagedTorrentBuilder {
file_infos, file_infos,
info: self.info, info: self.info,
torrent_bytes: self.torrent_bytes, torrent_bytes: self.torrent_bytes,
info_bytes: self.info_bytes,
info_hash: self.info_hash, info_hash: self.info_hash,
trackers: self.trackers.into_iter().collect(), trackers: self.trackers.into_iter().collect(),
spawner: self.spawner.unwrap_or_default(), spawner: self.spawner.unwrap_or_default(),

View file

@ -20,6 +20,14 @@ impl Magnet {
self.id32 self.id32
} }
pub fn from_id20(id20: Id20, trackers: Vec<String>) -> Self {
Self {
id20: Some(id20),
id32: None,
trackers,
}
}
/// Parse a magnet link. /// Parse a magnet link.
pub fn parse(url: &str) -> anyhow::Result<Magnet> { pub fn parse(url: &str) -> anyhow::Result<Magnet> {
let url = url::Url::parse(url).context("magnet link must be a valid URL")?; let url = url::Url::parse(url).context("magnet link must be a valid URL")?;
@ -63,37 +71,44 @@ impl Magnet {
} }
impl std::fmt::Display for Magnet { impl std::fmt::Display for Magnet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> core::fmt::Result {
if let (Some(id20), Some(id32)) = (self.id20, self.id32) { write!(f, "magnet:")?;
write!( let mut write_ampersand = {
f, let mut written_so_far = 0;
"magnet:?xt=urn:btih:{}?xt=urn:btmh:1220{}&tr={}", move |f: &mut std::fmt::Formatter<'_>| -> core::fmt::Result {
id20.as_string(), if written_so_far == 0 {
id32.as_string(), write!(f, "?")?;
self.trackers.join("&tr=") } else {
) write!(f, "&")?;
} else if let Some(id20) = self.id20 { }
write!( written_so_far += 1;
f, Ok(())
"magnet:?xt=urn:btih:{}&tr={}", }
id20.as_string(), };
self.trackers.join("&tr=") if let Some(id20) = self.id20 {
) write_ampersand(f)?;
} else if let Some(id32) = self.id32 { write!(f, "xt=urn:btih:{}", id20.as_string(),)?;
write!(
f,
"magnet:?xt=urn:btmh:1220{}&tr={}",
id32.as_string(),
self.trackers.join("&tr=")
)
} else {
panic!("no infohash")
} }
if let Some(id32) = self.id32 {
write_ampersand(f)?;
write!(f, "xt=xt=urn:btmh:1220{}", id32.as_string(),)?;
}
for tracker in self.trackers.iter() {
write_ampersand(f)?;
write!(f, "tr={tracker}")?;
}
Ok(())
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::str::FromStr;
use crate::Id20;
use super::Magnet;
#[test] #[test]
fn test_parse_magnet_as_url() { fn test_parse_magnet_as_url() {
let magnet = "magnet:?xt=urn:btih:a621779b5e3d486e127c3efbca9b6f8d135f52e5&dn=rutor.info_%D0%92%D0%BE%D0%B9%D0%BD%D0%B0+%D0%B1%D1%83%D0%B4%D1%83%D1%89%D0%B5%D0%B3%D0%BE+%2F+The+Tomorrow+War+%282021%29+WEB-DLRip+%D0%BE%D1%82+MegaPeer+%7C+P+%7C+NewComers&tr=udp://opentor.org:2710&tr=udp://opentor.org:2710&tr=http://retracker.local/announce"; let magnet = "magnet:?xt=urn:btih:a621779b5e3d486e127c3efbca9b6f8d135f52e5&dn=rutor.info_%D0%92%D0%BE%D0%B9%D0%BD%D0%B0+%D0%B1%D1%83%D0%B4%D1%83%D1%89%D0%B5%D0%B3%D0%BE+%2F+The+Tomorrow+War+%282021%29+WEB-DLRip+%D0%BE%D1%82+MegaPeer+%7C+P+%7C+NewComers&tr=udp://opentor.org:2710&tr=udp://opentor.org:2710&tr=http://retracker.local/announce";
@ -113,4 +128,18 @@ mod tests {
let m = Magnet::parse(magnet).unwrap(); let m = Magnet::parse(magnet).unwrap();
assert!(m.as_id32() == Some(info_hash)); assert!(m.as_id32() == Some(info_hash));
} }
#[test]
fn test_magnet_to_string() {
let id20 = Id20::from_str("a621779b5e3d486e127c3efbca9b6f8d135f52e5").unwrap();
assert_eq!(
&Magnet::from_id20(id20, Default::default()).to_string(),
"magnet:?xt=urn:btih:a621779b5e3d486e127c3efbca9b6f8d135f52e5"
);
assert_eq!(
&Magnet::from_id20(id20, vec!["foo".to_string(), "bar".to_string()]).to_string(),
"magnet:?xt=urn:btih:a621779b5e3d486e127c3efbca9b6f8d135f52e5&tr=foo&tr=bar"
);
}
} }