From d7a37c1b48dbd00f578fd32435096d1627df04e9 Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 24 Nov 2023 18:28:46 +0000 Subject: [PATCH] Saving --- crates/librqbit/src/http_api.rs | 47 ++- crates/librqbit/src/http_api_error.rs | 8 + crates/librqbit/src/peer_connection.rs | 16 +- crates/librqbit/src/peer_info_reader/mod.rs | 4 +- crates/librqbit/src/session.rs | 7 +- crates/librqbit/src/spawn_utils.rs | 8 +- .../src/torrent_state/initializing.rs | 5 + crates/librqbit/src/torrent_state/live/mod.rs | 277 ++++++++++++------ crates/librqbit/src/torrent_state/mod.rs | 83 +++--- crates/librqbit/webui/src/index.tsx | 43 ++- crates/rqbit/src/main.rs | 7 +- 11 files changed, 337 insertions(+), 168 deletions(-) diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 2d4edc1..3c227f0 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -127,14 +127,28 @@ impl HttpApi { State(state): State, Path(idx): Path, ) -> Result { - state.api_torrent_action_pause(idx) + state.api_torrent_action_pause(idx).map(axum::Json) } async fn torrent_action_start( State(state): State, Path(idx): Path, ) -> Result { - state.api_torrent_action_start(idx) + state.api_torrent_action_start(idx).map(axum::Json) + } + + async fn torrent_action_forget( + State(state): State, + Path(idx): Path, + ) -> Result { + state.api_torrent_action_forget(idx).map(axum::Json) + } + + async fn torrent_action_delete( + State(state): State, + Path(idx): Path, + ) -> Result { + state.api_torrent_action_delete(idx).map(axum::Json) } #[allow(unused_mut)] @@ -149,7 +163,9 @@ impl HttpApi { .route("/torrents/:id/stats/v1", get(torrent_stats_v1)) .route("/torrents/:id/peer_stats", get(peer_stats)) .route("/torrents/:id/pause", post(torrent_action_pause)) - .route("/torrents/:id/start", post(torrent_action_start)); + .route("/torrents/:id/start", post(torrent_action_start)) + .route("/torrents/:id/forget", post(torrent_action_forget)) + .route("/torrents/:id/delete", post(torrent_action_delete)); #[cfg(feature = "webui")] { @@ -244,6 +260,9 @@ pub struct TorrentDetailsResponseFile { pub included: bool, } +#[derive(Default, Serialize)] +struct EmptyJsonResponse {} + #[derive(Serialize, Deserialize)] pub struct TorrentDetailsResponse { pub info_hash: String, @@ -409,20 +428,32 @@ impl ApiInternal { .per_peer_stats_snapshot(filter)) } - fn api_torrent_action_pause(&self, idx: TorrentId) -> Result<()> { + fn api_torrent_action_pause(&self, idx: TorrentId) -> Result { let handle = self.mgr_handle(idx)?; handle .pause() .context("error pausing torrent") - .with_error_status_code(StatusCode::BAD_REQUEST) + .with_error_status_code(StatusCode::BAD_REQUEST)?; + Ok(Default::default()) } - fn api_torrent_action_start(&self, idx: TorrentId) -> Result<()> { + fn api_torrent_action_start(&self, idx: TorrentId) -> Result { let handle = self.mgr_handle(idx)?; self.session .unpause(&handle) .context("error unpausing torrent") - .with_error_status_code(StatusCode::BAD_REQUEST) + .with_error_status_code(StatusCode::BAD_REQUEST)?; + Ok(Default::default()) + } + + fn api_torrent_action_forget(&self, idx: TorrentId) -> Result { + Err(ApiError::not_implemented("forgetting not implemented yet")) + } + + fn api_torrent_action_delete(&self, idx: TorrentId) -> Result { + Err(ApiError::not_implemented( + "deleting torrent not implemented yet", + )) } pub async fn api_add_torrent( @@ -528,7 +559,7 @@ impl ApiInternal { ManagedTorrentState::Paused(p) => { resp.state = "paused"; resp.progress_bytes = p.have_bytes; - resp.finished = p.have_bytes == resp.progress_bytes; + resp.finished = p.have_bytes == resp.total_bytes; } ManagedTorrentState::Live(l) => { resp.state = "live"; diff --git a/crates/librqbit/src/http_api_error.rs b/crates/librqbit/src/http_api_error.rs index dfa68db..628eb34 100644 --- a/crates/librqbit/src/http_api_error.rs +++ b/crates/librqbit/src/http_api_error.rs @@ -19,6 +19,14 @@ impl ApiError { } } + pub fn not_implemented(msg: &str) -> Self { + Self { + status: Some(StatusCode::INTERNAL_SERVER_ERROR), + kind: ApiErrorKind::Other(anyhow::anyhow!("{}", msg)), + plaintext: false, + } + } + pub const fn dht_disabled() -> Self { Self { status: Some(StatusCode::NOT_FOUND), diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index cd8ca38..289e061 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -20,7 +20,7 @@ use crate::spawn_utils::BlockingSpawner; pub trait PeerConnectionHandler { fn on_connected(&self, _connection_time: Duration) {} fn get_have_bytes(&self) -> u64; - fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> Option; + fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> anyhow::Result; fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>; fn on_extended_handshake( &self, @@ -204,15 +204,13 @@ impl PeerConnection { .unwrap_or_else(|| Duration::from_secs(120)); if self.handler.get_have_bytes() > 0 { - if let Some(len) = self + let len = self .handler - .serialize_bitfield_message_to_buf(&mut write_buf) - { - with_timeout(rwtimeout, write_half.write_all(&write_buf[..len])) - .await - .context("error writing bitfield to peer")?; - debug!("sent bitfield"); - } + .serialize_bitfield_message_to_buf(&mut write_buf)?; + with_timeout(rwtimeout, write_half.write_all(&write_buf[..len])) + .await + .context("error writing bitfield to peer")?; + debug!("sent bitfield"); } loop { diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index fca366c..a205a0d 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -141,8 +141,8 @@ impl PeerConnectionHandler for Handler { 0 } - fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec) -> Option { - None + fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec) -> anyhow::Result { + Ok(0) } fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> { diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 2d1d656..e073a84 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -11,7 +11,7 @@ use librqbit_core::{ use parking_lot::RwLock; use reqwest::Url; use tokio_stream::StreamExt; -use tracing::{debug, info, trace_span, warn}; +use tracing::{debug, error_span, info, warn}; use crate::{ dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, @@ -412,13 +412,14 @@ impl Session { { return Ok(AddTorrentResponse::AlreadyManaged(id, handle.clone())); } - let managed_torrent = builder.build()?; + let next_id = g.torrents.len(); + let managed_torrent = builder.build(error_span!("torrent", id = next_id))?; let id = g.add_torrent(managed_torrent.clone()); (managed_torrent, id) }; { - let span = trace_span!("torrent", id = id); + let span = managed_torrent.info.span.clone(); let _ = span.enter(); managed_torrent .start(initial_peers, dht_peer_rx) diff --git a/crates/librqbit/src/spawn_utils.rs b/crates/librqbit/src/spawn_utils.rs index e108254..3b54967 100644 --- a/crates/librqbit/src/spawn_utils.rs +++ b/crates/librqbit/src/spawn_utils.rs @@ -1,9 +1,9 @@ -use tracing::{debug, error, trace, Instrument}; +use tracing::{debug, trace, Instrument}; pub fn spawn( span: tracing::Span, fut: impl std::future::Future> + Send + 'static, -) { +) -> tokio::task::JoinHandle<()> { let fut = async move { trace!("started"); match fut.await { @@ -11,12 +11,12 @@ pub fn spawn( debug!("finished"); } Err(e) => { - error!("{:#}", e) + debug!("finished with error: {:#}", e) } } } .instrument(span.or_current()); - tokio::spawn(fut); + tokio::spawn(fut) } #[derive(Clone, Copy, Debug)] diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index c40d2db..d9d2ae0 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -35,6 +35,11 @@ impl TorrentStateInitializing { } } + pub fn get_checked_bytes(&self) -> u64 { + self.checked_bytes + .load(std::sync::atomic::Ordering::Relaxed) + } + pub async fn check(&self) -> anyhow::Result { let (files, filenames) = { let mut files = diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 68a96c8..ac9f295 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -61,6 +61,7 @@ use bencode::from_bytes; use buffers::{ByteBuf, ByteString}; use clone_to_owned::CloneToOwned; use futures::{stream::FuturesUnordered, StreamExt}; +use itertools::Itertools; use librqbit_core::{ id20::Id20, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, @@ -79,7 +80,7 @@ use tokio::{ }, time::timeout, }; -use tracing::{debug, error, info, span, trace, trace_span, warn, Level}; +use tracing::{debug, error, error_span, info, trace, warn}; use url::Url; use crate::{ @@ -116,15 +117,42 @@ struct InflightPiece { started: Instant, } +fn dummy_file() -> anyhow::Result { + #[cfg(target_os = "windows")] + const DEVNULL: &str = "NUL"; + #[cfg(not(target_os = "windows"))] + const DEVNULL: &str = "/dev/null"; + + std::fs::OpenOptions::new() + .read(true) + .open(DEVNULL) + .with_context(|| format!("error opening {}", DEVNULL)) +} + pub(crate) struct TorrentStateLocked { // What chunks we have and need. - pub(crate) chunks: ChunkTracker, + // If this is None, the torrent was paused, and this live state is useless, and needs to be dropped. + pub(crate) chunks: Option, // At a moment in time, we are expecting a piece from only one peer. // inflight_pieces stores this information. inflight_pieces: HashMap, } +impl TorrentStateLocked { + pub(crate) fn get_chunks(&self) -> anyhow::Result<&ChunkTracker> { + self.chunks + .as_ref() + .context("chunk tracker empty, torrent was paused") + } + + fn get_chunks_mut(&mut self) -> anyhow::Result<&mut ChunkTracker> { + self.chunks + .as_mut() + .context("chunk tracker empty, torrent was paused") + } +} + #[derive(Default)] pub struct TorrentStateOptions { pub peer_connect_timeout: Option, @@ -134,7 +162,8 @@ pub struct TorrentStateOptions { pub struct TorrentStateLive { peers: PeerStates, meta: Arc, - locked: Arc>, + locked: RwLock, + files: Vec>>, filenames: Vec, @@ -153,6 +182,9 @@ pub struct TorrentStateLive { finished_notify: Notify, + cancel_tx: tokio::sync::watch::Sender<()>, + cancel_rx: tokio::sync::watch::Receiver<()>, + speed_estimator: SpeedEstimator, } @@ -166,13 +198,15 @@ impl TorrentStateLive { let needed_bytes = paused.info.lengths.total_length() - have_bytes; let lengths = *paused.chunk_tracker.get_lengths(); + let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(()); + let state = Arc::new(TorrentStateLive { meta: paused.info.clone(), peers: Default::default(), - locked: Arc::new(RwLock::new(TorrentStateLocked { - chunks: paused.chunk_tracker, + locked: RwLock::new(TorrentStateLocked { + chunks: Some(paused.chunk_tracker), inflight_pieces: Default::default(), - })), + }), files: paused.files, filenames: paused.filenames, stats: AtomicStats { @@ -182,46 +216,71 @@ impl TorrentStateLive { needed_bytes, have_plus_needed_bytes: needed_bytes + have_bytes, lengths, - peer_semaphore: Semaphore::new(128), peer_queue_tx, finished_notify: Notify::new(), speed_estimator, + cancel_rx, + cancel_tx, }); for tracker in state.meta.trackers.iter() { - spawn( - trace_span!("tracker_monitor", url = tracker.to_string()), + state.spawn( + error_span!(parent: state.meta.span.clone(), "tracker_monitor", url = tracker.to_string()), state.clone().task_single_tracker_monitor(tracker.clone()), ); } - spawn(span!(Level::ERROR, "speed_estimator_updater"), { - let state = state.clone(); - async move { - loop { - let stats = state.stats_snapshot(); - let fetched = stats.fetched_bytes; - let needed = state.initially_needed(); - // fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64. - let remaining = needed - .wrapping_sub(fetched) - .min(needed - stats.downloaded_and_checked_bytes); - state - .speed_estimator - .add_snapshot(fetched, remaining, Instant::now()); - tokio::time::sleep(Duration::from_secs(1)).await; + state.spawn( + error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"), + { + let state = Arc::downgrade(&state); + async move { + loop { + let state = match state.upgrade() { + Some(state) => state, + None => return Ok(()), + }; + let stats = state.stats_snapshot(); + let fetched = stats.fetched_bytes; + let needed = state.initially_needed(); + // fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64. + let remaining = needed + .wrapping_sub(fetched) + .min(needed - stats.downloaded_and_checked_bytes); + state + .speed_estimator + .add_snapshot(fetched, remaining, Instant::now()); + tokio::time::sleep(Duration::from_secs(1)).await; + } } - } - }); + }, + ); - spawn( - span!(Level::ERROR, "peer_adder"), + state.spawn( + error_span!(parent: state.meta.span.clone(), "peer_adder"), state.clone().task_peer_adder(peer_queue_rx), ); state } + fn spawn( + &self, + span: tracing::Span, + fut: impl std::future::Future> + Send + 'static, + ) { + let mut cancel_rx = self.cancel_rx.clone(); + spawn(span, async move { + tokio::select! { + r = fut => r, + _ = cancel_rx.changed() => { + error!("canceled"); + bail!("canceled") + } + } + }); + } + pub fn speed_estimator(&self) -> &SpeedEstimator { &self.speed_estimator } @@ -241,7 +300,7 @@ impl TorrentStateLive { let response = from_bytes::(&bytes)?; for peer in response.peers.iter_sockaddrs() { - self.add_peer_if_not_seen(peer); + self.add_peer_if_not_seen(peer)?; } Ok(response.interval) } @@ -345,11 +404,11 @@ impl TorrentStateLive { match res { // We disconnected the peer ourselves as we don't need it Ok(()) => { - handler.on_peer_died(None); + handler.on_peer_died(None)?; } Err(e) => { debug!("error managing peer: {:#}", e); - handler.on_peer_died(Some(e)); + handler.on_peer_died(Some(e))?; } } Ok::<_, anyhow::Error>(()) @@ -361,17 +420,17 @@ impl TorrentStateLive { ) -> anyhow::Result<()> { let state = self; loop { - let addr = peer_queue_rx.recv().await.unwrap(); + let addr = peer_queue_rx.recv().await.context("torrent closed")?; if state.is_finished() { debug!("ignoring peer {} as we are finished", addr); state.peers.mark_peer_not_needed(addr); continue; } - let permit = state.peer_semaphore.acquire().await.unwrap(); + let permit = state.peer_semaphore.acquire().await?; permit.forget(); - spawn( - span!(parent: None, Level::ERROR, "manage_peer", peer = addr.to_string()), + state.spawn( + error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()), state.clone().task_manage_peer(addr), ); } @@ -406,23 +465,28 @@ impl TorrentStateLive { TimedExistence::new(timeit(reason, || self.locked.write()), reason) } - fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { + fn get_next_needed_piece( + &self, + peer_handle: PeerHandle, + ) -> anyhow::Result> { self.peers .with_live_mut(peer_handle, "l(get_next_needed_piece)", |live| { let g = self.lock_read("g(get_next_needed_piece)"); let bf = &live.bitfield; - for n in g.chunks.iter_needed_pieces() { + for n in g.get_chunks()?.iter_needed_pieces() { if bf.get(n).map(|v| *v) == Some(true) { // in theory it should be safe without validation, but whatever. - return self.lengths.validate_piece_index(n as u32); + return Ok(self.lengths.validate_piece_index(n as u32)); } } - None - })? + Ok(None) + }) + .transpose() + .map(|r| r.flatten()) } fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { - self.get_next_needed_piece(handle).is_some() + matches!(self.get_next_needed_piece(handle), Ok(Some(_))) } fn set_peer_live(&self, handle: PeerHandle, h: Handshake) { @@ -449,6 +513,10 @@ impl TorrentStateLive { .load(Ordering::Acquire) } + pub fn get_have_bytes(&self) -> u64 { + self.stats.have_bytes.load(Ordering::Relaxed) + } + pub fn is_finished(&self) -> bool { self.get_left_to_download_bytes() == 0 } @@ -498,9 +566,11 @@ impl TorrentStateLive { } let mut unordered: FuturesUnordered<_> = futures.into_iter().collect(); - spawn( - span!( - Level::ERROR, + + // We don't want to remember this task as there may be too many. + self.spawn( + error_span!( + parent: self.meta.span.clone(), "transmit_haves", piece = index.get(), count = unordered.len() @@ -512,14 +582,14 @@ impl TorrentStateLive { ); } - pub(crate) fn add_peer_if_not_seen(&self, addr: SocketAddr) -> bool { + pub(crate) fn add_peer_if_not_seen(&self, addr: SocketAddr) -> anyhow::Result { match self.peers.add_if_not_seen(addr) { Some(handle) => handle, - None => return false, + None => return Ok(false), }; - self.peer_queue_tx.send(addr).unwrap(); - true + self.peer_queue_tx.send(addr)?; + Ok(true) } pub fn stats_snapshot(&self) -> StatsSnapshot { @@ -560,7 +630,34 @@ impl TorrentStateLive { } pub fn pause(&self) -> anyhow::Result { - bail!("pause not implemented yet") + let _ = self.cancel_tx.send(()); + + let mut g = self.locked.write(); + + let files = self + .files + .iter() + .map(|f| { + let mut f = f.lock(); + let dummy = dummy_file()?; + let f = std::mem::replace(&mut *f, dummy); + Ok::<_, anyhow::Error>(Arc::new(Mutex::new(f))) + }) + .try_collect()?; + + let filenames = self.filenames.clone(); + + // g.chunks; + Ok(TorrentStatePaused { + info: self.meta.clone(), + files, + filenames, + chunk_tracker: g + .chunks + .take() + .context("bug: pausing already paused torrent")?, + have_bytes: self.get_have_bytes(), + }) } } @@ -633,16 +730,12 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { Ok(()) } - fn get_have_bytes(&self) -> u64 { - self.state.stats.have_bytes.load(Ordering::Relaxed) - } - - fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> Option { + fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> anyhow::Result { let g = self.state.lock_read("serialize_bitfield_message_to_buf"); - let msg = Message::Bitfield(ByteBuf(g.chunks.get_have_pieces().as_raw_slice())); - let len = msg.serialize(buf, None).unwrap(); + let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_raw_slice())); + let len = msg.serialize(buf, None)?; debug!("sending: {:?}, length={}", &msg, len); - Some(len) + Ok(len) } fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> { @@ -664,10 +757,14 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn on_extended_handshake(&self, _: &ExtendedHandshake) -> anyhow::Result<()> { Ok(()) } + + fn get_have_bytes(&self) -> u64 { + self.state.get_have_bytes() + } } impl PeerHandler { - fn on_peer_died(self, error: Option) { + fn on_peer_died(self, error: Option) -> anyhow::Result<()> { let peers = &self.state.peers; let pstats = &peers.stats; let handle = self.addr; @@ -675,7 +772,7 @@ impl PeerHandler { Some(peer) => TimedExistence::new(peer, "on_peer_died"), None => { warn!("bug: peer not found in table. Forgetting it forever"); - return; + return Ok(()); } }; let prev = pe.value_mut().state.take(pstats); @@ -690,20 +787,21 @@ impl PeerHandler { req.piece.get(), req.chunk ); - g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk); + g.get_chunks_mut()? + .mark_chunk_request_cancelled(req.piece, req.chunk); } } PeerState::NotNeeded => { // Restore it as std::mem::take() replaced it above. pe.value_mut().state.set(PeerState::NotNeeded, pstats); - return; + return Ok(()); } s @ PeerState::Queued | s @ PeerState::Dead => { warn!("bug: peer was in a wrong state {s:?}, ignoring it forever"); // Prevent deadlocks. drop(pe); self.state.peers.drop_peer(handle); - return; + return Ok(()); } }; @@ -712,7 +810,7 @@ impl PeerHandler { None => { debug!("peer died without errors, not re-queueing"); pe.value_mut().state.set(PeerState::NotNeeded, pstats); - return; + return Ok(()); } }; @@ -721,7 +819,7 @@ impl PeerHandler { if self.state.is_finished() { debug!("torrent finished, not re-queueing"); pe.value_mut().state.set(PeerState::NotNeeded, pstats); - return; + return Ok(()); } pe.value_mut().state.set(PeerState::Dead, pstats); @@ -732,10 +830,9 @@ impl PeerHandler { drop(pe); if let Some(dur) = backoff { - spawn( - span!( - parent: None, - Level::ERROR, + self.state.clone().spawn( + error_span!( + parent: self.state.meta.span.clone(), "wait_for_peer", peer = handle.to_string(), duration = format!("{dur:?}") @@ -764,31 +861,35 @@ impl PeerHandler { } else { debug!("dropping peer, backoff exhausted"); self.state.peers.drop_peer(handle); - } + }; + Ok(()) } - fn reserve_next_needed_piece(&self) -> Option { + fn reserve_next_needed_piece(&self) -> anyhow::Result> { // TODO: locking one inside the other in different order results in deadlocks. self.state .peers .with_live_mut(self.addr, "reserve_next_needed_piece", |live| { if self.locked.read().i_am_choked { debug!("we are choked, can't reserve next piece"); - return None; + return Ok(None); } let mut g = self.state.lock_write("reserve_next_needed_piece"); let n = { let mut n_opt = None; let bf = &live.bitfield; - for n in g.chunks.iter_needed_pieces() { + for n in g.get_chunks()?.iter_needed_pieces() { if bf.get(n).map(|v| *v) == Some(true) { n_opt = Some(n); break; } } - self.state.lengths.validate_piece_index(n_opt? as u32)? + self.state + .lengths + .validate_piece_index(n_opt.context("invalid n_opt")? as u32) + .context("invalid piece")? }; g.inflight_pieces.insert( n, @@ -797,10 +898,11 @@ impl PeerHandler { started: Instant::now(), }, ); - g.chunks.reserve_needed_piece(n); - Some(n) + g.get_chunks_mut()?.reserve_needed_piece(n); + Ok(Some(n)) }) - .flatten() + .transpose() + .map(|r| r.flatten()) } fn try_steal_old_slow_piece(&self, threshold: f64) -> Option { @@ -865,7 +967,7 @@ impl PeerHandler { if !self .state .lock_read("is_chunk_ready_to_upload") - .chunks + .get_chunks()? .is_chunk_ready_to_upload(&chunk_info) { anyhow::bail!( @@ -965,7 +1067,7 @@ impl PeerHandler { // Afterwards means we are close to completion, try stealing more aggressively. let next = match self .try_steal_old_slow_piece(10.) - .or_else(|| self.reserve_next_needed_piece()) + .or_else(|| self.reserve_next_needed_piece().ok().flatten()) .or_else(|| self.try_steal_old_slow_piece(2.)) { Some(next) => next, @@ -1037,18 +1139,6 @@ impl PeerHandler { } fn reopen_read_only(&self) -> anyhow::Result<()> { - fn dummy_file() -> anyhow::Result { - #[cfg(target_os = "windows")] - const DEVNULL: &str = "NUL"; - #[cfg(not(target_os = "windows"))] - const DEVNULL: &str = "/dev/null"; - - std::fs::OpenOptions::new() - .read(true) - .open(DEVNULL) - .with_context(|| format!("error opening {}", DEVNULL)) - } - // Lock exclusive just in case to ensure in-flight operations finish.?? let _guard = self.state.lock_write("reopen_read_only"); @@ -1140,7 +1230,7 @@ impl PeerHandler { } }; - match g.chunks.mark_chunk_downloaded(&piece) { + match g.get_chunks_mut()?.mark_chunk_downloaded(&piece) { Some(ChunkMarkingResult::Completed) => { debug!("piece={} done, will write and checksum", piece.index,); // This will prevent others from stealing it. @@ -1204,7 +1294,8 @@ impl PeerHandler { true => { { let mut g = self.state.lock_write("mark_piece_downloaded"); - g.chunks.mark_piece_downloaded(chunk_info.piece_index); + g.get_chunks_mut()? + .mark_piece_downloaded(chunk_info.piece_index); } // Global piece counters. @@ -1256,7 +1347,7 @@ impl PeerHandler { warn!("checksum for piece={} did not validate", index,); self.state .lock_write("mark_piece_broken") - .chunks + .get_chunks_mut()? .mark_piece_broken(chunk_info.piece_index); } }; diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index a81a59f..e2637e1 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -21,7 +21,7 @@ pub use live::*; use parking_lot::RwLock; use tokio_stream::StreamExt; -use tracing::trace_span; +use tracing::error_span; use url::Url; use crate::chunk_tracker::ChunkTracker; @@ -75,6 +75,7 @@ pub struct ManagedTorrentInfo { pub trackers: Vec, pub peer_id: Id20, pub lengths: Lengths, + pub span: tracing::Span, pub(crate) options: ManagedTorrentOptions, } @@ -89,6 +90,10 @@ impl ManagedTorrent { &self.info } + pub fn get_total_bytes(&self) -> u64 { + self.info.lengths.total_length() + } + pub fn info_hash(&self) -> Id20 { self.info.info_hash } @@ -105,7 +110,7 @@ impl ManagedTorrent { let g = self.locked.read(); match &g.state { ManagedTorrentState::Paused(p) => Ok(f(&p.chunk_tracker)), - ManagedTorrentState::Live(l) => Ok(f(&l.lock_read("chunk_tracker").chunks)), + ManagedTorrentState::Live(l) => Ok(f(l.lock_read("chunk_tracker").get_chunks()?)), _ => bail!("no chunk tracker, torrent neither paused nor live"), } } @@ -131,42 +136,51 @@ impl ManagedTorrent { ManagedTorrentState::Initializing(init) => { let init = init.clone(); let t = self.clone(); - spawn(trace_span!("initialize_and_start"), async move { - match init.check().await { - Ok(paused) => { - let live = TorrentStateLive::new(paused); - t.locked.write().state = ManagedTorrentState::Live(live.clone()); + let span = self.info.span.clone(); + spawn( + error_span!(parent: span.clone(), "initialize_and_start"), + async move { + match init.check().await { + Ok(paused) => { + let live = TorrentStateLive::new(paused); + t.locked.write().state = ManagedTorrentState::Live(live.clone()); - let live = Arc::downgrade(&live); - spawn(trace_span!("peer_adder"), async move { - { - let live: Arc = - live.upgrade().context("no longer live")?; - for peer in initial_peers { - live.add_peer_if_not_seen(peer); - } - } + let live = Arc::downgrade(&live); + spawn( + error_span!(parent: span.clone(), "external_peer_adder"), + async move { + { + let live: Arc = + live.upgrade().context("no longer live")?; + for peer in initial_peers { + live.add_peer_if_not_seen(peer) + .context("torrent closed")?; + } + } - if let Some(mut peer_rx) = peer_rx { - while let Some(peer) = peer_rx.next().await { - live.upgrade() - .context("no longer live")? - .add_peer_if_not_seen(peer); - } - } + if let Some(mut peer_rx) = peer_rx { + while let Some(peer) = peer_rx.next().await { + live.upgrade() + .context("no longer live")? + .add_peer_if_not_seen(peer) + .context("torrent closed")?; + } + } + + Ok(()) + }, + ); Ok(()) - }); - - Ok(()) + } + Err(err) => { + let result = anyhow::anyhow!("{:?}", err); + t.locked.write().state = ManagedTorrentState::Error(err); + Err(result) + } } - Err(err) => { - let result = anyhow::anyhow!("{:?}", err); - t.locked.write().state = ManagedTorrentState::Error(err); - Err(result) - } - } - }); + }, + ); Ok(()) } ManagedTorrentState::Paused(_) => { @@ -288,9 +302,10 @@ impl ManagedTorrentBuilder { self } - pub(crate) fn build(self) -> anyhow::Result { + pub(crate) fn build(self, span: tracing::Span) -> anyhow::Result { let lengths = Lengths::from_torrent(&self.info)?; let info = Arc::new(ManagedTorrentInfo { + span, info: self.info, info_hash: self.info_hash, out_dir: self.output_folder, diff --git a/crates/librqbit/webui/src/index.tsx b/crates/librqbit/webui/src/index.tsx index efdb1b8..57d2829 100644 --- a/crates/librqbit/webui/src/index.tsx +++ b/crates/librqbit/webui/src/index.tsx @@ -14,6 +14,7 @@ interface ContextType { } const AppContext = createContext(null); +const RefreshTorrentStatsContext = createContext<{ refresh: () => void }>(null); const IconButton: React.FC<{ className: string, @@ -39,6 +40,8 @@ const DeleteTorrentModal = ({ id, show, onHide }) => { const [error, setError] = useState(null); const [deleting, setDeleting] = useState(false); + const refreshCtx = useContext(RefreshTorrentStatsContext); + const close = () => { setDeleteFiles(false); setError(null); @@ -52,6 +55,7 @@ const DeleteTorrentModal = ({ id, show, onHide }) => { const call = deleteFiles ? API.delete : API.forget; call(id).then(() => { + refreshCtx.refresh(); close(); }).catch((e) => { setError({ @@ -99,6 +103,8 @@ const TorrentActions: React.FC<{ let [disabled, setDisabled] = useState(false); let [deleting, setDeleting] = useState(false); + let refreshCtx = useContext(RefreshTorrentStatsContext); + const canPause = state == 'live'; const canUnpause = state == 'paused'; @@ -106,22 +112,22 @@ const TorrentActions: React.FC<{ const unpause = () => { setDisabled(true); - API.start(id).finally(() => setDisabled(false)).catch((e) => { + API.start(id).then(() => { refreshCtx.refresh() }, (e) => { ctx.setCloseableError({ text: `Error starting torrent id=${id}`, details: e, }); - }) + }).finally(() => setDisabled(false)) }; const pause = () => { setDisabled(true); - API.pause(id).finally(() => setDisabled(false)).catch((e) => { + API.pause(id).then(() => { refreshCtx.refresh() }, (e) => { ctx.setCloseableError({ text: `Error pausing torrent id=${id}`, details: e, }); - }) + }).finally(() => setDisabled(false)) }; const startDeleting = () => { @@ -241,21 +247,34 @@ const Torrent = ({ id, torrent }) => { } }, [detailsResponse]); + const refreshStats = () => API.getTorrentStats(torrent.id).then((stats) => { + updateStatsResponse(stats); + return stats; + }); + // Update stats once then forever. useEffect(() => customSetInterval((async () => { const errorInterval = 10000; - const liveInterval = 500; - const finishedInterval = 5000; + const liveInterval = 1000; + const finishedInterval = 10000; + const nonLiveInterval = 10000; - return API.getTorrentStats(torrent.id).then((stats) => { - updateStatsResponse(stats); - return torrentIsDone(stats) ? finishedInterval : liveInterval; + return refreshStats().then((stats) => { + if (stats.finished) { + return finishedInterval; + } + if (stats.state == STATE_INITIALIZING || stats.state == STATE_LIVE) { + return liveInterval; + } + return nonLiveInterval; }, (e) => { return errorInterval; }); }), 0), []); - return + return + + } const TorrentsList = (props: { torrents: Array, loading: boolean }) => { @@ -541,10 +560,6 @@ const RootContent = (props: { closeableError: ErrorDetails, otherError: ErrorDet }; -function torrentIsDone(stats: TorrentStats): boolean { - return stats.finished; -} - function formatBytes(bytes: number): string { if (bytes === 0) return '0 Bytes'; diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index e1b923b..be85289 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -241,7 +241,12 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()> for (idx, torrent) in torrents { let live = torrent.with_state(|s| { match s { - ManagedTorrentState::Initializing(_) => info!("[{}] initializing", idx), + ManagedTorrentState::Initializing(i) => { + let total = torrent.get_total_bytes(); + let progress = i.get_checked_bytes(); + let pct = (progress as f64 / total as f64) * 100f64; + info!("[{}] initializing {:.2}%", idx, pct) + }, ManagedTorrentState::Live(h) => return Some(h.clone()), _ => {}, };