DHT: use rwlock, not mutex

This commit is contained in:
Igor Katson 2022-12-07 22:01:29 +00:00
parent 813ef92123
commit 818e300935
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5

View file

@ -18,7 +18,7 @@ use futures::{stream::FuturesUnordered, Stream, StreamExt};
use indexmap::IndexSet;
use librqbit_core::{id20::Id20, peer_id::generate_peer_id};
use log::{debug, info, trace, warn};
use parking_lot::Mutex;
use parking_lot::RwLock;
use rand::Rng;
use serde::Serialize;
use tokio::{
@ -466,18 +466,18 @@ enum Request {
#[derive(Clone)]
pub struct Dht {
state: Arc<Mutex<DhtState>>,
state: Arc<RwLock<DhtState>>,
}
struct DhtWorker {
socket: UdpSocket,
peer_id: Id20,
state: Arc<Mutex<DhtState>>,
state: Arc<RwLock<DhtState>>,
}
impl DhtWorker {
fn on_response(&self, msg: Message<ByteString>, addr: SocketAddr) -> anyhow::Result<()> {
self.state.lock().on_incoming_from_remote(msg, addr)
self.state.write().on_incoming_from_remote(msg, addr)
}
async fn start(
@ -502,7 +502,7 @@ impl DhtWorker {
for addr in addrs {
let request = this
.state
.lock()
.write()
.create_request(Request::FindNode(this.peer_id), addr);
in_tx.send((request, addr))?;
}
@ -561,7 +561,7 @@ impl DhtWorker {
struct PeerStream {
info_hash: Id20,
state: Arc<Mutex<DhtState>>,
state: Arc<RwLock<DhtState>>,
absolute_stream_pos: usize,
initial_peers_pos: Option<(usize, usize)>,
broadcast_rx: BroadcastStream<SocketAddr>,
@ -578,7 +578,7 @@ impl Stream for PeerStream {
if let Some((pos, end)) = self.initial_peers_pos.take() {
let addr = *self
.state
.lock()
.read()
.seen_peers
.get(&self.info_hash)
.unwrap()
@ -626,7 +626,7 @@ impl Dht {
let socket = match config.listen_addr {
Some(addr) => UdpSocket::bind(addr)
.await
.with_context(|| format!("error binding socket, address {}", addr)),
.with_context(|| format!("error binding socket, address {addr}")),
None => UdpSocket::bind("0.0.0.0:0")
.await
.context("error binding socket, address 0.0.0.0:0"),
@ -644,7 +644,7 @@ impl Dht {
.unwrap_or_else(|| crate::DHT_BOOTSTRAP.iter().map(|v| v.to_string()).collect());
let (in_tx, in_rx) = unbounded_channel();
let state = Arc::new(Mutex::new(DhtState::new(
let state = Arc::new(RwLock::new(DhtState::new(
peer_id,
in_tx.clone(),
config.routing_table,
@ -669,7 +669,7 @@ impl Dht {
&self,
info_hash: Id20,
) -> anyhow::Result<impl Stream<Item = SocketAddr> + Unpin> {
let (pos, rx) = self.state.lock().get_peers(info_hash)?;
let (pos, rx) = self.state.write().get_peers(info_hash)?;
Ok(PeerStream {
info_hash,
state: self.state.clone(),
@ -680,18 +680,18 @@ impl Dht {
}
pub fn listen_addr(&self) -> SocketAddr {
self.state.lock().listen_addr
self.state.read().listen_addr
}
pub fn stats(&self) -> DhtStats {
self.state.lock().get_stats()
self.state.read().get_stats()
}
pub fn with_routing_table<R, F: FnOnce(&RoutingTable) -> R>(&self, f: F) -> R {
f(&self.state.lock().routing_table)
f(&self.state.read().routing_table)
}
pub fn clone_routing_table(&self) -> RoutingTable {
self.state.lock().routing_table.clone()
self.state.read().routing_table.clone()
}
}