Box<[u8]> instead of Vec<u8> for ByteBufOwned

This commit is contained in:
Igor Katson 2024-03-29 11:00:58 +00:00
parent d9ec702f59
commit deee41cd93
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
23 changed files with 106 additions and 124 deletions

View file

@ -4,7 +4,7 @@ use std::{
net::{Ipv4Addr, SocketAddrV4},
};
use bencode::{ByteBuf, ByteString};
use bencode::{ByteBuf, ByteBufOwned};
use clone_to_owned::CloneToOwned;
use librqbit_core::hash_id::Id20;
use serde::{
@ -356,7 +356,7 @@ pub struct Message<BufT> {
pub ip: Option<SocketAddrV4>,
}
impl Message<ByteString> {
impl Message<ByteBufOwned> {
// This implies that the transaction id was generated by us.
pub fn get_our_transaction_id(&self) -> Option<u16> {
if self.transaction_id.len() != 2 {

View file

@ -21,7 +21,7 @@ use crate::{
};
use anyhow::{bail, Context};
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
use bencode::ByteString;
use bencode::ByteBufOwned;
use dashmap::DashMap;
use futures::{
future::BoxFuture, stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryFutureExt,
@ -59,7 +59,7 @@ struct OutstandingRequest {
pub struct WorkerSendRequest {
// If this is set, we are tracking the response in inflight_by_transaction_id
our_tid: Option<u16>,
message: Message<ByteString>,
message: Message<ByteBufOwned>,
addr: SocketAddr,
}
@ -607,13 +607,13 @@ impl DhtState {
}
}
fn create_request(&self, request: Request) -> (u16, Message<ByteString>) {
fn create_request(&self, request: Request) -> (u16, Message<ByteBufOwned>) {
let transaction_id = self.next_transaction_id.fetch_add(1, Ordering::Relaxed);
let transaction_id_buf = [(transaction_id >> 8) as u8, (transaction_id & 0xff) as u8];
let message = match request {
Request::GetPeers(info_hash) => Message {
transaction_id: ByteString::from(transaction_id_buf.as_ref()),
transaction_id: ByteBufOwned::from(transaction_id_buf.as_ref()),
version: None,
ip: None,
kind: MessageKind::GetPeersRequest(GetPeersRequest {
@ -622,7 +622,7 @@ impl DhtState {
}),
},
Request::FindNode(target) => Message {
transaction_id: ByteString::from(transaction_id_buf.as_ref()),
transaction_id: ByteBufOwned::from(transaction_id_buf.as_ref()),
version: None,
ip: None,
kind: MessageKind::FindNodeRequest(FindNodeRequest {
@ -631,7 +631,7 @@ impl DhtState {
}),
},
Request::Ping => Message {
transaction_id: ByteString::from(transaction_id_buf.as_ref()),
transaction_id: ByteBufOwned::from(transaction_id_buf.as_ref()),
version: None,
ip: None,
kind: MessageKind::PingRequest(PingRequest { id: self.id }),
@ -648,7 +648,7 @@ impl DhtState {
port,
token,
}),
transaction_id: ByteString::from(transaction_id_buf.as_ref()),
transaction_id: ByteBufOwned::from(transaction_id_buf.as_ref()),
version: None,
ip: None,
},
@ -658,7 +658,7 @@ impl DhtState {
fn on_received_message(
self: &Arc<Self>,
msg: Message<ByteString>,
msg: Message<ByteBufOwned>,
addr: SocketAddr,
) -> anyhow::Result<()> {
let generate_compact_nodes = |target| {
@ -770,8 +770,8 @@ impl DhtState {
id: self.id,
nodes: Some(compact_node_info),
values: Some(compact_peer_info),
token: Some(ByteString(
self.peer_store.gen_token_for(req.id, addr).to_vec(),
token: Some(ByteBufOwned::from(
&self.peer_store.gen_token_for(req.id, addr)[..],
)),
}),
};
@ -821,15 +821,15 @@ enum Request {
FindNode(Id20),
Announce {
info_hash: Id20,
token: ByteString,
token: ByteBufOwned,
port: u16,
},
Ping,
}
enum ResponseOrError {
Response(Response<ByteString>),
Error(ErrorDescription<ByteString>),
Response(Response<ByteBufOwned>),
Error(ErrorDescription<ByteBufOwned>),
}
impl core::fmt::Debug for ResponseOrError {
@ -1010,7 +1010,7 @@ impl DhtWorker {
&self,
socket: &UdpSocket,
mut input_rx: UnboundedReceiver<WorkerSendRequest>,
output_tx: Sender<(Message<ByteString>, SocketAddr)>,
output_tx: Sender<(Message<ByteBufOwned>, SocketAddr)>,
) -> anyhow::Result<()> {
let writer = async {
let mut buf = Vec::new();
@ -1050,7 +1050,7 @@ impl DhtWorker {
.recv_from(&mut buf)
.await
.context("error reading from UDP socket")?;
match bprotocol::deserialize_message::<ByteString>(&buf[..size]) {
match bprotocol::deserialize_message::<ByteBufOwned>(&buf[..size]) {
Ok(msg) => match output_tx.send((msg, addr)).await {
Ok(_) => {}
Err(_) => break,

View file

@ -5,7 +5,7 @@ use std::{
sync::atomic::AtomicU32,
};
use bencode::ByteString;
use bencode::ByteBufOwned;
use chrono::{DateTime, Utc};
use librqbit_core::hash_id::Id20;
use parking_lot::RwLock;
@ -134,7 +134,7 @@ impl PeerStore {
token
}
pub fn store_peer(&self, announce: &AnnouncePeer<ByteString>, addr: SocketAddr) -> bool {
pub fn store_peer(&self, announce: &AnnouncePeer<ByteBufOwned>, addr: SocketAddr) -> bool {
// If the info_hash in announce is too far away from us, don't store it.
// If the token doesn't match, don't store it.
// If we are out of capacity, don't store it.