Can unpause dead no-peer-source torrent just fine

This commit is contained in:
Igor Katson 2025-01-13 17:41:42 +00:00
parent 6fea795899
commit c84e3ad90d
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 19 additions and 38 deletions

View file

@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> {
let dht = DhtBuilder::new().await.context("error initializing DHT")?; let dht = DhtBuilder::new().await.context("error initializing DHT")?;
let mut stream = dht.get_peers(info_hash, None)?; let mut stream = dht.get_peers(info_hash, None);
let stats_printer = async { let stats_printer = async {
loop { loop {

View file

@ -1197,17 +1197,12 @@ impl DhtState {
.boxed() .boxed()
} }
#[inline(never)]
pub fn get_peers( pub fn get_peers(
self: &Arc<Self>, self: &Arc<Self>,
info_hash: Id20, info_hash: Id20,
announce_port: Option<u16>, announce_port: Option<u16>,
) -> anyhow::Result<RequestPeersStream> { ) -> RequestPeersStream {
Ok(RequestPeersStream::new( RequestPeersStream::new(self.clone(), info_hash, announce_port)
self.clone(),
info_hash,
announce_port,
))
} }
pub fn listen_addr(&self) -> SocketAddr { pub fn listen_addr(&self) -> SocketAddr {

View file

@ -130,7 +130,7 @@ mod tests {
let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap(); let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap();
let dht = DhtBuilder::new().await.unwrap(); let dht = DhtBuilder::new().await.unwrap();
let peer_rx = dht.get_peers(info_hash, None).unwrap(); let peer_rx = dht.get_peers(info_hash, None);
let peer_id = generate_peer_id(); let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver( match read_metainfo_from_peer_receiver(
peer_id, peer_id,

View file

@ -214,22 +214,10 @@ fn merge_two_optional_streams<T>(
s2: Option<impl Stream<Item = T> + Unpin + Send + 'static>, s2: Option<impl Stream<Item = T> + Unpin + Send + 'static>,
) -> Option<BoxStream<'static, T>> { ) -> Option<BoxStream<'static, T>> {
match (s1, s2) { match (s1, s2) {
(Some(s1), None) => { (Some(s1), None) => Some(Box::pin(s1)),
trace!("merge_two_optional_streams: using first"); (None, Some(s2)) => Some(Box::pin(s2)),
Some(Box::pin(s1)) (Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))),
} (None, None) => None,
(None, Some(s2)) => {
trace!("merge_two_optional_streams: using second");
Some(Box::pin(s2))
}
(Some(s1), Some(s2)) => {
trace!("merge_two_optional_streams: using both");
Some(Box::pin(merge_streams(s1, s2)))
}
(None, None) => {
trace!("merge_two_optional_streams: using none");
None
}
} }
} }
@ -1019,7 +1007,6 @@ impl Session {
opts.initial_peers.clone().unwrap_or_default(), opts.initial_peers.clone().unwrap_or_default(),
private, private,
) )
.context("error creating peer stream")
}; };
let mut seen_peers = Vec::new(); let mut seen_peers = Vec::new();
@ -1029,12 +1016,12 @@ impl Session {
Some(metadata) => { Some(metadata) => {
let mut peer_rx = None; let mut peer_rx = None;
if !opts.paused && !opts.list_only { if !opts.paused && !opts.list_only {
peer_rx = make_peer_rx()?; peer_rx = make_peer_rx();
} }
(metadata, peer_rx) (metadata, peer_rx)
} }
None => { None => {
let peer_rx = make_peer_rx()?.context( let peer_rx = make_peer_rx().context(
"no known way to resolve peers (no DHT, no trackers, no initial_peers)", "no known way to resolve peers (no DHT, no trackers, no initial_peers)",
)?; )?;
let resolved_magnet = self let resolved_magnet = self
@ -1286,7 +1273,7 @@ impl Session {
self: &Arc<Self>, self: &Arc<Self>,
t: &Arc<ManagedTorrent>, t: &Arc<ManagedTorrent>,
announce: bool, announce: bool,
) -> anyhow::Result<PeerStream> { ) -> Option<PeerStream> {
let is_private = t.with_metadata(|m| m.info.private).unwrap_or(false); let is_private = t.with_metadata(|m| m.info.private).unwrap_or(false);
self.make_peer_rx( self.make_peer_rx(
t.info_hash(), t.info_hash(),
@ -1295,8 +1282,7 @@ impl Session {
t.shared().options.force_tracker_interval, t.shared().options.force_tracker_interval,
t.shared().options.initial_peers.clone(), t.shared().options.initial_peers.clone(),
is_private, is_private,
)? )
.context("no peer source")
} }
// Get a peer stream from both DHT and trackers. // Get a peer stream from both DHT and trackers.
@ -1308,7 +1294,7 @@ impl Session {
force_tracker_interval: Option<Duration>, force_tracker_interval: Option<Duration>,
initial_peers: Vec<SocketAddr>, initial_peers: Vec<SocketAddr>,
is_private: bool, is_private: bool,
) -> anyhow::Result<Option<PeerStream>> { ) -> Option<PeerStream> {
let announce_port = if announce { self.tcp_listen_port } else { None }; let announce_port = if announce { self.tcp_listen_port } else { None };
let dht_rx = if is_private { let dht_rx = if is_private {
None None
@ -1316,7 +1302,6 @@ impl Session {
self.dht self.dht
.as_ref() .as_ref()
.map(|dht| dht.get_peers(info_hash, announce_port)) .map(|dht| dht.get_peers(info_hash, announce_port))
.transpose()?
}; };
if is_private && trackers.len() > 1 { if is_private && trackers.len() > 1 {
@ -1343,9 +1328,10 @@ impl Session {
} else { } else {
Some(futures::stream::iter(initial_peers)) Some(futures::stream::iter(initial_peers))
}; };
let peer_rx = merge_two_optional_streams(dht_rx, tracker_rx); merge_two_optional_streams(
let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_rx); merge_two_optional_streams(dht_rx, tracker_rx),
Ok(peer_rx) initial_peers_rx,
)
} }
async fn try_update_persistence_metadata(&self, handle: &ManagedTorrentHandle) { async fn try_update_persistence_metadata(&self, handle: &ManagedTorrentHandle) {
@ -1363,8 +1349,8 @@ impl Session {
} }
pub async fn unpause(self: &Arc<Self>, handle: &ManagedTorrentHandle) -> anyhow::Result<()> { pub async fn unpause(self: &Arc<Self>, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
let peer_rx = self.make_peer_rx_managed_torrent(handle, true)?; let peer_rx = self.make_peer_rx_managed_torrent(handle, true);
handle.start(Some(peer_rx), false)?; handle.start(peer_rx, false)?;
self.try_update_persistence_metadata(handle).await; self.try_update_persistence_metadata(handle).await;
Ok(()) Ok(())
} }