Changed log to tracing
This commit is contained in:
parent
db12bba7a6
commit
48a14823fa
26 changed files with 321 additions and 689 deletions
|
|
@ -1,6 +1,6 @@
|
|||
use librqbit_core::lengths::{ChunkInfo, Lengths, ValidPieceIndex};
|
||||
use log::{debug, info};
|
||||
use peer_binary_protocol::Piece;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::type_aliases::BF;
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use anyhow::Context;
|
|||
use buffers::ByteString;
|
||||
use futures::{stream::FuturesUnordered, Stream, StreamExt};
|
||||
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
|
||||
use log::debug;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{
|
||||
peer_connection::PeerConnectionOptions, peer_info_reader, spawn_utils::BlockingSpawner,
|
||||
|
|
@ -97,7 +97,7 @@ mod tests {
|
|||
fn init_logging() {
|
||||
#[allow(unused_must_use)]
|
||||
LOG_INIT.call_once(|| {
|
||||
pretty_env_logger::try_init();
|
||||
// pretty_env_logger::try_init();
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ use librqbit_core::{
|
|||
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
|
||||
torrent_metainfo::{FileIteratorName, TorrentMetaV1Info},
|
||||
};
|
||||
use log::{debug, trace, warn};
|
||||
use tracing::{debug, trace, warn};
|
||||
use parking_lot::Mutex;
|
||||
use peer_binary_protocol::Piece;
|
||||
use sha1w::ISha1;
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ use dht::{Dht, DhtStats};
|
|||
use http::StatusCode;
|
||||
use librqbit_core::id20::Id20;
|
||||
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
|
||||
use log::warn;
|
||||
use tracing::{warn, info};
|
||||
use parking_lot::RwLock;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::SocketAddr;
|
||||
|
|
@ -110,7 +110,7 @@ impl HttpApi {
|
|||
.route("/torrents/:id/stats", get(torrent_stats))
|
||||
.with_state(state);
|
||||
|
||||
log::info!("starting HTTP server on {}", addr);
|
||||
info!("starting HTTP server on {}", addr);
|
||||
axum::Server::try_bind(&addr)
|
||||
.with_context(|| format!("error binding to {addr}"))?
|
||||
.serve(app.into_make_service())
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ pub mod http_api;
|
|||
pub mod http_api_client;
|
||||
mod http_api_error;
|
||||
pub mod peer_connection;
|
||||
pub mod peer_handler;
|
||||
pub mod peer_info_reader;
|
||||
pub mod peer_state;
|
||||
pub mod session;
|
||||
|
|
|
|||
|
|
@ -4,13 +4,13 @@ use anyhow::Context;
|
|||
use buffers::{ByteBuf, ByteString};
|
||||
use clone_to_owned::CloneToOwned;
|
||||
use librqbit_core::{id20::Id20, lengths::ChunkInfo, peer_id::try_decode_peer_id};
|
||||
use log::{debug, trace};
|
||||
use peer_binary_protocol::{
|
||||
extended::{handshake::ExtendedHandshake, ExtendedMessage},
|
||||
serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError,
|
||||
MessageOwned, PIECE_MESSAGE_DEFAULT_LEN,
|
||||
};
|
||||
use tokio::time::timeout;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
use crate::spawn_utils::BlockingSpawner;
|
||||
|
||||
|
|
@ -57,10 +57,10 @@ async fn with_timeout<T, E>(
|
|||
where
|
||||
E: Into<anyhow::Error>,
|
||||
{
|
||||
timeout(timeout_value, fut)
|
||||
.await
|
||||
.with_context(|| format!("timeout at {timeout_value:?}"))?
|
||||
.map_err(|e| e.into())
|
||||
match timeout(timeout_value, fut).await {
|
||||
Ok(v) => v.map_err(Into::into),
|
||||
Err(_) => anyhow::bail!("timeout at {timeout_value:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! read_one {
|
||||
|
|
@ -149,11 +149,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
|||
let (h, size) = Handshake::deserialize(&read_buf[..read_so_far])
|
||||
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?;
|
||||
|
||||
debug!(
|
||||
"connected peer {}: {:?}",
|
||||
self.addr,
|
||||
try_decode_peer_id(Id20(h.peer_id))
|
||||
);
|
||||
debug!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id)));
|
||||
if h.info_hash != self.info_hash.0 {
|
||||
anyhow::bail!("info hash does not match");
|
||||
}
|
||||
|
|
@ -170,11 +166,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
|||
if supports_extended {
|
||||
let my_extended =
|
||||
Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new()));
|
||||
trace!(
|
||||
"sending extended handshake to {}: {:?}",
|
||||
self.addr,
|
||||
&my_extended
|
||||
);
|
||||
trace!("sending extended handshake: {:?}", &my_extended);
|
||||
my_extended.serialize(&mut write_buf, None).unwrap();
|
||||
with_timeout(rwtimeout, conn.write_all(&write_buf))
|
||||
.await
|
||||
|
|
@ -184,7 +176,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
|||
let (extended, size) = read_one!(conn, read_buf, read_so_far, rwtimeout);
|
||||
match extended {
|
||||
Message::Extended(ExtendedMessage::Handshake(h)) => {
|
||||
trace!("received from {}: {:?}", self.addr, &h);
|
||||
trace!("received: {:?}", &h);
|
||||
self.handler.on_extended_handshake(&h)?;
|
||||
extended_handshake = Some(h.clone_to_owned())
|
||||
}
|
||||
|
|
@ -213,7 +205,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
|||
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
|
||||
.await
|
||||
.context("error writing bitfield to peer")?;
|
||||
debug!("sent bitfield to {}", self.addr);
|
||||
debug!("sent bitfield");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -253,7 +245,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
|||
}
|
||||
};
|
||||
|
||||
debug!("sending to {}: {:?}, length={}", self.addr, &req, len);
|
||||
debug!("sending: {:?}, length={}", &req, len);
|
||||
|
||||
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
|
||||
.await
|
||||
|
|
@ -273,7 +265,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
|||
let reader = async move {
|
||||
loop {
|
||||
let (message, size) = read_one!(read_half, read_buf, read_so_far, rwtimeout);
|
||||
trace!("received from {}: {:?}", self.addr, &message);
|
||||
trace!("received: {:?}", &message);
|
||||
|
||||
self.handler
|
||||
.on_received_message(message)
|
||||
|
|
@ -294,7 +286,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
|||
r = reader => {r}
|
||||
r = writer => {r}
|
||||
};
|
||||
debug!("{}: either reader or writer are done, exiting", self.addr);
|
||||
debug!("either reader or writer are done, exiting");
|
||||
r
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1 +0,0 @@
|
|||
|
||||
|
|
@ -8,7 +8,6 @@ use librqbit_core::{
|
|||
lengths::{ceil_div_u64, last_element_size_u64, ChunkInfo},
|
||||
torrent_metainfo::TorrentMetaV1Info,
|
||||
};
|
||||
use log::debug;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use peer_binary_protocol::{
|
||||
extended::{handshake::ExtendedHandshake, ut_metadata::UtMetadata, ExtendedMessage},
|
||||
|
|
@ -16,6 +15,7 @@ use peer_binary_protocol::{
|
|||
};
|
||||
use sha1w::{ISha1, Sha1};
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{
|
||||
peer_connection::{
|
||||
|
|
@ -238,7 +238,7 @@ mod tests {
|
|||
fn init_logging() {
|
||||
#[allow(unused_must_use)]
|
||||
LOG_INIT.call_once(|| {
|
||||
pretty_env_logger::try_init();
|
||||
// pretty_env_logger::try_init();
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,10 +8,10 @@ use librqbit_core::{
|
|||
peer_id::generate_peer_id,
|
||||
torrent_metainfo::{torrent_from_bytes, TorrentMetaV1Info, TorrentMetaV1Owned},
|
||||
};
|
||||
use log::{debug, info, warn};
|
||||
use parking_lot::RwLock;
|
||||
use reqwest::Url;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{debug, info, span, warn, Level};
|
||||
|
||||
use crate::{
|
||||
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
|
||||
|
|
@ -402,7 +402,7 @@ impl Session {
|
|||
}
|
||||
|
||||
if let Some(mut dht_peer_rx) = dht_peer_rx {
|
||||
spawn("DHT peer adder", {
|
||||
spawn(span!(Level::INFO, "dht_peer_adder"), {
|
||||
let handle = handle.clone();
|
||||
async move {
|
||||
while let Some(peer) = dht_peer_rx.next().await {
|
||||
|
|
|
|||
|
|
@ -1,22 +1,22 @@
|
|||
use std::fmt::Display;
|
||||
use tracing::{debug, error, trace, Instrument};
|
||||
|
||||
use log::{debug, error};
|
||||
|
||||
pub fn spawn<N: Display + 'static + Send>(
|
||||
name: N,
|
||||
pub fn spawn(
|
||||
span: tracing::Span,
|
||||
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
|
||||
) {
|
||||
debug!("starting task \"{}\"", &name);
|
||||
tokio::spawn(async move {
|
||||
let fut = async move {
|
||||
trace!("started");
|
||||
match fut.await {
|
||||
Ok(_) => {
|
||||
debug!("task \"{}\" finished", &name);
|
||||
debug!("finished");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("error in task \"{}\": {:#}", &name, e)
|
||||
error!("{:#}", e)
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
.instrument(span.or_current());
|
||||
tokio::spawn(fut);
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
|
|
|
|||
|
|
@ -14,11 +14,11 @@ use librqbit_core::{
|
|||
id20::Id20, lengths::Lengths, peer_id::generate_peer_id, speed_estimator::SpeedEstimator,
|
||||
torrent_metainfo::TorrentMetaV1Info,
|
||||
};
|
||||
use log::{debug, info, warn};
|
||||
use parking_lot::Mutex;
|
||||
use reqwest::Url;
|
||||
use sha1w::Sha1;
|
||||
use size_format::SizeFormatterBinary as SF;
|
||||
use tracing::{debug, info, span, warn, Level};
|
||||
|
||||
use crate::{
|
||||
chunk_tracker::ChunkTracker,
|
||||
|
|
@ -116,9 +116,10 @@ impl TorrentManagerHandle {
|
|||
pub fn add_tracker(&self, url: Url) -> bool {
|
||||
let mgr = self.manager.clone();
|
||||
if mgr.trackers.lock().insert(url.clone()) {
|
||||
spawn(format!("tracker monitor {url}"), async move {
|
||||
mgr.single_tracker_monitor(url).await
|
||||
});
|
||||
spawn(
|
||||
span!(Level::ERROR, "tracker_monitor", url = url.to_string()),
|
||||
async move { mgr.single_tracker_monitor(url).await },
|
||||
);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
|
|
@ -289,7 +290,7 @@ impl TorrentManager {
|
|||
options,
|
||||
});
|
||||
|
||||
spawn("speed estimator updater", {
|
||||
spawn(span!(Level::ERROR, "speed_estimator_updater"), {
|
||||
let state = mgr.state.clone();
|
||||
async move {
|
||||
loop {
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ use librqbit_core::{
|
|||
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
|
||||
torrent_metainfo::TorrentMetaV1Info,
|
||||
};
|
||||
use log::{debug, info, trace, warn};
|
||||
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
|
||||
use peer_binary_protocol::{
|
||||
extended::handshake::ExtendedHandshake, Handshake, Message, MessageOwned, Piece, Request,
|
||||
|
|
@ -37,6 +36,7 @@ use tokio::{
|
|||
},
|
||||
time::timeout,
|
||||
};
|
||||
use tracing::{debug, info, span, trace, warn, Level};
|
||||
|
||||
use crate::{
|
||||
chunk_tracker::{ChunkMarkingResult, ChunkTracker},
|
||||
|
|
@ -311,7 +311,7 @@ impl TorrentState {
|
|||
peer_queue_tx,
|
||||
finished_notify: Notify::new(),
|
||||
});
|
||||
spawn("peer adder", {
|
||||
spawn(span!(Level::ERROR, "peer_adder"), {
|
||||
let state = state.clone();
|
||||
async move {
|
||||
loop {
|
||||
|
|
@ -319,47 +319,50 @@ impl TorrentState {
|
|||
|
||||
let permit = state.peer_semaphore.acquire().await.unwrap();
|
||||
permit.forget();
|
||||
spawn(format!("manage_peer({addr})"), {
|
||||
let state = state.clone();
|
||||
async move {
|
||||
let rx = state.locked.write().peers.mark_peer_connecting(addr)?;
|
||||
spawn(
|
||||
span!(parent: None, Level::ERROR, "manage_peer", peer = addr.to_string()),
|
||||
{
|
||||
let state = state.clone();
|
||||
async move {
|
||||
let rx = state.locked.write().peers.mark_peer_connecting(addr)?;
|
||||
|
||||
let handler = PeerHandler {
|
||||
addr,
|
||||
state: state.clone(),
|
||||
spawner,
|
||||
};
|
||||
let options = PeerConnectionOptions {
|
||||
connect_timeout: state.options.peer_connect_timeout,
|
||||
read_write_timeout: state.options.peer_read_write_timeout,
|
||||
..Default::default()
|
||||
};
|
||||
let peer_connection = PeerConnection::new(
|
||||
addr,
|
||||
state.info_hash,
|
||||
state.peer_id,
|
||||
handler,
|
||||
Some(options),
|
||||
spawner,
|
||||
);
|
||||
let handler = PeerHandler {
|
||||
addr,
|
||||
state: state.clone(),
|
||||
spawner,
|
||||
};
|
||||
let options = PeerConnectionOptions {
|
||||
connect_timeout: state.options.peer_connect_timeout,
|
||||
read_write_timeout: state.options.peer_read_write_timeout,
|
||||
..Default::default()
|
||||
};
|
||||
let peer_connection = PeerConnection::new(
|
||||
addr,
|
||||
state.info_hash,
|
||||
state.peer_id,
|
||||
handler,
|
||||
Some(options),
|
||||
spawner,
|
||||
);
|
||||
|
||||
let res = peer_connection.manage_peer(rx).await;
|
||||
let state = peer_connection.into_handler().state;
|
||||
state.peer_semaphore.add_permits(1);
|
||||
let res = peer_connection.manage_peer(rx).await;
|
||||
let state = peer_connection.into_handler().state;
|
||||
state.peer_semaphore.add_permits(1);
|
||||
|
||||
match res {
|
||||
// We disconnected the peer ourselves as we don't need it
|
||||
Ok(()) => {
|
||||
state.on_peer_died(addr, None);
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("error managing peer {}: {:#}", addr, e);
|
||||
state.on_peer_died(addr, Some(e));
|
||||
match res {
|
||||
// We disconnected the peer ourselves as we don't need it
|
||||
Ok(()) => {
|
||||
state.on_peer_died(addr, None);
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("error managing peer: {:#}", e);
|
||||
state.on_peer_died(addr, Some(e));
|
||||
}
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
}
|
||||
Ok::<_, anyhow::Error>(())
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
@ -406,7 +409,7 @@ impl TorrentState {
|
|||
|
||||
fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option<ValidPieceIndex> {
|
||||
if self.am_i_choked(peer_handle)? {
|
||||
debug!("we are choked by {}, can't reserve next piece", peer_handle);
|
||||
debug!("we are choked, can't reserve next piece");
|
||||
return None;
|
||||
}
|
||||
let mut g = self.locked.write();
|
||||
|
|
@ -459,8 +462,8 @@ impl TorrentState {
|
|||
// heuristic for "too slow peer"
|
||||
if elapsed > avg_time * 10 {
|
||||
debug!(
|
||||
"{} will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}",
|
||||
handle, idx, piece_req.peer, elapsed, avg_time
|
||||
"will steal piece {} from {}: elapsed time {:?}, avg piece time: {:?}",
|
||||
idx, piece_req.peer, elapsed, avg_time
|
||||
);
|
||||
piece_req.peer = handle;
|
||||
piece_req.started = Instant::now();
|
||||
|
|
@ -487,7 +490,7 @@ impl TorrentState {
|
|||
let peer = match g.peers.states.get_mut(&handle) {
|
||||
Some(peer) => peer,
|
||||
None => {
|
||||
warn!("peer {} was in a wrong state", handle);
|
||||
warn!("peer was in a wrong state, can't set live");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
|
@ -499,6 +502,11 @@ impl TorrentState {
|
|||
match g.peers.mark_peer_dead(handle) {
|
||||
Some(Some(live)) => {
|
||||
for req in live.inflight_requests {
|
||||
debug!(
|
||||
"peer dead, marking chunk request cancelled, index={}, chunk={}",
|
||||
req.piece.get(),
|
||||
req.chunk
|
||||
);
|
||||
g.chunks.mark_chunk_request_cancelled(req.piece, req.chunk);
|
||||
}
|
||||
}
|
||||
|
|
@ -509,41 +517,54 @@ impl TorrentState {
|
|||
}
|
||||
|
||||
if error.is_none() {
|
||||
debug!("peer died without errors, not re-queueing");
|
||||
return;
|
||||
}
|
||||
|
||||
let backoff = g
|
||||
.peers
|
||||
.states
|
||||
.get_mut(&handle)
|
||||
.unwrap()
|
||||
.stats
|
||||
.backoff
|
||||
.next_backoff();
|
||||
let backoff = {
|
||||
let peer = match g.peers.states.get_mut(&handle) {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
warn!("bug: did not find peer in the list");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
peer.stats.backoff.next_backoff()
|
||||
};
|
||||
|
||||
if let Some(dur) = backoff {
|
||||
let state = self.clone();
|
||||
spawn(format!("wait_for_peer({handle}, {dur:?})"), async move {
|
||||
tokio::time::sleep(dur).await;
|
||||
{
|
||||
let mut g = state.locked.write();
|
||||
let peer = match g.peers.states.get_mut(&handle) {
|
||||
Some(p) => p,
|
||||
None => bail!("bug: peer {} disappeared", handle),
|
||||
};
|
||||
match &peer.state {
|
||||
PeerState::Dead => peer.state = PeerState::Queued,
|
||||
other => bail!(
|
||||
"peer {} in unexpected state: {}. Expected dead",
|
||||
handle,
|
||||
other.name()
|
||||
),
|
||||
spawn(
|
||||
span!(
|
||||
parent: None,
|
||||
Level::ERROR,
|
||||
"wait_for_peer",
|
||||
peer = handle.to_string(),
|
||||
duration = format!("{dur:?}")
|
||||
),
|
||||
async move {
|
||||
tokio::time::sleep(dur).await;
|
||||
{
|
||||
let mut g = state.locked.write();
|
||||
let peer = match g.peers.states.get_mut(&handle) {
|
||||
Some(p) => p,
|
||||
None => bail!("bug: peer disappeared"),
|
||||
};
|
||||
match &peer.state {
|
||||
PeerState::Dead => peer.state = PeerState::Queued,
|
||||
other => bail!(
|
||||
"peer is in unexpected state: {}. Expected dead",
|
||||
other.name()
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
state.peer_queue_tx.send(handle)?;
|
||||
Ok::<_, anyhow::Error>(())
|
||||
});
|
||||
state.peer_queue_tx.send(handle)?;
|
||||
Ok::<_, anyhow::Error>(())
|
||||
},
|
||||
);
|
||||
} else {
|
||||
debug!("dropping peer, backoff exhausted");
|
||||
g.peers.drop_peer(handle);
|
||||
}
|
||||
}
|
||||
|
|
@ -602,7 +623,12 @@ impl TorrentState {
|
|||
|
||||
let mut unordered: FuturesUnordered<_> = futures.into_iter().collect();
|
||||
spawn(
|
||||
format!("transmit_haves(piece={}, count={})", index, unordered.len()),
|
||||
span!(
|
||||
Level::ERROR,
|
||||
"transmit_haves",
|
||||
piece = index.get(),
|
||||
count = unordered.len()
|
||||
),
|
||||
async move {
|
||||
while unordered.next().await.is_some() {}
|
||||
Ok(())
|
||||
|
|
@ -668,30 +694,27 @@ impl PeerConnectionHandler for PeerHandler {
|
|||
match message {
|
||||
Message::Request(request) => {
|
||||
self.on_download_request(self.addr, request)
|
||||
.with_context(|| {
|
||||
format!("error handling download request from {}", self.addr)
|
||||
})?;
|
||||
.context("on_download_request")?;
|
||||
}
|
||||
Message::Bitfield(b) => self.on_bitfield(self.addr, b.clone_to_owned())?,
|
||||
Message::Bitfield(b) => self
|
||||
.on_bitfield(self.addr, b.clone_to_owned())
|
||||
.context("on_bitfield")?,
|
||||
Message::Choke => self.on_i_am_choked(self.addr),
|
||||
Message::Unchoke => self.on_i_am_unchoked(self.addr),
|
||||
Message::Interested => self.on_peer_interested(self.addr),
|
||||
Message::Piece(piece) => {
|
||||
self.on_received_piece(self.addr, piece)
|
||||
.context("error in on_received_piece()")?;
|
||||
.context("on_received_piece")?;
|
||||
}
|
||||
Message::KeepAlive => {
|
||||
debug!("keepalive received from {}", self.addr);
|
||||
debug!("keepalive received");
|
||||
}
|
||||
Message::Have(h) => self.on_have(self.addr, h),
|
||||
Message::NotInterested => {
|
||||
info!("received \"not interested\", but we don't care yet")
|
||||
}
|
||||
message => {
|
||||
warn!(
|
||||
"{}: received unsupported message {:?}, ignoring",
|
||||
self.addr, message
|
||||
);
|
||||
warn!("received unsupported message {:?}, ignoring", message);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
|
@ -705,7 +728,7 @@ impl PeerConnectionHandler for PeerHandler {
|
|||
let g = self.state.locked.read();
|
||||
let msg = Message::Bitfield(ByteBuf(g.chunks.get_have_pieces().as_raw_slice()));
|
||||
let len = msg.serialize(buf, None).unwrap();
|
||||
debug!("sending to {}: {:?}, length={}", self.addr, &msg, len);
|
||||
debug!("sending: {:?}, length={}", &msg, len);
|
||||
Some(len)
|
||||
}
|
||||
|
||||
|
|
@ -736,8 +759,8 @@ impl PeerHandler {
|
|||
Some(p) => p,
|
||||
None => {
|
||||
anyhow::bail!(
|
||||
"{}: received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.",
|
||||
peer_handle, request
|
||||
"received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.",
|
||||
request
|
||||
);
|
||||
}
|
||||
};
|
||||
|
|
@ -749,8 +772,8 @@ impl PeerHandler {
|
|||
Some(d) => d,
|
||||
None => {
|
||||
anyhow::bail!(
|
||||
"{}: received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.",
|
||||
peer_handle, request
|
||||
"received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.",
|
||||
request
|
||||
);
|
||||
}
|
||||
};
|
||||
|
|
@ -764,19 +787,16 @@ impl PeerHandler {
|
|||
);
|
||||
}
|
||||
|
||||
g.peers.clone_tx(peer_handle).ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"peer {} died, dropping chunk that it requested",
|
||||
peer_handle
|
||||
)
|
||||
})?
|
||||
g.peers
|
||||
.clone_tx(peer_handle)
|
||||
.context("peer died, dropping chunk that it requested")?
|
||||
};
|
||||
|
||||
// TODO: this is not super efficient as it does copying multiple times.
|
||||
// Theoretically, this could be done in the sending code, so that it reads straight into
|
||||
// the send buffer.
|
||||
let request = WriterRequest::ReadChunkRequest(chunk_info);
|
||||
debug!("sending to {}: {:?}", peer_handle, &request);
|
||||
debug!("sending {:?}", &request);
|
||||
Ok::<_, anyhow::Error>(tx.send(request)?)
|
||||
}
|
||||
|
||||
|
|
@ -789,7 +809,7 @@ impl PeerHandler {
|
|||
.get_live_mut(handle)
|
||||
.and_then(|l| l.bitfield.as_mut())
|
||||
{
|
||||
debug!("{}: updated bitfield with have={}", handle, have);
|
||||
debug!("updated bitfield with have={}", have);
|
||||
bitfield.set(have as usize, true)
|
||||
}
|
||||
}
|
||||
|
|
@ -797,8 +817,7 @@ impl PeerHandler {
|
|||
fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) -> anyhow::Result<()> {
|
||||
if bitfield.len() != self.state.lengths.piece_bitfield_bytes() {
|
||||
anyhow::bail!(
|
||||
"dropping {} as its bitfield has unexpected size. Got {}, expected {}",
|
||||
handle,
|
||||
"dropping peer as its bitfield has unexpected size. Got {}, expected {}",
|
||||
bitfield.len(),
|
||||
self.state.lengths.piece_bitfield_bytes(),
|
||||
);
|
||||
|
|
@ -816,7 +835,7 @@ impl PeerHandler {
|
|||
.read()
|
||||
.peers
|
||||
.clone_tx(handle)
|
||||
.ok_or_else(|| anyhow::anyhow!("peer closed"))?;
|
||||
.context("peer dropped")?;
|
||||
tx.send(WriterRequest::Message(MessageOwned::Unchoke))
|
||||
.context("peer dropped")?;
|
||||
tx.send(WriterRequest::Message(MessageOwned::NotInterested))
|
||||
|
|
@ -826,7 +845,11 @@ impl PeerHandler {
|
|||
|
||||
// Additional spawn per peer, not good.
|
||||
spawn(
|
||||
format!("peer_chunk_requester({handle})"),
|
||||
span!(
|
||||
Level::ERROR,
|
||||
"peer_chunk_requester",
|
||||
peer = handle.to_string()
|
||||
),
|
||||
self.clone().task_peer_chunk_requester(handle),
|
||||
);
|
||||
Ok(())
|
||||
|
|
@ -846,7 +869,7 @@ impl PeerHandler {
|
|||
}
|
||||
|
||||
fn on_i_am_choked(&self, handle: PeerHandle) {
|
||||
debug!("we are choked by {}", handle);
|
||||
debug!("we are choked");
|
||||
self.state
|
||||
.locked
|
||||
.write()
|
||||
|
|
@ -855,7 +878,7 @@ impl PeerHandler {
|
|||
}
|
||||
|
||||
fn on_peer_interested(&self, handle: PeerHandle) {
|
||||
debug!("peer {} is interested", handle);
|
||||
debug!("peer is interested");
|
||||
self.state
|
||||
.locked
|
||||
.write()
|
||||
|
|
@ -878,7 +901,7 @@ impl PeerHandler {
|
|||
loop {
|
||||
match self.state.am_i_choked(handle) {
|
||||
Some(true) => {
|
||||
debug!("we are choked by {}, can't reserve next piece", handle);
|
||||
debug!("we are choked, can't reserve next piece");
|
||||
#[allow(unused_must_use)]
|
||||
{
|
||||
timeout(Duration::from_secs(60), notify.notified()).await;
|
||||
|
|
@ -895,15 +918,15 @@ impl PeerHandler {
|
|||
Some(next) => next,
|
||||
None => {
|
||||
if self.state.get_left_to_download() == 0 {
|
||||
debug!("{}: nothing left to download, closing requester", handle);
|
||||
debug!("nothing left to download, closing requester");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(piece) = self.state.try_steal_piece(handle) {
|
||||
debug!("{}: stole a piece {}", handle, piece);
|
||||
debug!("stole a piece {}", piece);
|
||||
piece
|
||||
} else {
|
||||
debug!("no pieces to request from {}", handle);
|
||||
debug!("no pieces to request");
|
||||
#[allow(unused_must_use)]
|
||||
{
|
||||
timeout(Duration::from_secs(60), notify.notified()).await;
|
||||
|
|
@ -935,10 +958,7 @@ impl PeerHandler {
|
|||
.inflight_requests
|
||||
.insert(InflightRequest::from(&chunk))
|
||||
{
|
||||
warn!(
|
||||
"{}: probably a bug, we already requested {:?}",
|
||||
handle, chunk
|
||||
);
|
||||
warn!("probably a bug, we already requested {:?}", chunk);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -985,7 +1005,7 @@ impl PeerHandler {
|
|||
}
|
||||
|
||||
fn on_i_am_unchoked(&self, handle: PeerHandle) {
|
||||
debug!("we are unchoked by {}", handle);
|
||||
debug!("we are unchoked");
|
||||
let mut g = self.state.locked.write();
|
||||
let live = match g.peers.get_live_mut(handle) {
|
||||
Some(live) => live,
|
||||
|
|
@ -1004,11 +1024,7 @@ impl PeerHandler {
|
|||
) {
|
||||
Some(i) => i,
|
||||
None => {
|
||||
anyhow::bail!(
|
||||
"peer {} sent us a piece that is invalid {:?}",
|
||||
handle,
|
||||
&piece,
|
||||
);
|
||||
anyhow::bail!("peer sent us an invalid piece {:?}", &piece,);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
@ -1026,16 +1042,15 @@ impl PeerHandler {
|
|||
.remove(&InflightRequest::from(&chunk_info))
|
||||
{
|
||||
anyhow::bail!(
|
||||
"peer {} sent us a piece that we did not ask it for. Requested pieces: {:?}. Got: {:?}", handle, &h.inflight_requests, &piece,
|
||||
"peer sent us a piece we did not ask. Requested pieces: {:?}. Got: {:?}",
|
||||
&h.inflight_requests,
|
||||
&piece,
|
||||
);
|
||||
}
|
||||
|
||||
let full_piece_download_time = match g.chunks.mark_chunk_downloaded(&piece) {
|
||||
Some(ChunkMarkingResult::Completed) => {
|
||||
debug!(
|
||||
"piece={} done by {}, will write and checksum",
|
||||
piece.index, handle
|
||||
);
|
||||
debug!("piece={} done, will write and checksum", piece.index,);
|
||||
// This will prevent others from stealing it.
|
||||
g.peers
|
||||
.remove_inflight_piece(chunk_info.piece_index)
|
||||
|
|
@ -1043,17 +1058,13 @@ impl PeerHandler {
|
|||
}
|
||||
Some(ChunkMarkingResult::PreviouslyCompleted) => {
|
||||
// TODO: we might need to send cancellations here.
|
||||
debug!(
|
||||
"piece={} was done by someone else {}, ignoring",
|
||||
piece.index, handle
|
||||
);
|
||||
debug!("piece={} was done by someone else, ignoring", piece.index,);
|
||||
return Ok(());
|
||||
}
|
||||
Some(ChunkMarkingResult::NotCompleted) => None,
|
||||
None => {
|
||||
anyhow::bail!(
|
||||
"bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer",
|
||||
handle,
|
||||
"bogus data received: {:?}, cannot map this to a chunk, dropping peer",
|
||||
piece
|
||||
);
|
||||
}
|
||||
|
|
@ -1116,10 +1127,7 @@ impl PeerHandler {
|
|||
g.peers.reset_peer_backoff(handle);
|
||||
}
|
||||
|
||||
debug!(
|
||||
"piece={} successfully downloaded and verified from {}",
|
||||
index, handle
|
||||
);
|
||||
debug!("piece={} successfully downloaded and verified", index);
|
||||
|
||||
if self.state.get_left_to_download() == 0 {
|
||||
self.state.finished_notify.notify_waiters();
|
||||
|
|
@ -1130,10 +1138,7 @@ impl PeerHandler {
|
|||
self.state.maybe_transmit_haves(chunk_info.piece_index);
|
||||
}
|
||||
false => {
|
||||
warn!(
|
||||
"checksum for piece={} did not validate, came from {}",
|
||||
index, handle
|
||||
);
|
||||
warn!("checksum for piece={} did not validate", index,);
|
||||
self.state
|
||||
.locked
|
||||
.write()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue