Uploading chunks now reads straight into write buffer
This commit is contained in:
parent
2192842099
commit
fab43a8d23
5 changed files with 470 additions and 50 deletions
395
crates/librqbit/src/file_ops.rs
Normal file
395
crates/librqbit/src/file_ops.rs
Normal file
|
|
@ -0,0 +1,395 @@
|
|||
use std::{
|
||||
fs::File,
|
||||
io::{Read, Seek, SeekFrom, Write},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use log::{debug, trace, warn};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::{
|
||||
buffers::ByteString,
|
||||
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
|
||||
peer_binary_protocol::Piece,
|
||||
torrent_metainfo::{FileIteratorName, TorrentMetaV1Owned},
|
||||
type_aliases::{PeerHandle, BF},
|
||||
};
|
||||
|
||||
pub struct InitialCheckResults {
|
||||
pub needed_pieces: BF,
|
||||
pub have_pieces: BF,
|
||||
pub have_bytes: u64,
|
||||
pub needed_bytes: u64,
|
||||
}
|
||||
|
||||
pub fn update_hash_from_file(
|
||||
file: &mut File,
|
||||
hash: &mut sha1::Sha1,
|
||||
buf: &mut [u8],
|
||||
mut bytes_to_read: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut read = 0;
|
||||
while bytes_to_read > 0 {
|
||||
let chunk = std::cmp::min(buf.len(), bytes_to_read);
|
||||
file.read_exact(&mut buf[..chunk]).with_context(|| {
|
||||
format!(
|
||||
"failed reading chunk of size {}, read so far {}",
|
||||
chunk, read
|
||||
)
|
||||
})?;
|
||||
bytes_to_read -= chunk;
|
||||
read += chunk;
|
||||
hash.update(&buf[..chunk]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct FileOps<'a> {
|
||||
torrent: &'a TorrentMetaV1Owned,
|
||||
files: &'a [Arc<Mutex<File>>],
|
||||
lengths: &'a Lengths,
|
||||
}
|
||||
|
||||
impl<'a> FileOps<'a> {
|
||||
pub fn new(
|
||||
torrent: &'a TorrentMetaV1Owned,
|
||||
files: &'a [Arc<Mutex<File>>],
|
||||
lengths: &'a Lengths,
|
||||
) -> Self {
|
||||
Self {
|
||||
torrent,
|
||||
files,
|
||||
lengths,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn initial_check(
|
||||
&self,
|
||||
only_files: Option<&[usize]>,
|
||||
) -> anyhow::Result<InitialCheckResults> {
|
||||
let mut needed_pieces = BF::from_vec(vec![0u8; self.lengths.piece_bitfield_bytes()]);
|
||||
let mut have_pieces = BF::from_vec(vec![0u8; self.lengths.piece_bitfield_bytes()]);
|
||||
|
||||
let mut have_bytes = 0u64;
|
||||
let mut needed_bytes = 0u64;
|
||||
|
||||
struct CurrentFile<'a> {
|
||||
index: usize,
|
||||
fd: &'a Arc<Mutex<File>>,
|
||||
len: u64,
|
||||
name: FileIteratorName<'a, ByteString>,
|
||||
full_file_required: bool,
|
||||
processed_bytes: u64,
|
||||
is_broken: bool,
|
||||
}
|
||||
impl<'a> CurrentFile<'a> {
|
||||
fn remaining(&self) -> u64 {
|
||||
self.len - self.processed_bytes
|
||||
}
|
||||
fn mark_processed_bytes(&mut self, bytes: u64) {
|
||||
self.processed_bytes += bytes as u64
|
||||
}
|
||||
}
|
||||
let mut file_iterator = self
|
||||
.files
|
||||
.iter()
|
||||
.zip(self.torrent.info.iter_filenames_and_lengths())
|
||||
.enumerate()
|
||||
.map(|(idx, (fd, (name, len)))| {
|
||||
let full_file_required = if let Some(only_files) = only_files {
|
||||
only_files.contains(&idx)
|
||||
} else {
|
||||
true
|
||||
};
|
||||
CurrentFile {
|
||||
index: idx,
|
||||
fd,
|
||||
len,
|
||||
name,
|
||||
full_file_required,
|
||||
processed_bytes: 0,
|
||||
is_broken: false,
|
||||
}
|
||||
});
|
||||
|
||||
let mut current_file = file_iterator
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("empty input file list"))?;
|
||||
|
||||
let mut read_buffer = vec![0u8; 65536];
|
||||
|
||||
for piece_info in self.lengths.iter_piece_infos() {
|
||||
let mut computed_hash = sha1::Sha1::new();
|
||||
let mut piece_remaining = piece_info.len as usize;
|
||||
let mut some_files_broken = false;
|
||||
let mut at_least_one_file_required = current_file.full_file_required;
|
||||
|
||||
while piece_remaining > 0 {
|
||||
let mut to_read_in_file =
|
||||
std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize;
|
||||
while to_read_in_file == 0 {
|
||||
current_file = file_iterator
|
||||
.next()
|
||||
.ok_or_else(|| anyhow::anyhow!("broken torrent metadata"))?;
|
||||
|
||||
at_least_one_file_required |= current_file.full_file_required;
|
||||
|
||||
to_read_in_file =
|
||||
std::cmp::min(current_file.remaining(), piece_remaining as u64) as usize;
|
||||
}
|
||||
|
||||
let pos = current_file.processed_bytes;
|
||||
piece_remaining -= to_read_in_file;
|
||||
current_file.mark_processed_bytes(to_read_in_file as u64);
|
||||
|
||||
if current_file.is_broken {
|
||||
// no need to read.
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut fd = current_file.fd.lock();
|
||||
|
||||
fd.seek(SeekFrom::Start(pos)).unwrap();
|
||||
if let Err(err) = update_hash_from_file(
|
||||
&mut fd,
|
||||
&mut computed_hash,
|
||||
&mut read_buffer,
|
||||
to_read_in_file,
|
||||
) {
|
||||
debug!(
|
||||
"error reading from file {} ({:?}) at {}: {:#}",
|
||||
current_file.index, current_file.name, pos, &err
|
||||
);
|
||||
current_file.is_broken = true;
|
||||
some_files_broken = true;
|
||||
}
|
||||
}
|
||||
|
||||
if at_least_one_file_required && some_files_broken {
|
||||
trace!(
|
||||
"piece {} had errors, marking as needed",
|
||||
piece_info.piece_index
|
||||
);
|
||||
|
||||
needed_bytes += piece_info.len as u64;
|
||||
needed_pieces.set(piece_info.piece_index.get() as usize, true);
|
||||
continue;
|
||||
}
|
||||
|
||||
if self
|
||||
.torrent
|
||||
.info
|
||||
.compare_hash(piece_info.piece_index.get(), &computed_hash)
|
||||
.unwrap()
|
||||
{
|
||||
trace!(
|
||||
"piece {} is fine, not marking as needed",
|
||||
piece_info.piece_index
|
||||
);
|
||||
have_bytes += piece_info.len as u64;
|
||||
have_pieces.set(piece_info.piece_index.get() as usize, true);
|
||||
} else {
|
||||
if at_least_one_file_required {
|
||||
trace!(
|
||||
"piece {} hash does not match, marking as needed",
|
||||
piece_info.piece_index
|
||||
);
|
||||
needed_bytes += piece_info.len as u64;
|
||||
needed_pieces.set(piece_info.piece_index.get() as usize, true);
|
||||
} else {
|
||||
trace!(
|
||||
"piece {} hash does not match, but it is not required by any of the requested files, ignoring",
|
||||
piece_info.piece_index
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(InitialCheckResults {
|
||||
needed_pieces,
|
||||
have_pieces,
|
||||
have_bytes,
|
||||
needed_bytes,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn check_piece(
|
||||
&self,
|
||||
who_sent: PeerHandle,
|
||||
piece_index: ValidPieceIndex,
|
||||
last_received_chunk: &ChunkInfo,
|
||||
) -> anyhow::Result<bool> {
|
||||
let mut h = sha1::Sha1::new();
|
||||
let piece_length = self.lengths.piece_length(piece_index);
|
||||
let mut absolute_offset = self.lengths.piece_offset(piece_index);
|
||||
let mut buf = vec![0u8; std::cmp::min(65536, piece_length as usize)];
|
||||
|
||||
let mut piece_remaining_bytes = piece_length as usize;
|
||||
|
||||
for (file_idx, (name, file_len)) in
|
||||
self.torrent.info.iter_filenames_and_lengths().enumerate()
|
||||
{
|
||||
if absolute_offset > file_len {
|
||||
absolute_offset -= file_len;
|
||||
continue;
|
||||
}
|
||||
let file_remaining_len = file_len - absolute_offset;
|
||||
|
||||
let to_read_in_file =
|
||||
std::cmp::min(file_remaining_len, piece_remaining_bytes as u64) as usize;
|
||||
let mut file_g = self.files[file_idx].lock();
|
||||
debug!(
|
||||
"piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}",
|
||||
piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk
|
||||
);
|
||||
file_g
|
||||
.seek(SeekFrom::Start(absolute_offset))
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"error seeking to {}, file id: {}",
|
||||
absolute_offset, file_idx
|
||||
)
|
||||
})?;
|
||||
update_hash_from_file(&mut file_g, &mut h, &mut buf, to_read_in_file).with_context(
|
||||
|| {
|
||||
format!(
|
||||
"error reading {} bytes, file_id: {} (\"{:?}\")",
|
||||
to_read_in_file, file_idx, name
|
||||
)
|
||||
},
|
||||
)?;
|
||||
|
||||
piece_remaining_bytes -= to_read_in_file;
|
||||
|
||||
if piece_remaining_bytes == 0 {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
absolute_offset = 0;
|
||||
}
|
||||
|
||||
match self.torrent.info.compare_hash(piece_index.get(), &h) {
|
||||
Some(true) => {
|
||||
debug!("piece={} hash matches", piece_index);
|
||||
Ok(true)
|
||||
}
|
||||
Some(false) => {
|
||||
warn!("the piece={} hash does not match", piece_index);
|
||||
Ok(false)
|
||||
}
|
||||
None => {
|
||||
// this is probably a bug?
|
||||
warn!("compare_hash() did not find the piece");
|
||||
anyhow::bail!("compare_hash() did not find the piece");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_chunk(
|
||||
&self,
|
||||
who_sent: PeerHandle,
|
||||
chunk_info: &ChunkInfo,
|
||||
result_buf: &mut [u8],
|
||||
) -> anyhow::Result<()> {
|
||||
if result_buf.len() < chunk_info.size as usize {
|
||||
anyhow::bail!("read_chunk(): not enough capacity in the provided buffer")
|
||||
}
|
||||
let mut absolute_offset = self.lengths.chunk_absolute_offset(&chunk_info);
|
||||
let mut buf = &mut result_buf[..];
|
||||
|
||||
for (file_idx, file_len) in self.torrent.info.iter_file_lengths().enumerate() {
|
||||
if absolute_offset > file_len {
|
||||
absolute_offset -= file_len;
|
||||
continue;
|
||||
}
|
||||
let file_remaining_len = file_len - absolute_offset;
|
||||
let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize;
|
||||
|
||||
let mut file_g = self.files[file_idx].lock();
|
||||
debug!(
|
||||
"piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}",
|
||||
chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info
|
||||
);
|
||||
file_g
|
||||
.seek(SeekFrom::Start(absolute_offset))
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"error seeking to {}, file id: {}",
|
||||
absolute_offset, file_idx
|
||||
)
|
||||
})?;
|
||||
file_g
|
||||
.read_exact(&mut buf[..to_read_in_file])
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"error reading {} bytes, file_id: {}",
|
||||
file_idx, to_read_in_file
|
||||
)
|
||||
})?;
|
||||
|
||||
buf = &mut buf[to_read_in_file..];
|
||||
|
||||
if buf.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
absolute_offset = 0;
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
pub fn write_chunk(
|
||||
&self,
|
||||
who_sent: PeerHandle,
|
||||
data: &Piece<ByteString>,
|
||||
chunk_info: &ChunkInfo,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut buf = data.block.as_ref();
|
||||
let mut absolute_offset = self.lengths.chunk_absolute_offset(&chunk_info);
|
||||
|
||||
for (file_idx, (name, file_len)) in
|
||||
self.torrent.info.iter_filenames_and_lengths().enumerate()
|
||||
{
|
||||
if absolute_offset > file_len {
|
||||
absolute_offset -= file_len;
|
||||
continue;
|
||||
}
|
||||
|
||||
let remaining_len = file_len - absolute_offset;
|
||||
let to_write = std::cmp::min(buf.len(), remaining_len as usize);
|
||||
|
||||
let mut file_g = self.files[file_idx].lock();
|
||||
debug!(
|
||||
"piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}",
|
||||
chunk_info.piece_index,
|
||||
chunk_info,
|
||||
who_sent,
|
||||
chunk_info.offset,
|
||||
file_idx,
|
||||
to_write,
|
||||
absolute_offset
|
||||
);
|
||||
file_g
|
||||
.seek(SeekFrom::Start(absolute_offset))
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"error seeking to {} in file {} (\"{:?}\")",
|
||||
absolute_offset, file_idx, name
|
||||
)
|
||||
})?;
|
||||
file_g
|
||||
.write_all(&buf[..to_write])
|
||||
.with_context(|| format!("error writing to file {} (\"{:?}\")", file_idx, name))?;
|
||||
buf = &buf[to_write..];
|
||||
if buf.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
absolute_offset = 0;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
@ -1,10 +1,11 @@
|
|||
use bincode::Options;
|
||||
use byteorder::ByteOrder;
|
||||
use byteorder::{ByteOrder, BE};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
buffers::{ByteBuf, ByteString},
|
||||
clone_to_owned::CloneToOwned,
|
||||
lengths::ChunkInfo,
|
||||
};
|
||||
|
||||
const PREAMBLE_LEN: usize = 5;
|
||||
|
|
@ -18,6 +19,7 @@ const LEN_PREFIX_UNCHOKE: u32 = 1;
|
|||
const LEN_PREFIX_INTERESTED: u32 = 1;
|
||||
const LEN_PREFIX_NOT_INTERESTED: u32 = 1;
|
||||
const LEN_PREFIX_HAVE: u32 = 5;
|
||||
const LEN_PREFIX_PIECE: u32 = 9;
|
||||
const LEN_PREFIX_REQUEST: u32 = 13;
|
||||
|
||||
const MSGID_CHOKE: u8 = 0;
|
||||
|
|
@ -46,6 +48,17 @@ pub enum MessageDeserializeError {
|
|||
},
|
||||
}
|
||||
|
||||
pub fn serialize_piece_preamble(chunk: &ChunkInfo, mut buf: &mut [u8]) -> usize {
|
||||
BE::write_u32(&mut buf[0..4], LEN_PREFIX_PIECE + chunk.size);
|
||||
buf[4] = MSGID_PIECE;
|
||||
|
||||
buf = &mut buf[PREAMBLE_LEN..];
|
||||
BE::write_u32(&mut buf[0..4], chunk.piece_index.get());
|
||||
BE::write_u32(&mut buf[4..8], chunk.offset);
|
||||
|
||||
PREAMBLE_LEN + 8
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Piece<ByteBuf> {
|
||||
pub index: u32,
|
||||
|
|
@ -209,7 +222,10 @@ where
|
|||
Message::Unchoke => (LEN_PREFIX_UNCHOKE, MSGID_UNCHOKE),
|
||||
Message::Interested => (LEN_PREFIX_INTERESTED, MSGID_INTERESTED),
|
||||
Message::NotInterested => (LEN_PREFIX_NOT_INTERESTED, MSGID_NOT_INTERESTED),
|
||||
Message::Piece(p) => (9 + p.block.as_ref().len() as u32, MSGID_PIECE),
|
||||
Message::Piece(p) => (
|
||||
LEN_PREFIX_PIECE + p.block.as_ref().len() as u32,
|
||||
MSGID_PIECE,
|
||||
),
|
||||
Message::KeepAlive => (LEN_PREFIX_KEEPALIVE, 0),
|
||||
Message::Have(_) => (LEN_PREFIX_HAVE, MSGID_HAVE),
|
||||
}
|
||||
|
|
@ -260,7 +276,7 @@ where
|
|||
Message::Have(v) => {
|
||||
let msg_len = PREAMBLE_LEN + 4;
|
||||
out.resize(msg_len, 0);
|
||||
byteorder::BE::write_u32(&mut out[PREAMBLE_LEN..], *v);
|
||||
BE::write_u32(&mut out[PREAMBLE_LEN..], *v);
|
||||
msg_len
|
||||
}
|
||||
}
|
||||
|
|
@ -332,10 +348,7 @@ where
|
|||
MSGID_HAVE => {
|
||||
let expected_len = 4;
|
||||
match rest.get(..expected_len as usize) {
|
||||
Some(h) => Ok((
|
||||
Message::Have(byteorder::BE::read_u32(&h)),
|
||||
PREAMBLE_LEN + expected_len,
|
||||
)),
|
||||
Some(h) => Ok((Message::Have(BE::read_u32(&h)), PREAMBLE_LEN + expected_len)),
|
||||
None => {
|
||||
let missing = expected_len - rest.len();
|
||||
Err(MessageDeserializeError::NotEnoughData(missing, "have"))
|
||||
|
|
|
|||
|
|
@ -12,8 +12,10 @@ use crate::{
|
|||
buffers::{ByteBuf, ByteString},
|
||||
chunk_tracker::ChunkMarkingResult,
|
||||
clone_to_owned::CloneToOwned,
|
||||
lengths::ChunkInfo,
|
||||
peer_binary_protocol::{
|
||||
Handshake, Message, MessageBorrowed, MessageDeserializeError, MessageOwned, Piece, Request,
|
||||
serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError,
|
||||
MessageOwned, Piece, Request,
|
||||
},
|
||||
peer_id::try_decode_peer_id,
|
||||
spawn_utils::{spawn, spawn_blocking},
|
||||
|
|
@ -21,6 +23,12 @@ use crate::{
|
|||
type_aliases::PeerHandle,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum WriterRequest {
|
||||
Message(MessageOwned),
|
||||
ReadChunkRequest(ChunkInfo),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PeerConnection {
|
||||
state: Arc<TorrentState>,
|
||||
|
|
@ -38,7 +46,7 @@ impl PeerConnection {
|
|||
addr: SocketAddr,
|
||||
handle: PeerHandle,
|
||||
// outgoing_chan_tx: tokio::sync::mpsc::Sender<MessageOwned>,
|
||||
mut outgoing_chan: tokio::sync::mpsc::Receiver<MessageOwned>,
|
||||
mut outgoing_chan: tokio::sync::mpsc::Receiver<WriterRequest>,
|
||||
) -> anyhow::Result<()> {
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
|
@ -101,21 +109,40 @@ impl PeerConnection {
|
|||
}
|
||||
|
||||
loop {
|
||||
let msg = match timeout(keep_alive_interval, outgoing_chan.recv()).await {
|
||||
let req = match timeout(keep_alive_interval, outgoing_chan.recv()).await {
|
||||
Ok(Some(msg)) => msg,
|
||||
Ok(None) => {
|
||||
anyhow::bail!("closing writer, channel closed")
|
||||
}
|
||||
Err(_) => MessageOwned::KeepAlive,
|
||||
Err(_) => WriterRequest::Message(MessageOwned::KeepAlive),
|
||||
};
|
||||
|
||||
let uploaded_add = match &msg {
|
||||
Message::Piece(p) => Some(p.block.len()),
|
||||
let uploaded_add = match &req {
|
||||
WriterRequest::Message(Message::Piece(p)) => Some(p.block.len()),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let len = msg.serialize(&mut buf);
|
||||
debug!("sending to {}: {:?}, length={}", handle, &msg, len);
|
||||
let len = match &req {
|
||||
WriterRequest::Message(msg) => msg.serialize(&mut buf),
|
||||
WriterRequest::ReadChunkRequest(chunk) => {
|
||||
// this whole section is an optimization
|
||||
|
||||
let preamble_len = serialize_piece_preamble(&chunk, &mut buf);
|
||||
let full_len = preamble_len + chunk.size as usize;
|
||||
buf.resize(full_len, 0);
|
||||
tokio::task::block_in_place(|| {
|
||||
this.state.file_ops().read_chunk(
|
||||
handle,
|
||||
&chunk,
|
||||
&mut buf[preamble_len..],
|
||||
)
|
||||
})
|
||||
.with_context(|| format!("error reading chunk {:?}", chunk))?;
|
||||
full_len
|
||||
}
|
||||
};
|
||||
|
||||
debug!("sending to {}: {:?}, length={}", handle, &req, len);
|
||||
|
||||
write_half
|
||||
.write_all(&buf[..len])
|
||||
|
|
@ -243,22 +270,6 @@ impl PeerConnection {
|
|||
}
|
||||
};
|
||||
|
||||
let state = self.state.clone();
|
||||
let chunk = spawn_blocking(
|
||||
format!(
|
||||
"read_chunk_blocking(peer={}, chunk_info={:?}",
|
||||
peer_handle, &chunk_info
|
||||
),
|
||||
move || {
|
||||
let mut buf = Vec::new();
|
||||
state
|
||||
.file_ops()
|
||||
.read_chunk(peer_handle, chunk_info, &mut buf)?;
|
||||
Ok(buf)
|
||||
},
|
||||
)
|
||||
.await??;
|
||||
|
||||
let tx = self
|
||||
.state
|
||||
.locked
|
||||
|
|
@ -275,13 +286,9 @@ impl PeerConnection {
|
|||
// TODO: this is not super efficient as it does copying multiple times.
|
||||
// Theoretically, this could be done in the sending code, so that it reads straight into
|
||||
// the send buffer.
|
||||
let message = Message::Piece(Piece::from_data(
|
||||
chunk_info.piece_index.get(),
|
||||
chunk_info.offset,
|
||||
chunk,
|
||||
));
|
||||
info!("sending to {}: {:?}", peer_handle, &message);
|
||||
Ok::<_, anyhow::Error>(tx.send(message).await?)
|
||||
let request = WriterRequest::ReadChunkRequest(chunk_info);
|
||||
info!("sending to {}: {:?}", peer_handle, &request);
|
||||
Ok::<_, anyhow::Error>(tx.send(request).await?)
|
||||
}
|
||||
|
||||
fn on_have(&self, handle: PeerHandle, have: u32) {
|
||||
|
|
@ -321,10 +328,10 @@ impl PeerConnection {
|
|||
.peers
|
||||
.clone_tx(handle)
|
||||
.ok_or_else(|| anyhow::anyhow!("peer closed"))?;
|
||||
tx.send(MessageOwned::Unchoke)
|
||||
tx.send(WriterRequest::Message(MessageOwned::Unchoke))
|
||||
.await
|
||||
.context("peer dropped")?;
|
||||
tx.send(MessageOwned::NotInterested)
|
||||
tx.send(WriterRequest::Message(MessageOwned::NotInterested))
|
||||
.await
|
||||
.context("peer dropped")?;
|
||||
return Ok(());
|
||||
|
|
@ -343,10 +350,10 @@ impl PeerConnection {
|
|||
Some(tx) => tx,
|
||||
None => return Ok(()),
|
||||
};
|
||||
tx.send(MessageOwned::Unchoke)
|
||||
tx.send(WriterRequest::Message(MessageOwned::Unchoke))
|
||||
.await
|
||||
.context("peer dropped")?;
|
||||
tx.send(MessageOwned::Interested)
|
||||
tx.send(WriterRequest::Message(MessageOwned::Interested))
|
||||
.await
|
||||
.context("peer dropped")?;
|
||||
|
||||
|
|
@ -445,7 +452,7 @@ impl PeerConnection {
|
|||
};
|
||||
sem.acquire().await?.forget();
|
||||
|
||||
tx.send(MessageOwned::Request(request))
|
||||
tx.send(WriterRequest::Message(MessageOwned::Request(request)))
|
||||
.await
|
||||
.context("peer dropped")?;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ use crate::{
|
|||
file_ops::FileOps,
|
||||
lengths::Lengths,
|
||||
peer_binary_protocol::MessageOwned,
|
||||
peer_connection::PeerConnection,
|
||||
peer_connection::{PeerConnection, WriterRequest},
|
||||
spawn_utils::spawn,
|
||||
torrent_metainfo::TorrentMetaV1Owned,
|
||||
torrent_state::{AtomicStats, TorrentState, TorrentStateLocked},
|
||||
|
|
@ -304,7 +304,7 @@ impl TorrentManager {
|
|||
}
|
||||
|
||||
fn add_peer(&self, addr: SocketAddr) {
|
||||
let (out_tx, out_rx) = tokio::sync::mpsc::channel::<MessageOwned>(1);
|
||||
let (out_tx, out_rx) = tokio::sync::mpsc::channel::<WriterRequest>(1);
|
||||
let handle = match self
|
||||
.inner
|
||||
.locked
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ use crate::{
|
|||
file_ops::FileOps,
|
||||
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
|
||||
peer_binary_protocol::{Handshake, Message, MessageOwned},
|
||||
peer_connection::WriterRequest,
|
||||
peer_state::{LivePeerState, PeerState},
|
||||
torrent_metainfo::TorrentMetaV1Owned,
|
||||
type_aliases::{PeerHandle, BF},
|
||||
|
|
@ -43,7 +44,7 @@ pub struct PeerStates {
|
|||
states: HashMap<PeerHandle, PeerState>,
|
||||
seen_peers: HashSet<SocketAddr>,
|
||||
inflight_pieces: HashSet<ValidPieceIndex>,
|
||||
tx: HashMap<PeerHandle, Arc<tokio::sync::mpsc::Sender<MessageOwned>>>,
|
||||
tx: HashMap<PeerHandle, Arc<tokio::sync::mpsc::Sender<WriterRequest>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
|
@ -67,7 +68,7 @@ impl PeerStates {
|
|||
pub fn add_if_not_seen(
|
||||
&mut self,
|
||||
addr: SocketAddr,
|
||||
tx: tokio::sync::mpsc::Sender<MessageOwned>,
|
||||
tx: tokio::sync::mpsc::Sender<WriterRequest>,
|
||||
) -> Option<PeerHandle> {
|
||||
if self.seen_peers.contains(&addr) {
|
||||
return None;
|
||||
|
|
@ -95,7 +96,7 @@ impl PeerStates {
|
|||
pub fn add(
|
||||
&mut self,
|
||||
addr: SocketAddr,
|
||||
tx: tokio::sync::mpsc::Sender<MessageOwned>,
|
||||
tx: tokio::sync::mpsc::Sender<WriterRequest>,
|
||||
) -> Option<PeerHandle> {
|
||||
let handle = addr;
|
||||
if self.states.contains_key(&addr) {
|
||||
|
|
@ -127,7 +128,7 @@ impl PeerStates {
|
|||
live.bitfield = Some(bitfield);
|
||||
Some(prev)
|
||||
}
|
||||
pub fn clone_tx(&self, handle: PeerHandle) -> Option<Arc<Sender<MessageOwned>>> {
|
||||
pub fn clone_tx(&self, handle: PeerHandle) -> Option<Arc<Sender<WriterRequest>>> {
|
||||
Some(self.tx.get(&handle)?.clone())
|
||||
}
|
||||
pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> bool {
|
||||
|
|
@ -277,7 +278,11 @@ impl TorrentState {
|
|||
{
|
||||
unordered.push(async move {
|
||||
if let Some(tx) = weak.upgrade() {
|
||||
if tx.send(Message::Have(index)).await.is_err() {
|
||||
if tx
|
||||
.send(WriterRequest::Message(Message::Have(index)))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
// whatever
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue