This commit is contained in:
Igor Katson 2021-06-27 10:10:59 +01:00
parent 0bd3f95891
commit 5c092389f9
4 changed files with 112 additions and 45 deletions

1
Cargo.lock generated
View file

@ -515,6 +515,7 @@ dependencies = [
"futures", "futures",
"log", "log",
"parking_lot", "parking_lot",
"rand",
"reqwest", "reqwest",
"serde", "serde",
"sha1", "sha1",

View file

@ -19,6 +19,7 @@ bitvec = "0.22"
parking_lot = "0.11" parking_lot = "0.11"
log = "0.4" log = "0.4"
size_format = "1" size_format = "1"
rand = "0.8"
uuid = {version = "0.8", features = ["v4"]} uuid = {version = "0.8", features = ["v4"]}
futures = "0.3" futures = "0.3"

View file

@ -2,7 +2,7 @@ use log::{debug, info};
use crate::{ use crate::{
buffers::ByteString, buffers::ByteString,
lengths::{Lengths, ValidPieceIndex}, lengths::{ChunkInfo, Lengths, ValidPieceIndex},
peer_comms::Piece, peer_comms::Piece,
type_aliases::BF, type_aliases::BF,
}; };
@ -50,6 +50,12 @@ fn compute_chunk_status(lengths: &Lengths, needed_pieces: &BF) -> BF {
chunk_bf chunk_bf
} }
pub enum ChunkMarkingResult {
PreviouslyCompleted,
NotCompleted,
Completed,
}
impl ChunkTracker { impl ChunkTracker {
pub fn new(needed_pieces: BF, have_pieces: BF, lengths: Lengths) -> Self { pub fn new(needed_pieces: BF, have_pieces: BF, lengths: Lengths) -> Self {
Self { Self {
@ -84,19 +90,33 @@ impl ChunkTracker {
self.have.set(idx.get() as usize, true) self.have.set(idx.get() as usize, true)
} }
// return true if the whole piece is marked downloaded pub fn is_chunk_downloaded(&self, chunk: &ChunkInfo) -> bool {
pub fn mark_chunk_downloaded(&mut self, piece: &Piece<ByteString>) -> Option<bool> { *self
let chunk_info = self.lengths.chunk_info_from_received_piece(piece)?; .chunk_status
self.chunk_status .get(chunk.absolute_index as usize)
.set(chunk_info.absolute_index as usize, true); .unwrap()
let chunk_range = self.lengths.chunk_range(chunk_info.piece_index); }
let chunk_range = self.chunk_status.get(chunk_range).unwrap();
let all = chunk_range.all();
// return true if the whole piece is marked downloaded
pub fn mark_chunk_downloaded(
&mut self,
piece: &Piece<ByteString>,
) -> Option<ChunkMarkingResult> {
let chunk_info = self.lengths.chunk_info_from_received_piece(piece)?;
let chunk_range = self.lengths.chunk_range(chunk_info.piece_index);
let chunk_range = self.chunk_status.get_mut(chunk_range).unwrap();
if chunk_range.all() {
return Some(ChunkMarkingResult::PreviouslyCompleted);
}
chunk_range.set(chunk_info.chunk_index as usize, true);
debug!( debug!(
"piece={}, chunk_info={:?}, bits={:?}", "piece={}, chunk_info={:?}, bits={:?}",
piece.index, chunk_info, chunk_range, piece.index, chunk_info, chunk_range,
); );
Some(all)
if chunk_range.all() {
return Some(ChunkMarkingResult::Completed);
}
return Some(ChunkMarkingResult::NotCompleted);
} }
} }

View file

@ -18,11 +18,14 @@ use log::{debug, error, info, trace, warn};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use reqwest::Url; use reqwest::Url;
use size_format::SizeFormatterBinary as SF; use size_format::SizeFormatterBinary as SF;
use tokio::sync::{mpsc::Sender, Notify, Semaphore}; use tokio::{
sync::{mpsc::Sender, Notify, Semaphore},
time::timeout,
};
use crate::{ use crate::{
buffers::{ByteBuf, ByteString}, buffers::{ByteBuf, ByteString},
chunk_tracker::ChunkTracker, chunk_tracker::{ChunkMarkingResult, ChunkTracker},
clone_to_owned::CloneToOwned, clone_to_owned::CloneToOwned,
lengths::{ChunkInfo, Lengths, ValidPieceIndex}, lengths::{ChunkInfo, Lengths, ValidPieceIndex},
peer_comms::{ peer_comms::{
@ -112,6 +115,7 @@ struct LivePeerState {
struct PeerStates { struct PeerStates {
states: HashMap<PeerHandle, PeerState>, states: HashMap<PeerHandle, PeerState>,
seen_peers: HashSet<SocketAddr>, seen_peers: HashSet<SocketAddr>,
requested_pieces: HashSet<ValidPieceIndex>,
tx: HashMap<PeerHandle, Arc<tokio::sync::mpsc::Sender<MessageOwned>>>, tx: HashMap<PeerHandle, Arc<tokio::sync::mpsc::Sender<MessageOwned>>>,
} }
@ -789,6 +793,19 @@ impl TorrentManager {
}) })
} }
fn try_steal_piece(&self) -> Option<ValidPieceIndex> {
let mut rng = rand::thread_rng();
use rand::seq::IteratorRandom;
self.inner
.locked
.read()
.peers
.requested_pieces
.iter()
.choose(&mut rng)
.copied()
}
async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> { async fn requester(self, handle: PeerHandle) -> anyhow::Result<()> {
let notify = match self.inner.locked.read().peers.get_live(handle) { let notify = match self.inner.locked.read().peers.get_live(handle) {
Some(l) => l.have_notify.clone(), Some(l) => l.have_notify.clone(),
@ -798,25 +815,43 @@ impl TorrentManager {
// TODO: this might dangle, same below. // TODO: this might dangle, same below.
#[allow(unused_must_use)] #[allow(unused_must_use)]
{ {
tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; timeout(Duration::from_secs(60), notify.notified()).await;
} }
loop { loop {
let next = match self.reserve_next_needed_piece(handle) { match self.am_i_choked(handle) {
Some(next) => next, Some(true) => {
None => { warn!("we are choked by {}, can't reserve next piece", handle);
info!("no pieces to request from {}", handle);
let notify = match self.inner.locked.read().peers.get_live(handle) {
Some(l) => l.have_notify.clone(),
None => return Ok(()),
};
#[allow(unused_must_use)] #[allow(unused_must_use)]
{ {
tokio::time::timeout(Duration::from_secs(60), notify.notified()).await; timeout(Duration::from_secs(60), notify.notified()).await;
} }
continue; continue;
} }
Some(false) => {}
None => return Ok(()),
}
let (next, is_stolen) = match self.reserve_next_needed_piece(handle) {
Some(next) => (next, false),
None => {
if self.get_left_to_download() == 0 {
info!("{}: nothing left to download, closing requester", handle);
return Ok(());
}
if let Some(piece) = self.try_steal_piece() {
info!("{}: stole a piece {}", handle, piece);
(piece, true)
} else {
info!("no pieces to request from {}", handle);
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), notify.notified()).await;
}
continue;
}
}
}; };
let tx = match self.inner.locked.read().peers.clone_tx(handle) { let tx = match self.inner.locked.read().peers.clone_tx(handle) {
Some(tx) => tx, Some(tx) => tx,
@ -827,6 +862,9 @@ impl TorrentManager {
None => return Ok(()), None => return Ok(()),
}; };
for chunk in self.inner.lengths.iter_chunk_infos(next) { for chunk in self.inner.lengths.iter_chunk_infos(next) {
if is_stolen && self.inner.locked.read().chunks.is_chunk_downloaded(&chunk) {
continue;
}
let request = Request { let request = Request {
index: next.get(), index: next.get(),
begin: chunk.offset, begin: chunk.offset,
@ -880,6 +918,7 @@ impl TorrentManager {
break; break;
} }
} }
self.inner.lengths.validate_piece_index(n_opt? as u32)? self.inner.lengths.validate_piece_index(n_opt? as u32)?
}; };
@ -887,6 +926,7 @@ impl TorrentManager {
.get_live_mut(peer_handle)? .get_live_mut(peer_handle)?
.requested_pieces .requested_pieces
.insert(n); .insert(n);
g.peers.requested_pieces.insert(n);
g.chunks.reserve_needed_piece(n); g.chunks.reserve_needed_piece(n);
Some(n) Some(n)
} }
@ -1057,9 +1097,14 @@ impl TorrentManager {
.fetch_add(piece.block.len() as u64, Ordering::Relaxed); .fetch_add(piece.block.len() as u64, Ordering::Relaxed);
if !h.requested_pieces.contains(&chunk_info.piece_index) { if !h.requested_pieces.contains(&chunk_info.piece_index) {
// TODO: this is wrong, we need to distinguish between these cases.
warn!( warn!(
"peer {} sent us a piece that we did not ask for, dropping it. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece, "peer {} sent us a piece that we did not ask for, dropping it. Requested pieces: {:?}. Got: {:?}", handle, &h.requested_pieces, &piece,
); );
// this prevents a deadlock.
drop(g);
self.drop_peer(handle); self.drop_peer(handle);
return None; return None;
} }
@ -1075,21 +1120,24 @@ impl TorrentManager {
let index = piece.index; let index = piece.index;
this.write_chunk_blocking(handle, &piece, &chunk_info)?; this.write_chunk_blocking(handle, &piece, &chunk_info)?;
let piece_done = match this match this
.inner .inner
.locked .locked
.write() .write()
.chunks .chunks
.mark_chunk_downloaded(&piece) .mark_chunk_downloaded(&piece)
{ {
Some(true) => { Some(ChunkMarkingResult::Completed) => {
debug!("piece={} done by {}, will checksum", piece.index, handle);
}
Some(ChunkMarkingResult::PreviouslyCompleted) => {
debug!( debug!(
"piece={} done, requesting a piece from {}", "piece={} was done by someone else {}, ignoring",
piece.index, handle piece.index, handle
); );
true return Ok(());
} }
Some(false) => false, Some(ChunkMarkingResult::NotCompleted) => return Ok(()),
None => { None => {
warn!( warn!(
"bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer", "bogus data received from {}: {:?}, cannot map this to a chunk, dropping peer",
@ -1100,16 +1148,14 @@ impl TorrentManager {
} }
}; };
if !piece_done {
return Ok(());
}
// Ignore responses about this piece from now on. // Ignore responses about this piece from now on.
this.inner {
.locked let mut g = this.inner.locked.write();
.write() g.peers
.peers .get_live_mut(handle)
.get_live_mut(handle) .map(|l| l.requested_pieces.remove(&chunk_info.piece_index));
.map(|l| l.requested_pieces.remove(&chunk_info.piece_index)); g.peers.requested_pieces.remove(&chunk_info.piece_index);
}
let clone = this.clone(); let clone = this.clone();
match clone match clone
@ -1309,14 +1355,13 @@ impl TorrentManager {
} }
loop { loop {
let msg = let msg = match timeout(keep_alive_interval, outgoing_chan.recv()).await {
match tokio::time::timeout(keep_alive_interval, outgoing_chan.recv()).await { Ok(Some(msg)) => msg,
Ok(Some(msg)) => msg, Ok(None) => {
Ok(None) => { anyhow::bail!("closing writer, channel closed")
anyhow::bail!("closing writer, channel closed") }
} Err(_) => MessageOwned::KeepAlive,
Err(_) => MessageOwned::KeepAlive, };
};
let uploaded_add = match &msg { let uploaded_add = match &msg {
Message::Piece(p) => Some(p.block.len()), Message::Piece(p) => Some(p.block.len()),