Something horribly broken. But added downloading

This commit is contained in:
Igor Katson 2021-06-26 00:32:52 +01:00
parent 0a640daba4
commit e1354e8a85
5 changed files with 160 additions and 28 deletions

View file

@ -110,3 +110,9 @@ impl<'a> From<&'a [u8]> for ByteString {
Self(b.into())
}
}
impl From<Vec<u8>> for ByteString {
fn from(b: Vec<u8>) -> Self {
Self(b)
}
}

View file

@ -55,7 +55,7 @@ impl ChunkTracker {
// return true if the whole piece is marked downloaded
pub fn mark_chunk_downloaded(&mut self, piece: &Piece<ByteString>) -> Option<bool> {
let chunk_info = self.lengths.chunk_info_from_received_piece_data(piece)?;
let chunk_info = self.lengths.chunk_info_from_received_piece(piece)?;
self.chunk_status
.set(chunk_info.absolute_index as usize, true);
let chunk_range = self.lengths.chunk_range(chunk_info.piece_index);

View file

@ -141,18 +141,19 @@ impl Lengths {
})
}
pub fn chunk_info_from_received_piece_data(
pub fn chunk_info_from_received_data(
&self,
piece: &Piece<ByteString>,
piece_index: ValidPieceIndex,
begin: u32,
chunk_size: u32,
) -> Option<ChunkInfo> {
let piece_index = self.validate_piece_index(piece.index)?;
let index = piece.begin / self.chunk_length;
let chunk_size = self.chunk_size(piece_index, index)?;
let index = begin / self.chunk_length;
let expected_chunk_size = self.chunk_size(piece_index, index)?;
let offset = self.chunk_offset_in_piece(piece_index, index)?;
if offset != piece.begin {
if offset != begin {
return None;
}
if chunk_size as usize != piece.block.len() {
if expected_chunk_size != chunk_size {
return None;
}
let absolute_index = self.chunks_per_piece * piece_index.get() + index;
@ -164,6 +165,14 @@ impl Lengths {
absolute_index,
})
}
pub fn chunk_info_from_received_piece(&self, piece: &Piece<ByteString>) -> Option<ChunkInfo> {
self.chunk_info_from_received_data(
self.validate_piece_index(piece.index)?,
piece.begin,
piece.block.len() as u32,
)
}
pub const fn chunk_range(&self, index: ValidPieceIndex) -> std::ops::Range<usize> {
let start = index.0 * self.chunks_per_piece;
let end = start + self.chunks_per_piece(index);

View file

@ -57,6 +57,17 @@ impl<ByteBuf> Piece<ByteBuf>
where
ByteBuf: AsRef<[u8]>,
{
pub fn from_vec(index: u32, begin: u32, block: Vec<u8>) -> Piece<ByteBuf>
where
ByteBuf: From<Vec<u8>>,
{
Piece {
index,
begin,
block: ByteBuf::from(block),
}
}
pub fn serialize(&self, buf: &mut [u8]) -> usize {
byteorder::BigEndian::write_u32(&mut buf[0..4], self.index);
byteorder::BigEndian::write_u32(&mut buf[4..8], self.begin);

View file

@ -224,32 +224,35 @@ fn spawn<N: Display + 'static + Send>(
name: N,
fut: impl std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
) {
debug!("starting task \"{}\"", name);
debug!("starting task \"{}\"", &name);
tokio::spawn(async move {
match fut.await {
Ok(_) => {
debug!("task \"{}\" finished", name);
debug!("task \"{}\" finished", &name);
}
Err(e) => {
error!("error in task \"{}\": {:#}", name, e)
error!("error in task \"{}\": {:#}", &name, e)
}
}
});
}
fn spawn_blocking<N: Display + 'static + Send>(
async fn spawn_blocking<T: Send + Sync + 'static, N: Display + 'static + Send>(
name: N,
f: impl FnOnce() -> anyhow::Result<()> + Send + 'static,
) {
f: impl FnOnce() -> anyhow::Result<T> + Send + 'static,
) -> anyhow::Result<T> {
debug!("starting blocking task \"{}\"", name);
tokio::task::spawn_blocking(move || match f() {
Ok(_) => {
Ok(v) => {
debug!("blocking task \"{}\" finished", name);
Ok(v)
}
Err(e) => {
error!("error in blocking task \"{}\": {:#}", name, e)
error!("error in blocking task \"{}\": {:#}", name, &e);
Err(e)
}
});
})
.await?
}
fn make_lengths(torrent: &TorrentMetaV1Owned) -> anyhow::Result<Lengths> {
@ -406,12 +409,7 @@ impl TorrentManager {
};
match message {
Message::Request(request) => {
warn!(
"{}: received {:?} , but download requests not implemented",
peer_handle, request
)
}
Message::Request(request) => self.on_download_request(peer_handle, request),
Message::Bitfield(b) => self.on_bitfield(peer_handle, b),
Message::Choke => self.on_i_am_choked(peer_handle),
Message::Unchoke => self.on_i_am_unchoked(peer_handle),
@ -434,6 +432,118 @@ impl TorrentManager {
}
}
}
fn on_download_request(&self, peer_handle: PeerHandle, request: Request) {
let piece_index = match self.inner.lengths.validate_piece_index(request.index) {
Some(p) => p,
None => {
warn!(
"{}: received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.",
peer_handle, request
);
return;
}
};
let chunk_info = match self.inner.lengths.chunk_info_from_received_data(
piece_index,
request.begin,
request.length,
) {
Some(d) => d,
None => {
warn!(
"{}: received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.",
peer_handle, request
);
return;
}
};
let this = self.clone();
let task_name = format!(
"download_request(peer={}, chunk_info={:?})",
peer_handle, &chunk_info
);
spawn(task_name, async move {
let clone = this.clone();
let chunk = spawn_blocking(
format!(
"read_chunk_blocking(peer={}, chunk_info={:?}",
peer_handle, &chunk_info
),
move || clone.read_chunk_blocking(peer_handle, chunk_info),
)
.await?;
let tx = this
.inner
.locked
.read()
.peers
.clone_tx(peer_handle)
.ok_or_else(|| {
anyhow::anyhow!(
"peer {} died, dropping chunk that it requested",
peer_handle
)
})?;
let message = Message::Piece(Piece::from_vec(
chunk_info.piece_index.get(),
chunk_info.offset,
chunk,
));
Ok::<_, anyhow::Error>(tx.send(message).await?)
});
}
fn read_chunk_blocking(
self,
who_sent: PeerHandle,
chunk_info: ChunkInfo,
) -> anyhow::Result<Vec<u8>> {
let mut h = sha1::Sha1::new();
let mut absolute_offset = self.inner.lengths.chunk_absolute_offset(&chunk_info);
let mut result_buf = vec![0u8; chunk_info.size as usize];
let mut buf = &mut result_buf[..];
for (file_idx, file_len) in self.inner.torrent.info.iter_file_lengths().enumerate() {
if absolute_offset > file_len {
absolute_offset -= file_len;
continue;
}
let file_remaining_len = file_len - absolute_offset;
let to_read_in_file = std::cmp::min(file_remaining_len, buf.len() as u64) as usize;
let mut file_g = self.inner.files[file_idx].lock();
debug!(
"piece={}, handle={}, file_idx={}, seeking to {}. To read chunk: {:?}",
chunk_info.piece_index, who_sent, file_idx, absolute_offset, &chunk_info
);
file_g
.seek(std::io::SeekFrom::Start(absolute_offset))
.with_context(|| {
format!(
"error seeking to {}, file id: {}",
absolute_offset, file_idx
)
})?;
file_g
.read_exact(&mut buf[..to_read_in_file])
.with_context(|| {
format!(
"error reading {} bytes, file_id: {}",
file_idx, to_read_in_file
)
})?;
buf = &mut buf[to_read_in_file..];
if buf.is_empty() {
break;
}
absolute_offset = 0;
}
return Ok(result_buf);
}
fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool {
self.get_next_needed_piece(handle).is_some()
}
@ -756,11 +866,7 @@ impl TorrentManager {
}
fn on_received_piece(&self, handle: PeerHandle, piece: Piece<ByteString>) -> Option<()> {
let chunk_info = match self
.inner
.lengths
.chunk_info_from_received_piece_data(&piece)
{
let chunk_info = match self.inner.lengths.chunk_info_from_received_piece(&piece) {
Some(i) => i,
None => {
warn!(