diff --git a/crates/librqbit/src/http_api.rs b/crates/librqbit/src/http_api.rs index 3c98768..ba0ce27 100644 --- a/crates/librqbit/src/http_api.rs +++ b/crates/librqbit/src/http_api.rs @@ -7,14 +7,15 @@ use warp::Filter; use crate::torrent_state::TorrentState; -// This is just a stub for debugging, nothing useful here. +// This is just a stub for debugging. +// A real http api would know about ALL torrents we are downloading, not just one. pub async fn make_and_run_http_api( state: Arc, estimator: Arc, ) -> anyhow::Result<()> { let dump_haves = warp::path("haves").map({ let state = state.clone(); - move || format!("{:?}", state.locked.read().chunks.get_have_pieces()) + move || format!("{:?}", state.lock_read().chunks.get_have_pieces()) }); let dump_stats = warp::path("stats").map({ diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index c5f23ff..b03e70f 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use std::{collections::HashSet, sync::Arc}; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; use tokio::sync::{Notify, Semaphore}; diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index 2e4f1fb..c54ae15 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -4,7 +4,7 @@ use std::{ net::SocketAddr, ops::Deref, path::{Path, PathBuf}, - sync::{atomic::Ordering, Arc}, + sync::Arc, time::{Duration, Instant}, }; @@ -117,6 +117,10 @@ impl TorrentManagerHandle { pub fn add_peer(&self, addr: SocketAddr) -> bool { self.manager.state.add_peer_if_not_seen(addr) } + // Not sure why anyone would need that, but as this is a library... + pub fn torrent_state(&self) -> &TorrentState { + &self.manager.state + } pub async fn cancel(&self) -> anyhow::Result<()> { todo!() } @@ -256,8 +260,8 @@ impl TorrentManager { async fn stats_printer(&self) -> anyhow::Result<()> { loop { - let live_peer_stats = self.state.locked.read().peers.stats(); - let seen_peers_count = self.state.locked.read().peers.seen().len(); + let live_peer_stats = self.state.lock_read().peers.stats(); + let seen_peers_count = self.state.lock_read().peers.seen().len(); let stats = self.state.stats_snapshot(); let needed = self.state.initially_needed(); let downloaded_pct = if stats.remaining_bytes == 0 { diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 4cc4ce9..5aad406 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -19,7 +19,7 @@ use librqbit_core::{ torrent_metainfo::TorrentMetaV1Info, }; use log::{debug, info, trace, warn}; -use parking_lot::{Mutex, RwLock}; +use parking_lot::{Mutex, RwLock, RwLockReadGuard}; use peer_binary_protocol::{ extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request, }; @@ -215,7 +215,7 @@ impl StatsSnapshot { pub struct TorrentState { info: TorrentMetaV1Info, - pub locked: Arc>, + locked: Arc>, files: Vec>>, info_hash: [u8; 20], peer_id: [u8; 20], @@ -314,6 +314,9 @@ impl TorrentState { pub fn initially_needed(&self) -> u64 { self.needed } + pub fn lock_read(&self) -> RwLockReadGuard { + self.locked.read() + } fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { let g = self.locked.read(); @@ -532,7 +535,7 @@ impl TorrentState { downloaded_and_checked_bytes: downloaded, downloaded_and_checked_pieces: self.stats.downloaded_pieces.load(Relaxed), fetched_bytes: self.stats.fetched_bytes.load(Relaxed), - uploaded_bytes: self.stats.fetched_bytes.load(Relaxed), + uploaded_bytes: self.stats.uploaded.load(Relaxed), live_peers: peer_stats.live as u32, seen_peers: g.peers.seen.len() as u32, connecting_peers: peer_stats.connecting as u32, diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index 9e93b1f..844ad8f 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -518,7 +518,12 @@ impl<'a> Handshake<'a> { expected_len, "handshake", ))?; - Ok((Self::bopts().deserialize(&hbuf).unwrap(), expected_len)) + Ok(( + Self::bopts() + .deserialize(&hbuf) + .map_err(|e| MessageDeserializeError::Other(e.into()))?, + expected_len, + )) } pub fn serialize(&self, buf: &mut Vec) { Self::bopts().serialize_into(buf, &self).unwrap()