diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index ae04a48..6ad0b82 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -218,8 +218,6 @@ struct TorrentManagerInner { files: Vec>>, info_hash: [u8; 20], peer_id: [u8; 20], - incoming_tx: tokio::sync::mpsc::Sender<(PeerHandle, MessageOwned)>, - have: AtomicU64, downloaded_and_checked: AtomicU64, needed: u64, @@ -520,9 +518,6 @@ impl TorrentManager { lengths, ); - let (incoming_tx, incoming_rx) = - tokio::sync::mpsc::channel::<(PeerHandle, MessageOwned)>(1); - let mgr = Self { inner: Arc::new(TorrentManagerInner { info_hash: torrent.info_hash, @@ -533,7 +528,6 @@ impl TorrentManager { chunks: chunk_tracker, })), files, - incoming_tx, have: AtomicU64::new(initial_check_results.have_bytes), needed: initial_check_results.needed_bytes, downloaded_and_checked: Default::default(), @@ -543,12 +537,8 @@ impl TorrentManager { }), }; - spawn("tracker_monitor", mgr.clone().task_tracker_monitor()); - spawn( - "incoming_rx_handler", - mgr.clone().task_incoming_rx_handler(incoming_rx), - ); - spawn("Stats printer", mgr.clone().stats_printer()); + spawn("tracker monitor", mgr.clone().task_tracker_monitor()); + spawn("stats printer", mgr.clone().stats_printer()); Ok(mgr.into_handle()) } @@ -607,52 +597,19 @@ impl TorrentManager { while tracker_futures.next().await.is_some() {} Ok(()) } - async fn task_incoming_rx_handler( - self, - mut incoming_tx: tokio::sync::mpsc::Receiver<(PeerHandle, MessageOwned)>, - ) -> anyhow::Result<()> { - loop { - let (peer_handle, message): (PeerHandle, MessageOwned) = match incoming_tx.recv().await - { - Some(msg) => msg, - None => { - return Ok(()); - } - }; - match message { - Message::Request(request) => self.on_download_request(peer_handle, request), - Message::Bitfield(b) => self.on_bitfield(peer_handle, b), - Message::Choke => self.on_i_am_choked(peer_handle), - Message::Unchoke => self.on_i_am_unchoked(peer_handle), - Message::Interested => { - warn!( - "{} is interested, but support for interested messages not implemented", - peer_handle - ) - } - Message::Piece(piece) => { - self.on_received_piece(peer_handle, piece); - } - Message::KeepAlive => { - debug!("keepalive received from {}", peer_handle); - } - Message::Have(h) => self.on_have(peer_handle, h), - Message::NotInterested => { - info!("received \"not interested\", but we don't care yet") - } - } - } - } - fn on_download_request(&self, peer_handle: PeerHandle, request: Request) { + async fn on_download_request( + &self, + peer_handle: PeerHandle, + request: Request, + ) -> anyhow::Result<()> { let piece_index = match self.inner.lengths.validate_piece_index(request.index) { Some(p) => p, None => { - warn!( + anyhow::bail!( "{}: received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.", peer_handle, request ); - return; } }; let chunk_info = match self.inner.lengths.chunk_info_from_received_data( @@ -662,49 +619,42 @@ impl TorrentManager { ) { Some(d) => d, None => { - warn!( + anyhow::bail!( "{}: received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.", peer_handle, request ); - return; } }; let this = self.clone(); - let task_name = format!( - "download_request(peer={}, chunk_info={:?})", - peer_handle, &chunk_info - ); - spawn(task_name, async move { - let clone = this.clone(); - let chunk = spawn_blocking( - format!( - "read_chunk_blocking(peer={}, chunk_info={:?}", - peer_handle, &chunk_info - ), - move || clone.read_chunk_blocking(peer_handle, chunk_info), - ) - .await??; - let tx = this - .inner - .locked - .read() - .peers - .clone_tx(peer_handle) - .ok_or_else(|| { - anyhow::anyhow!( - "peer {} died, dropping chunk that it requested", - peer_handle - ) - })?; - let message = Message::Piece(Piece::from_vec( - chunk_info.piece_index.get(), - chunk_info.offset, - chunk, - )); - info!("sending to {}: {:?}", peer_handle, &message); - Ok::<_, anyhow::Error>(tx.send(message).await?) - }); + let clone = this.clone(); + let chunk = spawn_blocking( + format!( + "read_chunk_blocking(peer={}, chunk_info={:?}", + peer_handle, &chunk_info + ), + move || clone.read_chunk_blocking(peer_handle, chunk_info), + ) + .await??; + let tx = this + .inner + .locked + .read() + .peers + .clone_tx(peer_handle) + .ok_or_else(|| { + anyhow::anyhow!( + "peer {} died, dropping chunk that it requested", + peer_handle + ) + })?; + let message = Message::Piece(Piece::from_vec( + chunk_info.piece_index.get(), + chunk_info.offset, + chunk, + )); + info!("sending to {}: {:?}", peer_handle, &message); + Ok::<_, anyhow::Error>(tx.send(message).await?) } fn read_chunk_blocking( self, @@ -1295,7 +1245,6 @@ impl TorrentManager { &self, addr: SocketAddr, handle: PeerHandle, - incoming_chan: tokio::sync::mpsc::Sender<(PeerHandle, MessageOwned)>, // outgoing_chan_tx: tokio::sync::mpsc::Sender, mut outgoing_chan: tokio::sync::mpsc::Receiver, ) -> anyhow::Result<()> { @@ -1396,39 +1345,68 @@ impl TorrentManager { let reader = async move { loop { - let (message, size) = loop { + let message = loop { match MessageBorrowed::deserialize(&read_buf[..read_so_far]) { - Ok((msg, size)) => break (msg.clone_to_owned(), size), + Ok((msg, size)) => { + let msg = msg.clone_to_owned(); + if read_so_far > size { + read_buf.copy_within(size..read_so_far, 0); + } + read_so_far -= size; + break msg; + } Err(MessageDeserializeError::NotEnoughData(d, _)) => { if read_buf.len() < read_so_far + d { read_buf.reserve(d); read_buf.resize(read_buf.capacity(), 0); } + + let size = read_half + .read(&mut read_buf[read_so_far..]) + .await + .context("error reading from peer")?; + if size == 0 { + anyhow::bail!( + "disconnected while reading, read so far: {}", + read_so_far + ) + } + read_so_far += size; } Err(e) => return Err(e.into()), } - - let size = read_half - .read(&mut read_buf[read_so_far..]) - .await - .context("error reading from peer")?; - if size == 0 { - anyhow::bail!("disconnected while reading, read so far: {}", read_so_far) - } - read_so_far += size; }; trace!("received from {}: {:?}", handle, &message); - if read_so_far > size { - read_buf.copy_within(size..read_so_far, 0); + match message { + Message::Request(request) => { + self.on_download_request(handle, request) + .await + .with_context(|| { + format!("error handling download request from {}", handle) + })?; + } + Message::Bitfield(b) => self.on_bitfield(handle, b), + Message::Choke => self.on_i_am_choked(handle), + Message::Unchoke => self.on_i_am_unchoked(handle), + Message::Interested => { + warn!( + "{} is interested, but support for interested messages not implemented", + handle + ) + } + Message::Piece(piece) => { + self.on_received_piece(handle, piece); + } + Message::KeepAlive => { + debug!("keepalive received from {}", handle); + } + Message::Have(h) => self.on_have(handle, h), + Message::NotInterested => { + info!("received \"not interested\", but we don't care yet") + } } - read_so_far -= size; - - incoming_chan - .send((handle, message)) - .await - .context("error sending received message")?; } // For type inference. @@ -1474,10 +1452,7 @@ impl TorrentManager { let this = self.clone(); spawn(format!("manage_peer({})", handle), async move { - if let Err(e) = this - .manage_peer(addr, handle, this.inner.incoming_tx.clone(), out_rx) - .await - { + if let Err(e) = this.manage_peer(addr, handle, out_rx).await { error!("error managing peer, will drop {}: {:#}", handle, e) }; this.drop_peer(handle); diff --git a/src/main.rs b/src/main.rs index a2a1136..101861e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,16 +130,3 @@ fn main() -> anyhow::Result<()> { Ok(()) }) } - -#[cfg(test)] -mod tests { - use std::{fs::File, io::Read}; - - #[test] - fn test_bullshit() { - let mut buf = vec![0u8; 65536]; - let mut f = - File::open("/tmp/torrent-download/08.Comedy.Club.S17.WEB-DL.1080p.7turza.mkv").unwrap(); - f.read_exact(&mut buf[..]).unwrap(); - } -}