This commit is contained in:
Igor Katson 2024-02-18 20:16:18 +00:00
parent f5ccb8632b
commit 76b7d23149
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 17 additions and 24 deletions

View file

@ -13,10 +13,8 @@ use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::{ByteBuf, ByteBufT, ByteString}; use buffers::{ByteBuf, ByteBufT, ByteString};
use clone_to_owned::CloneToOwned; use clone_to_owned::CloneToOwned;
use dht::{ use dht::{Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig};
Dht, DhtBuilder, DhtConfig, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream, use futures::{stream::FuturesUnordered, TryFutureExt};
};
use futures::{stream::FuturesUnordered, Stream, TryFutureExt};
use librqbit_core::{ use librqbit_core::{
directories::get_configuration_directory, directories::get_configuration_directory,
magnet::Magnet, magnet::Magnet,
@ -43,7 +41,7 @@ use crate::{
torrent_state::{ torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive, ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
}, },
tracker_comms::{self, TorrentStatsForTrackerDummy, TrackerComms}, tracker_comms::{self, TrackerComms},
type_aliases::PeerStream, type_aliases::PeerStream,
}; };
@ -368,18 +366,6 @@ async fn create_tcp_listener(
bail!("no free TCP ports in range {port_range:?}"); bail!("no free TCP ports in range {port_range:?}");
} }
fn merge_peer_rx(
dht_rx: Option<RequestPeersStream>,
peer_rx: Option<impl Stream<Item = SocketAddr> + Unpin + Send + Sync + 'static>,
) -> Option<PeerStream> {
match (dht_rx, peer_rx) {
(Some(dht_rx), None) => Some(Box::new(dht_rx)),
(None, Some(peer_rx)) => Some(Box::new(peer_rx)),
(None, None) => None,
(Some(dht_rx), Some(peer_rx)) => Some(Box::new(dht_rx.merge(peer_rx))),
}
}
pub(crate) struct CheckedIncomingConnection { pub(crate) struct CheckedIncomingConnection {
pub addr: SocketAddr, pub addr: SocketAddr,
pub stream: tokio::net::TcpStream, pub stream: tokio::net::TcpStream,
@ -1073,6 +1059,7 @@ impl Session {
} }
} }
// Get a peer stream from both DHT and trackers.
fn make_peer_rx( fn make_peer_rx(
&self, &self,
info_hash: Id20, info_hash: Id20,
@ -1095,8 +1082,14 @@ impl Session {
cancel, cancel,
announce_port, announce_port,
); );
let peer_rx = merge_peer_rx(dht_rx, peer_rx);
Ok(peer_rx) // Merge DHT rx and tracker comms peer rx.
match (dht_rx, peer_rx) {
(Some(dht_rx), None) => Ok(Some(Box::new(dht_rx))),
(None, Some(peer_rx)) => Ok(Some(Box::new(peer_rx))),
(None, None) => Ok(None),
(Some(dht_rx), Some(peer_rx)) => Ok(Some(Box::new(dht_rx.merge(peer_rx)))),
}
} }
pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { pub fn unpause(&self, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {

View file

@ -4,7 +4,6 @@ use anyhow::Context;
use crate::hash_id::{Id20, Id32}; use crate::hash_id::{Id20, Id32};
/// A parsed magnet link. /// A parsed magnet link.
pub struct Magnet { pub struct Magnet {
id20: Option<Id20>, id20: Option<Id20>,
@ -45,7 +44,7 @@ impl Magnet {
} else { } else {
anyhow::bail!("expected xt to start with btih or btmh"); anyhow::bail!("expected xt to start with btih or btmh");
} }
}, }
"tr" => trackers.push(value.into()), "tr" => trackers.push(value.into()),
_ => {} _ => {}
} }
@ -93,7 +92,6 @@ impl std::fmt::Display for Magnet {
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#[test] #[test]
@ -109,8 +107,10 @@ mod tests {
use std::str::FromStr; use std::str::FromStr;
let magnet = "magnet:?xt=urn:btmh:1220caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e&dn=bittorrent-v2-test let magnet = "magnet:?xt=urn:btmh:1220caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e&dn=bittorrent-v2-test
"; ";
let info_hash = Id32::from_str("caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e").unwrap(); let info_hash =
let m = Magnet::parse(&magnet).unwrap(); Id32::from_str("caf1e1c30e81cb361b9ee167c4aa64228a7fa4fa9f6105232b28ad099f3a302e")
.unwrap();
let m = Magnet::parse(magnet).unwrap();
assert!(m.as_id32() == Some(info_hash)); assert!(m.as_id32() == Some(info_hash));
} }
} }