diff --git a/TODO.md b/TODO.md index af2ee34..2acdd5f 100644 --- a/TODO.md +++ b/TODO.md @@ -1,11 +1,12 @@ - [ ] Selective file downloading (mostly done) - [ ] Proper counting of how much is left, and how much is downloaded -- [ ] Refactor "needed pieces" into a bitfield -- [ ] Send bitfield at the start if I have something -- [ ] use the "update_hash" function in piece checking +- [x] Send bitfield at the start if I have something +- [x] use the "update_hash" function in piece checking - [ ] signaling when file is done +- [ ] per-file stats +- [ ] per-peer stats someday: - [ ] cancellation \ No newline at end of file diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index 2358592..6186ef4 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -62,6 +62,9 @@ impl ChunkTracker { pub fn get_needed_pieces(&self) -> &BF { &self.needed_pieces } + pub fn get_have_pieces(&self) -> &BF { + &self.have + } pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) { self.needed_pieces.set(index.get() as usize, false) } diff --git a/crates/librqbit/src/peer_comms.rs b/crates/librqbit/src/peer_comms.rs index aab31ee..7ab719b 100644 --- a/crates/librqbit/src/peer_comms.rs +++ b/crates/librqbit/src/peer_comms.rs @@ -233,10 +233,18 @@ where .unwrap(); MSG_LEN } - Message::Bitfield(_) => todo!(), + Message::Bitfield(b) => { + let block_len = b.as_ref().len(); + let msg_len = PREAMBLE_LEN + block_len; + out.resize(msg_len, 0); + (&mut out[PREAMBLE_LEN..PREAMBLE_LEN + block_len]).copy_from_slice(b.as_ref()); + msg_len + } Message::Choke | Message::Unchoke | Message::Interested => PREAMBLE_LEN, Message::Piece(p) => { - let payload_len = 8 + p.block.as_ref().len(); + // below code is wrong, need to serialize len_prefix + let block_len = p.block.as_ref().len(); + let payload_len = 8 + block_len; let msg_len = PREAMBLE_LEN + payload_len; out.resize(msg_len, 0); let tmp = &mut out[PREAMBLE_LEN..]; diff --git a/crates/librqbit/src/torrent_manager.rs b/crates/librqbit/src/torrent_manager.rs index eea20c1..1c7cba2 100644 --- a/crates/librqbit/src/torrent_manager.rs +++ b/crates/librqbit/src/torrent_manager.rs @@ -21,7 +21,7 @@ use size_format::SizeFormatterBinary as SF; use tokio::sync::{mpsc::Sender, Notify, Semaphore}; use crate::{ - buffers::ByteString, + buffers::{ByteBuf, ByteString}, chunk_tracker::ChunkTracker, clone_to_owned::CloneToOwned, lengths::{ChunkInfo, Lengths, ValidPieceIndex}, @@ -115,7 +115,24 @@ struct PeerStates { tx: HashMap>>, } +#[derive(Debug, Default)] +struct AggregatePeerStats { + connecting: usize, + live: usize, +} + impl PeerStates { + fn stats(&self) -> AggregatePeerStats { + self.states + .values() + .fold(AggregatePeerStats::default(), |mut s, p| { + match p { + PeerState::Connecting(_) => s.connecting += 1, + PeerState::Live(_) => s.live += 1, + }; + s + }) + } fn add_if_not_seen( &mut self, addr: SocketAddr, @@ -350,7 +367,7 @@ fn initial_check( for piece_info in lengths.iter_piece_infos() { let mut computed_hash = sha1::Sha1::new(); let mut piece_remaining = piece_info.len as usize; - let mut piece_is_needed = false; + let mut some_files_broken = false; let mut at_least_one_file_required = current_file.full_file_required; while piece_remaining > 0 { @@ -372,11 +389,7 @@ fn initial_check( current_file.mark_processed_bytes(to_read_in_file as u64); if current_file.is_broken { - piece_is_needed = true; - continue; - } - - if piece_is_needed { + // no need to read. continue; } @@ -393,16 +406,18 @@ fn initial_check( "error reading from file {} ({:?}) at {}: {:#}", current_file.index, current_file.name, pos, &err ); - piece_is_needed = true; current_file.is_broken = true; + some_files_broken = true; } } - if piece_is_needed { + if at_least_one_file_required && some_files_broken { trace!( "piece {} had errors, marking as needed", piece_info.piece_index ); + + needed_bytes += piece_info.len as u64; needed_pieces.set(piece_info.piece_index.get() as usize, true); continue; } @@ -539,25 +554,27 @@ impl TorrentManager { async fn stats_printer(self) -> anyhow::Result<()> { loop { - let live_peers = self.inner.locked.read().peers.states.len(); + let live_peers = self.inner.locked.read().peers.stats(); let have = self.inner.have.load(Ordering::Relaxed); let fetched = self.inner.fetched_bytes.load(Ordering::Relaxed); let needed = self.inner.needed; let downloaded = self.inner.downloaded_and_checked.load(Ordering::Relaxed); let remaining = needed - downloaded; + let uploaded = self.inner.uploaded.load(Ordering::Relaxed); let downloaded_pct = if downloaded == needed { 100f64 } else { (downloaded as f64 / needed as f64) * 100f64 }; info!( - "Stats: downloaded {:.2}% ({}), live peers {}, fetched {}, remaining {} out of {}, total have {}", + "Stats: downloaded {:.2}% ({}), peers {:?}, fetched {}, remaining {} out of {}, uploaded {}, total have {}", downloaded_pct, SF::new(downloaded), live_peers, SF::new(fetched), SF::new(remaining), SF::new(needed), + SF::new(uploaded), SF::new(have) ); tokio::time::sleep(Duration::from_secs(1)).await; @@ -1295,11 +1312,18 @@ impl TorrentManager { ) -> anyhow::Result<()> { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut conn = tokio::net::TcpStream::connect(addr).await?; + let mut conn = tokio::net::TcpStream::connect(addr) + .await + .context("error connecting")?; let handshake = Handshake::new(self.inner.info_hash, self.inner.peer_id); - conn.write_all(&handshake.serialize()).await?; + conn.write_all(&handshake.serialize()) + .await + .context("error writing handshake")?; let mut read_buf = vec![0u8; 16384 * 2]; - let read_bytes = conn.read(&mut read_buf).await?; + let read_bytes = conn + .read(&mut read_buf) + .await + .context("error reading handshake")?; if read_bytes == 0 { anyhow::bail!("bad handshake"); } @@ -1325,24 +1349,56 @@ impl TorrentManager { let (mut read_half, mut write_half) = tokio::io::split(conn); + let this = self.clone(); let writer = async move { - let mut buf = vec![0u8; 1024]; + let mut buf = Vec::::new(); let keep_alive_interval = Duration::from_secs(120); + + if this.inner.have.load(Ordering::Relaxed) > 0 { + let len = { + let g = this.inner.locked.read(); + let msg = Message::Bitfield(ByteBuf(g.chunks.get_have_pieces().as_raw_slice())); + let len = msg.serialize(&mut buf); + debug!("sending to {}: {:?}, length={}", handle, &msg, len); + len + }; + + write_half + .write_all(&buf[..len]) + .await + .context("error writing bitfield to peer")?; + debug!("sent bitfield to {}", handle); + } + loop { let msg = match tokio::time::timeout(keep_alive_interval, outgoing_chan.recv()).await { Ok(Some(msg)) => msg, - Ok(None) => return Err(anyhow::anyhow!("torrent manager closed")), + Ok(None) => { + // we were closed + return Ok(()); + } Err(_) => MessageOwned::KeepAlive, }; + let uploaded_add = match &msg { + Message::Piece(p) => Some(p.block.len()), + _ => None, + }; + let len = msg.serialize(&mut buf); debug!("sending to {}: {:?}, length={}", handle, &msg, len); write_half .write_all(&buf[..len]) .await - .context("error writing")?; + .context("error writing the message to peer")?; + + if let Some(uploaded_add) = uploaded_add { + this.inner + .uploaded + .fetch_add(uploaded_add as u64, Ordering::Relaxed); + } } // For type inference. @@ -1374,6 +1430,8 @@ impl TorrentManager { read_so_far += size; }; + trace!("received from {}: {:?}", handle, &message); + if read_so_far > size { read_buf.copy_within(size..read_so_far, 0); } diff --git a/src/main.rs b/src/main.rs index 3bfaecf..a2a1136 100644 --- a/src/main.rs +++ b/src/main.rs @@ -57,6 +57,10 @@ struct Opts { /// Set if you are ok to write on top of existing files #[clap(long)] overwrite: bool, + + /// Only list the torrent metadata contents, don't do anything else. + #[clap(short, long)] + list: bool, } fn compute_only_files( @@ -105,6 +109,9 @@ fn main() -> anyhow::Result<()> { }; info!("Torrent metadata: {:#?}", &torrent); + if opts.list { + return Ok(()); + } let only_files = if let Some(filename_re) = opts.only_files_matching_regex { Some(compute_only_files(&torrent, &filename_re)?)