This commit is contained in:
Igor Katson 2021-07-04 14:38:44 +01:00
parent b4f6d8b93d
commit 44c760bcc2
5 changed files with 23 additions and 10 deletions

View file

@ -7,14 +7,15 @@ use warp::Filter;
use crate::torrent_state::TorrentState; use crate::torrent_state::TorrentState;
// This is just a stub for debugging, nothing useful here. // This is just a stub for debugging.
// A real http api would know about ALL torrents we are downloading, not just one.
pub async fn make_and_run_http_api( pub async fn make_and_run_http_api(
state: Arc<TorrentState>, state: Arc<TorrentState>,
estimator: Arc<SpeedEstimator>, estimator: Arc<SpeedEstimator>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let dump_haves = warp::path("haves").map({ let dump_haves = warp::path("haves").map({
let state = state.clone(); let state = state.clone();
move || format!("{:?}", state.locked.read().chunks.get_have_pieces()) move || format!("{:?}", state.lock_read().chunks.get_have_pieces())
}); });
let dump_stats = warp::path("stats").map({ let dump_stats = warp::path("stats").map({

View file

@ -1,4 +1,4 @@
use std::{collections::HashSet, net::SocketAddr, sync::Arc}; use std::{collections::HashSet, sync::Arc};
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
use tokio::sync::{Notify, Semaphore}; use tokio::sync::{Notify, Semaphore};

View file

@ -4,7 +4,7 @@ use std::{
net::SocketAddr, net::SocketAddr,
ops::Deref, ops::Deref,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{atomic::Ordering, Arc}, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -117,6 +117,10 @@ impl TorrentManagerHandle {
pub fn add_peer(&self, addr: SocketAddr) -> bool { pub fn add_peer(&self, addr: SocketAddr) -> bool {
self.manager.state.add_peer_if_not_seen(addr) self.manager.state.add_peer_if_not_seen(addr)
} }
// Not sure why anyone would need that, but as this is a library...
pub fn torrent_state(&self) -> &TorrentState {
&self.manager.state
}
pub async fn cancel(&self) -> anyhow::Result<()> { pub async fn cancel(&self) -> anyhow::Result<()> {
todo!() todo!()
} }
@ -256,8 +260,8 @@ impl TorrentManager {
async fn stats_printer(&self) -> anyhow::Result<()> { async fn stats_printer(&self) -> anyhow::Result<()> {
loop { loop {
let live_peer_stats = self.state.locked.read().peers.stats(); let live_peer_stats = self.state.lock_read().peers.stats();
let seen_peers_count = self.state.locked.read().peers.seen().len(); let seen_peers_count = self.state.lock_read().peers.seen().len();
let stats = self.state.stats_snapshot(); let stats = self.state.stats_snapshot();
let needed = self.state.initially_needed(); let needed = self.state.initially_needed();
let downloaded_pct = if stats.remaining_bytes == 0 { let downloaded_pct = if stats.remaining_bytes == 0 {

View file

@ -19,7 +19,7 @@ use librqbit_core::{
torrent_metainfo::TorrentMetaV1Info, torrent_metainfo::TorrentMetaV1Info,
}; };
use log::{debug, info, trace, warn}; use log::{debug, info, trace, warn};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock, RwLockReadGuard};
use peer_binary_protocol::{ use peer_binary_protocol::{
extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request, extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request,
}; };
@ -215,7 +215,7 @@ impl StatsSnapshot {
pub struct TorrentState { pub struct TorrentState {
info: TorrentMetaV1Info<ByteString>, info: TorrentMetaV1Info<ByteString>,
pub locked: Arc<RwLock<TorrentStateLocked>>, locked: Arc<RwLock<TorrentStateLocked>>,
files: Vec<Arc<Mutex<File>>>, files: Vec<Arc<Mutex<File>>>,
info_hash: [u8; 20], info_hash: [u8; 20],
peer_id: [u8; 20], peer_id: [u8; 20],
@ -314,6 +314,9 @@ impl TorrentState {
pub fn initially_needed(&self) -> u64 { pub fn initially_needed(&self) -> u64 {
self.needed self.needed
} }
pub fn lock_read(&self) -> RwLockReadGuard<TorrentStateLocked> {
self.locked.read()
}
fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> { fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
let g = self.locked.read(); let g = self.locked.read();
@ -532,7 +535,7 @@ impl TorrentState {
downloaded_and_checked_bytes: downloaded, downloaded_and_checked_bytes: downloaded,
downloaded_and_checked_pieces: self.stats.downloaded_pieces.load(Relaxed), downloaded_and_checked_pieces: self.stats.downloaded_pieces.load(Relaxed),
fetched_bytes: self.stats.fetched_bytes.load(Relaxed), fetched_bytes: self.stats.fetched_bytes.load(Relaxed),
uploaded_bytes: self.stats.fetched_bytes.load(Relaxed), uploaded_bytes: self.stats.uploaded.load(Relaxed),
live_peers: peer_stats.live as u32, live_peers: peer_stats.live as u32,
seen_peers: g.peers.seen.len() as u32, seen_peers: g.peers.seen.len() as u32,
connecting_peers: peer_stats.connecting as u32, connecting_peers: peer_stats.connecting as u32,

View file

@ -518,7 +518,12 @@ impl<'a> Handshake<'a> {
expected_len, expected_len,
"handshake", "handshake",
))?; ))?;
Ok((Self::bopts().deserialize(&hbuf).unwrap(), expected_len)) Ok((
Self::bopts()
.deserialize(&hbuf)
.map_err(|e| MessageDeserializeError::Other(e.into()))?,
expected_len,
))
} }
pub fn serialize(&self, buf: &mut Vec<u8>) { pub fn serialize(&self, buf: &mut Vec<u8>) {
Self::bopts().serialize_into(buf, &self).unwrap() Self::bopts().serialize_into(buf, &self).unwrap()