diff --git a/TODO.md b/TODO.md index 7214b9f..fed1d9b 100644 --- a/TODO.md +++ b/TODO.md @@ -1,4 +1,4 @@ -- [ ] when we have the whole torrent, there's no point talking to peers that also have the whole torrent and keep reconnecting to them. +- [x] when we have the whole torrent, there's no point talking to peers that also have the whole torrent and keep reconnecting to them. - [ ] per-file stats - [x (partial)] per-peer stats - [x] use some concurrent hashmap e.g. flurry or dashmap @@ -8,13 +8,13 @@ - [x] initializing/checking - [x] blocks the whole process. Need to break it up. On slower devices (rpi) just hangs for a good while - [x] checking torrents should be visible right away -- [ ] server persistence - - [ ] it would be nice to restart the server and keep the state +- [x] server persistence + - [x] it would be nice to restart the server and keep the state - [x] torrent actions - [x] pause/unpause - [x] remove including from disk - [ ] DHT - - [ ] bootstrapping is lame + - [x] bootstrapping is lame - [x] many nodes in "Unknown" status, do smth about it - [x] for torrents with a few seeds might be cool to re-query DHT once in a while. - [x] don't leak memory when deleting torrents (i.e. remove torrent information (seen peers etc) once the torrent is deleted) diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index cb7a756..97a2101 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -277,7 +277,7 @@ impl RecursiveRequest { async move { let mut iteration = 0; loop { - debug!("iteration {}", iteration); + trace!("iteration {}", iteration); let sleep = match this.get_peers_root() { Ok(0) => Duration::from_secs(1), Ok(n) if n < 8 => REQUERY_INTERVAL / 2, diff --git a/crates/dht/src/persistence.rs b/crates/dht/src/persistence.rs index bf91903..f74b89f 100644 --- a/crates/dht/src/persistence.rs +++ b/crates/dht/src/persistence.rs @@ -44,7 +44,7 @@ fn dump_dht(dht: &Dht, filename: &Path, tempfile_name: &Path) -> anyhow::Result< .with_routing_table(|r| serde_json::to_writer(&mut file, &DhtSerialize { addr, table: r })) { Ok(_) => { - debug!("dumped DHT to {:?}", &tempfile_name); + trace!("dumped DHT to {:?}", &tempfile_name); } Err(e) => { return Err(e).with_context(|| { diff --git a/crates/dht/src/routing_table.rs b/crates/dht/src/routing_table.rs index 6da6ce7..1e21512 100644 --- a/crates/dht/src/routing_table.rs +++ b/crates/dht/src/routing_table.rs @@ -7,7 +7,7 @@ use serde::{ }; use tracing::debug; -use crate::{INACTIVITY_TIMEOUT}; +use crate::INACTIVITY_TIMEOUT; #[derive(Debug, Clone, Serialize, Deserialize)] enum BucketTreeNodeData { diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index aee6625..80cb07c 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -241,9 +241,13 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { let to_read_in_file = std::cmp::min(file_remaining_len, piece_remaining_bytes as u64) as usize; let mut file_g = self.files[file_idx].lock(); - debug!( + trace!( "piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}", - piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk + piece_index, + who_sent, + file_idx, + absolute_offset, + &last_received_chunk ); file_g .seek(SeekFrom::Start(absolute_offset)) @@ -269,7 +273,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { match self.torrent.compare_hash(piece_index.get(), h.finish()) { Some(true) => { - debug!("piece={} hash matches", piece_index); + trace!("piece={} hash matches", piece_index); Ok(true) } Some(false) => { @@ -305,9 +309,13 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize; let mut file_g = self.files[file_idx].lock(); - debug!( + trace!( "piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}", - chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info + chunk_info.piece_index, + who_sent, + file_idx, + absolute_offset, + &chunk_info ); file_g .seek(SeekFrom::Start(absolute_offset)) @@ -354,7 +362,7 @@ impl<'a, Sha1Impl: ISha1> FileOps<'a, Sha1Impl> { let to_write = std::cmp::min(buf.len(), remaining_len as usize); let mut file_g = self.files[file_idx].lock(); - debug!( + trace!( "piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}", chunk_info.piece_index, chunk_info, diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 289e061..84b3b89 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -13,7 +13,7 @@ use peer_binary_protocol::{ MessageOwned, PIECE_MESSAGE_DEFAULT_LEN, }; use tokio::time::timeout; -use tracing::{debug, trace}; +use tracing::trace; use crate::spawn_utils::BlockingSpawner; @@ -155,7 +155,7 @@ impl PeerConnection { let (h, size) = Handshake::deserialize(&read_buf[..read_so_far]) .map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?; - debug!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id))); + trace!("connected: id={:?}", try_decode_peer_id(Id20(h.peer_id))); if h.info_hash != self.info_hash.0 { anyhow::bail!("info hash does not match"); } @@ -210,7 +210,7 @@ impl PeerConnection { with_timeout(rwtimeout, write_half.write_all(&write_buf[..len])) .await .context("error writing bitfield to peer")?; - debug!("sent bitfield"); + trace!("sent bitfield"); } loop { @@ -249,7 +249,7 @@ impl PeerConnection { } }; - debug!("sending: {:?}, length={}", &req, len); + trace!("sending: {:?}, length={}", &req, len); with_timeout(rwtimeout, write_half.write_all(&write_buf[..len])) .await @@ -290,7 +290,7 @@ impl PeerConnection { r = reader => {r} r = writer => {r} }; - debug!("either reader or writer are done, exiting"); + trace!("either reader or writer are done, exiting"); r } } diff --git a/crates/librqbit/src/peer_info_reader/mod.rs b/crates/librqbit/src/peer_info_reader/mod.rs index a205a0d..6955aab 100644 --- a/crates/librqbit/src/peer_info_reader/mod.rs +++ b/crates/librqbit/src/peer_info_reader/mod.rs @@ -15,7 +15,7 @@ use peer_binary_protocol::{ }; use sha1w::{ISha1, Sha1}; use tokio::sync::mpsc::UnboundedSender; -use tracing::debug; +use tracing::trace; use crate::{ peer_connection::{ @@ -153,7 +153,7 @@ impl PeerConnectionHandler for Handler { } fn on_received_message(&self, msg: Message>) -> anyhow::Result<()> { - debug!("{}: received message: {:?}", self.addr, msg); + trace!("{}: received message: {:?}", self.addr, msg); if let Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Data { piece, diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 5aba636..47eb680 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -510,7 +510,7 @@ impl TorrentStateLive { }); match result { Some(true) => { - debug!("set peer to live") + trace!("set peer to live") } Some(false) => debug!("can't set peer live, it was in wrong state"), None => debug!("can't set peer live, it disappeared"), @@ -750,7 +750,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { Message::Interested => self.on_peer_interested(), Message::Piece(piece) => self.on_received_piece(piece).context("on_received_piece")?, Message::KeepAlive => { - debug!("keepalive received"); + trace!("keepalive received"); } Message::Have(h) => self.on_have(h), Message::NotInterested => { @@ -767,7 +767,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { let g = self.state.lock_read("serialize_bitfield_message_to_buf"); let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_raw_slice())); let len = msg.serialize(buf, None)?; - debug!("sending: {:?}, length={}", &msg, len); + trace!("sending: {:?}, length={}", &msg, len); Ok(len) } @@ -841,7 +841,7 @@ impl PeerHandler { let _error = match error { Some(e) => e, None => { - debug!("peer died without errors, not re-queueing"); + trace!("peer died without errors, not re-queueing"); pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); } @@ -850,7 +850,7 @@ impl PeerHandler { self.counters.errors.fetch_add(1, Ordering::Relaxed); if self.state.is_finished() { - debug!("torrent finished, not re-queueing"); + trace!("torrent finished, not re-queueing"); pe.value_mut().state.set(PeerState::NotNeeded, pstats); return Ok(()); } @@ -1014,7 +1014,7 @@ impl PeerHandler { // Theoretically, this could be done in the sending code, so that it reads straight into // the send buffer. let request = WriterRequest::ReadChunkRequest(chunk_info); - debug!("sending {:?}", &request); + trace!("sending {:?}", &request); Ok::<_, anyhow::Error>(self.tx.send(request)?) } @@ -1034,7 +1034,7 @@ impl PeerHandler { return; } }; - debug!("updated bitfield with have={}", have); + trace!("updated bitfield with have={}", have); }); } @@ -1168,7 +1168,7 @@ impl PeerHandler { } fn on_peer_interested(&self) { - debug!("peer is interested"); + trace!("peer is interested"); self.state.peers.mark_peer_interested(self.addr, true); } @@ -1266,7 +1266,7 @@ impl PeerHandler { match g.get_chunks_mut()?.mark_chunk_downloaded(&piece) { Some(ChunkMarkingResult::Completed) => { - debug!("piece={} done, will write and checksum", piece.index,); + trace!("piece={} done, will write and checksum", piece.index,); // This will prevent others from stealing it. { let piece = chunk_info.piece_index; diff --git a/crates/librqbit_core/src/spawn_utils.rs b/crates/librqbit_core/src/spawn_utils.rs index 81e9b00..6893e19 100644 --- a/crates/librqbit_core/src/spawn_utils.rs +++ b/crates/librqbit_core/src/spawn_utils.rs @@ -1,4 +1,4 @@ -use tracing::{debug, error, trace, Instrument}; +use tracing::{error, trace, Instrument}; pub fn spawn( span: tracing::Span, @@ -8,7 +8,7 @@ pub fn spawn( trace!("started"); match fut.await { Ok(_) => { - debug!("finished"); + trace!("finished"); } Err(e) => { error!("finished with error: {:#}", e)