SAving, its broken

This commit is contained in:
Igor Katson 2023-12-05 20:02:54 +00:00
parent b15815d12f
commit efaa36a161
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
4 changed files with 67 additions and 43 deletions

View file

@ -157,6 +157,8 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let h_supports_extended = handshake.supports_extended(); let h_supports_extended = handshake.supports_extended();
self.handler.on_handshake(handshake)?;
self.manage_peer( self.manage_peer(
h_supports_extended, h_supports_extended,
read_so_far, read_so_far,

View file

@ -376,6 +376,7 @@ impl TorrentStateLive {
let peer = Peer::new_live_for_incoming_connection( let peer = Peer::new_live_for_incoming_connection(
Id20(checked_peer.handshake.peer_id), Id20(checked_peer.handshake.peer_id),
tx.clone(), tx.clone(),
&self.peers.stats,
); );
let counters = peer.stats.counters.clone(); let counters = peer.stats.counters.clone();
vac.insert(peer); vac.insert(peer);
@ -400,7 +401,6 @@ impl TorrentStateLive {
rx: PeerRx, rx: PeerRx,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// TODO: bump counters for incoming // TODO: bump counters for incoming
let handler = PeerHandler { let handler = PeerHandler {
addr: checked_peer.addr, addr: checked_peer.addr,
on_bitfield_notify: Default::default(), on_bitfield_notify: Default::default(),
@ -427,7 +427,7 @@ impl TorrentStateLive {
Some(options), Some(options),
self.meta.spawner, self.meta.spawner,
); );
let requester = handler.task_peer_chunk_requester(checked_peer.addr); let requester = handler.task_peer_chunk_requester();
let res = tokio::select! { let res = tokio::select! {
r = requester => {r} r = requester => {r}
@ -458,7 +458,6 @@ impl TorrentStateLive {
async fn task_manage_outgoing_peer(self: Arc<Self>, addr: SocketAddr) -> anyhow::Result<()> { async fn task_manage_outgoing_peer(self: Arc<Self>, addr: SocketAddr) -> anyhow::Result<()> {
let state = self; let state = self;
let (rx, tx) = state.peers.mark_peer_connecting(addr)?; let (rx, tx) = state.peers.mark_peer_connecting(addr)?;
let counters = state let counters = state
.peers .peers
.with_peer(addr, |p| p.stats.counters.clone()) .with_peer(addr, |p| p.stats.counters.clone())
@ -490,7 +489,7 @@ impl TorrentStateLive {
Some(options), Some(options),
state.meta.spawner, state.meta.spawner,
); );
let requester = handler.task_peer_chunk_requester(addr); let requester = handler.task_peer_chunk_requester();
handler handler
.counters .counters
@ -597,18 +596,10 @@ impl TorrentStateLive {
} }
fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) { fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
let result = self.peers.with_peer_mut(handle, "set_peer_live", |p| { self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state p.state
.connecting_to_live(Id20(h.peer_id), &self.peers.stats) .connecting_to_live(Id20(h.peer_id), &self.peers.stats);
.is_some()
}); });
match result {
Some(true) => {
trace!("set peer to live")
}
Some(false) => debug!("can't set peer live, it was in wrong state"),
None => debug!("can't set peer live, it disappeared"),
}
} }
pub fn get_uploaded_bytes(&self) -> u64 { pub fn get_uploaded_bytes(&self) -> u64 {
@ -867,6 +858,8 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()> { fn on_handshake<B>(&self, handshake: Handshake<B>) -> anyhow::Result<()> {
self.state.set_peer_live(self.addr, handshake); self.state.set_peer_live(self.addr, handshake);
self.tx
.send(WriterRequest::Message(MessageOwned::Unchoke))?;
Ok(()) Ok(())
} }
@ -1116,7 +1109,8 @@ impl PeerHandler {
self.state self.state
.peers .peers
.with_live_mut(self.addr, "on_have", |live| { .with_live_mut(self.addr, "on_have", |live| {
// If bitfield wasn't allocated yet, let's do it. Some clients send haves before bitfield. // If bitfield wasn't allocated yet, let's do it. Some clients start empty so they never
// send bitfields.
if live.bitfield.is_empty() { if live.bitfield.is_empty() {
live.bitfield = live.bitfield =
BF::from_vec(vec![0; self.state.lengths.piece_bitfield_bytes()]); BF::from_vec(vec![0; self.state.lengths.piece_bitfield_bytes()]);
@ -1129,6 +1123,7 @@ impl PeerHandler {
} }
}; };
trace!("updated bitfield with have={}", have); trace!("updated bitfield with have={}", have);
self.on_bitfield_notify.notify_waiters();
}); });
} }
@ -1144,10 +1139,40 @@ impl PeerHandler {
self.state self.state
.peers .peers
.update_bitfield_from_vec(self.addr, bitfield.0); .update_bitfield_from_vec(self.addr, bitfield.0);
self.on_bitfield_notify.notify_waiters();
Ok(())
}
async fn wait_for_any_notify(&self, notify: &Notify, check: impl Fn() -> bool) {
// To remove possibility of races, we first grab a token, then check
// if we need it, and only if so, await.
let notified = notify.notified();
if check() {
return;
}
notified.await;
}
async fn wait_for_bitfield(&self) {
self.wait_for_any_notify(&self.on_bitfield_notify, || {
self.state
.peers
.with_live(self.addr, |live| !live.bitfield.is_empty())
.unwrap_or_default()
})
.await;
}
async fn wait_for_unchoke(&self) {
self.wait_for_any_notify(&self.unchoke_notify, || !self.locked.read().i_am_choked)
.await;
}
async fn task_peer_chunk_requester(&self) -> anyhow::Result<()> {
let handle = self.addr;
self.wait_for_bitfield().await;
if !self.state.am_i_interested_in_peer(self.addr) { if !self.state.am_i_interested_in_peer(self.addr) {
self.tx
.send(WriterRequest::Message(MessageOwned::Unchoke))?;
self.tx self.tx
.send(WriterRequest::Message(MessageOwned::NotInterested))?; .send(WriterRequest::Message(MessageOwned::NotInterested))?;
if self.state.is_finished() { if self.state.is_finished() {
@ -1155,32 +1180,11 @@ impl PeerHandler {
} }
return Ok(()); return Ok(());
} }
self.tx
self.on_bitfield_notify.notify_waiters(); .send_many([WriterRequest::Message(MessageOwned::Interested)])?;
Ok(())
}
async fn task_peer_chunk_requester(&self, handle: PeerHandle) -> anyhow::Result<()> {
self.on_bitfield_notify.notified().await;
self.tx.send_many([
WriterRequest::Message(MessageOwned::Unchoke),
WriterRequest::Message(MessageOwned::Interested),
])?;
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), self.unchoke_notify.notified()).await;
}
loop { loop {
if self.locked.read().i_am_choked { self.wait_for_unchoke().await;
debug!("we are choked, can't reserve next piece");
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), self.unchoke_notify.notified()).await;
}
continue;
}
if self.state.is_finished() { if self.state.is_finished() {
debug!("nothing left to download, looping forever until manage_peer quits"); debug!("nothing left to download, looping forever until manage_peer quits");

View file

@ -53,9 +53,15 @@ pub(crate) struct Peer {
} }
impl Peer { impl Peer {
pub fn new_live_for_incoming_connection(peer_id: Id20, tx: PeerTx) -> Self { pub fn new_live_for_incoming_connection(
peer_id: Id20,
tx: PeerTx,
counters: &AggregatePeerStatsAtomic,
) -> Self {
let state = PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx)));
counters.inc(&state.0);
Self { Self {
state: PeerStateNoMut(PeerState::Live(LivePeerState::new(peer_id, tx))), state,
stats: Default::default(), stats: Default::default(),
} }
} }
@ -118,6 +124,13 @@ impl PeerStateNoMut {
std::mem::replace(&mut self.0, new) std::mem::replace(&mut self.0, new)
} }
pub fn get_live(&self) -> Option<&LivePeerState> {
match &self.0 {
PeerState::Live(l) => Some(l),
_ => None,
}
}
pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> { pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
match &mut self.0 { match &mut self.0 {
PeerState::Live(l) => Some(l), PeerState::Live(l) => Some(l),

View file

@ -53,6 +53,11 @@ impl PeerStates {
.map(|e| f(TimedExistence::new(e, reason).value_mut())) .map(|e| f(TimedExistence::new(e, reason).value_mut()))
} }
pub fn with_live<R>(&self, addr: PeerHandle, f: impl FnOnce(&LivePeerState) -> R) -> Option<R> {
self.with_peer(addr, |peer| peer.state.get_live().map(f))
.flatten()
}
pub fn with_live_mut<R>( pub fn with_live_mut<R>(
&self, &self,
addr: PeerHandle, addr: PeerHandle,