From 1f299247d25132e03cb890f89e8336eb8a395acb Mon Sep 17 00:00:00 2001 From: Igor Katson Date: Fri, 2 Jul 2021 10:12:48 +0100 Subject: [PATCH] Abstract away peer handler --- crates/librqbit/src/lib.rs | 1 + crates/librqbit/src/peer_connection.rs | 536 +++---------------------- crates/librqbit/src/peer_handler.rs | 1 + crates/librqbit/src/torrent_state.rs | 489 +++++++++++++++++++++- 4 files changed, 534 insertions(+), 493 deletions(-) create mode 100644 crates/librqbit/src/peer_handler.rs diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index e477c79..54453ac 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -8,6 +8,7 @@ pub mod http_api; pub mod lengths; pub mod peer_binary_protocol; pub mod peer_connection; +pub mod peer_handler; pub mod peer_id; pub mod peer_state; pub mod serde_bencode_de; diff --git a/crates/librqbit/src/peer_connection.rs b/crates/librqbit/src/peer_connection.rs index 6b47004..b852a4b 100644 --- a/crates/librqbit/src/peer_connection.rs +++ b/crates/librqbit/src/peer_connection.rs @@ -1,61 +1,63 @@ -use std::{ - net::SocketAddr, - sync::{atomic::Ordering, Arc}, - time::Duration, -}; +use std::{net::SocketAddr, time::Duration}; use anyhow::Context; -use log::{debug, info, trace, warn}; +use log::{debug, trace}; use tokio::time::timeout; use crate::{ - buffers::{ByteBuf, ByteString}, - chunk_tracker::ChunkMarkingResult, - clone_to_owned::CloneToOwned, + buffers::ByteBuf, lengths::ChunkInfo, peer_binary_protocol::{ serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError, - MessageOwned, Piece, Request, PIECE_MESSAGE_DEFAULT_LEN, + MessageOwned, PIECE_MESSAGE_DEFAULT_LEN, }, peer_id::try_decode_peer_id, - peer_state::InflightRequest, - spawn_utils::{spawn, BlockingSpawner}, - torrent_state::TorrentState, - type_aliases::PeerHandle, }; +pub trait PeerConnectionHandler { + fn get_have_bytes(&self) -> u64; + fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> Option; + fn on_handshake(&self, handshake: Handshake); + fn on_received_message(&self, msg: Message>) -> anyhow::Result<()>; + fn on_uploaded_bytes(&self, bytes: u32); + fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()>; +} + #[derive(Debug)] pub enum WriterRequest { Message(MessageOwned), ReadChunkRequest(ChunkInfo), } -#[derive(Clone)] -pub struct PeerConnection { - state: Arc, - spawner: BlockingSpawner, +pub struct PeerConnection { + handler: H, + addr: SocketAddr, + info_hash: [u8; 20], + peer_id: [u8; 20], } -impl PeerConnection { - pub fn new(state: Arc, spawner: BlockingSpawner) -> Self { - PeerConnection { state, spawner } +impl PeerConnection { + pub fn new(addr: SocketAddr, info_hash: [u8; 20], peer_id: [u8; 20], handler: H) -> Self { + PeerConnection { + addr, + handler, + info_hash, + peer_id, + } } - pub fn into_state(self) -> Arc { - self.state + pub fn into_handler(self) -> H { + self.handler } pub async fn manage_peer( &self, - addr: SocketAddr, - handle: PeerHandle, - // outgoing_chan_tx: tokio::sync::mpsc::Sender, - mut outgoing_chan: tokio::sync::mpsc::Receiver, + mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver, ) -> anyhow::Result<()> { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut conn = tokio::net::TcpStream::connect(addr) + let mut conn = tokio::net::TcpStream::connect(self.addr) .await .context("error connecting")?; - let handshake = Handshake::new(self.state.info_hash, self.state.peer_id); + let handshake = Handshake::new(self.info_hash, self.peer_id); conn.write_all(&handshake.serialize()) .await .context("error writing handshake")?; @@ -73,14 +75,14 @@ impl PeerConnection { let mut read_so_far = 0usize; debug!( "connected peer {}: {:?}", - addr, + self.addr, try_decode_peer_id(h.peer_id) ); - if h.info_hash != self.state.info_hash { + if h.info_hash != self.info_hash { anyhow::bail!("info hash does not match"); } - self.state.set_peer_live(handle, h); + self.handler.on_handshake(h); if read_bytes > hlen { read_buf.copy_within(hlen..read_bytes, 0); @@ -89,25 +91,25 @@ impl PeerConnection { let (mut read_half, mut write_half) = tokio::io::split(conn); - let this = self.clone(); let writer = async move { let mut buf = Vec::::with_capacity(PIECE_MESSAGE_DEFAULT_LEN); let keep_alive_interval = Duration::from_secs(120); - if this.state.stats.have.load(Ordering::Relaxed) > 0 { - let len = { - let g = this.state.locked.read(); - let msg = Message::Bitfield(ByteBuf(g.chunks.get_have_pieces().as_raw_slice())); - let len = msg.serialize(&mut buf); - debug!("sending to {}: {:?}, length={}", handle, &msg, len); - len - }; - - write_half - .write_all(&buf[..len]) - .await - .context("error writing bitfield to peer")?; - debug!("sent bitfield to {}", handle); + if self.handler.get_have_bytes() > 0 { + if let Some(len) = self.handler.serialize_bitfield_message_to_buf(&mut buf) { + write_half + .write_all(&buf[..len]) + .await + .context("error writing bitfield to peer")?; + debug!("sent bitfield to {}", self.addr); + } + // let len = { + // let bitfield = self.handler.get_have_bitfield(); + // let msg = Message::Bitfield(ByteBuf(g.chunks.get_have_pieces().as_raw_slice())); + // let len = msg.serialize(&mut buf); + // debug!("sending to {}: {:?}, length={}", self.addr, &msg, len); + // len + // }; } loop { @@ -129,21 +131,15 @@ impl PeerConnection { let preamble_len = serialize_piece_preamble(&chunk, &mut buf); let full_len = preamble_len + chunk.size as usize; buf.resize(full_len, 0); - this.spawner - .spawn_block_in_place(|| { - this.state.file_ops().read_chunk( - handle, - &chunk, - &mut buf[preamble_len..], - ) - }) + self.handler + .read_chunk(chunk, &mut buf[preamble_len..]) .with_context(|| format!("error reading chunk {:?}", chunk))?; uploaded_add = Some(chunk.size); full_len } }; - debug!("sending to {}: {:?}, length={}", handle, &req, len); + debug!("sending to {}: {:?}, length={}", self.addr, &req, len); write_half .write_all(&buf[..len]) @@ -151,10 +147,7 @@ impl PeerConnection { .context("error writing the message to peer")?; if let Some(uploaded_add) = uploaded_add { - this.state - .stats - .uploaded - .fetch_add(uploaded_add as u64, Ordering::Relaxed); + self.handler.on_uploaded_bytes(uploaded_add) } } @@ -192,38 +185,11 @@ impl PeerConnection { } }; - trace!("received from {}: {:?}", handle, &message); + trace!("received from {}: {:?}", self.addr, &message); - match message { - Message::Request(request) => { - self.on_download_request(handle, request) - .await - .with_context(|| { - format!("error handling download request from {}", handle) - })?; - } - Message::Bitfield(b) => self.on_bitfield(handle, b.clone_to_owned()).await?, - Message::Choke => self.on_i_am_choked(handle), - Message::Unchoke => self.on_i_am_unchoked(handle), - Message::Interested => self.on_peer_interested(handle), - Message::Piece(piece) => { - self.on_received_piece(handle, piece) - .context("error in on_received_piece()")?; - } - Message::KeepAlive => { - debug!("keepalive received from {}", handle); - } - Message::Have(h) => self.on_have(handle, h), - Message::NotInterested => { - info!("received \"not interested\", but we don't care yet") - } - message => { - warn!( - "{}: received unsupported message {:?}, ignoring", - handle, message - ); - } - } + self.handler + .on_received_message(message) + .context("error in handler.on_received_message()")?; if read_so_far > size { read_buf.copy_within(size..read_so_far, 0); @@ -240,393 +206,7 @@ impl PeerConnection { r = reader => {r} r = writer => {r} }; - debug!("{}: either reader or writer are done, exiting", handle); + debug!("{}: either reader or writer are done, exiting", self.addr); r } - - async fn on_download_request( - &self, - peer_handle: PeerHandle, - request: Request, - ) -> anyhow::Result<()> { - let piece_index = match self.state.lengths.validate_piece_index(request.index) { - Some(p) => p, - None => { - anyhow::bail!( - "{}: received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.", - peer_handle, request - ); - } - }; - let chunk_info = match self.state.lengths.chunk_info_from_received_data( - piece_index, - request.begin, - request.length, - ) { - Some(d) => d, - None => { - anyhow::bail!( - "{}: received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.", - peer_handle, request - ); - } - }; - - let tx = { - let g = self.state.locked.read(); - if !g.chunks.is_chunk_ready_to_upload(&chunk_info) { - anyhow::bail!( - "got request for a chunk that is not ready to upload. chunk {:?}", - &chunk_info - ); - } - - g.peers.clone_tx(peer_handle).ok_or_else(|| { - anyhow::anyhow!( - "peer {} died, dropping chunk that it requested", - peer_handle - ) - })? - }; - - // TODO: this is not super efficient as it does copying multiple times. - // Theoretically, this could be done in the sending code, so that it reads straight into - // the send buffer. - let request = WriterRequest::ReadChunkRequest(chunk_info); - debug!("sending to {}: {:?}", peer_handle, &request); - Ok::<_, anyhow::Error>(tx.send(request).await?) - } - - fn on_have(&self, handle: PeerHandle, have: u32) { - if let Some(bitfield) = self - .state - .locked - .write() - .peers - .get_live_mut(handle) - .and_then(|l| l.bitfield.as_mut()) - { - debug!("{}: updated bitfield with have={}", handle, have); - bitfield.set(have as usize, true) - } - } - - async fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) -> anyhow::Result<()> { - if bitfield.len() != self.state.lengths.piece_bitfield_bytes() as usize { - anyhow::bail!( - "dropping {} as its bitfield has unexpected size. Got {}, expected {}", - handle, - bitfield.len(), - self.state.lengths.piece_bitfield_bytes(), - ); - } - self.state - .locked - .write() - .peers - .update_bitfield_from_vec(handle, bitfield.0); - - if !self.state.am_i_interested_in_peer(handle) { - let tx = self - .state - .locked - .read() - .peers - .clone_tx(handle) - .ok_or_else(|| anyhow::anyhow!("peer closed"))?; - tx.send(WriterRequest::Message(MessageOwned::Unchoke)) - .await - .context("peer dropped")?; - tx.send(WriterRequest::Message(MessageOwned::NotInterested)) - .await - .context("peer dropped")?; - return Ok(()); - } - - // Additional spawn per peer, not good. - spawn( - format!("peer_chunk_requester({})", handle), - self.clone().task_peer_chunk_requester(handle), - ); - Ok(()) - } - - async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> { - let tx = match self.state.locked.read().peers.clone_tx(handle) { - Some(tx) => tx, - None => return Ok(()), - }; - tx.send(WriterRequest::Message(MessageOwned::Unchoke)) - .await - .context("peer dropped")?; - tx.send(WriterRequest::Message(MessageOwned::Interested)) - .await - .context("peer dropped")?; - - self.requester(handle).await?; - Ok::<_, anyhow::Error>(()) - } - - fn on_i_am_choked(&self, handle: PeerHandle) { - warn!("we are choked by {}", handle); - self.state - .locked - .write() - .peers - .mark_i_am_choked(handle, true); - } - - fn on_peer_interested(&self, handle: PeerHandle) { - debug!("peer {} is interested", handle); - self.state - .locked - .write() - .peers - .mark_peer_interested(handle, true); - } - - async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { - let notify = match self.state.locked.read().peers.get_live(handle) { - Some(l) => l.have_notify.clone(), - None => return Ok(()), - }; - - // TODO: this might dangle, same below. - #[allow(unused_must_use)] - { - timeout(Duration::from_secs(60), notify.notified()).await; - } - - loop { - match self.state.am_i_choked(handle) { - Some(true) => { - warn!("we are choked by {}, can't reserve next piece", handle); - #[allow(unused_must_use)] - { - timeout(Duration::from_secs(60), notify.notified()).await; - } - continue; - } - Some(false) => {} - None => return Ok(()), - } - - let next = match self.state.try_steal_old_slow_piece(handle) { - Some(next) => next, - None => match self.state.reserve_next_needed_piece(handle) { - Some(next) => next, - None => { - if self.state.get_left_to_download() == 0 { - debug!("{}: nothing left to download, closing requester", handle); - return Ok(()); - } - - if let Some(piece) = self.state.try_steal_piece(handle) { - debug!("{}: stole a piece {}", handle, piece); - piece - } else { - debug!("no pieces to request from {}", handle); - #[allow(unused_must_use)] - { - timeout(Duration::from_secs(60), notify.notified()).await; - } - continue; - } - } - }, - }; - - let tx = match self.state.locked.read().peers.clone_tx(handle) { - Some(tx) => tx, - None => return Ok(()), - }; - let sem = match self.state.locked.read().peers.get_live(handle) { - Some(live) => live.requests_sem.clone(), - None => return Ok(()), - }; - for chunk in self.state.lengths.iter_chunk_infos(next) { - if self.state.locked.read().chunks.is_chunk_downloaded(&chunk) { - continue; - } - if !self - .state - .locked - .write() - .peers - .try_get_live_mut(handle)? - .inflight_requests - .insert(InflightRequest::from(&chunk)) - { - warn!( - "{}: probably a bug, we already requested {:?}", - handle, chunk - ); - continue; - } - - let request = Request { - index: next.get(), - begin: chunk.offset, - length: chunk.size, - }; - sem.acquire().await?.forget(); - - tx.send(WriterRequest::Message(MessageOwned::Request(request))) - .await - .context("peer dropped")?; - } - } - } - - fn on_i_am_unchoked(&self, handle: PeerHandle) { - debug!("we are unchoked by {}", handle); - let mut g = self.state.locked.write(); - let live = match g.peers.get_live_mut(handle) { - Some(live) => live, - None => return, - }; - live.i_am_choked = false; - live.have_notify.notify_waiters(); - live.requests_sem.add_permits(16); - } - - fn on_received_piece(&self, handle: PeerHandle, piece: Piece) -> anyhow::Result<()> { - let chunk_info = match self.state.lengths.chunk_info_from_received_piece(&piece) { - Some(i) => i, - None => { - anyhow::bail!( - "peer {} sent us a piece that is invalid {:?}", - handle, - &piece, - ); - } - }; - - let mut g = self.state.locked.write(); - let h = g.peers.try_get_live_mut(handle)?; - h.requests_sem.add_permits(1); - - self.state - .stats - .fetched_bytes - .fetch_add(piece.block.len() as u64, Ordering::Relaxed); - - if !h - .inflight_requests - .remove(&InflightRequest::from(&chunk_info)) - { - anyhow::bail!( - "peer {} sent us a piece that we did not ask it for. Requested pieces: {:?}. Got: {:?}", handle, &h.inflight_requests, &piece, - ); - } - - let full_piece_download_time = match g.chunks.mark_chunk_downloaded(&piece) { - Some(ChunkMarkingResult::Completed) => { - debug!( - "piece={} done by {}, will write and checksum", - piece.index, handle - ); - // This will prevent others from stealing it. - g.peers - .remove_inflight_piece(chunk_info.piece_index) - .map(|t| t.started.elapsed()) - } - Some(ChunkMarkingResult::PreviouslyCompleted) => { - // TODO: we might need to send cancellations here. - debug!( - "piece={} was done by someone else {}, ignoring", - piece.index, handle - ); - return Ok(()); - } - Some(ChunkMarkingResult::NotCompleted) => None, - None => { - anyhow::bail!( - "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", - handle, - piece - ); - } - }; - - // to prevent deadlocks. - drop(g); - - self.spawner - .spawn_block_in_place(move || { - let index = piece.index; - - // TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what - // should we really do? If we unmark it, it will get requested forever... - // - // So let's just unwrap and abort. - self.state - .file_ops() - .write_chunk(handle, &piece, &chunk_info) - .expect("expected to be able to write to disk"); - - let full_piece_download_time = match full_piece_download_time { - Some(t) => t, - None => return Ok(()), - }; - - match self - .state - .file_ops() - .check_piece(handle, chunk_info.piece_index, &chunk_info) - .with_context(|| format!("error checking piece={}", index))? - { - true => { - let piece_len = - self.state.lengths.piece_length(chunk_info.piece_index) as u64; - self.state - .stats - .downloaded_and_checked - .fetch_add(piece_len, Ordering::Relaxed); - self.state - .stats - .have - .fetch_add(piece_len, Ordering::Relaxed); - self.state - .stats - .downloaded_pieces - .fetch_add(1, Ordering::Relaxed); - self.state - .stats - .downloaded_pieces - .fetch_add(1, Ordering::Relaxed); - self.state.stats.total_piece_download_ms.fetch_add( - full_piece_download_time.as_millis() as u64, - Ordering::Relaxed, - ); - self.state - .locked - .write() - .chunks - .mark_piece_downloaded(chunk_info.piece_index); - - debug!( - "piece={} successfully downloaded and verified from {}", - index, handle - ); - - self.state.maybe_transmit_haves(chunk_info.piece_index); - } - false => { - warn!( - "checksum for piece={} did not validate, came from {}", - index, handle - ); - self.state - .locked - .write() - .chunks - .mark_piece_broken(chunk_info.piece_index); - } - }; - Ok::<_, anyhow::Error>(()) - }) - .with_context(|| format!("error processing received chunk {:?}", chunk_info))?; - Ok(()) - } } diff --git a/crates/librqbit/src/peer_handler.rs b/crates/librqbit/src/peer_handler.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/librqbit/src/peer_handler.rs @@ -0,0 +1 @@ + diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 905e7f0..e5c9c3c 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -9,18 +9,24 @@ use std::{ time::{Duration, Instant}, }; +use anyhow::Context; use futures::{stream::FuturesUnordered, StreamExt}; -use log::{debug, trace, warn}; +use log::{debug, info, trace, warn}; use parking_lot::{Mutex, RwLock}; -use tokio::sync::mpsc::{channel, Sender}; +use tokio::{ + sync::mpsc::{channel, Sender, UnboundedSender}, + time::timeout, +}; use crate::{ - chunk_tracker::ChunkTracker, + buffers::{ByteBuf, ByteString}, + chunk_tracker::{ChunkMarkingResult, ChunkTracker}, + clone_to_owned::CloneToOwned, file_ops::FileOps, lengths::{Lengths, ValidPieceIndex}, - peer_binary_protocol::{Handshake, Message}, - peer_connection::{PeerConnection, WriterRequest}, - peer_state::{LivePeerState, PeerState}, + peer_binary_protocol::{Handshake, Message, MessageOwned, Piece, Request}, + peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest}, + peer_state::{InflightRequest, LivePeerState, PeerState}, spawn_utils::{spawn, BlockingSpawner}, torrent_metainfo::TorrentMetaV1Owned, type_aliases::{PeerHandle, Sha1, BF}, @@ -36,7 +42,7 @@ pub struct PeerStates { states: HashMap, seen: HashSet, inflight_pieces: HashMap, - tx: HashMap>>, + tx: HashMap>>, } #[derive(Debug, Default)] @@ -60,7 +66,7 @@ impl PeerStates { pub fn add_if_not_seen( &mut self, addr: SocketAddr, - tx: tokio::sync::mpsc::Sender, + tx: UnboundedSender, ) -> Option { if self.seen.contains(&addr) { return None; @@ -91,7 +97,7 @@ impl PeerStates { pub fn add( &mut self, addr: SocketAddr, - tx: tokio::sync::mpsc::Sender, + tx: UnboundedSender, ) -> Option { let handle = addr; if self.states.contains_key(&addr) { @@ -133,7 +139,7 @@ impl PeerStates { live.bitfield = Some(bitfield); Some(prev) } - pub fn clone_tx(&self, handle: PeerHandle) -> Option>> { + pub fn clone_tx(&self, handle: PeerHandle) -> Option>> { Some(self.tx.get(&handle)?.clone()) } pub fn remove_inflight_piece(&mut self, piece: ValidPieceIndex) -> Option { @@ -368,7 +374,6 @@ impl TorrentState { if let Some(tx) = tx.upgrade() { if tx .send(WriterRequest::Message(Message::Have(index.get()))) - .await .is_err() { // whatever @@ -396,18 +401,24 @@ impl TorrentState { } pub fn add_peer_if_not_seen(self: &Arc, addr: SocketAddr) -> bool { - let (out_tx, out_rx) = channel::(1); + let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::(); let handle = match self.locked.write().peers.add_if_not_seen(addr, out_tx) { Some(handle) => handle, None => return false, }; - let peer_connection = PeerConnection::new(self.clone(), self.spawner.clone()); + let handler = PeerHandler { + addr, + state: self.clone(), + spawner: self.spawner, + }; + let peer_connection = + PeerConnection::new(addr, self.torrent.info_hash, self.peer_id, handler); spawn(format!("manage_peer({})", handle), async move { - if let Err(e) = peer_connection.manage_peer(addr, handle, out_rx).await { + if let Err(e) = peer_connection.manage_peer(out_rx).await { debug!("error managing peer {}: {:#}", handle, e) }; - peer_connection.into_state().drop_peer(handle); + peer_connection.into_handler().state.drop_peer(handle); Ok::<_, anyhow::Error>(()) }); true @@ -441,3 +452,451 @@ impl TorrentState { } } } + +#[derive(Clone)] +struct PeerHandler { + state: Arc, + addr: SocketAddr, + spawner: BlockingSpawner, +} + +impl PeerConnectionHandler for PeerHandler { + fn on_received_message(&self, message: Message>) -> anyhow::Result<()> { + match message { + Message::Request(request) => { + self.on_download_request(self.addr, request) + .with_context(|| { + format!("error handling download request from {}", self.addr) + })?; + } + Message::Bitfield(b) => self.on_bitfield(self.addr, b.clone_to_owned())?, + Message::Choke => self.on_i_am_choked(self.addr), + Message::Unchoke => self.on_i_am_unchoked(self.addr), + Message::Interested => self.on_peer_interested(self.addr), + Message::Piece(piece) => { + self.on_received_piece(self.addr, piece) + .context("error in on_received_piece()")?; + } + Message::KeepAlive => { + debug!("keepalive received from {}", self.addr); + } + Message::Have(h) => self.on_have(self.addr, h), + Message::NotInterested => { + info!("received \"not interested\", but we don't care yet") + } + message => { + warn!( + "{}: received unsupported message {:?}, ignoring", + self.addr, message + ); + } + } + Ok(()) + } + + fn get_have_bytes(&self) -> u64 { + self.state.stats.have.load(Ordering::Relaxed) + } + + fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> Option { + let g = self.state.locked.read(); + let msg = Message::Bitfield(ByteBuf(g.chunks.get_have_pieces().as_raw_slice())); + let len = msg.serialize(buf); + debug!("sending to {}: {:?}, length={}", self.addr, &msg, len); + Some(len) + } + + fn on_handshake(&self, handshake: Handshake) { + self.state.set_peer_live(self.addr, handshake) + } + + fn on_uploaded_bytes(&self, bytes: u32) { + self.state + .stats + .uploaded + .fetch_add(bytes as u64, Ordering::Relaxed); + } + + fn read_chunk(&self, chunk: &crate::lengths::ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()> { + self.state.file_ops().read_chunk(self.addr, chunk, buf) + } +} + +impl PeerHandler { + fn on_download_request(&self, peer_handle: PeerHandle, request: Request) -> anyhow::Result<()> { + let piece_index = match self.state.lengths.validate_piece_index(request.index) { + Some(p) => p, + None => { + anyhow::bail!( + "{}: received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.", + peer_handle, request + ); + } + }; + let chunk_info = match self.state.lengths.chunk_info_from_received_data( + piece_index, + request.begin, + request.length, + ) { + Some(d) => d, + None => { + anyhow::bail!( + "{}: received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.", + peer_handle, request + ); + } + }; + + let tx = { + let g = self.state.locked.read(); + if !g.chunks.is_chunk_ready_to_upload(&chunk_info) { + anyhow::bail!( + "got request for a chunk that is not ready to upload. chunk {:?}", + &chunk_info + ); + } + + g.peers.clone_tx(peer_handle).ok_or_else(|| { + anyhow::anyhow!( + "peer {} died, dropping chunk that it requested", + peer_handle + ) + })? + }; + + // TODO: this is not super efficient as it does copying multiple times. + // Theoretically, this could be done in the sending code, so that it reads straight into + // the send buffer. + let request = WriterRequest::ReadChunkRequest(chunk_info); + debug!("sending to {}: {:?}", peer_handle, &request); + Ok::<_, anyhow::Error>(tx.send(request)?) + } + + fn on_have(&self, handle: PeerHandle, have: u32) { + if let Some(bitfield) = self + .state + .locked + .write() + .peers + .get_live_mut(handle) + .and_then(|l| l.bitfield.as_mut()) + { + debug!("{}: updated bitfield with have={}", handle, have); + bitfield.set(have as usize, true) + } + } + + fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) -> anyhow::Result<()> { + if bitfield.len() != self.state.lengths.piece_bitfield_bytes() as usize { + anyhow::bail!( + "dropping {} as its bitfield has unexpected size. Got {}, expected {}", + handle, + bitfield.len(), + self.state.lengths.piece_bitfield_bytes(), + ); + } + self.state + .locked + .write() + .peers + .update_bitfield_from_vec(handle, bitfield.0); + + if !self.state.am_i_interested_in_peer(handle) { + let tx = self + .state + .locked + .read() + .peers + .clone_tx(handle) + .ok_or_else(|| anyhow::anyhow!("peer closed"))?; + tx.send(WriterRequest::Message(MessageOwned::Unchoke)) + .context("peer dropped")?; + tx.send(WriterRequest::Message(MessageOwned::NotInterested)) + .context("peer dropped")?; + return Ok(()); + } + + // Additional spawn per peer, not good. + spawn( + format!("peer_chunk_requester({})", handle), + self.clone().task_peer_chunk_requester(handle), + ); + Ok(()) + } + + async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> { + let tx = match self.state.locked.read().peers.clone_tx(handle) { + Some(tx) => tx, + None => return Ok(()), + }; + tx.send(WriterRequest::Message(MessageOwned::Unchoke)) + .context("peer dropped")?; + tx.send(WriterRequest::Message(MessageOwned::Interested)) + .context("peer dropped")?; + + self.requester(handle).await?; + Ok::<_, anyhow::Error>(()) + } + + fn on_i_am_choked(&self, handle: PeerHandle) { + warn!("we are choked by {}", handle); + self.state + .locked + .write() + .peers + .mark_i_am_choked(handle, true); + } + + fn on_peer_interested(&self, handle: PeerHandle) { + debug!("peer {} is interested", handle); + self.state + .locked + .write() + .peers + .mark_peer_interested(handle, true); + } + + async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { + let notify = match self.state.locked.read().peers.get_live(handle) { + Some(l) => l.have_notify.clone(), + None => return Ok(()), + }; + + // TODO: this might dangle, same below. + #[allow(unused_must_use)] + { + timeout(Duration::from_secs(60), notify.notified()).await; + } + + loop { + match self.state.am_i_choked(handle) { + Some(true) => { + warn!("we are choked by {}, can't reserve next piece", handle); + #[allow(unused_must_use)] + { + timeout(Duration::from_secs(60), notify.notified()).await; + } + continue; + } + Some(false) => {} + None => return Ok(()), + } + + let next = match self.state.try_steal_old_slow_piece(handle) { + Some(next) => next, + None => match self.state.reserve_next_needed_piece(handle) { + Some(next) => next, + None => { + if self.state.get_left_to_download() == 0 { + debug!("{}: nothing left to download, closing requester", handle); + return Ok(()); + } + + if let Some(piece) = self.state.try_steal_piece(handle) { + debug!("{}: stole a piece {}", handle, piece); + piece + } else { + debug!("no pieces to request from {}", handle); + #[allow(unused_must_use)] + { + timeout(Duration::from_secs(60), notify.notified()).await; + } + continue; + } + } + }, + }; + + let tx = match self.state.locked.read().peers.clone_tx(handle) { + Some(tx) => tx, + None => return Ok(()), + }; + let sem = match self.state.locked.read().peers.get_live(handle) { + Some(live) => live.requests_sem.clone(), + None => return Ok(()), + }; + for chunk in self.state.lengths.iter_chunk_infos(next) { + if self.state.locked.read().chunks.is_chunk_downloaded(&chunk) { + continue; + } + if !self + .state + .locked + .write() + .peers + .try_get_live_mut(handle)? + .inflight_requests + .insert(InflightRequest::from(&chunk)) + { + warn!( + "{}: probably a bug, we already requested {:?}", + handle, chunk + ); + continue; + } + + let request = Request { + index: next.get(), + begin: chunk.offset, + length: chunk.size, + }; + sem.acquire().await?.forget(); + + tx.send(WriterRequest::Message(MessageOwned::Request(request))) + .context("peer dropped")?; + } + } + } + + fn on_i_am_unchoked(&self, handle: PeerHandle) { + debug!("we are unchoked by {}", handle); + let mut g = self.state.locked.write(); + let live = match g.peers.get_live_mut(handle) { + Some(live) => live, + None => return, + }; + live.i_am_choked = false; + live.have_notify.notify_waiters(); + live.requests_sem.add_permits(16); + } + + fn on_received_piece(&self, handle: PeerHandle, piece: Piece) -> anyhow::Result<()> { + let chunk_info = match self.state.lengths.chunk_info_from_received_piece(&piece) { + Some(i) => i, + None => { + anyhow::bail!( + "peer {} sent us a piece that is invalid {:?}", + handle, + &piece, + ); + } + }; + + let mut g = self.state.locked.write(); + let h = g.peers.try_get_live_mut(handle)?; + h.requests_sem.add_permits(1); + + self.state + .stats + .fetched_bytes + .fetch_add(piece.block.len() as u64, Ordering::Relaxed); + + if !h + .inflight_requests + .remove(&InflightRequest::from(&chunk_info)) + { + anyhow::bail!( + "peer {} sent us a piece that we did not ask it for. Requested pieces: {:?}. Got: {:?}", handle, &h.inflight_requests, &piece, + ); + } + + let full_piece_download_time = match g.chunks.mark_chunk_downloaded(&piece) { + Some(ChunkMarkingResult::Completed) => { + debug!( + "piece={} done by {}, will write and checksum", + piece.index, handle + ); + // This will prevent others from stealing it. + g.peers + .remove_inflight_piece(chunk_info.piece_index) + .map(|t| t.started.elapsed()) + } + Some(ChunkMarkingResult::PreviouslyCompleted) => { + // TODO: we might need to send cancellations here. + debug!( + "piece={} was done by someone else {}, ignoring", + piece.index, handle + ); + return Ok(()); + } + Some(ChunkMarkingResult::NotCompleted) => None, + None => { + anyhow::bail!( + "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", + handle, + piece + ); + } + }; + + // to prevent deadlocks. + drop(g); + + self.spawner + .spawn_block_in_place(move || { + let index = piece.index; + + // TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what + // should we really do? If we unmark it, it will get requested forever... + // + // So let's just unwrap and abort. + self.state + .file_ops() + .write_chunk(handle, &piece, &chunk_info) + .expect("expected to be able to write to disk"); + + let full_piece_download_time = match full_piece_download_time { + Some(t) => t, + None => return Ok(()), + }; + + match self + .state + .file_ops() + .check_piece(handle, chunk_info.piece_index, &chunk_info) + .with_context(|| format!("error checking piece={}", index))? + { + true => { + let piece_len = + self.state.lengths.piece_length(chunk_info.piece_index) as u64; + self.state + .stats + .downloaded_and_checked + .fetch_add(piece_len, Ordering::Relaxed); + self.state + .stats + .have + .fetch_add(piece_len, Ordering::Relaxed); + self.state + .stats + .downloaded_pieces + .fetch_add(1, Ordering::Relaxed); + self.state + .stats + .downloaded_pieces + .fetch_add(1, Ordering::Relaxed); + self.state.stats.total_piece_download_ms.fetch_add( + full_piece_download_time.as_millis() as u64, + Ordering::Relaxed, + ); + self.state + .locked + .write() + .chunks + .mark_piece_downloaded(chunk_info.piece_index); + + debug!( + "piece={} successfully downloaded and verified from {}", + index, handle + ); + + self.state.maybe_transmit_haves(chunk_info.piece_index); + } + false => { + warn!( + "checksum for piece={} did not validate, came from {}", + index, handle + ); + self.state + .locked + .write() + .chunks + .mark_piece_broken(chunk_info.piece_index); + } + }; + Ok::<_, anyhow::Error>(()) + }) + .with_context(|| format!("error processing received chunk {:?}", chunk_info))?; + Ok(()) + } +}