This commit is contained in:
Igor Katson 2023-11-24 18:28:46 +00:00
parent 0b8580dacd
commit d7a37c1b48
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
11 changed files with 337 additions and 168 deletions

View file

@ -127,14 +127,28 @@ impl HttpApi {
State(state): State<ApiState>,
Path(idx): Path<usize>,
) -> Result<impl IntoResponse> {
state.api_torrent_action_pause(idx)
state.api_torrent_action_pause(idx).map(axum::Json)
}
async fn torrent_action_start(
State(state): State<ApiState>,
Path(idx): Path<usize>,
) -> Result<impl IntoResponse> {
state.api_torrent_action_start(idx)
state.api_torrent_action_start(idx).map(axum::Json)
}
async fn torrent_action_forget(
State(state): State<ApiState>,
Path(idx): Path<usize>,
) -> Result<impl IntoResponse> {
state.api_torrent_action_forget(idx).map(axum::Json)
}
async fn torrent_action_delete(
State(state): State<ApiState>,
Path(idx): Path<usize>,
) -> Result<impl IntoResponse> {
state.api_torrent_action_delete(idx).map(axum::Json)
}
#[allow(unused_mut)]
@ -149,7 +163,9 @@ impl HttpApi {
.route("/torrents/:id/stats/v1", get(torrent_stats_v1))
.route("/torrents/:id/peer_stats", get(peer_stats))
.route("/torrents/:id/pause", post(torrent_action_pause))
.route("/torrents/:id/start", post(torrent_action_start));
.route("/torrents/:id/start", post(torrent_action_start))
.route("/torrents/:id/forget", post(torrent_action_forget))
.route("/torrents/:id/delete", post(torrent_action_delete));
#[cfg(feature = "webui")]
{
@ -244,6 +260,9 @@ pub struct TorrentDetailsResponseFile {
pub included: bool,
}
#[derive(Default, Serialize)]
struct EmptyJsonResponse {}
#[derive(Serialize, Deserialize)]
pub struct TorrentDetailsResponse {
pub info_hash: String,
@ -409,20 +428,32 @@ impl ApiInternal {
.per_peer_stats_snapshot(filter))
}
fn api_torrent_action_pause(&self, idx: TorrentId) -> Result<()> {
fn api_torrent_action_pause(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
let handle = self.mgr_handle(idx)?;
handle
.pause()
.context("error pausing torrent")
.with_error_status_code(StatusCode::BAD_REQUEST)
.with_error_status_code(StatusCode::BAD_REQUEST)?;
Ok(Default::default())
}
fn api_torrent_action_start(&self, idx: TorrentId) -> Result<()> {
fn api_torrent_action_start(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
let handle = self.mgr_handle(idx)?;
self.session
.unpause(&handle)
.context("error unpausing torrent")
.with_error_status_code(StatusCode::BAD_REQUEST)
.with_error_status_code(StatusCode::BAD_REQUEST)?;
Ok(Default::default())
}
fn api_torrent_action_forget(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
Err(ApiError::not_implemented("forgetting not implemented yet"))
}
fn api_torrent_action_delete(&self, idx: TorrentId) -> Result<EmptyJsonResponse> {
Err(ApiError::not_implemented(
"deleting torrent not implemented yet",
))
}
pub async fn api_add_torrent(
@ -528,7 +559,7 @@ impl ApiInternal {
ManagedTorrentState::Paused(p) => {
resp.state = "paused";
resp.progress_bytes = p.have_bytes;
resp.finished = p.have_bytes == resp.progress_bytes;
resp.finished = p.have_bytes == resp.total_bytes;
}
ManagedTorrentState::Live(l) => {
resp.state = "live";

View file

@ -19,6 +19,14 @@ impl ApiError {
}
}
pub fn not_implemented(msg: &str) -> Self {
Self {
status: Some(StatusCode::INTERNAL_SERVER_ERROR),
kind: ApiErrorKind::Other(anyhow::anyhow!("{}", msg)),
plaintext: false,
}
}
pub const fn dht_disabled() -> Self {
Self {
status: Some(StatusCode::NOT_FOUND),

View file

@ -20,7 +20,7 @@ use crate::spawn_utils::BlockingSpawner;
pub trait PeerConnectionHandler {
fn on_connected(&self, _connection_time: Duration) {}
fn get_have_bytes(&self) -> u64;
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> Option<usize>;
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize>;
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()>;
fn on_extended_handshake(
&self,
@ -204,15 +204,13 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
.unwrap_or_else(|| Duration::from_secs(120));
if self.handler.get_have_bytes() > 0 {
if let Some(len) = self
let len = self
.handler
.serialize_bitfield_message_to_buf(&mut write_buf)
{
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
.await
.context("error writing bitfield to peer")?;
debug!("sent bitfield");
}
.serialize_bitfield_message_to_buf(&mut write_buf)?;
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
.await
.context("error writing bitfield to peer")?;
debug!("sent bitfield");
}
loop {

View file

@ -141,8 +141,8 @@ impl PeerConnectionHandler for Handler {
0
}
fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec<u8>) -> Option<usize> {
None
fn serialize_bitfield_message_to_buf(&self, _buf: &mut Vec<u8>) -> anyhow::Result<usize> {
Ok(0)
}
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> {

View file

@ -11,7 +11,7 @@ use librqbit_core::{
use parking_lot::RwLock;
use reqwest::Url;
use tokio_stream::StreamExt;
use tracing::{debug, info, trace_span, warn};
use tracing::{debug, error_span, info, warn};
use crate::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
@ -412,13 +412,14 @@ impl Session {
{
return Ok(AddTorrentResponse::AlreadyManaged(id, handle.clone()));
}
let managed_torrent = builder.build()?;
let next_id = g.torrents.len();
let managed_torrent = builder.build(error_span!("torrent", id = next_id))?;
let id = g.add_torrent(managed_torrent.clone());
(managed_torrent, id)
};
{
let span = trace_span!("torrent", id = id);
let span = managed_torrent.info.span.clone();
let _ = span.enter();
managed_torrent
.start(initial_peers, dht_peer_rx)

View file

@ -1,9 +1,9 @@
use tracing::{debug, error, trace, Instrument};
use tracing::{debug, trace, Instrument};
pub fn spawn(
span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) {
) -> tokio::task::JoinHandle<()> {
let fut = async move {
trace!("started");
match fut.await {
@ -11,12 +11,12 @@ pub fn spawn(
debug!("finished");
}
Err(e) => {
error!("{:#}", e)
debug!("finished with error: {:#}", e)
}
}
}
.instrument(span.or_current());
tokio::spawn(fut);
tokio::spawn(fut)
}
#[derive(Clone, Copy, Debug)]

View file

@ -35,6 +35,11 @@ impl TorrentStateInitializing {
}
}
pub fn get_checked_bytes(&self) -> u64 {
self.checked_bytes
.load(std::sync::atomic::Ordering::Relaxed)
}
pub async fn check(&self) -> anyhow::Result<TorrentStatePaused> {
let (files, filenames) = {
let mut files =

View file

@ -61,6 +61,7 @@ use bencode::from_bytes;
use buffers::{ByteBuf, ByteString};
use clone_to_owned::CloneToOwned;
use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use librqbit_core::{
id20::Id20,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
@ -79,7 +80,7 @@ use tokio::{
},
time::timeout,
};
use tracing::{debug, error, info, span, trace, trace_span, warn, Level};
use tracing::{debug, error, error_span, info, trace, warn};
use url::Url;
use crate::{
@ -116,15 +117,42 @@ struct InflightPiece {
started: Instant,
}
fn dummy_file() -> anyhow::Result<std::fs::File> {
#[cfg(target_os = "windows")]
const DEVNULL: &str = "NUL";
#[cfg(not(target_os = "windows"))]
const DEVNULL: &str = "/dev/null";
std::fs::OpenOptions::new()
.read(true)
.open(DEVNULL)
.with_context(|| format!("error opening {}", DEVNULL))
}
pub(crate) struct TorrentStateLocked {
// What chunks we have and need.
pub(crate) chunks: ChunkTracker,
// If this is None, the torrent was paused, and this live state is useless, and needs to be dropped.
pub(crate) chunks: Option<ChunkTracker>,
// At a moment in time, we are expecting a piece from only one peer.
// inflight_pieces stores this information.
inflight_pieces: HashMap<ValidPieceIndex, InflightPiece>,
}
impl TorrentStateLocked {
pub(crate) fn get_chunks(&self) -> anyhow::Result<&ChunkTracker> {
self.chunks
.as_ref()
.context("chunk tracker empty, torrent was paused")
}
fn get_chunks_mut(&mut self) -> anyhow::Result<&mut ChunkTracker> {
self.chunks
.as_mut()
.context("chunk tracker empty, torrent was paused")
}
}
#[derive(Default)]
pub struct TorrentStateOptions {
pub peer_connect_timeout: Option<Duration>,
@ -134,7 +162,8 @@ pub struct TorrentStateOptions {
pub struct TorrentStateLive {
peers: PeerStates,
meta: Arc<ManagedTorrentInfo>,
locked: Arc<RwLock<TorrentStateLocked>>,
locked: RwLock<TorrentStateLocked>,
files: Vec<Arc<Mutex<File>>>,
filenames: Vec<PathBuf>,
@ -153,6 +182,9 @@ pub struct TorrentStateLive {
finished_notify: Notify,
cancel_tx: tokio::sync::watch::Sender<()>,
cancel_rx: tokio::sync::watch::Receiver<()>,
speed_estimator: SpeedEstimator,
}
@ -166,13 +198,15 @@ impl TorrentStateLive {
let needed_bytes = paused.info.lengths.total_length() - have_bytes;
let lengths = *paused.chunk_tracker.get_lengths();
let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(());
let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(),
peers: Default::default(),
locked: Arc::new(RwLock::new(TorrentStateLocked {
chunks: paused.chunk_tracker,
locked: RwLock::new(TorrentStateLocked {
chunks: Some(paused.chunk_tracker),
inflight_pieces: Default::default(),
})),
}),
files: paused.files,
filenames: paused.filenames,
stats: AtomicStats {
@ -182,46 +216,71 @@ impl TorrentStateLive {
needed_bytes,
have_plus_needed_bytes: needed_bytes + have_bytes,
lengths,
peer_semaphore: Semaphore::new(128),
peer_queue_tx,
finished_notify: Notify::new(),
speed_estimator,
cancel_rx,
cancel_tx,
});
for tracker in state.meta.trackers.iter() {
spawn(
trace_span!("tracker_monitor", url = tracker.to_string()),
state.spawn(
error_span!(parent: state.meta.span.clone(), "tracker_monitor", url = tracker.to_string()),
state.clone().task_single_tracker_monitor(tracker.clone()),
);
}
spawn(span!(Level::ERROR, "speed_estimator_updater"), {
let state = state.clone();
async move {
loop {
let stats = state.stats_snapshot();
let fetched = stats.fetched_bytes;
let needed = state.initially_needed();
// fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64.
let remaining = needed
.wrapping_sub(fetched)
.min(needed - stats.downloaded_and_checked_bytes);
state
.speed_estimator
.add_snapshot(fetched, remaining, Instant::now());
tokio::time::sleep(Duration::from_secs(1)).await;
state.spawn(
error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"),
{
let state = Arc::downgrade(&state);
async move {
loop {
let state = match state.upgrade() {
Some(state) => state,
None => return Ok(()),
};
let stats = state.stats_snapshot();
let fetched = stats.fetched_bytes;
let needed = state.initially_needed();
// fetched can be too high in theory, so for safety make sure that it doesn't wrap around u64.
let remaining = needed
.wrapping_sub(fetched)
.min(needed - stats.downloaded_and_checked_bytes);
state
.speed_estimator
.add_snapshot(fetched, remaining, Instant::now());
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
});
},
);
spawn(
span!(Level::ERROR, "peer_adder"),
state.spawn(
error_span!(parent: state.meta.span.clone(), "peer_adder"),
state.clone().task_peer_adder(peer_queue_rx),
);
state
}
fn spawn(
&self,
span: tracing::Span,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) {
let mut cancel_rx = self.cancel_rx.clone();
spawn(span, async move {
tokio::select! {
r = fut => r,
_ = cancel_rx.changed() => {
error!("canceled");
bail!("canceled")
}
}
});
}
pub fn speed_estimator(&self) -> &SpeedEstimator {
&self.speed_estimator
}
@ -241,7 +300,7 @@ impl TorrentStateLive {
let response = from_bytes::<TrackerResponse>(&bytes)?;
for peer in response.peers.iter_sockaddrs() {
self.add_peer_if_not_seen(peer);
self.add_peer_if_not_seen(peer)?;
}
Ok(response.interval)
}
@ -345,11 +404,11 @@ impl TorrentStateLive {
match res {
// We disconnected the peer ourselves as we don't need it
Ok(()) => {
handler.on_peer_died(None);
handler.on_peer_died(None)?;
}
Err(e) => {
debug!("error managing peer: {:#}", e);
handler.on_peer_died(Some(e));
handler.on_peer_died(Some(e))?;
}
}
Ok::<_, anyhow::Error>(())
@ -361,17 +420,17 @@ impl TorrentStateLive {
) -> anyhow::Result<()> {
let state = self;
loop {
let addr = peer_queue_rx.recv().await.unwrap();
let addr = peer_queue_rx.recv().await.context("torrent closed")?;
if state.is_finished() {
debug!("ignoring peer {} as we are finished", addr);
state.peers.mark_peer_not_needed(addr);
continue;
}
let permit = state.peer_semaphore.acquire().await.unwrap();
let permit = state.peer_semaphore.acquire().await?;
permit.forget();
spawn(
span!(parent: None, Level::ERROR, "manage_peer", peer = addr.to_string()),
state.spawn(
error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()),
state.clone().task_manage_peer(addr),
);
}
@ -406,23 +465,28 @@ impl TorrentStateLive {
TimedExistence::new(timeit(reason, || self.locked.write()), reason)
}
fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
fn get_next_needed_piece(
&self,
peer_handle: PeerHandle,
) -> anyhow::Result<Option<ValidPieceIndex>> {
self.peers
.with_live_mut(peer_handle, "l(get_next_needed_piece)", |live| {
let g = self.lock_read("g(get_next_needed_piece)");
let bf = &live.bitfield;
for n in g.chunks.iter_needed_pieces() {
for n in g.get_chunks()?.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
// in theory it should be safe without validation, but whatever.
return self.lengths.validate_piece_index(n as u32);
return Ok(self.lengths.validate_piece_index(n as u32));
}
}
None
})?
Ok(None)
})
.transpose()
.map(|r| r.flatten())
}
fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool {
self.get_next_needed_piece(handle).is_some()
matches!(self.get_next_needed_piece(handle), Ok(Some(_)))
}
fn set_peer_live(&self, handle: PeerHandle, h: Handshake) {
@ -449,6 +513,10 @@ impl TorrentStateLive {
.load(Ordering::Acquire)
}
pub fn get_have_bytes(&self) -> u64 {
self.stats.have_bytes.load(Ordering::Relaxed)
}
pub fn is_finished(&self) -> bool {
self.get_left_to_download_bytes() == 0
}
@ -498,9 +566,11 @@ impl TorrentStateLive {
}
let mut unordered: FuturesUnordered<_> = futures.into_iter().collect();
spawn(
span!(
Level::ERROR,
// We don't want to remember this task as there may be too many.
self.spawn(
error_span!(
parent: self.meta.span.clone(),
"transmit_haves",
piece = index.get(),
count = unordered.len()
@ -512,14 +582,14 @@ impl TorrentStateLive {
);
}
pub(crate) fn add_peer_if_not_seen(&self, addr: SocketAddr) -> bool {
pub(crate) fn add_peer_if_not_seen(&self, addr: SocketAddr) -> anyhow::Result<bool> {
match self.peers.add_if_not_seen(addr) {
Some(handle) => handle,
None => return false,
None => return Ok(false),
};
self.peer_queue_tx.send(addr).unwrap();
true
self.peer_queue_tx.send(addr)?;
Ok(true)
}
pub fn stats_snapshot(&self) -> StatsSnapshot {
@ -560,7 +630,34 @@ impl TorrentStateLive {
}
pub fn pause(&self) -> anyhow::Result<TorrentStatePaused> {
bail!("pause not implemented yet")
let _ = self.cancel_tx.send(());
let mut g = self.locked.write();
let files = self
.files
.iter()
.map(|f| {
let mut f = f.lock();
let dummy = dummy_file()?;
let f = std::mem::replace(&mut *f, dummy);
Ok::<_, anyhow::Error>(Arc::new(Mutex::new(f)))
})
.try_collect()?;
let filenames = self.filenames.clone();
// g.chunks;
Ok(TorrentStatePaused {
info: self.meta.clone(),
files,
filenames,
chunk_tracker: g
.chunks
.take()
.context("bug: pausing already paused torrent")?,
have_bytes: self.get_have_bytes(),
})
}
}
@ -633,16 +730,12 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
Ok(())
}
fn get_have_bytes(&self) -> u64 {
self.state.stats.have_bytes.load(Ordering::Relaxed)
}
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> Option<usize> {
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> anyhow::Result<usize> {
let g = self.state.lock_read("serialize_bitfield_message_to_buf");
let msg = Message::Bitfield(ByteBuf(g.chunks.get_have_pieces().as_raw_slice()));
let len = msg.serialize(buf, None).unwrap();
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_raw_slice()));
let len = msg.serialize(buf, None)?;
debug!("sending: {:?}, length={}", &msg, len);
Some(len)
Ok(len)
}
fn on_handshake(&self, handshake: Handshake) -> anyhow::Result<()> {
@ -664,10 +757,14 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn on_extended_handshake(&self, _: &ExtendedHandshake<ByteBuf>) -> anyhow::Result<()> {
Ok(())
}
fn get_have_bytes(&self) -> u64 {
self.state.get_have_bytes()
}
}
impl PeerHandler {
fn on_peer_died(self, error: Option<anyhow::Error>) {
fn on_peer_died(self, error: Option<anyhow::Error>) -> anyhow::Result<()> {
let peers = &self.state.peers;
let pstats = &peers.stats;
let handle = self.addr;
@ -675,7 +772,7 @@ impl PeerHandler {
Some(peer) => TimedExistence::new(peer, "on_peer_died"),
None => {
warn!("bug: peer not found in table. Forgetting it forever");
return;
return Ok(());
}
};
let prev = pe.value_mut().state.take(pstats);
@ -690,20 +787,21 @@ impl PeerHandler {
req.piece.get(),
req.chunk
);
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
g.get_chunks_mut()?
.mark_chunk_request_cancelled(req.piece, req.chunk);
}
}
PeerState::NotNeeded => {
// Restore it as std::mem::take() replaced it above.
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return;
return Ok(());
}
s @ PeerState::Queued | s @ PeerState::Dead => {
warn!("bug: peer was in a wrong state {s:?}, ignoring it forever");
// Prevent deadlocks.
drop(pe);
self.state.peers.drop_peer(handle);
return;
return Ok(());
}
};
@ -712,7 +810,7 @@ impl PeerHandler {
None => {
debug!("peer died without errors, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return;
return Ok(());
}
};
@ -721,7 +819,7 @@ impl PeerHandler {
if self.state.is_finished() {
debug!("torrent finished, not re-queueing");
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
return;
return Ok(());
}
pe.value_mut().state.set(PeerState::Dead, pstats);
@ -732,10 +830,9 @@ impl PeerHandler {
drop(pe);
if let Some(dur) = backoff {
spawn(
span!(
parent: None,
Level::ERROR,
self.state.clone().spawn(
error_span!(
parent: self.state.meta.span.clone(),
"wait_for_peer",
peer = handle.to_string(),
duration = format!("{dur:?}")
@ -764,31 +861,35 @@ impl PeerHandler {
} else {
debug!("dropping peer, backoff exhausted");
self.state.peers.drop_peer(handle);
}
};
Ok(())
}
fn reserve_next_needed_piece(&self) -> Option<ValidPieceIndex> {
fn reserve_next_needed_piece(&self) -> anyhow::Result<Option<ValidPieceIndex>> {
// TODO: locking one inside the other in different order results in deadlocks.
self.state
.peers
.with_live_mut(self.addr, "reserve_next_needed_piece", |live| {
if self.locked.read().i_am_choked {
debug!("we are choked, can't reserve next piece");
return None;
return Ok(None);
}
let mut g = self.state.lock_write("reserve_next_needed_piece");
let n = {
let mut n_opt = None;
let bf = &live.bitfield;
for n in g.chunks.iter_needed_pieces() {
for n in g.get_chunks()?.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
n_opt = Some(n);
break;
}
}
self.state.lengths.validate_piece_index(n_opt? as u32)?
self.state
.lengths
.validate_piece_index(n_opt.context("invalid n_opt")? as u32)
.context("invalid piece")?
};
g.inflight_pieces.insert(
n,
@ -797,10 +898,11 @@ impl PeerHandler {
started: Instant::now(),
},
);
g.chunks.reserve_needed_piece(n);
Some(n)
g.get_chunks_mut()?.reserve_needed_piece(n);
Ok(Some(n))
})
.flatten()
.transpose()
.map(|r| r.flatten())
}
fn try_steal_old_slow_piece(&self, threshold: f64) -> Option<ValidPieceIndex> {
@ -865,7 +967,7 @@ impl PeerHandler {
if !self
.state
.lock_read("is_chunk_ready_to_upload")
.chunks
.get_chunks()?
.is_chunk_ready_to_upload(&chunk_info)
{
anyhow::bail!(
@ -965,7 +1067,7 @@ impl PeerHandler {
// Afterwards means we are close to completion, try stealing more aggressively.
let next = match self
.try_steal_old_slow_piece(10.)
.or_else(|| self.reserve_next_needed_piece())
.or_else(|| self.reserve_next_needed_piece().ok().flatten())
.or_else(|| self.try_steal_old_slow_piece(2.))
{
Some(next) => next,
@ -1037,18 +1139,6 @@ impl PeerHandler {
}
fn reopen_read_only(&self) -> anyhow::Result<()> {
fn dummy_file() -> anyhow::Result<std::fs::File> {
#[cfg(target_os = "windows")]
const DEVNULL: &str = "NUL";
#[cfg(not(target_os = "windows"))]
const DEVNULL: &str = "/dev/null";
std::fs::OpenOptions::new()
.read(true)
.open(DEVNULL)
.with_context(|| format!("error opening {}", DEVNULL))
}
// Lock exclusive just in case to ensure in-flight operations finish.??
let _guard = self.state.lock_write("reopen_read_only");
@ -1140,7 +1230,7 @@ impl PeerHandler {
}
};
match g.chunks.mark_chunk_downloaded(&piece) {
match g.get_chunks_mut()?.mark_chunk_downloaded(&piece) {
Some(ChunkMarkingResult::Completed) => {
debug!("piece={} done, will write and checksum", piece.index,);
// This will prevent others from stealing it.
@ -1204,7 +1294,8 @@ impl PeerHandler {
true => {
{
let mut g = self.state.lock_write("mark_piece_downloaded");
g.chunks.mark_piece_downloaded(chunk_info.piece_index);
g.get_chunks_mut()?
.mark_piece_downloaded(chunk_info.piece_index);
}
// Global piece counters.
@ -1256,7 +1347,7 @@ impl PeerHandler {
warn!("checksum for piece={} did not validate", index,);
self.state
.lock_write("mark_piece_broken")
.chunks
.get_chunks_mut()?
.mark_piece_broken(chunk_info.piece_index);
}
};

View file

@ -21,7 +21,7 @@ pub use live::*;
use parking_lot::RwLock;
use tokio_stream::StreamExt;
use tracing::trace_span;
use tracing::error_span;
use url::Url;
use crate::chunk_tracker::ChunkTracker;
@ -75,6 +75,7 @@ pub struct ManagedTorrentInfo {
pub trackers: Vec<Url>,
pub peer_id: Id20,
pub lengths: Lengths,
pub span: tracing::Span,
pub(crate) options: ManagedTorrentOptions,
}
@ -89,6 +90,10 @@ impl ManagedTorrent {
&self.info
}
pub fn get_total_bytes(&self) -> u64 {
self.info.lengths.total_length()
}
pub fn info_hash(&self) -> Id20 {
self.info.info_hash
}
@ -105,7 +110,7 @@ impl ManagedTorrent {
let g = self.locked.read();
match &g.state {
ManagedTorrentState::Paused(p) => Ok(f(&p.chunk_tracker)),
ManagedTorrentState::Live(l) => Ok(f(&l.lock_read("chunk_tracker").chunks)),
ManagedTorrentState::Live(l) => Ok(f(l.lock_read("chunk_tracker").get_chunks()?)),
_ => bail!("no chunk tracker, torrent neither paused nor live"),
}
}
@ -131,42 +136,51 @@ impl ManagedTorrent {
ManagedTorrentState::Initializing(init) => {
let init = init.clone();
let t = self.clone();
spawn(trace_span!("initialize_and_start"), async move {
match init.check().await {
Ok(paused) => {
let live = TorrentStateLive::new(paused);
t.locked.write().state = ManagedTorrentState::Live(live.clone());
let span = self.info.span.clone();
spawn(
error_span!(parent: span.clone(), "initialize_and_start"),
async move {
match init.check().await {
Ok(paused) => {
let live = TorrentStateLive::new(paused);
t.locked.write().state = ManagedTorrentState::Live(live.clone());
let live = Arc::downgrade(&live);
spawn(trace_span!("peer_adder"), async move {
{
let live: Arc<TorrentStateLive> =
live.upgrade().context("no longer live")?;
for peer in initial_peers {
live.add_peer_if_not_seen(peer);
}
}
let live = Arc::downgrade(&live);
spawn(
error_span!(parent: span.clone(), "external_peer_adder"),
async move {
{
let live: Arc<TorrentStateLive> =
live.upgrade().context("no longer live")?;
for peer in initial_peers {
live.add_peer_if_not_seen(peer)
.context("torrent closed")?;
}
}
if let Some(mut peer_rx) = peer_rx {
while let Some(peer) = peer_rx.next().await {
live.upgrade()
.context("no longer live")?
.add_peer_if_not_seen(peer);
}
}
if let Some(mut peer_rx) = peer_rx {
while let Some(peer) = peer_rx.next().await {
live.upgrade()
.context("no longer live")?
.add_peer_if_not_seen(peer)
.context("torrent closed")?;
}
}
Ok(())
},
);
Ok(())
});
Ok(())
}
Err(err) => {
let result = anyhow::anyhow!("{:?}", err);
t.locked.write().state = ManagedTorrentState::Error(err);
Err(result)
}
}
Err(err) => {
let result = anyhow::anyhow!("{:?}", err);
t.locked.write().state = ManagedTorrentState::Error(err);
Err(result)
}
}
});
},
);
Ok(())
}
ManagedTorrentState::Paused(_) => {
@ -288,9 +302,10 @@ impl ManagedTorrentBuilder {
self
}
pub(crate) fn build(self) -> anyhow::Result<ManagedTorrentHandle> {
pub(crate) fn build(self, span: tracing::Span) -> anyhow::Result<ManagedTorrentHandle> {
let lengths = Lengths::from_torrent(&self.info)?;
let info = Arc::new(ManagedTorrentInfo {
span,
info: self.info,
info_hash: self.info_hash,
out_dir: self.output_folder,

View file

@ -14,6 +14,7 @@ interface ContextType {
}
const AppContext = createContext<ContextType>(null);
const RefreshTorrentStatsContext = createContext<{ refresh: () => void }>(null);
const IconButton: React.FC<{
className: string,
@ -39,6 +40,8 @@ const DeleteTorrentModal = ({ id, show, onHide }) => {
const [error, setError] = useState<Error>(null);
const [deleting, setDeleting] = useState(false);
const refreshCtx = useContext(RefreshTorrentStatsContext);
const close = () => {
setDeleteFiles(false);
setError(null);
@ -52,6 +55,7 @@ const DeleteTorrentModal = ({ id, show, onHide }) => {
const call = deleteFiles ? API.delete : API.forget;
call(id).then(() => {
refreshCtx.refresh();
close();
}).catch((e) => {
setError({
@ -99,6 +103,8 @@ const TorrentActions: React.FC<{
let [disabled, setDisabled] = useState<boolean>(false);
let [deleting, setDeleting] = useState<boolean>(false);
let refreshCtx = useContext(RefreshTorrentStatsContext);
const canPause = state == 'live';
const canUnpause = state == 'paused';
@ -106,22 +112,22 @@ const TorrentActions: React.FC<{
const unpause = () => {
setDisabled(true);
API.start(id).finally(() => setDisabled(false)).catch((e) => {
API.start(id).then(() => { refreshCtx.refresh() }, (e) => {
ctx.setCloseableError({
text: `Error starting torrent id=${id}`,
details: e,
});
})
}).finally(() => setDisabled(false))
};
const pause = () => {
setDisabled(true);
API.pause(id).finally(() => setDisabled(false)).catch((e) => {
API.pause(id).then(() => { refreshCtx.refresh() }, (e) => {
ctx.setCloseableError({
text: `Error pausing torrent id=${id}`,
details: e,
});
})
}).finally(() => setDisabled(false))
};
const startDeleting = () => {
@ -241,21 +247,34 @@ const Torrent = ({ id, torrent }) => {
}
}, [detailsResponse]);
const refreshStats = () => API.getTorrentStats(torrent.id).then((stats) => {
updateStatsResponse(stats);
return stats;
});
// Update stats once then forever.
useEffect(() => customSetInterval((async () => {
const errorInterval = 10000;
const liveInterval = 500;
const finishedInterval = 5000;
const liveInterval = 1000;
const finishedInterval = 10000;
const nonLiveInterval = 10000;
return API.getTorrentStats(torrent.id).then((stats) => {
updateStatsResponse(stats);
return torrentIsDone(stats) ? finishedInterval : liveInterval;
return refreshStats().then((stats) => {
if (stats.finished) {
return finishedInterval;
}
if (stats.state == STATE_INITIALIZING || stats.state == STATE_LIVE) {
return liveInterval;
}
return nonLiveInterval;
}, (e) => {
return errorInterval;
});
}), 0), []);
return <TorrentRow id={id} detailsResponse={detailsResponse} statsResponse={statsResponse} />
return <RefreshTorrentStatsContext.Provider value={{ refresh: refreshStats }}>
<TorrentRow id={id} detailsResponse={detailsResponse} statsResponse={statsResponse} />
</RefreshTorrentStatsContext.Provider >
}
const TorrentsList = (props: { torrents: Array<TorrentId>, loading: boolean }) => {
@ -541,10 +560,6 @@ const RootContent = (props: { closeableError: ErrorDetails, otherError: ErrorDet
</Container>
};
function torrentIsDone(stats: TorrentStats): boolean {
return stats.finished;
}
function formatBytes(bytes: number): string {
if (bytes === 0) return '0 Bytes';

View file

@ -241,7 +241,12 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
for (idx, torrent) in torrents {
let live = torrent.with_state(|s| {
match s {
ManagedTorrentState::Initializing(_) => info!("[{}] initializing", idx),
ManagedTorrentState::Initializing(i) => {
let total = torrent.get_total_bytes();
let progress = i.get_checked_bytes();
let pct = (progress as f64 / total as f64) * 100f64;
info!("[{}] initializing {:.2}%", idx, pct)
},
ManagedTorrentState::Live(h) => return Some(h.clone()),
_ => {},
};