diff --git a/crates/librqbit/src/peer_state.rs b/crates/librqbit/src/peer_state.rs index 4fc59ae..874146e 100644 --- a/crates/librqbit/src/peer_state.rs +++ b/crates/librqbit/src/peer_state.rs @@ -223,10 +223,24 @@ pub struct LivePeerState { pub peer_id: Id20, pub i_am_choked: bool, pub peer_interested: bool, + + // This is used to limit the number of requests we send to a peer at a time. pub requests_sem: Arc, + + // This is used to unpause processes after we were choked. pub have_notify: Arc, - pub bitfield: Option, + + // This is used to track the pieces the peer has. + pub bitfield: BF, + + // This is used to only request a piece from a peer once when stealing from others. + // So that you don't steal then re-steal the same piece in a loop. + pub previously_requested_pieces: BF, + + // When the peer sends us data this is used to track if we asked for it. pub inflight_requests: HashSet, + + // The main channel to send requests to peer. pub tx: PeerTx, } @@ -236,7 +250,8 @@ impl LivePeerState { peer_id, i_am_choked: true, peer_interested: false, - bitfield: None, + bitfield: BF::new(), + previously_requested_pieces: BF::new(), have_notify: Arc::new(Notify::new()), requests_sem: Arc::new(Semaphore::new(0)), inflight_requests: Default::default(), @@ -245,10 +260,8 @@ impl LivePeerState { } pub fn has_full_torrent(&self, total_pieces: usize) -> bool { - let bf = match self.bitfield.as_ref() { - Some(bf) => bf, - None => return false, - }; - bf.get(0..total_pieces).map_or(false, |s| s.all()) + self.bitfield + .get(0..total_pieces) + .map_or(false, |s| s.all()) } } diff --git a/crates/librqbit/src/torrent_state.rs b/crates/librqbit/src/torrent_state.rs index 12ba450..ee7affd 100644 --- a/crates/librqbit/src/torrent_state.rs +++ b/crates/librqbit/src/torrent_state.rs @@ -168,16 +168,11 @@ impl PeerStates { prev }) } - pub fn update_bitfield_from_vec( - &self, - handle: PeerHandle, - bitfield: Vec, - ) -> Option> { + pub fn update_bitfield_from_vec(&self, handle: PeerHandle, bitfield: Vec) -> Option<()> { self.with_live_mut(handle, "update_bitfield_from_vec", |live| { let bitfield = BF::from_vec(bitfield); - let prev = live.bitfield.take(); - live.bitfield = Some(bitfield); - prev + live.previously_requested_pieces = BF::with_capacity(bitfield.len()); + live.bitfield = bitfield; }) } pub fn mark_peer_connecting(&self, h: PeerHandle) -> anyhow::Result { @@ -543,7 +538,7 @@ impl TorrentState { self.peers .with_live_mut(peer_handle, "l(get_next_needed_piece)", |live| { let g = self.lock_read("g(get_next_needed_piece)"); - let bf = live.bitfield.as_ref()?; + let bf = &live.bitfield; for n in g.chunks.iter_needed_pieces() { if bf.get(n).map(|v| *v) == Some(true) { // in theory it should be safe without validation, but whatever. @@ -567,9 +562,10 @@ impl TorrentState { return None; } let mut g = self.lock_write("reserve_next_needed_piece"); + let n = { let mut n_opt = None; - let bf = live.bitfield.as_ref()?; + let bf = &live.bitfield; for n in g.chunks.iter_needed_pieces() { if bf.get(n).map(|v| *v) == Some(true) { n_opt = Some(n); @@ -627,6 +623,7 @@ impl TorrentState { None } + // TODO: need to throttle this or make it smarter as it may loop and steal pieces forever from each other. fn try_steal_piece(&self, handle: PeerHandle) -> Option { let mut rng = rand::thread_rng(); use rand::seq::IteratorRandom; @@ -782,8 +779,8 @@ impl TorrentState { if live .bitfield - .as_ref() - .and_then(|b| b.get(index.get() as usize).map(|v| *v)) + .get(index.get() as usize) + .map(|v| *v) .unwrap_or(false) { continue; @@ -986,10 +983,8 @@ impl PeerHandler { fn on_have(&self, handle: PeerHandle, have: u32) { self.state.peers.with_live_mut(handle, "on_have", |live| { - if let Some(bitfield) = live.bitfield.as_mut() { - bitfield.set(have as usize, true); - debug!("updated bitfield with have={}", have); - } + live.bitfield.set(have as usize, true); + debug!("updated bitfield with have={}", have); }); } @@ -1081,6 +1076,7 @@ impl PeerHandler { None => return Ok(()), } + // Try steal a pice from a very slow peer first. let next = match self.state.try_steal_old_slow_piece(handle) { Some(next) => next, None => match self.state.reserve_next_needed_piece(handle) {