Downgraded a bunch of messages from debug to trace
This commit is contained in:
parent
aa2a41a53c
commit
a5ae2988b8
9 changed files with 39 additions and 31 deletions
8
TODO.md
8
TODO.md
|
|
@ -1,4 +1,4 @@
|
||||||
- [ ] when we have the whole torrent, there's no point talking to peers that also have the whole torrent and keep reconnecting to them.
|
- [x] when we have the whole torrent, there's no point talking to peers that also have the whole torrent and keep reconnecting to them.
|
||||||
- [ ] per-file stats
|
- [ ] per-file stats
|
||||||
- [x (partial)] per-peer stats
|
- [x (partial)] per-peer stats
|
||||||
- [x] use some concurrent hashmap e.g. flurry or dashmap
|
- [x] use some concurrent hashmap e.g. flurry or dashmap
|
||||||
|
|
@ -8,13 +8,13 @@
|
||||||
- [x] initializing/checking
|
- [x] initializing/checking
|
||||||
- [x] blocks the whole process. Need to break it up. On slower devices (rpi) just hangs for a good while
|
- [x] blocks the whole process. Need to break it up. On slower devices (rpi) just hangs for a good while
|
||||||
- [x] checking torrents should be visible right away
|
- [x] checking torrents should be visible right away
|
||||||
- [ ] server persistence
|
- [x] server persistence
|
||||||
- [ ] it would be nice to restart the server and keep the state
|
- [x] it would be nice to restart the server and keep the state
|
||||||
- [x] torrent actions
|
- [x] torrent actions
|
||||||
- [x] pause/unpause
|
- [x] pause/unpause
|
||||||
- [x] remove including from disk
|
- [x] remove including from disk
|
||||||
- [ ] DHT
|
- [ ] DHT
|
||||||
- [ ] bootstrapping is lame
|
- [x] bootstrapping is lame
|
||||||
- [x] many nodes in "Unknown" status, do smth about it
|
- [x] many nodes in "Unknown" status, do smth about it
|
||||||
- [x] for torrents with a few seeds might be cool to re-query DHT once in a while.
|
- [x] for torrents with a few seeds might be cool to re-query DHT once in a while.
|
||||||
- [x] don't leak memory when deleting torrents (i.e. remove torrent information (seen peers etc) once the torrent is deleted)
|
- [x] don't leak memory when deleting torrents (i.e. remove torrent information (seen peers etc) once the torrent is deleted)
|
||||||
|
|
|
||||||
|
|
@ -277,7 +277,7 @@ impl RecursiveRequest<RecursiveRequestCallbacksGetPeers> {
|
||||||
async move {
|
async move {
|
||||||
let mut iteration = 0;
|
let mut iteration = 0;
|
||||||
loop {
|
loop {
|
||||||
debug!("iteration {}", iteration);
|
trace!("iteration {}", iteration);
|
||||||
let sleep = match this.get_peers_root() {
|
let sleep = match this.get_peers_root() {
|
||||||
Ok(0) => Duration::from_secs(1),
|
Ok(0) => Duration::from_secs(1),
|
||||||
Ok(n) if n < 8 => REQUERY_INTERVAL / 2,
|
Ok(n) if n < 8 => REQUERY_INTERVAL / 2,
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ fn dump_dht(dht: &Dht, filename: &Path, tempfile_name: &Path) -> anyhow::Result<
|
||||||
.with_routing_table(|r| serde_json::to_writer(&mut file, &DhtSerialize { addr, table: r }))
|
.with_routing_table(|r| serde_json::to_writer(&mut file, &DhtSerialize { addr, table: r }))
|
||||||
{
|
{
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
debug!("dumped DHT to {:?}", &tempfile_name);
|
trace!("dumped DHT to {:?}", &tempfile_name);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return Err(e).with_context(|| {
|
return Err(e).with_context(|| {
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ use serde::{
|
||||||
};
|
};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use crate::{INACTIVITY_TIMEOUT};
|
use crate::INACTIVITY_TIMEOUT;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
enum BucketTreeNodeData {
|
enum BucketTreeNodeData {
|
||||||
|
|
|
||||||
|
|
@ -241,9 +241,13 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
|
||||||
let to_read_in_file =
|
let to_read_in_file =
|
||||||
std::cmp::min(file_remaining_len, piece_remaining_bytes as u64) as usize;
|
std::cmp::min(file_remaining_len, piece_remaining_bytes as u64) as usize;
|
||||||
let mut file_g = self.files[file_idx].lock();
|
let mut file_g = self.files[file_idx].lock();
|
||||||
debug!(
|
trace!(
|
||||||
"piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}",
|
"piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}",
|
||||||
piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk
|
piece_index,
|
||||||
|
who_sent,
|
||||||
|
file_idx,
|
||||||
|
absolute_offset,
|
||||||
|
&last_received_chunk
|
||||||
);
|
);
|
||||||
file_g
|
file_g
|
||||||
.seek(SeekFrom::Start(absolute_offset))
|
.seek(SeekFrom::Start(absolute_offset))
|
||||||
|
|
@ -269,7 +273,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
|
||||||
|
|
||||||
match self.torrent.compare_hash(piece_index.get(), h.finish()) {
|
match self.torrent.compare_hash(piece_index.get(), h.finish()) {
|
||||||
Some(true) => {
|
Some(true) => {
|
||||||
debug!("piece={} hash matches", piece_index);
|
trace!("piece={} hash matches", piece_index);
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
Some(false) => {
|
Some(false) => {
|
||||||
|
|
@ -305,9 +309,13 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
|
||||||
let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize;
|
let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize;
|
||||||
|
|
||||||
let mut file_g = self.files[file_idx].lock();
|
let mut file_g = self.files[file_idx].lock();
|
||||||
debug!(
|
trace!(
|
||||||
"piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}",
|
"piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}",
|
||||||
chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info
|
chunk_info.piece_index,
|
||||||
|
who_sent,
|
||||||
|
file_idx,
|
||||||
|
absolute_offset,
|
||||||
|
&chunk_info
|
||||||
);
|
);
|
||||||
file_g
|
file_g
|
||||||
.seek(SeekFrom::Start(absolute_offset))
|
.seek(SeekFrom::Start(absolute_offset))
|
||||||
|
|
@ -354,7 +362,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> {
|
||||||
let to_write = std::cmp::min(buf.len(), remaining_len as usize);
|
let to_write = std::cmp::min(buf.len(), remaining_len as usize);
|
||||||
|
|
||||||
let mut file_g = self.files[file_idx].lock();
|
let mut file_g = self.files[file_idx].lock();
|
||||||
debug!(
|
trace!(
|
||||||
"piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}",
|
"piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}",
|
||||||
chunk_info.piece_index,
|
chunk_info.piece_index,
|
||||||
chunk_info,
|
chunk_info,
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ use peer_binary_protocol::{
|
||||||
MessageOwned, PIECE_MESSAGE_DEFAULT_LEN,
|
MessageOwned, PIECE_MESSAGE_DEFAULT_LEN,
|
||||||
};
|
};
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
use tracing::{debug, trace};
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::spawn_utils::BlockingSpawner;
|
use crate::spawn_utils::BlockingSpawner;
|
||||||
|
|
||||||
|
|
@ -155,7 +155,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
let (h, size) = Handshake::deserialize(&read_buf[..read_so_far])
|
let (h, size) = Handshake::deserialize(&read_buf[..read_so_far])
|
||||||
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?;
|
.map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?;
|
||||||
|
|
||||||
debug!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id)));
|
trace!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id)));
|
||||||
if h.info_hash != self.info_hash.0 {
|
if h.info_hash != self.info_hash.0 {
|
||||||
anyhow::bail!("info hash does not match");
|
anyhow::bail!("info hash does not match");
|
||||||
}
|
}
|
||||||
|
|
@ -210,7 +210,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
|
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
|
||||||
.await
|
.await
|
||||||
.context("error writing bitfield to peer")?;
|
.context("error writing bitfield to peer")?;
|
||||||
debug!("sent bitfield");
|
trace!("sent bitfield");
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
|
@ -249,7 +249,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!("sending: {:?}, length={}", &req, len);
|
trace!("sending: {:?}, length={}", &req, len);
|
||||||
|
|
||||||
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
|
with_timeout(rwtimeout, write_half.write_all(&write_buf[..len]))
|
||||||
.await
|
.await
|
||||||
|
|
@ -290,7 +290,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
r = reader => {r}
|
r = reader => {r}
|
||||||
r = writer => {r}
|
r = writer => {r}
|
||||||
};
|
};
|
||||||
debug!("either reader or writer are done, exiting");
|
trace!("either reader or writer are done, exiting");
|
||||||
r
|
r
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ use peer_binary_protocol::{
|
||||||
};
|
};
|
||||||
use sha1w::{ISha1, Sha1};
|
use sha1w::{ISha1, Sha1};
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tracing::debug;
|
use tracing::trace;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
peer_connection::{
|
peer_connection::{
|
||||||
|
|
@ -153,7 +153,7 @@ impl PeerConnectionHandler for Handler {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
|
fn on_received_message(&self, msg: Message<ByteBuf<'_>>) -> anyhow::Result<()> {
|
||||||
debug!("{}: received message: {:?}", self.addr, msg);
|
trace!("{}: received message: {:?}", self.addr, msg);
|
||||||
|
|
||||||
if let Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Data {
|
if let Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Data {
|
||||||
piece,
|
piece,
|
||||||
|
|
|
||||||
|
|
@ -510,7 +510,7 @@ impl TorrentStateLive {
|
||||||
});
|
});
|
||||||
match result {
|
match result {
|
||||||
Some(true) => {
|
Some(true) => {
|
||||||
debug!("set peer to live")
|
trace!("set peer to live")
|
||||||
}
|
}
|
||||||
Some(false) => debug!("can't set peer live, it was in wrong state"),
|
Some(false) => debug!("can't set peer live, it was in wrong state"),
|
||||||
None => debug!("can't set peer live, it disappeared"),
|
None => debug!("can't set peer live, it disappeared"),
|
||||||
|
|
@ -750,7 +750,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
Message::Interested => self.on_peer_interested(),
|
Message::Interested => self.on_peer_interested(),
|
||||||
Message::Piece(piece) => self.on_received_piece(piece).context("on_received_piece")?,
|
Message::Piece(piece) => self.on_received_piece(piece).context("on_received_piece")?,
|
||||||
Message::KeepAlive => {
|
Message::KeepAlive => {
|
||||||
debug!("keepalive received");
|
trace!("keepalive received");
|
||||||
}
|
}
|
||||||
Message::Have(h) => self.on_have(h),
|
Message::Have(h) => self.on_have(h),
|
||||||
Message::NotInterested => {
|
Message::NotInterested => {
|
||||||
|
|
@ -767,7 +767,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
|
||||||
let g = self.state.lock_read("serialize_bitfield_message_to_buf");
|
let g = self.state.lock_read("serialize_bitfield_message_to_buf");
|
||||||
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_raw_slice()));
|
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_raw_slice()));
|
||||||
let len = msg.serialize(buf, None)?;
|
let len = msg.serialize(buf, None)?;
|
||||||
debug!("sending: {:?}, length={}", &msg, len);
|
trace!("sending: {:?}, length={}", &msg, len);
|
||||||
Ok(len)
|
Ok(len)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -841,7 +841,7 @@ impl PeerHandler {
|
||||||
let _error = match error {
|
let _error = match error {
|
||||||
Some(e) => e,
|
Some(e) => e,
|
||||||
None => {
|
None => {
|
||||||
debug!("peer died without errors, not re-queueing");
|
trace!("peer died without errors, not re-queueing");
|
||||||
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
|
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
@ -850,7 +850,7 @@ impl PeerHandler {
|
||||||
self.counters.errors.fetch_add(1, Ordering::Relaxed);
|
self.counters.errors.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
if self.state.is_finished() {
|
if self.state.is_finished() {
|
||||||
debug!("torrent finished, not re-queueing");
|
trace!("torrent finished, not re-queueing");
|
||||||
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
|
pe.value_mut().state.set(PeerState::NotNeeded, pstats);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
@ -1014,7 +1014,7 @@ impl PeerHandler {
|
||||||
// Theoretically, this could be done in the sending code, so that it reads straight into
|
// Theoretically, this could be done in the sending code, so that it reads straight into
|
||||||
// the send buffer.
|
// the send buffer.
|
||||||
let request = WriterRequest::ReadChunkRequest(chunk_info);
|
let request = WriterRequest::ReadChunkRequest(chunk_info);
|
||||||
debug!("sending {:?}", &request);
|
trace!("sending {:?}", &request);
|
||||||
Ok::<_, anyhow::Error>(self.tx.send(request)?)
|
Ok::<_, anyhow::Error>(self.tx.send(request)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1034,7 +1034,7 @@ impl PeerHandler {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
debug!("updated bitfield with have={}", have);
|
trace!("updated bitfield with have={}", have);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1168,7 +1168,7 @@ impl PeerHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn on_peer_interested(&self) {
|
fn on_peer_interested(&self) {
|
||||||
debug!("peer is interested");
|
trace!("peer is interested");
|
||||||
self.state.peers.mark_peer_interested(self.addr, true);
|
self.state.peers.mark_peer_interested(self.addr, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1266,7 +1266,7 @@ impl PeerHandler {
|
||||||
|
|
||||||
match g.get_chunks_mut()?.mark_chunk_downloaded(&piece) {
|
match g.get_chunks_mut()?.mark_chunk_downloaded(&piece) {
|
||||||
Some(ChunkMarkingResult::Completed) => {
|
Some(ChunkMarkingResult::Completed) => {
|
||||||
debug!("piece={} done, will write and checksum", piece.index,);
|
trace!("piece={} done, will write and checksum", piece.index,);
|
||||||
// This will prevent others from stealing it.
|
// This will prevent others from stealing it.
|
||||||
{
|
{
|
||||||
let piece = chunk_info.piece_index;
|
let piece = chunk_info.piece_index;
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
use tracing::{debug, error, trace, Instrument};
|
use tracing::{error, trace, Instrument};
|
||||||
|
|
||||||
pub fn spawn(
|
pub fn spawn(
|
||||||
span: tracing::Span,
|
span: tracing::Span,
|
||||||
|
|
@ -8,7 +8,7 @@ pub fn spawn(
|
||||||
trace!("started");
|
trace!("started");
|
||||||
match fut.await {
|
match fut.await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
debug!("finished");
|
trace!("finished");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("finished with error: {:#}", e)
|
error!("finished with error: {:#}", e)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue