diff --git a/TODO.md b/TODO.md index 017b3d6..b21b187 100644 --- a/TODO.md +++ b/TODO.md @@ -28,6 +28,7 @@ someday: - [x] favicons for Web UI refactor: +- [ ] session persistence: should add torrents even if we haven't resolved it yet - [x] where are peers stored - [x] http api pause/unpause etc - [x] when a live torrent fails writing to disk, it should transition to error state @@ -35,7 +36,6 @@ refactor: - [x] silence this: WARN torrent{id=0}:external_peer_adder: librqbit::spawn_utils: finished with error: no longer live - [x] start from error state should be possible from UI -- [ ] if the torrent was completed, not need to re-check it - [x] checking is very slow on raspberry checked. nothing much can be done here. Even if raspberry's own libssl.so is used it's still super slow (sha1) - [ ] .rqbit-session.json file has 0 bytes when disk full. I guess fs::rename does this when disk is full? at least on linux. Couldn't repro on MacOS \ No newline at end of file diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index fea416a..0456810 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -130,7 +130,7 @@ impl DhtState { fn spawn_request(self: &Arc, request: Request, addr: SocketAddr) { let this = self.clone(); spawn( - error_span!(parent: None, "dht_request", addr=addr.to_string(), request=format!("{:?}", request)), + error_span!(parent: None, "dht_spawn_request", addr=addr.to_string(), request=format!("{:?}", request)), async move { match this.send_request_and_handle_response(request, addr).await { Ok(_) => {} @@ -421,7 +421,13 @@ impl DhtState { spawn( error_span!("peers_requester", info_hash = format!("{:?}", info_hash)), async move { + let mut iteration = 0usize; loop { + if !this.get_peers_subscribers.contains_key(&info_hash) { + debug!("no more subscribers, closing peers_requester"); + return Ok(()); + } + trace!("iteration {iteration}"); // We don't need to allocate/collect here, but the borrow checker is not happy otherwise. let nodes_to_query = this .routing_table @@ -442,6 +448,7 @@ impl DhtState { } } tokio::time::sleep(REQUERY_INTERVAL).await; + iteration += 1; } }, ); @@ -635,9 +642,10 @@ impl DhtState { ) -> anyhow::Result<()> { self.routing_table_add_node(source, source_addr); - let bsender = match self.get_peers_subscribers.get(&info_hash) { - Some(s) => s, - None => { + use dashmap::mapref::entry::Entry; + let bsender = match self.get_peers_subscribers.entry(info_hash) { + Entry::Occupied(o) => o, + Entry::Vacant(_) => { warn!( "ignoring get_peers response, no subscribers for {:?}", info_hash @@ -647,7 +655,6 @@ impl DhtState { }; { - use dashmap::mapref::entry::Entry; let n = MaybeUsefulNode { id: source, addr: source_addr, @@ -682,9 +689,29 @@ impl DhtState { } let addr = SocketAddr::V4(peer.addr); if seen.insert(addr) { - bsender - .send(addr) - .context("error sending peers to subscribers")?; + match bsender.get().send(addr) { + Ok(_) => {} + Err(_) => { + debug!("no more subscribers for {:?}, cleaning up", info_hash); + // bsender.remove(); + + // let this = self.clone(); + // spawn( + // error_span!("cleanup", info_hash = format!("{info_hash:?}")), + // async move { + // tokio::time::sleep(Duration::from_secs(10)).await; + // if !this.get_peers_subscribers.contains_key(&info_hash) { + // debug!("no more subscribers for {:?}, removed it from seen peers", info_hash); + // this.seen_peers.remove(&info_hash); + // this.closest_responding_nodes_for_info_hash + // .remove(&info_hash); + // } + // Ok(()) + // }, + // ); + return Ok(()); + } + } } } };