diff --git a/crates/dht/src/dht.rs b/crates/dht/src/dht.rs index ae30b02..a4fda61 100644 --- a/crates/dht/src/dht.rs +++ b/crates/dht/src/dht.rs @@ -18,7 +18,7 @@ use futures::{stream::FuturesUnordered, Stream, StreamExt}; use indexmap::IndexSet; use librqbit_core::{id20::Id20, peer_id::generate_peer_id}; use log::{debug, info, trace, warn}; -use parking_lot::Mutex; +use parking_lot::RwLock; use rand::Rng; use serde::Serialize; use tokio::{ @@ -466,18 +466,18 @@ enum Request { #[derive(Clone)] pub struct Dht { - state: Arc>, + state: Arc>, } struct DhtWorker { socket: UdpSocket, peer_id: Id20, - state: Arc>, + state: Arc>, } impl DhtWorker { fn on_response(&self, msg: Message, addr: SocketAddr) -> anyhow::Result<()> { - self.state.lock().on_incoming_from_remote(msg, addr) + self.state.write().on_incoming_from_remote(msg, addr) } async fn start( @@ -502,7 +502,7 @@ impl DhtWorker { for addr in addrs { let request = this .state - .lock() + .write() .create_request(Request::FindNode(this.peer_id), addr); in_tx.send((request, addr))?; } @@ -561,7 +561,7 @@ impl DhtWorker { struct PeerStream { info_hash: Id20, - state: Arc>, + state: Arc>, absolute_stream_pos: usize, initial_peers_pos: Option<(usize, usize)>, broadcast_rx: BroadcastStream, @@ -578,7 +578,7 @@ impl Stream for PeerStream { if let Some((pos, end)) = self.initial_peers_pos.take() { let addr = *self .state - .lock() + .read() .seen_peers .get(&self.info_hash) .unwrap() @@ -626,7 +626,7 @@ impl Dht { let socket = match config.listen_addr { Some(addr) => UdpSocket::bind(addr) .await - .with_context(|| format!("error binding socket, address {}", addr)), + .with_context(|| format!("error binding socket, address {addr}")), None => UdpSocket::bind("0.0.0.0:0") .await .context("error binding socket, address 0.0.0.0:0"), @@ -644,7 +644,7 @@ impl Dht { .unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect()); let (in_tx, in_rx) = unbounded_channel(); - let state = Arc::new(Mutex::new(DhtState::new( + let state = Arc::new(RwLock::new(DhtState::new( peer_id, in_tx.clone(), config.routing_table, @@ -669,7 +669,7 @@ impl Dht { &self, info_hash: Id20, ) -> anyhow::Result + Unpin> { - let (pos, rx) = self.state.lock().get_peers(info_hash)?; + let (pos, rx) = self.state.write().get_peers(info_hash)?; Ok(PeerStream { info_hash, state: self.state.clone(), @@ -680,18 +680,18 @@ impl Dht { } pub fn listen_addr(&self) -> SocketAddr { - self.state.lock().listen_addr + self.state.read().listen_addr } pub fn stats(&self) -> DhtStats { - self.state.lock().get_stats() + self.state.read().get_stats() } pub fn with_routing_table R>(&self, f: F) -> R { - f(&self.state.lock().routing_table) + f(&self.state.read().routing_table) } pub fn clone_routing_table(&self) -> RoutingTable { - self.state.lock().routing_table.clone() + self.state.read().routing_table.clone() } }