Merge pull request #300 from ikatson/unpause-zero-peers

Can unpause torrents even if there's no peers
This commit is contained in:
Igor Katson 2025-01-13 20:55:44 +00:00 committed by GitHub
commit bbae577ee7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 19 additions and 38 deletions

View file

@ -20,7 +20,7 @@ async fn main() -> anyhow::Result<()> {
let dht = DhtBuilder::new().await.context("error initializing DHT")?;
let mut stream = dht.get_peers(info_hash, None)?;
let mut stream = dht.get_peers(info_hash, None);
let stats_printer = async {
loop {

View file

@ -1197,17 +1197,12 @@ impl DhtState {
.boxed()
}
#[inline(never)]
pub fn get_peers(
self: &Arc<Self>,
info_hash: Id20,
announce_port: Option<u16>,
) -> anyhow::Result<RequestPeersStream> {
Ok(RequestPeersStream::new(
self.clone(),
info_hash,
announce_port,
))
) -> RequestPeersStream {
RequestPeersStream::new(self.clone(), info_hash, announce_port)
}
pub fn listen_addr(&self) -> SocketAddr {

View file

@ -130,7 +130,7 @@ mod tests {
let info_hash = Id20::from_str("cab507494d02ebb1178b38f2e9d7be299c86b862").unwrap();
let dht = DhtBuilder::new().await.unwrap();
let peer_rx = dht.get_peers(info_hash, None).unwrap();
let peer_rx = dht.get_peers(info_hash, None);
let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(
peer_id,

View file

@ -214,22 +214,10 @@ fn merge_two_optional_streams<T>(
s2: Option<impl Stream<Item = T> + Unpin + Send + 'static>,
) -> Option<BoxStream<'static, T>> {
match (s1, s2) {
(Some(s1), None) => {
trace!("merge_two_optional_streams: using first");
Some(Box::pin(s1))
}
(None, Some(s2)) => {
trace!("merge_two_optional_streams: using second");
Some(Box::pin(s2))
}
(Some(s1), Some(s2)) => {
trace!("merge_two_optional_streams: using both");
Some(Box::pin(merge_streams(s1, s2)))
}
(None, None) => {
trace!("merge_two_optional_streams: using none");
None
}
(Some(s1), None) => Some(Box::pin(s1)),
(None, Some(s2)) => Some(Box::pin(s2)),
(Some(s1), Some(s2)) => Some(Box::pin(merge_streams(s1, s2))),
(None, None) => None,
}
}
@ -1019,7 +1007,6 @@ impl Session {
opts.initial_peers.clone().unwrap_or_default(),
private,
)
.context("error creating peer stream")
};
let mut seen_peers = Vec::new();
@ -1029,12 +1016,12 @@ impl Session {
Some(metadata) => {
let mut peer_rx = None;
if !opts.paused && !opts.list_only {
peer_rx = make_peer_rx()?;
peer_rx = make_peer_rx();
}
(metadata, peer_rx)
}
None => {
let peer_rx = make_peer_rx()?.context(
let peer_rx = make_peer_rx().context(
"no known way to resolve peers (no DHT, no trackers, no initial_peers)",
)?;
let resolved_magnet = self
@ -1286,7 +1273,7 @@ impl Session {
self: &Arc<Self>,
t: &Arc<ManagedTorrent>,
announce: bool,
) -> anyhow::Result<PeerStream> {
) -> Option<PeerStream> {
let is_private = t.with_metadata(|m| m.info.private).unwrap_or(false);
self.make_peer_rx(
t.info_hash(),
@ -1295,8 +1282,7 @@ impl Session {
t.shared().options.force_tracker_interval,
t.shared().options.initial_peers.clone(),
is_private,
)?
.context("no peer source")
)
}
// Get a peer stream from both DHT and trackers.
@ -1308,7 +1294,7 @@ impl Session {
force_tracker_interval: Option<Duration>,
initial_peers: Vec<SocketAddr>,
is_private: bool,
) -> anyhow::Result<Option<PeerStream>> {
) -> Option<PeerStream> {
let announce_port = if announce { self.tcp_listen_port } else { None };
let dht_rx = if is_private {
None
@ -1316,7 +1302,6 @@ impl Session {
self.dht
.as_ref()
.map(|dht| dht.get_peers(info_hash, announce_port))
.transpose()?
};
if is_private && trackers.len() > 1 {
@ -1343,9 +1328,10 @@ impl Session {
} else {
Some(futures::stream::iter(initial_peers))
};
let peer_rx = merge_two_optional_streams(dht_rx, tracker_rx);
let peer_rx = merge_two_optional_streams(peer_rx, initial_peers_rx);
Ok(peer_rx)
merge_two_optional_streams(
merge_two_optional_streams(dht_rx, tracker_rx),
initial_peers_rx,
)
}
async fn try_update_persistence_metadata(&self, handle: &ManagedTorrentHandle) {
@ -1363,8 +1349,8 @@ impl Session {
}
pub async fn unpause(self: &Arc<Self>, handle: &ManagedTorrentHandle) -> anyhow::Result<()> {
let peer_rx = self.make_peer_rx_managed_torrent(handle, true)?;
handle.start(Some(peer_rx), false)?;
let peer_rx = self.make_peer_rx_managed_torrent(handle, true);
handle.start(peer_rx, false)?;
self.try_update_persistence_metadata(handle).await;
Ok(())
}