It sort of works... Peers are still disconnecting somehow

This commit is contained in:
Igor Katson 2021-06-26 21:00:17 +01:00
parent 47c5e9e0c4
commit 0f431621b9
5 changed files with 99 additions and 22 deletions

View file

@ -1,11 +1,12 @@
- [ ] Selective file downloading (mostly done)
- [ ] Proper counting of how much is left, and how much is downloaded
- [ ] Refactor "needed pieces" into a bitfield
- [ ] Send bitfield at the start if I have something
- [ ] use the "update_hash" function in piece checking
- [x] Send bitfield at the start if I have something
- [x] use the "update_hash" function in piece checking
- [ ] signaling when file is done
- [ ] per-file stats
- [ ] per-peer stats
someday:
- [ ] cancellation

View file

@ -62,6 +62,9 @@ impl ChunkTracker {
pub fn get_needed_pieces(&self) -> &BF {
&self.needed_pieces
}
pub fn get_have_pieces(&self) -> &BF {
&self.have
}
pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) {
self.needed_pieces.set(index.get() as usize, false)
}

View file

@ -233,10 +233,18 @@ where
.unwrap();
MSG_LEN
}
Message::Bitfield(_) => todo!(),
Message::Bitfield(b) => {
let block_len = b.as_ref().len();
let msg_len = PREAMBLE_LEN + block_len;
out.resize(msg_len, 0);
(&mut out[PREAMBLE_LEN..PREAMBLE_LEN + block_len]).copy_from_slice(b.as_ref());
msg_len
}
Message::Choke | Message::Unchoke | Message::Interested => PREAMBLE_LEN,
Message::Piece(p) => {
let payload_len = 8 + p.block.as_ref().len();
// below code is wrong, need to serialize len_prefix
let block_len = p.block.as_ref().len();
let payload_len = 8 + block_len;
let msg_len = PREAMBLE_LEN + payload_len;
out.resize(msg_len, 0);
let tmp = &mut out[PREAMBLE_LEN..];

View file

@ -21,7 +21,7 @@ use size_format::SizeFormatterBinary as SF;
use tokio::sync::{mpsc::Sender, Notify, Semaphore};
use crate::{
buffers::ByteString,
buffers::{ByteBuf, ByteString},
chunk_tracker::ChunkTracker,
clone_to_owned::CloneToOwned,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
@ -115,7 +115,24 @@ struct PeerStates {
tx: HashMap<PeerHandle, Arc<tokio::sync::mpsc::Sender<MessageOwned>>>,
}
#[derive(Debug, Default)]
struct AggregatePeerStats {
connecting: usize,
live: usize,
}
impl PeerStates {
fn stats(&self) -> AggregatePeerStats {
self.states
.values()
.fold(AggregatePeerStats::default(), |mut s, p| {
match p {
PeerState::Connecting(_) => s.connecting += 1,
PeerState::Live(_) => s.live += 1,
};
s
})
}
fn add_if_not_seen(
&mut self,
addr: SocketAddr,
@ -350,7 +367,7 @@ fn initial_check(
for piece_info in lengths.iter_piece_infos() {
let mut computed_hash = sha1::Sha1::new();
let mut piece_remaining = piece_info.len as usize;
let mut piece_is_needed = false;
let mut some_files_broken = false;
let mut at_least_one_file_required = current_file.full_file_required;
while piece_remaining > 0 {
@ -372,11 +389,7 @@ fn initial_check(
current_file.mark_processed_bytes(to_read_in_file as u64);
if current_file.is_broken {
piece_is_needed = true;
continue;
}
if piece_is_needed {
// no need to read.
continue;
}
@ -393,16 +406,18 @@ fn initial_check(
"error reading from file {} ({:?}) at {}: {:#}",
current_file.index, current_file.name, pos, &err
);
piece_is_needed = true;
current_file.is_broken = true;
some_files_broken = true;
}
}
if piece_is_needed {
if at_least_one_file_required && some_files_broken {
trace!(
"piece {} had errors, marking as needed",
piece_info.piece_index
);
needed_bytes += piece_info.len as u64;
needed_pieces.set(piece_info.piece_index.get() as usize, true);
continue;
}
@ -539,25 +554,27 @@ impl TorrentManager {
async fn stats_printer(self) -> anyhow::Result<()> {
loop {
let live_peers = self.inner.locked.read().peers.states.len();
let live_peers = self.inner.locked.read().peers.stats();
let have = self.inner.have.load(Ordering::Relaxed);
let fetched = self.inner.fetched_bytes.load(Ordering::Relaxed);
let needed = self.inner.needed;
let downloaded = self.inner.downloaded_and_checked.load(Ordering::Relaxed);
let remaining = needed - downloaded;
let uploaded = self.inner.uploaded.load(Ordering::Relaxed);
let downloaded_pct = if downloaded == needed {
100f64
} else {
(downloaded as f64 / needed as f64) * 100f64
};
info!(
"Stats: downloaded {:.2}% ({}), live peers {}, fetched {}, remaining {} out of {}, total have {}",
"Stats: downloaded {:.2}% ({}), peers {:?}, fetched {}, remaining {} out of {}, uploaded {}, total have {}",
downloaded_pct,
SF::new(downloaded),
live_peers,
SF::new(fetched),
SF::new(remaining),
SF::new(needed),
SF::new(uploaded),
SF::new(have)
);
tokio::time::sleep(Duration::from_secs(1)).await;
@ -1295,11 +1312,18 @@ impl TorrentManager {
) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let mut conn = tokio::net::TcpStream::connect(addr).await?;
let mut conn = tokio::net::TcpStream::connect(addr)
.await
.context("error connecting")?;
let handshake = Handshake::new(self.inner.info_hash, self.inner.peer_id);
conn.write_all(&handshake.serialize()).await?;
conn.write_all(&handshake.serialize())
.await
.context("error writing handshake")?;
let mut read_buf = vec![0u8; 16384 * 2];
let read_bytes = conn.read(&mut read_buf).await?;
let read_bytes = conn
.read(&mut read_buf)
.await
.context("error reading handshake")?;
if read_bytes == 0 {
anyhow::bail!("bad handshake");
}
@ -1325,24 +1349,56 @@ impl TorrentManager {
let (mut read_half, mut write_half) = tokio::io::split(conn);
let this = self.clone();
let writer = async move {
let mut buf = vec![0u8; 1024];
let mut buf = Vec::<u8>::new();
let keep_alive_interval = Duration::from_secs(120);
if this.inner.have.load(Ordering::Relaxed) > 0 {
let len = {
let g = this.inner.locked.read();
let msg = Message::Bitfield(ByteBuf(g.chunks.get_have_pieces().as_raw_slice()));
let len = msg.serialize(&mut buf);
debug!("sending to {}: {:?}, length={}", handle, &msg, len);
len
};
write_half
.write_all(&buf[..len])
.await
.context("error writing bitfield to peer")?;
debug!("sent bitfield to {}", handle);
}
loop {
let msg =
match tokio::time::timeout(keep_alive_interval, outgoing_chan.recv()).await {
Ok(Some(msg)) => msg,
Ok(None) => return Err(anyhow::anyhow!("torrent manager closed")),
Ok(None) => {
// we were closed
return Ok(());
}
Err(_) => MessageOwned::KeepAlive,
};
let uploaded_add = match &msg {
Message::Piece(p) => Some(p.block.len()),
_ => None,
};
let len = msg.serialize(&mut buf);
debug!("sending to {}: {:?}, length={}", handle, &msg, len);
write_half
.write_all(&buf[..len])
.await
.context("error writing")?;
.context("error writing the message to peer")?;
if let Some(uploaded_add) = uploaded_add {
this.inner
.uploaded
.fetch_add(uploaded_add as u64, Ordering::Relaxed);
}
}
// For type inference.
@ -1374,6 +1430,8 @@ impl TorrentManager {
read_so_far += size;
};
trace!("received from {}: {:?}", handle, &message);
if read_so_far > size {
read_buf.copy_within(size..read_so_far, 0);
}

View file

@ -57,6 +57,10 @@ struct Opts {
/// Set if you are ok to write on top of existing files
#[clap(long)]
overwrite: bool,
/// Only list the torrent metadata contents, don't do anything else.
#[clap(short, long)]
list: bool,
}
fn compute_only_files(
@ -105,6 +109,9 @@ fn main() -> anyhow::Result<()> {
};
info!("Torrent metadata: {:#?}", &torrent);
if opts.list {
return Ok(());
}
let only_files = if let Some(filename_re) = opts.only_files_matching_regex {
Some(compute_only_files(&torrent, &filename_re)?)