Pretty stable now

This commit is contained in:
Igor Katson 2021-06-27 14:49:41 +01:00
parent 0ac1116aba
commit 06136cc170
2 changed files with 49 additions and 16 deletions

View file

@ -240,7 +240,9 @@ where
(&mut out[PREAMBLE_LEN..PREAMBLE_LEN + block_len]).copy_from_slice(b.as_ref()); (&mut out[PREAMBLE_LEN..PREAMBLE_LEN + block_len]).copy_from_slice(b.as_ref());
msg_len msg_len
} }
Message::Choke | Message::Unchoke | Message::Interested => PREAMBLE_LEN, Message::Choke | Message::Unchoke | Message::Interested | Message::NotInterested => {
PREAMBLE_LEN
}
Message::Piece(p) => { Message::Piece(p) => {
// below code is wrong, need to serialize len_prefix // below code is wrong, need to serialize len_prefix
let block_len = p.block.as_ref().len(); let block_len = p.block.as_ref().len();
@ -261,7 +263,6 @@ where
byteorder::BE::write_u32(&mut out[PREAMBLE_LEN..], *v); byteorder::BE::write_u32(&mut out[PREAMBLE_LEN..], *v);
msg_len msg_len
} }
Message::NotInterested => todo!(),
} }
} }
pub fn deserialize<'a>( pub fn deserialize<'a>(

View file

@ -747,25 +747,36 @@ impl TorrentManager {
} }
} }
fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) { async fn on_bitfield(&self, handle: PeerHandle, bitfield: ByteString) -> anyhow::Result<()> {
if bitfield.len() != self.inner.lengths.piece_bitfield_bytes() as usize { if bitfield.len() != self.inner.lengths.piece_bitfield_bytes() as usize {
warn!( anyhow::bail!(
"dropping {} as its bitfield has unexpected size. Got {}, expected {}", "dropping {} as its bitfield has unexpected size. Got {}, expected {}",
handle, handle,
bitfield.len(), bitfield.len(),
self.inner.lengths.piece_bitfield_bytes(), self.inner.lengths.piece_bitfield_bytes(),
); );
self.inner.locked.write().peers.drop_peer(handle);
return;
} }
self.inner self.inner
.locked .locked
.write() .write()
.peers .peers
.update_bitfield_from_vec(handle, bitfield.0); .update_bitfield_from_vec(handle, bitfield.0);
if !self.am_i_interested_in_peer(handle) { if !self.am_i_interested_in_peer(handle) {
// self.inner.locked.write().peers.drop_peer(handle); let tx = self
return; .inner
.locked
.read()
.peers
.clone_tx(handle)
.ok_or_else(|| anyhow::anyhow!("peer closed"))?;
tx.send(MessageOwned::Unchoke)
.await
.context("peer dropped")?;
tx.send(MessageOwned::NotInterested)
.await
.context("peer dropped")?;
return Ok(());
} }
// Additional spawn per peer, not good. // Additional spawn per peer, not good.
@ -773,6 +784,7 @@ impl TorrentManager {
format!("peer_chunk_requester({})", handle), format!("peer_chunk_requester({})", handle),
self.clone().task_peer_chunk_requester(handle), self.clone().task_peer_chunk_requester(handle),
); );
Ok(())
} }
async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> { async fn task_peer_chunk_requester(self, handle: PeerHandle) -> anyhow::Result<()> {
@ -1073,7 +1085,13 @@ impl TorrentManager {
let mut buf = data.block.as_ref(); let mut buf = data.block.as_ref();
let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info); let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info);
for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() { for (file_idx, (name, file_len)) in self
.inner
.torrent
.info
.iter_filenames_and_lengths()
.enumerate()
{
if absolute_offset > file_len { if absolute_offset > file_len {
absolute_offset -= file_len; absolute_offset -= file_len;
continue; continue;
@ -1084,17 +1102,26 @@ impl TorrentManager {
let mut file_g = self.inner.files[file_idx].lock(); let mut file_g = self.inner.files[file_idx].lock();
debug!( debug!(
"piece={}, chunk={}, handle={}, begin={}, file={}, writing {} bytes at {}", "piece={}, chunk={:?}, handle={}, begin={}, file={}, writing {} bytes at {}",
chunk_info.piece_index, chunk_info.piece_index,
chunk_info.chunk_index, chunk_info,
who_sent, who_sent,
chunk_info.offset, chunk_info.offset,
file_idx, file_idx,
to_write, to_write,
absolute_offset absolute_offset
); );
file_g.seek(SeekFrom::Start(absolute_offset))?; file_g
file_g.write_all(&buf[..to_write])?; .seek(SeekFrom::Start(absolute_offset))
.with_context(|| {
format!(
"error seeking to {} in file {} (\"{:?}\")",
absolute_offset, file_idx, name
)
})?;
file_g
.write_all(&buf[..to_write])
.with_context(|| format!("error writing to file {} (\"{:?}\")", file_idx, name))?;
buf = &buf[to_write..]; buf = &buf[to_write..];
if buf.is_empty() { if buf.is_empty() {
break; break;
@ -1139,7 +1166,7 @@ impl TorrentManager {
); );
} }
match g.chunks.mark_chunk_downloaded(&piece) { let should_checksum = match g.chunks.mark_chunk_downloaded(&piece) {
Some(ChunkMarkingResult::Completed) => { Some(ChunkMarkingResult::Completed) => {
debug!( debug!(
"piece={} done by {}, will write and checksum", "piece={} done by {}, will write and checksum",
@ -1147,6 +1174,7 @@ impl TorrentManager {
); );
// This will prevent others from stealing it. // This will prevent others from stealing it.
g.peers.inflight_pieces.remove(&chunk_info.piece_index); g.peers.inflight_pieces.remove(&chunk_info.piece_index);
true
} }
Some(ChunkMarkingResult::PreviouslyCompleted) => { Some(ChunkMarkingResult::PreviouslyCompleted) => {
// TODO: we might need to send cancellations here. // TODO: we might need to send cancellations here.
@ -1156,7 +1184,7 @@ impl TorrentManager {
); );
return Ok(()); return Ok(());
} }
Some(ChunkMarkingResult::NotCompleted) => return Ok(()), Some(ChunkMarkingResult::NotCompleted) => false,
None => { None => {
anyhow::bail!( anyhow::bail!(
"bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer",
@ -1180,6 +1208,10 @@ impl TorrentManager {
// should we really do? If we unmark it, it will get requested forever... // should we really do? If we unmark it, it will get requested forever...
this.write_chunk_blocking(handle, &piece, &chunk_info)?; this.write_chunk_blocking(handle, &piece, &chunk_info)?;
if !should_checksum {
return Ok(());
}
let clone = this.clone(); let clone = this.clone();
match clone match clone
.check_piece_blocking(handle, chunk_info.piece_index, &chunk_info) .check_piece_blocking(handle, chunk_info.piece_index, &chunk_info)
@ -1454,7 +1486,7 @@ impl TorrentManager {
format!("error handling download request from {}", handle) format!("error handling download request from {}", handle)
})?; })?;
} }
Message::Bitfield(b) => self.on_bitfield(handle, b), Message::Bitfield(b) => self.on_bitfield(handle, b).await?,
Message::Choke => self.on_i_am_choked(handle), Message::Choke => self.on_i_am_choked(handle),
Message::Unchoke => self.on_i_am_unchoked(handle), Message::Unchoke => self.on_i_am_unchoked(handle),
Message::Interested => { Message::Interested => {