use std::{ collections::{HashMap, HashSet}, fmt::Display, fs::{File, OpenOptions}, future::Future, io::{Read, Seek, Write}, net::SocketAddr, path::{Path, PathBuf}, sync::{ atomic::{AtomicU64, Ordering}, Arc, }, time::Duration, }; use anyhow::Context; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use log::{debug, error, info, trace, warn}; use parking_lot::{Mutex, RwLock}; use reqwest::Url; use tokio::{ sync::{mpsc::Sender, Notify, Semaphore}, task::JoinHandle, }; use crate::{ buffers::ByteString, chunk_tracker::ChunkTracker, clone_to_owned::CloneToOwned, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, peer_comms::{ Handshake, Message, MessageBorrowed, MessageDeserializeError, MessageOwned, Piece, Request, }, peer_id::try_decode_peer_id, torrent_metainfo::TorrentMetaV1Owned, tracker_comms::{CompactTrackerResponse, TrackerRequest, TrackerRequestEvent}, }; pub struct TorrentManagerBuilder { torrent: TorrentMetaV1Owned, overwrite: bool, output_folder: PathBuf, } impl TorrentManagerBuilder { pub fn new>(torrent: TorrentMetaV1Owned, output_folder: P) -> Self { Self { torrent, overwrite: false, output_folder: output_folder.as_ref().into(), } } pub fn overwrite(mut self, overwrite: bool) -> Self { self.overwrite = overwrite; self } pub async fn start_manager(self) -> anyhow::Result { TorrentManager::start(self.torrent, self.output_folder, self.overwrite) } } #[derive(Clone)] pub struct TorrentManagerHandle { manager: TorrentManager, } impl TorrentManagerHandle { pub async fn cancel(&self) -> anyhow::Result<()> { todo!() } pub async fn wait_until_completed(&self) -> anyhow::Result<()> { loop { tokio::time::sleep(Duration::from_secs(60)).await; } } } type PeerHandle = SocketAddr; enum PeerState { Connecting(SocketAddr), Live(LivePeerState), } type BF = bitvec::vec::BitVec; struct LivePeerState { #[allow(unused)] peer_id: [u8; 20], i_am_choked: bool, #[allow(unused)] peer_choked: bool, #[allow(unused)] peer_interested: bool, outstanding_requests: Arc, have_notify: Arc, bitfield: Option, requested_pieces: HashSet, } #[derive(Default)] struct PeerStates { states: HashMap, seen_peers: HashSet, tx: HashMap>>, } impl PeerStates { fn add_if_not_seen( &mut self, addr: SocketAddr, tx: tokio::sync::mpsc::Sender, ) -> Option { if self.seen_peers.contains(&addr) { return None; } let handle = self.add(addr, tx)?; self.seen_peers.insert(addr); Some(handle) } fn get_live(&self, handle: PeerHandle) -> Option<&LivePeerState> { if let PeerState::Live(ref l) = self.states.get(&handle)? { return Some(l); } None } fn get_live_mut(&mut self, handle: PeerHandle) -> Option<&mut LivePeerState> { if let PeerState::Live(ref mut l) = self.states.get_mut(&handle)? { return Some(l); } None } fn add( &mut self, addr: SocketAddr, tx: tokio::sync::mpsc::Sender, ) -> Option { let handle = addr; if self.states.contains_key(&addr) { return None; } self.states.insert(handle, PeerState::Connecting(addr)); self.tx.insert(handle, Arc::new(tx)); Some(handle) } fn drop_peer(&mut self, handle: PeerHandle) -> Option { let result = self.states.remove(&handle); self.tx.remove(&handle); result } fn mark_i_am_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option { match self.states.get_mut(&handle) { Some(PeerState::Live(live)) => { let prev = live.i_am_choked; live.i_am_choked = is_choked; return Some(prev); } _ => return None, } } fn mark_peer_choked(&mut self, handle: PeerHandle, is_choked: bool) -> Option { match self.states.get_mut(&handle) { Some(PeerState::Live(live)) => { let prev = live.peer_choked; live.peer_choked = is_choked; return Some(prev); } _ => return None, } } fn update_bitfield_from_vec( &mut self, handle: PeerHandle, bitfield: Vec, ) -> Option> { match self.states.get_mut(&handle) { Some(PeerState::Live(live)) => { let bitfield = BF::from_vec(bitfield); let prev = live.bitfield.take(); live.bitfield = Some(bitfield); Some(prev) } _ => None, } } fn get_tx(&self, handle: PeerHandle) -> Option<&Sender> { self.tx.get(&handle).map(|v| v.as_ref()) } fn clone_tx(&self, handle: PeerHandle) -> Option>> { Some(self.tx.get(&handle)?.clone()) } } struct TorrentManagerInnerLocked { peers: PeerStates, chunks: ChunkTracker, } impl TorrentManagerInnerLocked {} struct TorrentManagerInner { torrent: TorrentMetaV1Owned, locked: Arc>, files: Vec>>, info_hash: [u8; 20], peer_id: [u8; 20], incoming_tx: tokio::sync::mpsc::Sender<(PeerHandle, MessageOwned)>, downloaded: AtomicU64, uploaded: AtomicU64, fetched_bytes: AtomicU64, lengths: Lengths, } #[derive(Clone)] struct TorrentManager { inner: Arc, } fn generate_peer_id() -> [u8; 20] { let mut peer_id = [0u8; 20]; let u = uuid::Uuid::new_v4(); (&mut peer_id[..16]).copy_from_slice(&u.as_bytes()[..]); peer_id } fn spawn( name: N, fut: impl std::future::Future> + Send + 'static, ) { debug!("starting task \"{}\"", &name); tokio::spawn(async move { match fut.await { Ok(_) => { debug!("task \"{}\" finished", &name); } Err(e) => { error!("error in task \"{}\": {:#}", &name, e) } } }); } fn spawn_blocking( name: N, f: impl FnOnce() -> anyhow::Result + Send + 'static, ) -> impl Future> { debug!("starting blocking task \"{}\"", name); tokio::task::spawn_blocking(move || match f() { Ok(v) => { debug!("blocking task \"{}\" finished", name); Ok(v) } Err(e) => { error!("error in blocking task \"{}\": {:#}", name, &e); Err(e) } }) .map(|j| j.unwrap()) } fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result { let total_length = torrent.info.iter_file_lengths().sum(); Lengths::new(total_length, torrent.info.piece_length, None) } fn compute_needed_pieces( torrent: &TorrentMetaV1Owned, files: &mut [Arc>], lengths: &Lengths, ) -> anyhow::Result { let needed_pieces = vec![u8::MAX; lengths.piece_bitfield_bytes()]; let needed_pieces = BF::from_vec(needed_pieces); // TODO: read and validate existing files Ok(needed_pieces) } impl TorrentManager { pub fn start>( torrent: TorrentMetaV1Owned, out: P, overwrite: bool, ) -> anyhow::Result { let mut files = { let mut files = Vec::>>::with_capacity(torrent.info.iter_file_lengths().count()); for (path_bits, _) in torrent.info.iter_filenames_and_lengths() { let mut full_path = out.as_ref().to_owned(); for bit in path_bits.iter_components() { full_path.push( bit.as_ref() .map(|b| std::str::from_utf8(b.as_ref())) .unwrap_or(Ok("output"))?, ); } std::fs::create_dir_all(full_path.parent().unwrap())?; let file = if overwrite { OpenOptions::new() .create(true) .read(true) .write(true) .open(&full_path)? } else { // TODO: create_new does not seem to work with read(true), so calling this twice. OpenOptions::new() .create_new(true) .write(true) .open(&full_path) .with_context(|| format!("error creating {:?}", &full_path))?; OpenOptions::new().read(true).write(true).open(&full_path)? }; files.push(Arc::new(Mutex::new(file))) } files }; let peer_id = generate_peer_id(); let lengths = make_lengths(&torrent).context("unable to compute Lengths from torrent")?; let needed_pieces = compute_needed_pieces(&torrent, &mut files, &lengths)?; debug!("computed lengths: {:?}", &lengths); let chunk_tracker = ChunkTracker::new(needed_pieces, lengths); let (incoming_tx, incoming_rx) = tokio::sync::mpsc::channel::<(PeerHandle, MessageOwned)>(1); let mgr = Self { inner: Arc::new(TorrentManagerInner { info_hash: torrent.info_hash, torrent, peer_id, locked: Arc::new(RwLock::new(TorrentManagerInnerLocked { peers: Default::default(), chunks: chunk_tracker, })), files, incoming_tx, downloaded: Default::default(), fetched_bytes: Default::default(), uploaded: Default::default(), lengths, }), }; spawn("tracker_monitor", mgr.clone().task_tracker_monitor()); spawn( "incoming_rx_handler", mgr.clone().task_incoming_rx_handler(incoming_rx), ); spawn("Stats printer", mgr.clone().stats_printer()); Ok(mgr.into_handle()) } async fn stats_printer(self) -> anyhow::Result<()> { loop { let live_peers = self.inner.locked.read().peers.states.len(); let downloaded_bytes = self.inner.downloaded.load(Ordering::Relaxed); let downloaded = self.inner.downloaded.load(Ordering::Relaxed) / 1024 / 1024; let fetched = self.inner.fetched_bytes.load(Ordering::Relaxed) / 1024 / 1024; let total_length = self.inner.lengths.total_length(); let pct = if total_length == downloaded { 100f64 } else { (downloaded_bytes as f64 / self.inner.lengths.total_length() as f64) * 100f64 }; info!( "Total downloaded and checked {}MiB ({:.2}%), fetched {}MiB, live peers={}", downloaded, pct, fetched, live_peers ); tokio::time::sleep(Duration::from_secs(1)).await; } } async fn task_tracker_monitor(self) -> anyhow::Result<()> { let mut seen_trackers = HashSet::new(); let mut tracker_futures = FuturesUnordered::new(); let parse_url = |url: &[u8]| -> anyhow::Result { let url = std::str::from_utf8(url).context("error parsing tracker URL")?; let url = Url::parse(url).context("error parsing tracker URL")?; Ok(url) }; for tracker in self.inner.torrent.iter_announce() { if seen_trackers.contains(&tracker) { continue; } seen_trackers.insert(tracker); let tracker_url = match parse_url(tracker) { Ok(url) => url, Err(e) => { warn!("ignoring tracker: {:#}", e); continue; } }; tracker_futures.push(self.clone().single_tracker_monitor(tracker_url)); } while tracker_futures.next().await.is_some() {} Ok(()) } async fn task_incoming_rx_handler( self, mut incoming_tx: tokio::sync::mpsc::Receiver<(PeerHandle, MessageOwned)>, ) -> anyhow::Result<()> { loop { let (peer_handle, message): (PeerHandle, MessageOwned) = match incoming_tx.recv().await { Some(msg) => msg, None => { return Ok(()); } }; match message { Message::Request(request) => self.on_download_request(peer_handle, request), Message::Bitfield(b) => self.on_bitfield(peer_handle, b), Message::Choke => self.on_i_am_choked(peer_handle), Message::Unchoke => self.on_i_am_unchoked(peer_handle), Message::Interested => { warn!( "{} is interested, but support for interested messages not implemented", peer_handle ) } Message::Piece(piece) => { self.on_received_piece(peer_handle, piece); } Message::KeepAlive => { debug!("keepalive received from {}", peer_handle); } Message::Have(h) => self.on_have(peer_handle, h), Message::NotInterested => { info!("received \"not interested\", but we don't care yet") } } } } fn on_download_request(&self, peer_handle: PeerHandle, request: Request) { let piece_index = match self.inner.lengths.validate_piece_index(request.index) { Some(p) => p, None => { warn!( "{}: received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.", peer_handle, request ); return; } }; let chunk_info = match self.inner.lengths.chunk_info_from_received_data( piece_index, request.begin, request.length, ) { Some(d) => d, None => { warn!( "{}: received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.", peer_handle, request ); return; } }; let this = self.clone(); let task_name = format!( "download_request(peer={}, chunk_info={:?})", peer_handle, &chunk_info ); spawn(task_name, async move { let clone = this.clone(); let chunk = spawn_blocking( format!( "read_chunk_blocking(peer={}, chunk_info={:?}", peer_handle, &chunk_info ), move || clone.read_chunk_blocking(peer_handle, chunk_info), ) .await?; let tx = this .inner .locked .read() .peers .clone_tx(peer_handle) .ok_or_else(|| { anyhow::anyhow!( "peer {} died, dropping chunk that it requested", peer_handle ) })?; let message = Message::Piece(Piece::from_vec( chunk_info.piece_index.get(), chunk_info.offset, chunk, )); info!("sending to {}: {:?}", peer_handle, &message); Ok::<_, anyhow::Error>(tx.send(message).await?) }); } fn read_chunk_blocking( self, who_sent: PeerHandle, chunk_info: ChunkInfo, ) -> anyhow::Result> { let mut h = sha1::Sha1::new(); let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info); let mut result_buf = vec![0u8; chunk_info.size as usize]; let mut buf = &mut result_buf[..]; for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() { if absolute_offset > file_len { absolute_offset -= file_len; continue; } let file_remaining_len = file_len - absolute_offset; let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize; let mut file_g = self.inner.files[file_idx].lock(); debug!( "piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}", chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info ); file_g .seek(std::io::SeekFrom::Start(absolute_offset)) .with_context(|| { format!( "error seeking to {}, file id: {}", absolute_offset, file_idx ) })?; file_g .read_exact(&mut buf[..to_read_in_file]) .with_context(|| { format!( "error reading {} bytes, file_id: {}", file_idx, to_read_in_file ) })?; buf = &mut buf[to_read_in_file..]; if buf.is_empty() { break; } absolute_offset = 0; } return Ok(result_buf); } fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool { self.get_next_needed_piece(handle).is_some() } fn on_have(&self, handle: PeerHandle, have: u32) { if let Some(bitfield) = self .inner .locked .write() .peers .get_live_mut(handle) .and_then(|l| l.bitfield.as_mut()) { debug!("{}: updated bitfield with have={}", handle, have); bitfield.set(have as usize, true) } } fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) { if bitfield.len() != self.inner.lengths.piece_bitfield_bytes() as usize { warn!( "dropping {} as its bitfield has unexpected size. Got {}, expected {}", handle, bitfield.len(), self.inner.lengths.piece_bitfield_bytes(), ); self.inner.locked.write().peers.drop_peer(handle); return; } self.inner .locked .write() .peers .update_bitfield_from_vec(handle, bitfield.0); if !self.am_i_interested_in_peer(handle) { self.inner.locked.write().peers.drop_peer(handle); return; } // Additional spawn per peer. spawn( format!("peer_chunk_requester({})", handle), self.clone().task_peer_chunk_requester(handle), ); } async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> { let tx = match self.inner.locked.read().peers.clone_tx(handle) { Some(tx) => tx, None => return Ok(()), }; tx.send(MessageOwned::Unchoke) .await .context("peer dropped")?; tx.send(MessageOwned::Interested) .await .context("peer dropped")?; self.requester(handle).await?; Ok::<_, anyhow::Error>(()) } fn on_i_am_choked(&self, handle: PeerHandle) { warn!("we are choked by {}", handle); self.inner .locked .write() .peers .mark_i_am_choked(handle, true); } fn am_i_choked(&self, peer_handle: PeerHandle) -> Option { self.inner .locked .read() .peers .states .get(&peer_handle) .and_then(|s| match s { PeerState::Live(l) => Some(l.i_am_choked), _ => None, }) } async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { let notify = match self.inner.locked.read().peers.get_live(handle) { Some(l) => l.have_notify.clone(), None => return Ok(()), }; // TODO: this might dangle tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; loop { let next = match self.reserve_next_needed_piece(handle) { Some(next) => next, None => { info!("no pieces to request from {}", handle); let notify = match self.inner.locked.read().peers.get_live(handle) { Some(l) => l.have_notify.clone(), None => return Ok(()), }; // TODO: this might dangle tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; continue; } }; let tx = match self.inner.locked.read().peers.clone_tx(handle) { Some(tx) => tx, None => return Ok(()), }; let sem = match self.inner.locked.read().peers.get_live(handle) { Some(live) => live.outstanding_requests.clone(), None => return Ok(()), }; for chunk in self.inner.lengths.iter_chunk_infos(next) { let request = Request { index: next.get(), begin: chunk.offset, length: chunk.size, }; sem.acquire().await?.forget(); tx.send(MessageOwned::Request(request)) .await .context("peer dropped")?; } } } fn on_i_am_unchoked(&self, handle: PeerHandle) { debug!("we are unchoked by {}", handle); let mut g = self.inner.locked.write(); let live = match g.peers.get_live_mut(handle) { Some(live) => live, None => return, }; live.i_am_choked = false; live.have_notify.notify_waiters(); live.outstanding_requests.add_permits(16); } fn get_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { let g = self.inner.locked.read(); let bf = match g.peers.states.get(&peer_handle)? { PeerState::Live(l) => l.bitfield.as_ref()?, _ => return None, }; for n in g.chunks.get_needed_pieces().iter_ones() { if bf.get(n).map(|v| *v) == Some(true) { // in theory it should be safe without validation, but whatever. return self.inner.lengths.validate_piece_index(n as u32); } } None } fn reserve_next_needed_piece(&self, peer_handle: PeerHandle) -> Option { if self.am_i_choked(peer_handle)? { warn!("we are choked by {}, can't reserve next piece", peer_handle); return None; } let mut g = self.inner.locked.write(); let n = { let mut n_opt = None; let bf = g.peers.get_live(peer_handle)?.bitfield.as_ref()?; for n in g.chunks.get_needed_pieces().iter_ones() { if bf.get(n).map(|v| *v) == Some(true) { n_opt = Some(n); break; } } self.inner.lengths.validate_piece_index(n_opt? as u32)? }; g.peers .get_live_mut(peer_handle)? .requested_pieces .insert(n); g.chunks.reserve_needed_piece(n); Some(n) } fn check_piece_blocking( &self, who_sent: PeerHandle, piece_index: ValidPieceIndex, last_received_chunk: &ChunkInfo, ) -> anyhow::Result { let mut h = sha1::Sha1::new(); let piece_length = self.inner.lengths.piece_length(piece_index); let mut absolute_offset = self.inner.lengths.piece_offset(piece_index); let mut buf = vec![0u8; std::cmp::min(8192, piece_length as usize)]; let mut left_to_read = piece_length as usize; for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() { if absolute_offset > file_len { absolute_offset -= file_len; continue; } let file_remaining_len = file_len - absolute_offset; let to_read_in_file = std::cmp::min(file_remaining_len, left_to_read as u64) as usize; let mut left_to_read_in_file = to_read_in_file; let mut file_g = self.inner.files[file_idx].lock(); debug!( "piece={}, handle={}, file_idx={}, seeking to {}. Last received chunk: {:?}", piece_index, who_sent, file_idx, absolute_offset, &last_received_chunk ); file_g .seek(std::io::SeekFrom::Start(absolute_offset)) .with_context(|| { format!( "error seeking to {}, file id: {}", absolute_offset, file_idx ) })?; while left_to_read_in_file > 0 { let chunk_length = std::cmp::min(buf.len(), left_to_read_in_file); file_g .read_exact(&mut buf[..chunk_length]) .with_context(|| { format!( "error reading {} bytes, file_id: {}, left_to_read_in_file: {}", chunk_length, file_idx, left_to_read_in_file ) })?; h.update(&buf[..chunk_length]); left_to_read_in_file -= chunk_length; } left_to_read -= to_read_in_file; if left_to_read == 0 { return Ok(true); } absolute_offset = 0; } match self.inner.torrent.info.compare_hash(piece_index.get(), &h) { Some(true) => { debug!("piece={} hash matches", piece_index); Ok(true) } Some(false) => { warn!("the piece={} hash does not match", piece_index); Ok(false) } None => { // this is probably a bug? warn!("compare_hash() did not find the piece"); anyhow::bail!("compare_hash() did not find the piece"); } } } // TODO: this is a task per chunk, not good async fn task_transmit_haves(self, index: u32) -> anyhow::Result<()> { let mut unordered = FuturesUnordered::new(); for weak in self .inner .locked .read() .peers .tx .values() .map(|v| Arc::downgrade(v)) { unordered.push(async move { if let Some(tx) = weak.upgrade() { if tx.send(Message::Have(index)).await.is_err() { // whatever } } }); } while unordered.next().await.is_some() {} Ok(()) } fn write_chunk_blocking( &self, who_sent: PeerHandle, data: &Piece, chunk_info: &ChunkInfo, ) -> anyhow::Result<()> { let mut buf = data.block.as_ref(); let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info); for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() { if absolute_offset > file_len { absolute_offset -= file_len; continue; } let remaining_len = file_len - absolute_offset; let to_write = std::cmp::min(buf.len(), remaining_len as usize); let mut file_g = self.inner.files[file_idx].lock(); debug!( "piece={}, chunk={}, handle={}, begin={}, file={}, writing {} bytes at {}", chunk_info.piece_index, chunk_info.chunk_index, who_sent, chunk_info.offset, file_idx, to_write, absolute_offset ); file_g.seek(std::io::SeekFrom::Start(absolute_offset))?; file_g.write_all(&buf[..to_write])?; buf = &buf[to_write..]; if buf.is_empty() { break; } absolute_offset = 0; } Ok(()) } fn on_received_piece(&self, handle: PeerHandle, piece: Piece) -> Option<()> { let chunk_info = match self.inner.lengths.chunk_info_from_received_piece(&piece) { Some(i) => i, None => { warn!( "peer {} sent us a piece that is invalid {:?}, dropping", handle, &piece, ); self.drop_peer(handle); return None; } }; let mut g = self.inner.locked.write(); let h = g.peers.get_live_mut(handle)?; h.outstanding_requests.add_permits(1); self.inner .fetched_bytes .fetch_add(piece.block.len() as u64, Ordering::Relaxed); if !h.requested_pieces.contains(&chunk_info.piece_index) { warn!( "peer {} sent us a piece that we did not ask for, dropping it. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece, ); self.drop_peer(handle); return None; } let this = self.clone(); spawn_blocking( format!( "write_and_check(piece={}, peer={}, block={:?})", piece.index, handle, &piece ), move || { let index = piece.index; this.write_chunk_blocking(handle, &piece, &chunk_info)?; let piece_done = match this .inner .locked .write() .chunks .mark_chunk_downloaded(&piece) { Some(true) => { debug!( "piece={} done, requesting a piece from {}", piece.index, handle ); true } Some(false) => false, None => { warn!( "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", handle, piece ); this.drop_peer(handle); return Ok(()); } }; if !piece_done { return Ok(()); } // Ignore responses about this piece from now on. this.inner .locked .write() .peers .get_live_mut(handle) .map(|l| l.requested_pieces.remove(&chunk_info.piece_index)); let clone = this.clone(); match clone .check_piece_blocking(handle, chunk_info.piece_index, &chunk_info) .with_context(|| format!("error checking piece={}", index))? { true => { this.inner.downloaded.fetch_add( this.inner.lengths.piece_length(chunk_info.piece_index) as u64, Ordering::Relaxed, ); debug!( "piece={} successfully downloaded and verified from {}", index, handle ); spawn( "transmit haves", this.clone().task_transmit_haves(piece.index), ); } false => { warn!( "checksum for piece={} did not validate, came from {}", index, handle ); this.inner .locked .write() .chunks .mark_piece_needed(chunk_info.piece_index); // this.drop_peer(handle); } }; Ok::<_, anyhow::Error>(()) }, ); Some(()) } fn into_handle(self) -> TorrentManagerHandle { TorrentManagerHandle { manager: self } } fn get_uploaded(&self) -> u64 { self.inner.uploaded.load(Ordering::Relaxed) } fn get_downloaded(&self) -> u64 { self.inner.downloaded.load(Ordering::Relaxed) } async fn tracker_one_request(&self, tracker_url: Url) -> anyhow::Result { let response: reqwest::Response = reqwest::get(tracker_url).await?; let bytes = response.bytes().await?; let response = crate::serde_bencode::from_bytes::(&bytes)?; for peer in response.peers.iter_sockaddrs() { self.add_peer(peer); } Ok(response.interval) } fn get_total(&self) -> u64 { if let Some(length) = self.inner.torrent.info.length { return length; } self.inner .torrent .info .files .as_ref() .map(|files| files.iter().map(|f| f.length).sum()) .unwrap_or_default() } fn get_left_to_download(&self) -> u64 { self.get_total() - self.get_downloaded() } async fn single_tracker_monitor(self, mut tracker_url: Url) -> anyhow::Result<()> { let mut event = Some(TrackerRequestEvent::Started); loop { let request = TrackerRequest { info_hash: self.inner.torrent.info_hash, peer_id: self.inner.peer_id, port: 6778, uploaded: self.get_uploaded(), downloaded: self.get_downloaded(), left: self.get_left_to_download(), compact: true, no_peer_id: false, event, ip: None, numwant: None, key: None, trackerid: None, }; let request_query = request.as_querystring(); tracker_url.set_query(Some(&request_query)); let this = self.clone(); match this.tracker_one_request(tracker_url.clone()).await { Ok(interval) => { event = None; let duration = Duration::from_secs(interval); debug!( "sleeping for {:?} after calling tracker {}", duration, tracker_url.host().unwrap() ); tokio::time::sleep(duration).await; } Err(e) => { error!("error calling the tracker {}: {:#}", tracker_url, e); tokio::time::sleep(Duration::from_secs(60)).await; } }; } } fn set_peer_live(&self, handle: PeerHandle, addr: SocketAddr, h: Handshake) { let mut g = self.inner.locked.write(); match g.peers.states.get_mut(&handle) { Some(s @ &mut PeerState::Connecting(_)) => { *s = PeerState::Live(LivePeerState { peer_id: h.peer_id, i_am_choked: true, peer_choked: true, peer_interested: false, bitfield: None, have_notify: Arc::new(Notify::new()), outstanding_requests: Arc::new(Semaphore::new(0)), requested_pieces: Default::default(), }); } _ => { warn!("peer {} was in wrong state", handle); } } } async fn manage_peer( &self, addr: SocketAddr, handle: PeerHandle, incoming_chan: tokio::sync::mpsc::Sender<(PeerHandle, MessageOwned)>, // outgoing_chan_tx: tokio::sync::mpsc::Sender, mut outgoing_chan: tokio::sync::mpsc::Receiver, ) -> anyhow::Result<()> { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; let mut conn = tokio::net::TcpStream::connect(addr).await?; let handshake = Handshake::new(self.inner.info_hash, self.inner.peer_id); conn.write_all(&handshake.serialize()).await?; let mut read_buf = vec![0u8; 16384 * 2]; let read_bytes = conn.read(&mut read_buf).await?; if read_bytes == 0 { anyhow::bail!("bad handshake"); } let (h, hlen) = Handshake::deserialize(&read_buf[..read_bytes]) .map_err(|e| anyhow::anyhow!("error deserializing handshake: {:?}", e))?; let mut read_so_far = 0usize; debug!( "connected peer {}: {:?}", addr, try_decode_peer_id(h.peer_id) ); if h.info_hash != self.inner.info_hash { anyhow::bail!("info hash does not match"); } self.set_peer_live(handle, addr, h); if read_bytes > hlen { read_buf.copy_within(hlen..read_bytes, 0); read_so_far = read_bytes - hlen; } let (mut read_half, mut write_half) = tokio::io::split(conn); let writer = async move { let mut buf = vec![0u8; 1024]; let keep_alive_interval = Duration::from_secs(120); loop { let msg = match tokio::time::timeout(keep_alive_interval, outgoing_chan.recv()).await { Ok(Some(msg)) => msg, Ok(None) => return Err(anyhow::anyhow!("torrent manager closed")), Err(_) => MessageOwned::KeepAlive, }; let len = msg.serialize(&mut buf); debug!("sending to {}: {:?}, length={}", handle, &msg, len); write_half .write_all(&buf[..len]) .await .context("error writing")?; } // For type inference. #[allow(unreachable_code)] Ok::<_, anyhow::Error>(()) }; let reader = async move { loop { let (message, size) = loop { match MessageBorrowed::deserialize(&read_buf[..read_so_far]) { Ok((msg, size)) => break (msg.clone_to_owned(), size), Err(MessageDeserializeError::NotEnoughData(d, _)) => { if read_buf.len() < read_so_far + d { read_buf.reserve(d); read_buf.resize(read_buf.capacity(), 0); } } Err(e) => return Err(e.into()), } let size = read_half .read(&mut read_buf[read_so_far..]) .await .context("error reading from peer")?; if size == 0 { anyhow::bail!("disconnected while reading, read so far: {}", read_so_far) } read_so_far += size; }; if read_so_far > size { read_buf.copy_within(size..read_so_far, 0); } read_so_far -= size; incoming_chan .send((handle, message)) .await .context("error sending received message")?; } // For type inference. #[allow(unreachable_code)] Ok::<_, anyhow::Error>(()) }; let r = tokio::select! { r = reader => {r} r = writer => {r} }; debug!("{}: either reader or writer are done, exiting", handle); r } fn drop_peer(&self, handle: PeerHandle) -> bool { let mut g = self.inner.locked.write(); let peer = match g.peers.drop_peer(handle) { Some(peer) => peer, None => return false, }; match peer { PeerState::Connecting(_) => {} PeerState::Live(l) => { for piece in l.requested_pieces { g.chunks.mark_piece_needed(piece); } } } true } fn add_peer(&self, addr: SocketAddr) { let (out_tx, out_rx) = tokio::sync::mpsc::channel::(1); let handle = match self .inner .locked .write() .peers .add_if_not_seen(addr, out_tx) { Some(handle) => handle, None => return, }; let this = self.clone(); spawn(format!("manage_peer({})", handle), async move { if let Err(e) = this .manage_peer(addr, handle, this.inner.incoming_tx.clone(), out_rx) .await { error!("error managing peer, will drop {}: {:#}", handle, e) }; this.drop_peer(handle); Ok::<_, anyhow::Error>(()) }); } }