Uploading seems to work fine now

This commit is contained in:
Igor Katson 2023-12-05 20:31:06 +00:00
parent efaa36a161
commit 4784f3f14b
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
3 changed files with 17 additions and 45 deletions

View file

@ -102,7 +102,7 @@ use self::{
atomic::PeerCountersAtomic as AtomicPeerCounters, atomic::PeerCountersAtomic as AtomicPeerCounters,
snapshot::{PeerStatsFilter, PeerStatsSnapshot}, snapshot::{PeerStatsFilter, PeerStatsSnapshot},
}, },
InflightRequest, PeerRx, PeerState, PeerTx, SendMany, InflightRequest, PeerRx, PeerState, PeerTx,
}, },
peers::PeerStates, peers::PeerStates,
stats::{atomic::AtomicStats, snapshot::StatsSnapshot}, stats::{atomic::AtomicStats, snapshot::StatsSnapshot},
@ -571,30 +571,6 @@ impl TorrentStateLive {
TimedExistence::new(timeit(reason, || self.locked.write()), reason) TimedExistence::new(timeit(reason, || self.locked.write()), reason)
} }
fn get_next_needed_piece(
&self,
peer_handle: PeerHandle,
) -> anyhow::Result<Option<ValidPieceIndex>> {
self.peers
.with_live_mut(peer_handle, "l(get_next_needed_piece)", |live| {
let g = self.lock_read("g(get_next_needed_piece)");
let bf = &live.bitfield;
for n in g.get_chunks()?.iter_needed_pieces() {
if bf.get(n).map(|v| *v) == Some(true) {
// in theory it should be safe without validation, but whatever.
return Ok(self.lengths.validate_piece_index(n as u32));
}
}
Ok(None)
})
.transpose()
.map(|r| r.flatten())
}
fn am_i_interested_in_peer(&self, handle: PeerHandle) -> bool {
matches!(self.get_next_needed_piece(handle), Ok(Some(_)))
}
fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) { fn set_peer_live<B>(&self, handle: PeerHandle, h: Handshake<B>) {
self.peers.with_peer_mut(handle, "set_peer_live", |p| { self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.state p.state
@ -1172,16 +1148,27 @@ impl PeerHandler {
let handle = self.addr; let handle = self.addr;
self.wait_for_bitfield().await; self.wait_for_bitfield().await;
if !self.state.am_i_interested_in_peer(self.addr) { // TODO: this check needs to happen more often
if self.state.is_finished() {
self.tx self.tx
.send(WriterRequest::Message(MessageOwned::NotInterested))?; .send(WriterRequest::Message(MessageOwned::NotInterested))?;
if self.state.is_finished() {
if self
.state
.peers
.with_live(self.addr, |l| {
l.has_full_torrent(self.state.lengths.total_pieces() as usize)
})
.unwrap_or_default()
{
debug!("both peer and us have full torrent, disconnecting");
self.tx.send(WriterRequest::Disconnect)?; self.tx.send(WriterRequest::Disconnect)?;
return Ok(());
} }
return Ok(());
} }
self.tx self.tx
.send_many([WriterRequest::Message(MessageOwned::Interested)])?; .send(WriterRequest::Message(MessageOwned::Interested))?;
loop { loop {
self.wait_for_unchoke().await; self.wait_for_unchoke().await;

View file

@ -2,8 +2,6 @@ pub mod stats;
use std::collections::HashSet; use std::collections::HashSet;
use anyhow::Context;
use librqbit_core::id20::Id20; use librqbit_core::id20::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex}; use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
@ -29,23 +27,9 @@ impl From<&ChunkInfo> for InflightRequest {
} }
} }
// TODO: Arc can be removed probably, as UnboundedSender should be clone + it can be downgraded to weak.
pub(crate) type PeerRx = UnboundedReceiver<WriterRequest>; pub(crate) type PeerRx = UnboundedReceiver<WriterRequest>;
pub(crate) type PeerTx = UnboundedSender<WriterRequest>; pub(crate) type PeerTx = UnboundedSender<WriterRequest>;
pub trait SendMany {
fn send_many(&self, requests: impl IntoIterator<Item = WriterRequest>) -> anyhow::Result<()>;
}
impl SendMany for PeerTx {
fn send_many(&self, requests: impl IntoIterator<Item = WriterRequest>) -> anyhow::Result<()> {
requests
.into_iter()
.try_for_each(|r| self.send(r))
.context("peer dropped")
}
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub(crate) struct Peer { pub(crate) struct Peer {
pub state: PeerStateNoMut, pub state: PeerStateNoMut,

View file

@ -338,6 +338,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> {
disable_dht: opts.disable_dht, disable_dht: opts.disable_dht,
disable_dht_persistence: opts.disable_dht_persistence, disable_dht_persistence: opts.disable_dht_persistence,
dht_config: None, dht_config: None,
// This will be overriden by "server start" below if needed.
persistence: false, persistence: false,
persistence_filename: None, persistence_filename: None,
peer_id: None, peer_id: None,