Wrap reading files into block_in_place
This commit is contained in:
parent
f845eafca4
commit
bca55891c1
5 changed files with 40 additions and 15 deletions
|
|
@ -6,7 +6,9 @@ use futures::{stream::FuturesUnordered, Stream, StreamExt};
|
||||||
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
|
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
|
|
||||||
use crate::{peer_connection::PeerConnectionOptions, peer_info_reader};
|
use crate::{
|
||||||
|
peer_connection::PeerConnectionOptions, peer_info_reader, spawn_utils::BlockingSpawner,
|
||||||
|
};
|
||||||
use librqbit_core::id20::Id20;
|
use librqbit_core::id20::Id20;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -45,6 +47,7 @@ pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unp
|
||||||
peer_id,
|
peer_id,
|
||||||
info_hash,
|
info_hash,
|
||||||
peer_connection_options,
|
peer_connection_options,
|
||||||
|
BlockingSpawner::new(true),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.with_context(|| format!("error reading metainfo from {}", addr));
|
.with_context(|| format!("error reading metainfo from {}", addr));
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,8 @@ use peer_binary_protocol::{
|
||||||
};
|
};
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
|
|
||||||
|
use crate::spawn_utils::BlockingSpawner;
|
||||||
|
|
||||||
pub trait PeerConnectionHandler {
|
pub trait PeerConnectionHandler {
|
||||||
fn get_have_bytes(&self) -> u64;
|
fn get_have_bytes(&self) -> u64;
|
||||||
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> Option<usize>;
|
fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec<u8>) -> Option<usize>;
|
||||||
|
|
@ -43,6 +45,7 @@ pub struct PeerConnection<H> {
|
||||||
info_hash: Id20,
|
info_hash: Id20,
|
||||||
peer_id: Id20,
|
peer_id: Id20,
|
||||||
options: PeerConnectionOptions,
|
options: PeerConnectionOptions,
|
||||||
|
spawner: BlockingSpawner,
|
||||||
}
|
}
|
||||||
|
|
||||||
// async fn read_one<'a, R: AsyncReadExt + Unpin>(
|
// async fn read_one<'a, R: AsyncReadExt + Unpin>(
|
||||||
|
|
@ -107,12 +110,14 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
peer_id: Id20,
|
peer_id: Id20,
|
||||||
handler: H,
|
handler: H,
|
||||||
options: Option<PeerConnectionOptions>,
|
options: Option<PeerConnectionOptions>,
|
||||||
|
spawner: BlockingSpawner,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
PeerConnection {
|
PeerConnection {
|
||||||
handler,
|
handler,
|
||||||
addr,
|
addr,
|
||||||
info_hash,
|
info_hash,
|
||||||
peer_id,
|
peer_id,
|
||||||
|
spawner,
|
||||||
options: options.unwrap_or_default(),
|
options: options.unwrap_or_default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -243,12 +248,16 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
|
||||||
WriterRequest::ReadChunkRequest(chunk) => {
|
WriterRequest::ReadChunkRequest(chunk) => {
|
||||||
// this whole section is an optimization
|
// this whole section is an optimization
|
||||||
write_buf.resize(PIECE_MESSAGE_DEFAULT_LEN, 0);
|
write_buf.resize(PIECE_MESSAGE_DEFAULT_LEN, 0);
|
||||||
let preamble_len = serialize_piece_preamble(&chunk, &mut write_buf);
|
let preamble_len = serialize_piece_preamble(chunk, &mut write_buf);
|
||||||
let full_len = preamble_len + chunk.size as usize;
|
let full_len = preamble_len + chunk.size as usize;
|
||||||
write_buf.resize(full_len, 0);
|
write_buf.resize(full_len, 0);
|
||||||
self.handler
|
self.spawner
|
||||||
.read_chunk(chunk, &mut write_buf[preamble_len..])
|
.spawn_block_in_place(|| {
|
||||||
|
self.handler
|
||||||
|
.read_chunk(chunk, &mut write_buf[preamble_len..])
|
||||||
|
})
|
||||||
.with_context(|| format!("error reading chunk {:?}", chunk))?;
|
.with_context(|| format!("error reading chunk {:?}", chunk))?;
|
||||||
|
|
||||||
uploaded_add = Some(chunk.size);
|
uploaded_add = Some(chunk.size);
|
||||||
full_len
|
full_len
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,8 +17,11 @@ use peer_binary_protocol::{
|
||||||
use sha1w::{ISha1, Sha1};
|
use sha1w::{ISha1, Sha1};
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
|
|
||||||
use crate::peer_connection::{
|
use crate::{
|
||||||
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
|
peer_connection::{
|
||||||
|
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
|
||||||
|
},
|
||||||
|
spawn_utils::BlockingSpawner,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn read_metainfo_from_peer(
|
pub async fn read_metainfo_from_peer(
|
||||||
|
|
@ -26,6 +29,7 @@ pub async fn read_metainfo_from_peer(
|
||||||
peer_id: Id20,
|
peer_id: Id20,
|
||||||
info_hash: Id20,
|
info_hash: Id20,
|
||||||
peer_connection_options: Option<PeerConnectionOptions>,
|
peer_connection_options: Option<PeerConnectionOptions>,
|
||||||
|
spawner: BlockingSpawner,
|
||||||
) -> anyhow::Result<TorrentMetaV1Info<ByteString>> {
|
) -> anyhow::Result<TorrentMetaV1Info<ByteString>> {
|
||||||
let (result_tx, result_rx) =
|
let (result_tx, result_rx) =
|
||||||
tokio::sync::oneshot::channel::<anyhow::Result<TorrentMetaV1Info<ByteString>>>();
|
tokio::sync::oneshot::channel::<anyhow::Result<TorrentMetaV1Info<ByteString>>>();
|
||||||
|
|
@ -37,8 +41,14 @@ pub async fn read_metainfo_from_peer(
|
||||||
result_tx: Mutex::new(Some(result_tx)),
|
result_tx: Mutex::new(Some(result_tx)),
|
||||||
locked: RwLock::new(None),
|
locked: RwLock::new(None),
|
||||||
};
|
};
|
||||||
let connection =
|
let connection = PeerConnection::new(
|
||||||
PeerConnection::new(addr, info_hash, peer_id, handler, peer_connection_options);
|
addr,
|
||||||
|
info_hash,
|
||||||
|
peer_id,
|
||||||
|
handler,
|
||||||
|
peer_connection_options,
|
||||||
|
spawner,
|
||||||
|
);
|
||||||
|
|
||||||
let result_reader = async move { result_rx.await? };
|
let result_reader = async move { result_rx.await? };
|
||||||
let connection_runner = async move { connection.manage_peer(writer_rx).await };
|
let connection_runner = async move { connection.manage_peer(writer_rx).await };
|
||||||
|
|
@ -219,6 +229,8 @@ mod tests {
|
||||||
use librqbit_core::id20::Id20;
|
use librqbit_core::id20::Id20;
|
||||||
use librqbit_core::peer_id::generate_peer_id;
|
use librqbit_core::peer_id::generate_peer_id;
|
||||||
|
|
||||||
|
use crate::spawn_utils::BlockingSpawner;
|
||||||
|
|
||||||
use super::read_metainfo_from_peer;
|
use super::read_metainfo_from_peer;
|
||||||
|
|
||||||
static LOG_INIT: Once = std::sync::Once::new();
|
static LOG_INIT: Once = std::sync::Once::new();
|
||||||
|
|
@ -234,8 +246,10 @@ mod tests {
|
||||||
let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap();
|
let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap();
|
||||||
let peer_id = generate_peer_id();
|
let peer_id = generate_peer_id();
|
||||||
let info_hash = Id20::from_str("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap();
|
let info_hash = Id20::from_str("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap();
|
||||||
dbg!(read_metainfo_from_peer(addr, peer_id, info_hash, None)
|
dbg!(
|
||||||
.await
|
read_metainfo_from_peer(addr, peer_id, info_hash, None, BlockingSpawner::new(true))
|
||||||
.unwrap());
|
.await
|
||||||
|
.unwrap()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -235,8 +235,6 @@ pub struct TorrentState {
|
||||||
lengths: Lengths,
|
lengths: Lengths,
|
||||||
needed: u64,
|
needed: u64,
|
||||||
stats: AtomicStats,
|
stats: AtomicStats,
|
||||||
spawner: BlockingSpawner,
|
|
||||||
|
|
||||||
options: TorrentStateOptions,
|
options: TorrentStateOptions,
|
||||||
|
|
||||||
peer_semaphore: Semaphore,
|
peer_semaphore: Semaphore,
|
||||||
|
|
@ -274,7 +272,6 @@ impl TorrentState {
|
||||||
},
|
},
|
||||||
needed: needed_bytes,
|
needed: needed_bytes,
|
||||||
lengths,
|
lengths,
|
||||||
spawner,
|
|
||||||
options,
|
options,
|
||||||
|
|
||||||
peer_semaphore: Semaphore::new(128),
|
peer_semaphore: Semaphore::new(128),
|
||||||
|
|
@ -299,7 +296,7 @@ impl TorrentState {
|
||||||
let handler = PeerHandler {
|
let handler = PeerHandler {
|
||||||
addr,
|
addr,
|
||||||
state: state.clone(),
|
state: state.clone(),
|
||||||
spawner: state.spawner,
|
spawner,
|
||||||
};
|
};
|
||||||
let options = PeerConnectionOptions {
|
let options = PeerConnectionOptions {
|
||||||
connect_timeout: state.options.peer_connect_timeout,
|
connect_timeout: state.options.peer_connect_timeout,
|
||||||
|
|
@ -311,6 +308,7 @@ impl TorrentState {
|
||||||
state.peer_id,
|
state.peer_id,
|
||||||
handler,
|
handler,
|
||||||
Some(options),
|
Some(options),
|
||||||
|
spawner,
|
||||||
);
|
);
|
||||||
spawn(format!("manage_peer({})", addr), async move {
|
spawn(format!("manage_peer({})", addr), async move {
|
||||||
if let Err(e) = peer_connection.manage_peer(out_rx).await {
|
if let Err(e) = peer_connection.manage_peer(out_rx).await {
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,7 @@ pub struct DictPeer<'a> {
|
||||||
#[serde(deserialize_with = "deserialize_ip_string")]
|
#[serde(deserialize_with = "deserialize_ip_string")]
|
||||||
ip: IpAddr,
|
ip: IpAddr,
|
||||||
#[serde(borrow)]
|
#[serde(borrow)]
|
||||||
|
#[allow(dead_code)]
|
||||||
peer_id: Option<ByteBuf<'a>>,
|
peer_id: Option<ByteBuf<'a>>,
|
||||||
port: u16,
|
port: u16,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue