diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index d3ae147..6373e07 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -1382,23 +1382,22 @@ impl PeerHandler { // By this time we reach here, no other peer can for this piece. All others, even if they steal pieces would // have fallen off above in one of the defensive checks. - self.state - .meta - .spawner - .spawn_block_in_place(move || { + + let work = { + let state = self.state.clone(); + let addr = self.addr; + let counters = self.counters.clone(); + let piece = piece.clone_to_owned(); + move || { let index = piece.index; // Not being able to write to storage is a fatal error. You need to unpause the // torrent to recover from it. - match self - .state - .file_ops() - .write_chunk(self.addr, &piece, &chunk_info) - { + match state.file_ops().write_chunk(addr, &piece, &chunk_info) { Ok(()) => {} Err(e) => { error!("FATAL: error writing chunk to disk: {:?}", e); - return self.state.on_fatal_error(e); + return state.on_fatal_error(e); } } @@ -1407,61 +1406,58 @@ impl PeerHandler { None => return Ok(()), }; - match self - .state + match state .file_ops() - .check_piece(self.addr, chunk_info.piece_index, &chunk_info) + .check_piece(addr, chunk_info.piece_index, &chunk_info) .with_context(|| format!("error checking piece={index}"))? { true => { { - let mut g = self.state.lock_write("mark_piece_downloaded"); + let mut g = state.lock_write("mark_piece_downloaded"); g.get_chunks_mut()? .mark_piece_downloaded(chunk_info.piece_index); } // Global piece counters. - let piece_len = - self.state.lengths.piece_length(chunk_info.piece_index) as u64; - self.state + let piece_len = state.lengths.piece_length(chunk_info.piece_index) as u64; + state .stats .downloaded_and_checked_bytes // This counter is used to compute "is_finished", so using // stronger ordering. .fetch_add(piece_len, Ordering::Release); - self.state + state .stats .downloaded_and_checked_pieces // This counter is used to compute "is_finished", so using // stronger ordering. .fetch_add(1, Ordering::Release); - self.state + state .stats .have_bytes .fetch_add(piece_len, Ordering::Relaxed); #[allow(clippy::cast_possible_truncation)] - self.state.stats.total_piece_download_ms.fetch_add( + state.stats.total_piece_download_ms.fetch_add( full_piece_download_time.as_millis() as u64, Ordering::Relaxed, ); // Per-peer piece counters. - self.counters - .on_piece_downloaded(piece_len, full_piece_download_time); - self.state.peers.reset_peer_backoff(self.addr); + counters.on_piece_completed(piece_len, full_piece_download_time); + state.peers.reset_peer_backoff(addr); debug!("piece={} successfully downloaded and verified", index); - self.state.on_piece_completed(chunk_info.piece_index)?; + state.on_piece_completed(chunk_info.piece_index)?; - self.state.maybe_transmit_haves(chunk_info.piece_index); + state.maybe_transmit_haves(chunk_info.piece_index); } false => { warn!( "checksum for piece={} did not validate. disconecting peer.", index ); - self.state + state .lock_write("mark_piece_broken") .get_chunks_mut()? .mark_piece_broken_if_not_have(chunk_info.piece_index); @@ -1469,8 +1465,10 @@ impl PeerHandler { } }; Ok::<_, anyhow::Error>(()) - }) - .with_context(|| format!("error processing received chunk {chunk_info:?}"))?; + } + }; + + tokio::runtime::Handle::current().spawn_blocking(work); Ok(()) } } diff --git a/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs b/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs index 6b284b4..4924391 100644 --- a/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs +++ b/crates/librqbit/src/torrent_state/live/peer/stats/atomic.rs @@ -25,7 +25,7 @@ pub(crate) struct PeerCountersAtomic { } impl PeerCountersAtomic { - pub(crate) fn on_piece_downloaded(&self, piece_len: u64, elapsed: Duration) { + pub(crate) fn on_piece_completed(&self, piece_len: u64, elapsed: Duration) { #[allow(clippy::cast_possible_truncation)] let elapsed = elapsed.as_millis() as u64; self.total_piece_download_ms diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index d3e5d50..1756d01 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -75,24 +75,36 @@ pub fn serialize_piece_preamble(chunk: &ChunkInfo, mut buf: &mut [u8]) -> usize } #[derive(Debug)] -pub struct Piece { +pub struct Piece { pub index: u32, pub begin: u32, - pub block: ByteBuf, + pub block: B, } -impl Piece +impl CloneToOwned for Piece { + type Target = Piece; + + fn clone_to_owned(&self) -> Self::Target { + Piece { + index: self.index, + begin: self.begin, + block: self.block.clone_to_owned(), + } + } +} + +impl Piece where - ByteBuf: AsRef<[u8]>, + B: AsRef<[u8]>, { - pub fn from_data(index: u32, begin: u32, block: T) -> Piece + pub fn from_data(index: u32, begin: u32, block: T) -> Piece where - ByteBuf: From, + B: From, { Piece { index, begin, - block: ByteBuf::from(block), + block: B::from(block), } } @@ -103,13 +115,13 @@ where buf.copy_from_slice(self.block.as_ref()); self.block.as_ref().len() + 8 } - pub fn deserialize<'a>(buf: &'a [u8]) -> Piece + pub fn deserialize<'a>(buf: &'a [u8]) -> Piece where - ByteBuf: From<&'a [u8]> + 'a, + B: From<&'a [u8]> + 'a, { let index = byteorder::BigEndian::read_u32(&buf[0..4]); let begin = byteorder::BigEndian::read_u32(&buf[4..8]); - let block = ByteBuf::from(&buf[8..]); + let block = B::from(&buf[8..]); Piece { index, begin,