Restructured a bit

This commit is contained in:
Igor Katson 2021-06-27 09:09:55 +01:00
parent 7fc41fd953
commit 0bd3f95891
2 changed files with 86 additions and 124 deletions

View file

@ -218,8 +218,6 @@ struct TorrentManagerInner {
files: Vec<Arc<Mutex<File>>>, files: Vec<Arc<Mutex<File>>>,
info_hash: [u8; 20], info_hash: [u8; 20],
peer_id: [u8; 20], peer_id: [u8; 20],
incoming_tx: tokio::sync::mpsc::Sender<(PeerHandle, MessageOwned)>,
have: AtomicU64, have: AtomicU64,
downloaded_and_checked: AtomicU64, downloaded_and_checked: AtomicU64,
needed: u64, needed: u64,
@ -520,9 +518,6 @@ impl TorrentManager {
lengths, lengths,
); );
let (incoming_tx, incoming_rx) =
tokio::sync::mpsc::channel::<(PeerHandle, MessageOwned)>(1);
let mgr = Self { let mgr = Self {
inner: Arc::new(TorrentManagerInner { inner: Arc::new(TorrentManagerInner {
info_hash: torrent.info_hash, info_hash: torrent.info_hash,
@ -533,7 +528,6 @@ impl TorrentManager {
chunks: chunk_tracker, chunks: chunk_tracker,
})), })),
files, files,
incoming_tx,
have: AtomicU64::new(initial_check_results.have_bytes), have: AtomicU64::new(initial_check_results.have_bytes),
needed: initial_check_results.needed_bytes, needed: initial_check_results.needed_bytes,
downloaded_and_checked: Default::default(), downloaded_and_checked: Default::default(),
@ -543,12 +537,8 @@ impl TorrentManager {
}), }),
}; };
spawn("tracker_monitor", mgr.clone().task_tracker_monitor()); spawn("tracker monitor", mgr.clone().task_tracker_monitor());
spawn( spawn("stats printer", mgr.clone().stats_printer());
"incoming_rx_handler",
mgr.clone().task_incoming_rx_handler(incoming_rx),
);
spawn("Stats printer", mgr.clone().stats_printer());
Ok(mgr.into_handle()) Ok(mgr.into_handle())
} }
@ -607,52 +597,19 @@ impl TorrentManager {
while tracker_futures.next().await.is_some() {} while tracker_futures.next().await.is_some() {}
Ok(()) Ok(())
} }
async fn task_incoming_rx_handler(
self,
mut incoming_tx: tokio::sync::mpsc::Receiver<(PeerHandle, MessageOwned)>,
) -> anyhow::Result<()> {
loop {
let (peer_handle, message): (PeerHandle, MessageOwned) = match incoming_tx.recv().await
{
Some(msg) => msg,
None => {
return Ok(());
}
};
match message { async fn on_download_request(
Message::Request(request) => self.on_download_request(peer_handle, request), &self,
Message::Bitfield(b) => self.on_bitfield(peer_handle, b), peer_handle: PeerHandle,
Message::Choke => self.on_i_am_choked(peer_handle), request: Request,
Message::Unchoke => self.on_i_am_unchoked(peer_handle), ) -> anyhow::Result<()> {
Message::Interested => {
warn!(
"{} is interested, but support for interested messages not implemented",
peer_handle
)
}
Message::Piece(piece) => {
self.on_received_piece(peer_handle, piece);
}
Message::KeepAlive => {
debug!("keepalive received from {}", peer_handle);
}
Message::Have(h) => self.on_have(peer_handle, h),
Message::NotInterested => {
info!("received \"not interested\", but we don't care yet")
}
}
}
}
fn on_download_request(&self, peer_handle: PeerHandle, request: Request) {
let piece_index = match self.inner.lengths.validate_piece_index(request.index) { let piece_index = match self.inner.lengths.validate_piece_index(request.index) {
Some(p) => p, Some(p) => p,
None => { None => {
warn!( anyhow::bail!(
"{}: received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.", "{}: received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.",
peer_handle, request peer_handle, request
); );
return;
} }
}; };
let chunk_info = match self.inner.lengths.chunk_info_from_received_data( let chunk_info = match self.inner.lengths.chunk_info_from_received_data(
@ -662,49 +619,42 @@ impl TorrentManager {
) { ) {
Some(d) => d, Some(d) => d,
None => { None => {
warn!( anyhow::bail!(
"{}: received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.", "{}: received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.",
peer_handle, request peer_handle, request
); );
return;
} }
}; };
let this = self.clone(); let this = self.clone();
let task_name = format!( let clone = this.clone();
"download_request(peer={}, chunk_info={:?})", let chunk = spawn_blocking(
peer_handle, &chunk_info format!(
); "read_chunk_blocking(peer={}, chunk_info={:?}",
spawn(task_name, async move { peer_handle, &chunk_info
let clone = this.clone(); ),
let chunk = spawn_blocking( move || clone.read_chunk_blocking(peer_handle, chunk_info),
format!( )
"read_chunk_blocking(peer={}, chunk_info={:?}", .await??;
peer_handle, &chunk_info let tx = this
), .inner
move || clone.read_chunk_blocking(peer_handle, chunk_info), .locked
) .read()
.await??; .peers
let tx = this .clone_tx(peer_handle)
.inner .ok_or_else(|| {
.locked anyhow::anyhow!(
.read() "peer {} died, dropping chunk that it requested",
.peers peer_handle
.clone_tx(peer_handle) )
.ok_or_else(|| { })?;
anyhow::anyhow!( let message = Message::Piece(Piece::from_vec(
"peer {} died, dropping chunk that it requested", chunk_info.piece_index.get(),
peer_handle chunk_info.offset,
) chunk,
})?; ));
let message = Message::Piece(Piece::from_vec( info!("sending to {}: {:?}", peer_handle, &message);
chunk_info.piece_index.get(), Ok::<_, anyhow::Error>(tx.send(message).await?)
chunk_info.offset,
chunk,
));
info!("sending to {}: {:?}", peer_handle, &message);
Ok::<_, anyhow::Error>(tx.send(message).await?)
});
} }
fn read_chunk_blocking( fn read_chunk_blocking(
self, self,
@ -1295,7 +1245,6 @@ impl TorrentManager {
&self, &self,
addr: SocketAddr, addr: SocketAddr,
handle: PeerHandle, handle: PeerHandle,
incoming_chan: tokio::sync::mpsc::Sender<(PeerHandle, MessageOwned)>,
// outgoing_chan_tx: tokio::sync::mpsc::Sender<MessageOwned>, // outgoing_chan_tx: tokio::sync::mpsc::Sender<MessageOwned>,
mut outgoing_chan: tokio::sync::mpsc::Receiver<MessageOwned>, mut outgoing_chan: tokio::sync::mpsc::Receiver<MessageOwned>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -1396,39 +1345,68 @@ impl TorrentManager {
let reader = async move { let reader = async move {
loop { loop {
let (message, size) = loop { let message = loop {
match MessageBorrowed::deserialize(&read_buf[..read_so_far]) { match MessageBorrowed::deserialize(&read_buf[..read_so_far]) {
Ok((msg, size)) => break (msg.clone_to_owned(), size), Ok((msg, size)) => {
let msg = msg.clone_to_owned();
if read_so_far > size {
read_buf.copy_within(size..read_so_far, 0);
}
read_so_far -= size;
break msg;
}
Err(MessageDeserializeError::NotEnoughData(d, _)) => { Err(MessageDeserializeError::NotEnoughData(d, _)) => {
if read_buf.len() < read_so_far + d { if read_buf.len() < read_so_far + d {
read_buf.reserve(d); read_buf.reserve(d);
read_buf.resize(read_buf.capacity(), 0); read_buf.resize(read_buf.capacity(), 0);
} }
let size = read_half
.read(&mut read_buf[read_so_far..])
.await
.context("error reading from peer")?;
if size == 0 {
anyhow::bail!(
"disconnected while reading, read so far: {}",
read_so_far
)
}
read_so_far += size;
} }
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
} }
let size = read_half
.read(&mut read_buf[read_so_far..])
.await
.context("error reading from peer")?;
if size == 0 {
anyhow::bail!("disconnected while reading, read so far: {}", read_so_far)
}
read_so_far += size;
}; };
trace!("received from {}: {:?}", handle, &message); trace!("received from {}: {:?}", handle, &message);
if read_so_far > size { match message {
read_buf.copy_within(size..read_so_far, 0); Message::Request(request) => {
self.on_download_request(handle, request)
.await
.with_context(|| {
format!("error handling download request from {}", handle)
})?;
}
Message::Bitfield(b) => self.on_bitfield(handle, b),
Message::Choke => self.on_i_am_choked(handle),
Message::Unchoke => self.on_i_am_unchoked(handle),
Message::Interested => {
warn!(
"{} is interested, but support for interested messages not implemented",
handle
)
}
Message::Piece(piece) => {
self.on_received_piece(handle, piece);
}
Message::KeepAlive => {
debug!("keepalive received from {}", handle);
}
Message::Have(h) => self.on_have(handle, h),
Message::NotInterested => {
info!("received \"not interested\", but we don't care yet")
}
} }
read_so_far -= size;
incoming_chan
.send((handle, message))
.await
.context("error sending received message")?;
} }
// For type inference. // For type inference.
@ -1474,10 +1452,7 @@ impl TorrentManager {
let this = self.clone(); let this = self.clone();
spawn(format!("manage_peer({})", handle), async move { spawn(format!("manage_peer({})", handle), async move {
if let Err(e) = this if let Err(e) = this.manage_peer(addr, handle, out_rx).await {
.manage_peer(addr, handle, this.inner.incoming_tx.clone(), out_rx)
.await
{
error!("error managing peer, will drop {}: {:#}", handle, e) error!("error managing peer, will drop {}: {:#}", handle, e)
}; };
this.drop_peer(handle); this.drop_peer(handle);

View file

@ -130,16 +130,3 @@ fn main() -> anyhow::Result<()> {
Ok(()) Ok(())
}) })
} }
#[cfg(test)]
mod tests {
use std::{fs::File, io::Read};
#[test]
fn test_bullshit() {
let mut buf = vec![0u8; 65536];
let mut f =
File::open("/tmp/torrent-download/08.Comedy.Club.S17.WEB-DL.1080p.7turza.mkv").unwrap();
f.read_exact(&mut buf[..]).unwrap();
}
}