diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 554514c..f85e3f9 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -24,12 +24,12 @@ use librqbit_core::torrent_metainfo::TorrentMetaV1Info; pub use live::*; use parking_lot::RwLock; +use tokio::time::timeout; use tokio_stream::StreamExt; use tracing::debug; -use tracing::error; use tracing::error_span; -use tracing::warn; use tracing::trace; +use tracing::warn; use url::Url; use crate::chunk_tracker::ChunkTracker; @@ -214,19 +214,27 @@ impl ManagedTorrent { } } - if let Some(mut peer_rx) = peer_rx { - while let Some(peer) = peer_rx.next().await { - let live = match live.upgrade() { - Some(live) => live, - None => return Ok(()), - }; - live.add_peer_if_not_seen(peer).context("torrent closed")?; - } + let mut peer_rx = if let Some(peer_rx) = peer_rx { + peer_rx } else { - error!("peer rx is not set"); - } + return Ok(()); + }; - Ok(()) + loop { + match timeout(Duration::from_secs(5), peer_rx.next()).await { + Ok(Some(peer)) => { + let live = match live.upgrade() { + Some(live) => live, + None => return Ok(()), + }; + live.add_peer_if_not_seen(peer).context("torrent closed")?; + } + Ok(None) => return Ok(()), + // If timeout, check if the torrent is live. + Err(_) if live.strong_count() == 0 => return Ok(()), + Err(_) => continue, + } + } }, ); }