flushing bitfield
This commit is contained in:
parent
bc9e72df60
commit
2fee0ca8c2
4 changed files with 54 additions and 12 deletions
|
|
@ -9,14 +9,12 @@ use bitvec::{
|
||||||
view::{AsBits, AsMutBits},
|
view::{AsBits, AsMutBits},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
pub trait BitV: Send + Sync {
|
pub trait BitV: Send + Sync {
|
||||||
fn as_slice(&self) -> &BitSlice<u8, Lsb0>;
|
fn as_slice(&self) -> &BitSlice<u8, Lsb0>;
|
||||||
fn as_slice_mut(&mut self) -> &mut BitSlice<u8, Lsb0>;
|
fn as_slice_mut(&mut self) -> &mut BitSlice<u8, Lsb0>;
|
||||||
fn into_dyn(self) -> Box<dyn BitV>;
|
fn into_dyn(self) -> Box<dyn BitV>;
|
||||||
fn as_bytes(&self) -> &[u8];
|
fn as_bytes(&self) -> &[u8];
|
||||||
|
fn flush(&mut self) -> anyhow::Result<()>;
|
||||||
async fn flush(&mut self) -> anyhow::Result<()>;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type BoxBitV = Box<dyn BitV>;
|
pub type BoxBitV = Box<dyn BitV>;
|
||||||
|
|
@ -48,7 +46,7 @@ impl BitV for BitVec<u8, Lsb0> {
|
||||||
self.as_raw_slice()
|
self.as_raw_slice()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn flush(&mut self) -> anyhow::Result<()> {
|
fn flush(&mut self) -> anyhow::Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -71,7 +69,7 @@ impl BitV for BitBox<u8, Lsb0> {
|
||||||
self.as_raw_slice()
|
self.as_raw_slice()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn flush(&mut self) -> anyhow::Result<()> {
|
fn flush(&mut self) -> anyhow::Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -80,7 +78,6 @@ impl BitV for BitBox<u8, Lsb0> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl BitV for MmapBitV {
|
impl BitV for MmapBitV {
|
||||||
fn as_slice(&self) -> &BitSlice<u8, Lsb0> {
|
fn as_slice(&self) -> &BitSlice<u8, Lsb0> {
|
||||||
self.mmap.as_bits()
|
self.mmap.as_bits()
|
||||||
|
|
@ -94,7 +91,7 @@ impl BitV for MmapBitV {
|
||||||
&self.mmap
|
&self.mmap
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn flush(&mut self) -> anyhow::Result<()> {
|
fn flush(&mut self) -> anyhow::Result<()> {
|
||||||
Ok(self.mmap.flush()?)
|
Ok(self.mmap.flush()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -103,7 +100,6 @@ impl BitV for MmapBitV {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
impl BitV for Box<dyn BitV> {
|
impl BitV for Box<dyn BitV> {
|
||||||
fn as_slice(&self) -> &BitSlice<u8, Lsb0> {
|
fn as_slice(&self) -> &BitSlice<u8, Lsb0> {
|
||||||
(**self).as_slice()
|
(**self).as_slice()
|
||||||
|
|
@ -117,8 +113,8 @@ impl BitV for Box<dyn BitV> {
|
||||||
(**self).as_bytes()
|
(**self).as_bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn flush(&mut self) -> anyhow::Result<()> {
|
fn flush(&mut self) -> anyhow::Result<()> {
|
||||||
(**self).flush().await
|
(**self).flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn into_dyn(self) -> Box<dyn BitV> {
|
fn into_dyn(self) -> Box<dyn BitV> {
|
||||||
|
|
|
||||||
|
|
@ -185,6 +185,10 @@ impl ChunkTracker {
|
||||||
&*self.have
|
&*self.have
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_have_pieces_mut(&mut self) -> &mut dyn BitV {
|
||||||
|
&mut *self.have
|
||||||
|
}
|
||||||
|
|
||||||
pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) {
|
pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) {
|
||||||
self.queue_pieces.set(index.get() as usize, false)
|
self.queue_pieces.set(index.get() as usize, false)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -70,10 +70,22 @@ impl TorrentStateInitializing {
|
||||||
bitv_factory: Arc<dyn BitVFactory>,
|
bitv_factory: Arc<dyn BitVFactory>,
|
||||||
) -> anyhow::Result<TorrentStatePaused> {
|
) -> anyhow::Result<TorrentStatePaused> {
|
||||||
let id: TorrentIdOrHash = self.meta.info_hash.into();
|
let id: TorrentIdOrHash = self.meta.info_hash.into();
|
||||||
let have_pieces = bitv_factory
|
let mut have_pieces = bitv_factory
|
||||||
.load(id)
|
.load(id)
|
||||||
.await
|
.await
|
||||||
.context("error loading have_pieces")?;
|
.context("error loading have_pieces")?;
|
||||||
|
if let Some(hp) = have_pieces.as_ref() {
|
||||||
|
let actual = hp.as_bytes().len();
|
||||||
|
let expected = self.meta.lengths.piece_bitfield_bytes();
|
||||||
|
if actual != expected {
|
||||||
|
warn!(
|
||||||
|
actual,
|
||||||
|
expected,
|
||||||
|
"the bitfield loaded isn't of correct length, ignoring it, will do full check"
|
||||||
|
);
|
||||||
|
have_pieces = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
let have_pieces = match have_pieces {
|
let have_pieces = match have_pieces {
|
||||||
Some(h) => h,
|
Some(h) => h,
|
||||||
None => {
|
None => {
|
||||||
|
|
|
||||||
|
|
@ -131,6 +131,8 @@ pub(crate) struct TorrentStateLocked {
|
||||||
|
|
||||||
// If this is None, then it was already used
|
// If this is None, then it was already used
|
||||||
fatal_errors_tx: Option<tokio::sync::oneshot::Sender<anyhow::Error>>,
|
fatal_errors_tx: Option<tokio::sync::oneshot::Sender<anyhow::Error>>,
|
||||||
|
|
||||||
|
unflushed_bitv_bytes: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TorrentStateLocked {
|
impl TorrentStateLocked {
|
||||||
|
|
@ -145,6 +147,23 @@ impl TorrentStateLocked {
|
||||||
.as_mut()
|
.as_mut()
|
||||||
.context("chunk tracker empty, torrent was paused")
|
.context("chunk tracker empty, torrent was paused")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn try_flush_bitv(&mut self) {
|
||||||
|
if self.unflushed_bitv_bytes == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(Err(e)) = self
|
||||||
|
.chunks
|
||||||
|
.as_mut()
|
||||||
|
.map(|ct| ct.get_have_pieces_mut().flush())
|
||||||
|
{
|
||||||
|
warn!(error=?e, "error flushing bitfield");
|
||||||
|
} else {
|
||||||
|
trace!("flushed bitfield");
|
||||||
|
self.unflushed_bitv_bytes = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
|
|
@ -155,6 +174,8 @@ pub struct TorrentStateOptions {
|
||||||
pub peer_read_write_timeout: Option<Duration>,
|
pub peer_read_write_timeout: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024;
|
||||||
|
|
||||||
pub struct TorrentStateLive {
|
pub struct TorrentStateLive {
|
||||||
peers: PeerStates,
|
peers: PeerStates,
|
||||||
meta: Arc<ManagedTorrentInfo>,
|
meta: Arc<ManagedTorrentInfo>,
|
||||||
|
|
@ -223,6 +244,7 @@ impl TorrentStateLive {
|
||||||
inflight_pieces: Default::default(),
|
inflight_pieces: Default::default(),
|
||||||
file_priorities,
|
file_priorities,
|
||||||
fatal_errors_tx: Some(fatal_errors_tx),
|
fatal_errors_tx: Some(fatal_errors_tx),
|
||||||
|
unflushed_bitv_bytes: 0,
|
||||||
}),
|
}),
|
||||||
files: paused.files,
|
files: paused.files,
|
||||||
stats: AtomicStats {
|
stats: AtomicStats {
|
||||||
|
|
@ -684,6 +706,7 @@ impl TorrentStateLive {
|
||||||
|
|
||||||
fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> {
|
fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> {
|
||||||
let mut g = self.lock_write("on_piece_completed");
|
let mut g = self.lock_write("on_piece_completed");
|
||||||
|
let g = &mut **g;
|
||||||
let chunks = g.get_chunks_mut()?;
|
let chunks = g.get_chunks_mut()?;
|
||||||
|
|
||||||
// if we have all the pieces of the file, reopen it read only
|
// if we have all the pieces of the file, reopen it read only
|
||||||
|
|
@ -701,13 +724,20 @@ impl TorrentStateLive {
|
||||||
self.streams
|
self.streams
|
||||||
.wake_streams_on_piece_completed(id, &self.meta.lengths);
|
.wake_streams_on_piece_completed(id, &self.meta.lengths);
|
||||||
|
|
||||||
|
g.unflushed_bitv_bytes += self.meta.lengths.piece_length(id) as u64;
|
||||||
|
if g.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES {
|
||||||
|
g.try_flush_bitv()
|
||||||
|
}
|
||||||
|
|
||||||
|
let chunks = g.get_chunks()?;
|
||||||
if chunks.is_finished() {
|
if chunks.is_finished() {
|
||||||
if chunks.get_selected_pieces()[id.get_usize()] {
|
if chunks.get_selected_pieces()[id.get_usize()] {
|
||||||
|
g.try_flush_bitv();
|
||||||
info!("torrent finished downloading");
|
info!("torrent finished downloading");
|
||||||
}
|
}
|
||||||
self.finished_notify.notify_waiters();
|
self.finished_notify.notify_waiters();
|
||||||
|
|
||||||
if !self.has_active_streams_unfinished_files(&g) {
|
if !self.has_active_streams_unfinished_files(g) {
|
||||||
// There is not poing being connected to peers that have all the torrent, when
|
// There is not poing being connected to peers that have all the torrent, when
|
||||||
// we don't need anything from them, and they don't need anything from us.
|
// we don't need anything from them, and they don't need anything from us.
|
||||||
self.disconnect_all_peers_that_have_full_torrent();
|
self.disconnect_all_peers_that_have_full_torrent();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue