Remove Option<BF> to just BF

This commit is contained in:
Igor Katson 2023-11-20 09:17:42 +00:00
parent 1de690a74b
commit 34ee9d9bd9
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 32 additions and 23 deletions

View file

@ -223,10 +223,24 @@ pub struct LivePeerState {
pub peer_id: Id20, pub peer_id: Id20,
pub i_am_choked: bool, pub i_am_choked: bool,
pub peer_interested: bool, pub peer_interested: bool,
// This is used to limit the number of requests we send to a peer at a time.
pub requests_sem: Arc<Semaphore>, pub requests_sem: Arc<Semaphore>,
// This is used to unpause processes after we were choked.
pub have_notify: Arc<Notify>, pub have_notify: Arc<Notify>,
pub bitfield: Option<BF>,
// This is used to track the pieces the peer has.
pub bitfield: BF,
// This is used to only request a piece from a peer once when stealing from others.
// So that you don't steal then re-steal the same piece in a loop.
pub previously_requested_pieces: BF,
// When the peer sends us data this is used to track if we asked for it.
pub inflight_requests: HashSet<InflightRequest>, pub inflight_requests: HashSet<InflightRequest>,
// The main channel to send requests to peer.
pub tx: PeerTx, pub tx: PeerTx,
} }
@ -236,7 +250,8 @@ impl LivePeerState {
peer_id, peer_id,
i_am_choked: true, i_am_choked: true,
peer_interested: false, peer_interested: false,
bitfield: None, bitfield: BF::new(),
previously_requested_pieces: BF::new(),
have_notify: Arc::new(Notify::new()), have_notify: Arc::new(Notify::new()),
requests_sem: Arc::new(Semaphore::new(0)), requests_sem: Arc::new(Semaphore::new(0)),
inflight_requests: Default::default(), inflight_requests: Default::default(),
@ -245,10 +260,8 @@ impl LivePeerState {
} }
pub fn has_full_torrent(&self, total_pieces: usize) -> bool { pub fn has_full_torrent(&self, total_pieces: usize) -> bool {
let bf = match self.bitfield.as_ref() { self.bitfield
Some(bf) => bf, .get(0..total_pieces)
None => return false, .map_or(false, |s| s.all())
};
bf.get(0..total_pieces).map_or(false, |s| s.all())
} }
} }

View file

@ -168,16 +168,11 @@ impl PeerStates {
prev prev
}) })
} }
pub fn update_bitfield_from_vec( pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec<u8>) -> Option<()> {
&self,
handle: PeerHandle,
bitfield: Vec<u8>,
) -> Option<Option<BF>> {
self.with_live_mut(handle, "update_bitfield_from_vec", |live| { self.with_live_mut(handle, "update_bitfield_from_vec", |live| {
let bitfield = BF::from_vec(bitfield); let bitfield = BF::from_vec(bitfield);
let prev = live.bitfield.take(); live.previously_requested_pieces = BF::with_capacity(bitfield.len());
live.bitfield = Some(bitfield); live.bitfield = bitfield;
prev
}) })
} }
pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<PeerRx> { pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result<PeerRx> {
@ -543,7 +538,7 @@ impl TorrentState {
self.peers self.peers
.with_live_mut(peer_handle, "l(get_next_needed_piece)", |live| { .with_live_mut(peer_handle, "l(get_next_needed_piece)", |live| {
let g = self.lock_read("g(get_next_needed_piece)"); let g = self.lock_read("g(get_next_needed_piece)");
let bf = live.bitfield.as_ref()?; let bf = &live.bitfield;
for n in g.chunks.iter_needed_pieces() { for n in g.chunks.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) { if bf.get(n).map(|v| *v) == Some(true) {
// in theory it should be safe without validation, but whatever. // in theory it should be safe without validation, but whatever.
@ -567,9 +562,10 @@ impl TorrentState {
return None; return None;
} }
let mut g = self.lock_write("reserve_next_needed_piece"); let mut g = self.lock_write("reserve_next_needed_piece");
let n = { let n = {
let mut n_opt = None; let mut n_opt = None;
let bf = live.bitfield.as_ref()?; let bf = &live.bitfield;
for n in g.chunks.iter_needed_pieces() { for n in g.chunks.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) { if bf.get(n).map(|v| *v) == Some(true) {
n_opt = Some(n); n_opt = Some(n);
@ -627,6 +623,7 @@ impl TorrentState {
None None
} }
// TODO: need to throttle this or make it smarter as it may loop and steal pieces forever from each other.
fn try_steal_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> { fn try_steal_piece(&self, handle: PeerHandle) -> Option<ValidPieceIndex> {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
use rand::seq::IteratorRandom; use rand::seq::IteratorRandom;
@ -782,8 +779,8 @@ impl TorrentState {
if live if live
.bitfield .bitfield
.as_ref() .get(index.get() as usize)
.and_then(|b| b.get(index.get() as usize).map(|v| *v)) .map(|v| *v)
.unwrap_or(false) .unwrap_or(false)
{ {
continue; continue;
@ -986,10 +983,8 @@ impl PeerHandler {
fn on_have(&self, handle: PeerHandle, have: u32) { fn on_have(&self, handle: PeerHandle, have: u32) {
self.state.peers.with_live_mut(handle, "on_have", |live| { self.state.peers.with_live_mut(handle, "on_have", |live| {
if let Some(bitfield) = live.bitfield.as_mut() { live.bitfield.set(have as usize, true);
bitfield.set(have as usize, true); debug!("updated bitfield with have={}", have);
debug!("updated bitfield with have={}", have);
}
}); });
} }
@ -1081,6 +1076,7 @@ impl PeerHandler {
None => return Ok(()), None => return Ok(()),
} }
// Try steal a pice from a very slow peer first.
let next = match self.state.try_steal_old_slow_piece(handle) { let next = match self.state.try_steal_old_slow_piece(handle) {
Some(next) => next, Some(next) => next,
None => match self.state.reserve_next_needed_piece(handle) { None => match self.state.reserve_next_needed_piece(handle) {