This commit is contained in:
Igor Katson 2021-07-01 19:17:44 +01:00
parent a6981231c1
commit 5942e6a9d5
12 changed files with 186 additions and 148 deletions

View file

@ -19,7 +19,7 @@ use crate::{
},
peer_id::try_decode_peer_id,
peer_state::InflightRequest,
spawn_utils::{spawn, spawn_block_in_place},
spawn_utils::{spawn, BlockingSpawner},
torrent_state::TorrentState,
type_aliases::PeerHandle,
};
@ -33,11 +33,12 @@ pub enum WriterRequest {
#[derive(Clone)]
pub struct PeerConnection {
state: Arc<TorrentState>,
spawner: BlockingSpawner,
}
impl PeerConnection {
pub fn new(state: Arc<TorrentState>) -> Self {
PeerConnection { state }
pub fn new(state: Arc<TorrentState>, spawner: BlockingSpawner) -> Self {
PeerConnection { state, spawner }
}
pub fn into_state(self) -> Arc<TorrentState> {
self.state
@ -128,14 +129,15 @@ impl PeerConnection {
let preamble_len = serialize_piece_preamble(&chunk, &mut buf);
let full_len = preamble_len + chunk.size as usize;
buf.resize(full_len, 0);
spawn_block_in_place(|| {
this.state.file_ops().read_chunk(
handle,
&chunk,
&mut buf[preamble_len..],
)
})
.with_context(|| format!("error reading chunk {:?}", chunk))?;
this.spawner
.spawn_block_in_place(|| {
this.state.file_ops().read_chunk(
handle,
&chunk,
&mut buf[preamble_len..],
)
})
.with_context(|| format!("error reading chunk {:?}", chunk))?;
uploaded_add = Some(chunk.size);
full_len
}
@ -544,79 +546,81 @@ impl PeerConnection {
// to prevent deadlocks.
drop(g);
spawn_block_in_place(move || {
let index = piece.index;
self.spawner
.spawn_block_in_place(move || {
let index = piece.index;
// TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what
// should we really do? If we unmark it, it will get requested forever...
//
// So let's just unwrap and abort.
self.state
.file_ops()
.write_chunk(handle, &piece, &chunk_info)
.expect("expected to be able to write to disk");
// TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what
// should we really do? If we unmark it, it will get requested forever...
//
// So let's just unwrap and abort.
self.state
.file_ops()
.write_chunk(handle, &piece, &chunk_info)
.expect("expected to be able to write to disk");
let full_piece_download_time = match full_piece_download_time {
Some(t) => t,
None => return Ok(()),
};
let full_piece_download_time = match full_piece_download_time {
Some(t) => t,
None => return Ok(()),
};
match self
.state
.file_ops()
.check_piece(handle, chunk_info.piece_index, &chunk_info)
.with_context(|| format!("error checking piece={}", index))?
{
true => {
let piece_len = self.state.lengths.piece_length(chunk_info.piece_index) as u64;
self.state
.stats
.downloaded_and_checked
.fetch_add(piece_len, Ordering::Relaxed);
self.state
.stats
.have
.fetch_add(piece_len, Ordering::Relaxed);
self.state
.stats
.downloaded_pieces
.fetch_add(1, Ordering::Relaxed);
self.state
.stats
.downloaded_pieces
.fetch_add(1, Ordering::Relaxed);
self.state.stats.total_piece_download_ms.fetch_add(
full_piece_download_time.as_millis() as u64,
Ordering::Relaxed,
);
self.state
.locked
.write()
.chunks
.mark_piece_downloaded(chunk_info.piece_index);
match self
.state
.file_ops()
.check_piece(handle, chunk_info.piece_index, &chunk_info)
.with_context(|| format!("error checking piece={}", index))?
{
true => {
let piece_len =
self.state.lengths.piece_length(chunk_info.piece_index) as u64;
self.state
.stats
.downloaded_and_checked
.fetch_add(piece_len, Ordering::Relaxed);
self.state
.stats
.have
.fetch_add(piece_len, Ordering::Relaxed);
self.state
.stats
.downloaded_pieces
.fetch_add(1, Ordering::Relaxed);
self.state
.stats
.downloaded_pieces
.fetch_add(1, Ordering::Relaxed);
self.state.stats.total_piece_download_ms.fetch_add(
full_piece_download_time.as_millis() as u64,
Ordering::Relaxed,
);
self.state
.locked
.write()
.chunks
.mark_piece_downloaded(chunk_info.piece_index);
debug!(
"piece={} successfully downloaded and verified from {}",
index, handle
);
debug!(
"piece={} successfully downloaded and verified from {}",
index, handle
);
self.state.maybe_transmit_haves(chunk_info.piece_index);
}
false => {
warn!(
"checksum for piece={} did not validate, came from {}",
index, handle
);
self.state
.locked
.write()
.chunks
.mark_piece_broken(chunk_info.piece_index);
}
};
Ok::<_, anyhow::Error>(())
})
.with_context(|| format!("error processing received chunk {:?}", chunk_info))?;
self.state.maybe_transmit_haves(chunk_info.piece_index);
}
false => {
warn!(
"checksum for piece={} did not validate, came from {}",
index, handle
);
self.state
.locked
.write()
.chunks
.mark_piece_broken(chunk_info.piece_index);
}
};
Ok::<_, anyhow::Error>(())
})
.with_context(|| format!("error processing received chunk {:?}", chunk_info))?;
Ok(())
}
}