Fixed a bug when unpausing torrents

This commit is contained in:
Igor Katson 2023-11-24 21:03:34 +00:00
parent d7a37c1b48
commit 4927850ff9
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 37 additions and 26 deletions

View file

@ -274,7 +274,6 @@ impl TorrentStateLive {
tokio::select! {
r = fut => r,
_ = cancel_rx.changed() => {
error!("canceled");
bail!("canceled")
}
}

View file

@ -7,6 +7,7 @@ use std::net::SocketAddr;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use anyhow::bail;
@ -21,6 +22,7 @@ pub use live::*;
use parking_lot::RwLock;
use tokio_stream::StreamExt;
use tracing::error;
use tracing::error_span;
use url::Url;
@ -110,7 +112,10 @@ impl ManagedTorrent {
let g = self.locked.read();
match &g.state {
ManagedTorrentState::Paused(p) => Ok(f(&p.chunk_tracker)),
ManagedTorrentState::Live(l) => Ok(f(l.lock_read("chunk_tracker").get_chunks()?)),
ManagedTorrentState::Live(l) => Ok(f(l
.lock_read("chunk_tracker")
.get_chunks()
.context("error getting chunks")?)),
_ => bail!("no chunk tracker, torrent neither paused nor live"),
}
}
@ -129,6 +134,31 @@ impl ManagedTorrent {
peer_rx: Option<impl StreamExt<Item = SocketAddr> + Unpin + Send + Sync + 'static>,
) -> anyhow::Result<()> {
let mut g = self.locked.write();
let peer_adder = |live: Weak<TorrentStateLive>| async move {
{
let live: Arc<TorrentStateLive> = live.upgrade().context("no longer live")?;
for peer in initial_peers {
live.add_peer_if_not_seen(peer).context("torrent closed")?;
}
}
if let Some(mut peer_rx) = peer_rx {
while let Some(peer) = peer_rx.next().await {
live.upgrade()
.context("no longer live")?
.add_peer_if_not_seen(peer)
.context("torrent closed")?;
}
} else {
error!("peer rx is not set");
}
Ok(())
};
let span = self.info.span.clone();
match &g.state {
ManagedTorrentState::Live(_) => {
bail!("torrent is already live");
@ -136,7 +166,6 @@ impl ManagedTorrent {
ManagedTorrentState::Initializing(init) => {
let init = init.clone();
let t = self.clone();
let span = self.info.span.clone();
spawn(
error_span!(parent: span.clone(), "initialize_and_start"),
async move {
@ -145,30 +174,9 @@ impl ManagedTorrent {
let live = TorrentStateLive::new(paused);
t.locked.write().state = ManagedTorrentState::Live(live.clone());
let live = Arc::downgrade(&live);
spawn(
error_span!(parent: span.clone(), "external_peer_adder"),
async move {
{
let live: Arc<TorrentStateLive> =
live.upgrade().context("no longer live")?;
for peer in initial_peers {
live.add_peer_if_not_seen(peer)
.context("torrent closed")?;
}
}
if let Some(mut peer_rx) = peer_rx {
while let Some(peer) = peer_rx.next().await {
live.upgrade()
.context("no longer live")?
.add_peer_if_not_seen(peer)
.context("torrent closed")?;
}
}
Ok(())
},
peer_adder(Arc::downgrade(&live)),
);
Ok(())
@ -186,7 +194,11 @@ impl ManagedTorrent {
ManagedTorrentState::Paused(_) => {
let paused = g.state.take().assert_paused();
let live = TorrentStateLive::new(paused);
g.state = ManagedTorrentState::Live(live);
g.state = ManagedTorrentState::Live(live.clone());
spawn(
error_span!(parent: span.clone(), "external_peer_adder"),
peer_adder(Arc::downgrade(&live)),
);
Ok(())
}
ManagedTorrentState::Error(_) => {