Use concrete type for DHT peers

This commit is contained in:
Igor Katson 2023-11-30 08:06:55 +00:00
parent 6243c5f02f
commit 210a3d5d3e
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 15 additions and 14 deletions

View file

@ -145,7 +145,7 @@ struct RecursiveRequest<C: RecursiveRequestCallbacks> {
callbacks: C, callbacks: C,
} }
struct RequestPeersStream { pub struct RequestPeersStream {
rx: tokio::sync::mpsc::UnboundedReceiver<SocketAddr>, rx: tokio::sync::mpsc::UnboundedReceiver<SocketAddr>,
cancel_join_handle: tokio::task::JoinHandle<()>, cancel_join_handle: tokio::task::JoinHandle<()>,
} }
@ -959,10 +959,7 @@ impl DhtState {
Ok(state) Ok(state)
} }
pub fn get_peers( pub fn get_peers(self: &Arc<Self>, info_hash: Id20) -> anyhow::Result<RequestPeersStream> {
self: &Arc<Self>,
info_hash: Id20,
) -> anyhow::Result<impl Stream<Item = SocketAddr> + Unpin> {
Ok(RequestPeersStream::new(self.clone(), info_hash)) Ok(RequestPeersStream::new(self.clone(), info_hash))
} }

View file

@ -8,7 +8,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
pub use crate::dht::DhtStats; pub use crate::dht::DhtStats;
pub use crate::dht::{DhtConfig, DhtState}; pub use crate::dht::{DhtConfig, DhtState, RequestPeersStream};
pub use librqbit_core::id20::Id20; pub use librqbit_core::id20::Id20;
pub use persistence::{PersistentDht, PersistentDhtConfig}; pub use persistence::{PersistentDht, PersistentDhtConfig};

View file

@ -12,7 +12,7 @@ use std::{
use anyhow::{bail, Context}; use anyhow::{bail, Context};
use bencode::{bencode_serialize_to_writer, BencodeDeserializer}; use bencode::{bencode_serialize_to_writer, BencodeDeserializer};
use buffers::ByteString; use buffers::ByteString;
use dht::{Dht, DhtBuilder, Id20, PersistentDht, PersistentDhtConfig}; use dht::{Dht, DhtBuilder, Id20, PersistentDht, PersistentDhtConfig, RequestPeersStream};
use librqbit_core::{ use librqbit_core::{
magnet::Magnet, magnet::Magnet,
peer_id::generate_peer_id, peer_id::generate_peer_id,
@ -21,7 +21,6 @@ use librqbit_core::{
use parking_lot::RwLock; use parking_lot::RwLock;
use reqwest::Url; use reqwest::Url;
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
use tokio_stream::StreamExt;
use tracing::{debug, error, error_span, info, warn}; use tracing::{debug, error, error_span, info, warn};
use crate::{ use crate::{
@ -451,7 +450,7 @@ impl Session {
pub async fn add_torrent( pub async fn add_torrent(
&self, &self,
add: impl Into<AddTorrent<'_>>, add: AddTorrent<'_>,
opts: Option<AddTorrentOptions>, opts: Option<AddTorrentOptions>,
) -> anyhow::Result<AddTorrentResponse> { ) -> anyhow::Result<AddTorrentResponse> {
// Magnet links are different in that we first need to discover the metadata. // Magnet links are different in that we first need to discover the metadata.
@ -502,7 +501,11 @@ impl Session {
( (
info_hash, info_hash,
info, info,
if opts.paused { None } else { Some(dht_rx) }, if opts.paused || opts.list_only {
None
} else {
Some(dht_rx)
},
trackers, trackers,
initial_peers, initial_peers,
) )
@ -527,7 +530,7 @@ impl Session {
}; };
let dht_rx = match self.dht.as_ref() { let dht_rx = match self.dht.as_ref() {
Some(dht) if !opts.paused => { Some(dht) if !opts.paused && !opts.list_only => {
debug!("reading peers for {:?} from DHT", torrent.info_hash); debug!("reading peers for {:?} from DHT", torrent.info_hash);
Some(dht.get_peers(torrent.info_hash)?) Some(dht.get_peers(torrent.info_hash)?)
} }
@ -578,7 +581,7 @@ impl Session {
&self, &self,
info_hash: Id20, info_hash: Id20,
info: TorrentMetaV1Info<ByteString>, info: TorrentMetaV1Info<ByteString>,
dht_peer_rx: Option<impl StreamExt<Item = SocketAddr> + Unpin + Send + Sync + 'static>, dht_peer_rx: Option<RequestPeersStream>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
trackers: Vec<reqwest::Url>, trackers: Vec<reqwest::Url>,
opts: AddTorrentOptions, opts: AddTorrentOptions,

View file

@ -15,6 +15,7 @@ use std::time::Duration;
use anyhow::bail; use anyhow::bail;
use anyhow::Context; use anyhow::Context;
use buffers::ByteString; use buffers::ByteString;
use dht::RequestPeersStream;
use librqbit_core::id20::Id20; use librqbit_core::id20::Id20;
use librqbit_core::lengths::Lengths; use librqbit_core::lengths::Lengths;
use librqbit_core::peer_id::generate_peer_id; use librqbit_core::peer_id::generate_peer_id;
@ -165,7 +166,7 @@ impl ManagedTorrent {
pub fn start( pub fn start(
self: &Arc<Self>, self: &Arc<Self>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
peer_rx: Option<impl StreamExt<Item = SocketAddr> + Unpin + Send + Sync + 'static>, peer_rx: Option<RequestPeersStream>,
start_paused: bool, start_paused: bool,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut g = self.locked.write(); let mut g = self.locked.write();
@ -195,7 +196,7 @@ impl ManagedTorrent {
fn spawn_peer_adder( fn spawn_peer_adder(
live: &Arc<TorrentStateLive>, live: &Arc<TorrentStateLive>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
peer_rx: Option<impl StreamExt<Item = SocketAddr> + Unpin + Send + Sync + 'static>, peer_rx: Option<RequestPeersStream>,
) { ) {
let span = live.meta().span.clone(); let span = live.meta().span.clone();
let live = Arc::downgrade(live); let live = Arc::downgrade(live);