diff --git a/Makefile b/Makefile index 800c7d9..aa5a6d3 100644 --- a/Makefile +++ b/Makefile @@ -20,14 +20,14 @@ devserver: echo -n '' > /tmp/rqbit-log && cargo run -- \ --log-file /tmp/rqbit-log \ --log-file-rust-log=debug,librqbit=trace \ - server start /tmp/scratch/ + server start --fastresume /tmp/scratch/ @PHONY: devserver devserver-postgres: echo -n '' > /tmp/rqbit-log && cargo run -- \ --log-file /tmp/rqbit-log \ --log-file-rust-log=debug,librqbit=trace \ - server start --persistence-config postgres:///rqbit /tmp/scratch/ + server start --fastresume --persistence-config postgres:///rqbit /tmp/scratch/ @PHONY: clean clean: diff --git a/crates/librqbit/src/api.rs b/crates/librqbit/src/api.rs index 4bbfdd3..6003966 100644 --- a/crates/librqbit/src/api.rs +++ b/crates/librqbit/src/api.rs @@ -363,7 +363,7 @@ impl Api { pub fn api_dump_haves(&self, idx: TorrentIdOrHash) -> Result { let mgr = self.mgr_handle(idx)?; - Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?) + Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces().as_slice()))?) } pub fn api_stream(&self, idx: TorrentIdOrHash, file_id: usize) -> Result { diff --git a/crates/librqbit/src/bitv.rs b/crates/librqbit/src/bitv.rs new file mode 100644 index 0000000..f3c0129 --- /dev/null +++ b/crates/librqbit/src/bitv.rs @@ -0,0 +1,123 @@ +use std::fs::File; + +use anyhow::Context; +use bitvec::{ + boxed::BitBox, + order::Msb0, + slice::BitSlice, + vec::BitVec, + view::{AsBits, AsMutBits}, +}; + +pub trait BitV: Send + Sync { + fn as_slice(&self) -> &BitSlice; + fn as_slice_mut(&mut self) -> &mut BitSlice; + fn into_dyn(self) -> Box; + fn as_bytes(&self) -> &[u8]; + fn flush(&mut self) -> anyhow::Result<()>; +} + +pub type BoxBitV = Box; + +pub struct MmapBitV { + _file: File, + mmap: memmap2::MmapMut, +} + +impl MmapBitV { + pub fn new(file: File) -> anyhow::Result { + let mmap = + unsafe { memmap2::MmapOptions::new().map_mut(&file) }.context("error mmapping file")?; + Ok(Self { mmap, _file: file }) + } +} + +#[async_trait::async_trait] +impl BitV for BitVec { + fn as_slice(&self) -> &BitSlice { + self.as_bitslice() + } + + fn as_slice_mut(&mut self) -> &mut BitSlice { + self.as_mut_bitslice() + } + + fn as_bytes(&self) -> &[u8] { + self.as_raw_slice() + } + + fn flush(&mut self) -> anyhow::Result<()> { + Ok(()) + } + + fn into_dyn(self) -> Box { + Box::new(self) + } +} + +#[async_trait::async_trait] +impl BitV for BitBox { + fn as_slice(&self) -> &BitSlice { + self.as_bitslice() + } + + fn as_slice_mut(&mut self) -> &mut BitSlice { + self.as_mut_bitslice() + } + + fn as_bytes(&self) -> &[u8] { + self.as_raw_slice() + } + + fn flush(&mut self) -> anyhow::Result<()> { + Ok(()) + } + + fn into_dyn(self) -> Box { + Box::new(self) + } +} + +impl BitV for MmapBitV { + fn as_slice(&self) -> &BitSlice { + self.mmap.as_bits() + } + + fn as_slice_mut(&mut self) -> &mut BitSlice { + self.mmap.as_mut_bits() + } + + fn as_bytes(&self) -> &[u8] { + &self.mmap + } + + fn flush(&mut self) -> anyhow::Result<()> { + Ok(self.mmap.flush()?) + } + + fn into_dyn(self) -> Box { + Box::new(self) + } +} + +impl BitV for Box { + fn as_slice(&self) -> &BitSlice { + (**self).as_slice() + } + + fn as_slice_mut(&mut self) -> &mut BitSlice { + (**self).as_slice_mut() + } + + fn as_bytes(&self) -> &[u8] { + (**self).as_bytes() + } + + fn flush(&mut self) -> anyhow::Result<()> { + (**self).flush() + } + + fn into_dyn(self) -> Box { + self + } +} diff --git a/crates/librqbit/src/bitv_factory.rs b/crates/librqbit/src/bitv_factory.rs new file mode 100644 index 0000000..57f0791 --- /dev/null +++ b/crates/librqbit/src/bitv_factory.rs @@ -0,0 +1,28 @@ +use crate::{api::TorrentIdOrHash, bitv::BitV, type_aliases::BF}; + +#[async_trait::async_trait] +pub trait BitVFactory: Send + Sync { + async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result>>; + async fn store_initial_check( + &self, + id: TorrentIdOrHash, + b: BF, + ) -> anyhow::Result>; +} + +pub struct NonPersistentBitVFactory {} + +#[async_trait::async_trait] +impl BitVFactory for NonPersistentBitVFactory { + async fn load(&self, _: TorrentIdOrHash) -> anyhow::Result>> { + Ok(None) + } + + async fn store_initial_check( + &self, + _id: TorrentIdOrHash, + b: BF, + ) -> anyhow::Result> { + Ok(Box::new(b)) + } +} diff --git a/crates/librqbit/src/chunk_tracker.rs b/crates/librqbit/src/chunk_tracker.rs index d3a87cb..7c25458 100644 --- a/crates/librqbit/src/chunk_tracker.rs +++ b/crates/librqbit/src/chunk_tracker.rs @@ -6,8 +6,9 @@ use peer_binary_protocol::Piece; use tracing::{debug, trace}; use crate::{ + bitv::{BitV, BoxBitV}, file_info::FileInfo, - type_aliases::{FileInfos, FilePriorities, BF}, + type_aliases::{FileInfos, FilePriorities, BF, BS}, }; pub struct ChunkTracker { @@ -26,7 +27,7 @@ pub struct ChunkTracker { chunk_status: BF, // These are the pieces that we actually have, fully checked and downloaded. - have: BF, + have: BoxBitV, // The pieces that the user selected. This doesn't change unless update_only_files // was called. @@ -70,7 +71,7 @@ impl HaveNeededSelected { // Comput the have-status of chunks. // // Save as "have_pieces", but there's one bit per chunk (not per piece). -fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BF) -> anyhow::Result { +fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BS) -> anyhow::Result { if have_pieces.len() < lengths.total_pieces() as usize { anyhow::bail!( "bug: have_pieces.len() < lengths.total_pieces(); {} < {}", @@ -98,15 +99,19 @@ fn compute_chunk_have_status(lengths: &Lengths, have_pieces: &BF) -> anyhow::Res Ok(chunk_bf) } -fn compute_queued_pieces_unchecked(have_pieces: &BF, selected_pieces: &BF) -> BF { +fn compute_queued_pieces_unchecked(have_pieces: &BS, selected_pieces: &BS) -> BF { // it's needed ONLY if it's selected and we don't have it. use core::ops::BitAnd; use core::ops::Not; - have_pieces.clone().not().bitand(selected_pieces) + have_pieces + .to_bitvec() + .not() + .bitand(selected_pieces) + .into_boxed_bitslice() } -fn compute_queued_pieces(have_pieces: &BF, selected_pieces: &BF) -> anyhow::Result { +fn compute_queued_pieces(have_pieces: &BS, selected_pieces: &BS) -> anyhow::Result { if have_pieces.len() != selected_pieces.len() { anyhow::bail!( "have_pieces.len() != selected_pieces.len(), {} != {}", @@ -131,20 +136,20 @@ pub enum ChunkMarkingResult { impl ChunkTracker { pub fn new( // Have pieces are the ones we have already downloaded and verified. - have_pieces: BF, + have_pieces: BoxBitV, // Selected pieces are the ones the user has selected selected_pieces: BF, lengths: Lengths, file_infos: &FileInfos, ) -> anyhow::Result { - let needed_pieces = compute_queued_pieces(&have_pieces, &selected_pieces) + let needed_pieces = compute_queued_pieces(have_pieces.as_slice(), &selected_pieces) .context("error computing needed pieces")?; // TODO: ideally this needs to be a list based on needed files, e.g. // last needed piece for each file. But let's keep simple for now. let mut ct = Self { - chunk_status: compute_chunk_have_status(&lengths, &have_pieces) + chunk_status: compute_chunk_have_status(&lengths, have_pieces.as_slice()) .context("error computing chunk status")?, queue_pieces: needed_pieces, selected: selected_pieces, @@ -163,7 +168,7 @@ impl ChunkTracker { *slot = fi .piece_range .clone() - .filter(|p| self.have[*p as usize]) + .filter(|p| self.have.as_slice()[*p as usize]) .map(|id| { self.lengths .size_of_piece_in_file(id, fi.offset_in_torrent, fi.len) @@ -176,8 +181,12 @@ impl ChunkTracker { &self.lengths } - pub fn get_have_pieces(&self) -> &BF { - &self.have + pub fn get_have_pieces(&self) -> &dyn BitV { + &*self.have + } + + pub fn get_have_pieces_mut(&mut self) -> &mut dyn BitV { + &mut *self.have } pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) { @@ -193,7 +202,7 @@ impl ChunkTracker { for piece in self.lengths.iter_piece_infos() { let id = piece.piece_index.get() as usize; let len = piece.len as u64; - let is_have = self.have[id]; + let is_have = self.have.as_slice()[id]; let is_selected = self.selected[id]; let is_needed = is_selected && !is_have; hns.have_bytes += len * (is_have as u64); @@ -219,12 +228,13 @@ impl ChunkTracker { } pub(crate) fn is_piece_have(&self, id: ValidPieceIndex) -> bool { - self.have[id.get() as usize] + self.have.as_slice()[id.get() as usize] } pub fn mark_piece_broken_if_not_have(&mut self, index: ValidPieceIndex) { if self .have + .as_slice() .get(index.get() as usize) .map(|r| *r) .unwrap_or_default() @@ -240,8 +250,8 @@ impl ChunkTracker { pub fn mark_piece_downloaded(&mut self, idx: ValidPieceIndex) { let id = idx.get() as usize; - if !self.have[id] { - self.have.set(id, true); + if !self.have.as_slice()[id] { + self.have.as_slice_mut().set(id, true); let len = self.lengths.piece_length(idx) as u64; self.hns.have_bytes += len; if self.selected[id] { @@ -252,6 +262,7 @@ impl ChunkTracker { pub fn is_chunk_ready_to_upload(&self, chunk: &ChunkInfo) -> bool { self.have + .as_slice() .get(chunk.piece_index.get() as usize) .map(|b| *b) .unwrap_or(false) @@ -327,7 +338,8 @@ impl ChunkTracker { current_piece_remaining -= TryInto::::try_into(shift)?; if current_piece_remaining == 0 { - let current_piece_have = self.have[current_piece.piece_index.get() as usize]; + let current_piece_have = + self.have.as_slice()[current_piece.piece_index.get() as usize]; if current_piece_have { have_bytes += current_piece.len as u64; } @@ -380,6 +392,7 @@ impl ChunkTracker { pub fn is_file_finished(&self, file_info: &FileInfo) -> bool { self.have + .as_slice() .get(file_info.piece_range_usize()) .map(|r| r.all()) .unwrap_or(true) @@ -412,11 +425,10 @@ impl ChunkTracker { #[cfg(test)] mod tests { + use librqbit_core::{constants::CHUNK_SIZE, lengths::Lengths}; use std::collections::HashSet; - use librqbit_core::{constants::CHUNK_SIZE, lengths::Lengths}; - - use crate::{chunk_tracker::HaveNeededSelected, type_aliases::BF}; + use crate::{bitv::BitV, chunk_tracker::HaveNeededSelected, type_aliases::BF}; use super::{compute_chunk_have_status, ChunkTracker}; @@ -539,7 +551,7 @@ mod tests { // Initially, we need all files and all pieces. let mut ct = ChunkTracker::new( - initial_have.clone(), + initial_have.clone().into_dyn(), initial_selected.clone(), l, &Default::default(), @@ -556,7 +568,7 @@ mod tests { needed_bytes: total_len, } ); - assert_eq!(ct.have, initial_have); + assert_eq!(ct.have.as_slice(), initial_have.as_bitslice()); assert_eq!(ct.queue_pieces, initial_selected); // Select only the first file. diff --git a/crates/librqbit/src/file_ops.rs b/crates/librqbit/src/file_ops.rs index e8a200d..237b08f 100644 --- a/crates/librqbit/src/file_ops.rs +++ b/crates/librqbit/src/file_ops.rs @@ -19,29 +19,6 @@ use crate::{ type_aliases::{FileInfos, PeerHandle, BF}, }; -pub(crate) struct InitialCheckResults { - // A piece as flags based on these dimensions: - // - if the asked for it or not (only_files) - // - if we have it downloaded and verified - // - if we need to queue it for downloading - // this one depends if we queued it already or not. - - // The pieces we have downloaded. - pub have_pieces: BF, - // The pieces that the user selected to download. - pub selected_pieces: BF, - - // How many bytes we have. This can be MORE than "total_selected_bytes", - // if we downloaded some pieces, and later the "only_files" was changed. - pub have_bytes: u64, - // How many bytes we need to download. - pub needed_bytes: u64, - - // How many bytes are in selected pieces. - // If all selected, this must be equal to total torrent length. - pub selected_bytes: u64, -} - pub fn update_hash_from_file( file_id: usize, mut pos: u64, @@ -88,26 +65,16 @@ impl<'a> FileOps<'a> { } } - pub fn initial_check( - &self, - only_files: Option<&[usize]>, - progress: &AtomicU64, - ) -> anyhow::Result { - let mut needed_pieces = + // Returns the bitvector with pieces we have. + pub fn initial_check(&self, progress: &AtomicU64) -> anyhow::Result { + let mut have_pieces = BF::from_boxed_slice(vec![0u8; self.lengths.piece_bitfield_bytes()].into()); - let mut have_pieces = needed_pieces.clone(); - let mut selected_pieces = needed_pieces.clone(); - - let mut have_bytes = 0u64; - let mut needed_bytes = 0u64; - let mut total_selected_bytes = 0u64; let mut piece_files = Vec::::new(); #[derive(Debug)] struct CurrentFile<'a> { index: usize, fi: &'a FileInfo, - full_file_required: bool, processed_bytes: u64, is_broken: bool, } @@ -119,20 +86,16 @@ impl<'a> FileOps<'a> { self.processed_bytes += bytes } } - let mut file_iterator = self.file_infos.iter().enumerate().map(|(idx, fi)| { - let full_file_required = if let Some(only_files) = only_files { - only_files.contains(&idx) - } else { - true - }; - CurrentFile { + let mut file_iterator = self + .file_infos + .iter() + .enumerate() + .map(|(idx, fi)| CurrentFile { index: idx, fi, - full_file_required, processed_bytes: 0, is_broken: false, - } - }); + }); let mut current_file = file_iterator .next() @@ -145,7 +108,6 @@ impl<'a> FileOps<'a> { let mut computed_hash = Sha1::new(); let mut piece_remaining = piece_info.len as usize; let mut some_files_broken = false; - let mut piece_selected = current_file.full_file_required; progress.fetch_add(piece_info.len as u64, Ordering::Relaxed); while piece_remaining > 0 { @@ -158,8 +120,6 @@ impl<'a> FileOps<'a> { .next() .ok_or_else(|| anyhow::anyhow!("broken torrent metadata"))?; - piece_selected |= current_file.full_file_required; - to_read_in_file = std::cmp::min(current_file.remaining(), piece_remaining as u64) .try_into()?; @@ -193,18 +153,11 @@ impl<'a> FileOps<'a> { } } - if piece_selected { - total_selected_bytes += piece_info.len as u64; - selected_pieces.set(piece_info.piece_index.get() as usize, true); - } - - if piece_selected && some_files_broken { + if some_files_broken { trace!( "piece {} had errors, marking as needed", piece_info.piece_index ); - - needed_bytes += piece_info.len as u64; continue; } @@ -213,34 +166,11 @@ impl<'a> FileOps<'a> { .compare_hash(piece_info.piece_index.get(), computed_hash.finish()) .context("bug: either torrent info broken or we have a bug - piece index invalid")? { - trace!( - "piece {} is fine, not marking as needed", - piece_info.piece_index - ); - have_bytes += piece_info.len as u64; have_pieces.set(piece_info.piece_index.get() as usize, true); - } else if piece_selected { - trace!( - "piece {} hash does not match, marking as needed", - piece_info.piece_index - ); - needed_bytes += piece_info.len as u64; - needed_pieces.set(piece_info.piece_index.get() as usize, true); - } else { - trace!( - "piece {} hash does not match, but it is not required by any of the requested files, ignoring", - piece_info.piece_index - ); } } - Ok(InitialCheckResults { - have_pieces, - selected_pieces, - have_bytes, - needed_bytes, - selected_bytes: total_selected_bytes, - }) + Ok(have_pieces) } pub fn check_piece( diff --git a/crates/librqbit/src/lib.rs b/crates/librqbit/src/lib.rs index 16ea5be..b6028c7 100644 --- a/crates/librqbit/src/lib.rs +++ b/crates/librqbit/src/lib.rs @@ -40,6 +40,8 @@ macro_rules! aframe { pub mod api; mod api_error; +mod bitv; +mod bitv_factory; mod chunk_tracker; mod create_torrent_file; mod dht_utils; diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 07ee4c6..01ac3ef 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -10,13 +10,12 @@ use std::{ use crate::{ api::TorrentIdOrHash, + bitv_factory::{BitVFactory, NonPersistentBitVFactory}, dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult}, merge_streams::merge_streams, peer_connection::PeerConnectionOptions, read_buf::ReadBuf, - session_persistence::{ - json::JsonSessionPersistenceStore, BoxSessionPersistenceStore, SessionPersistenceStore, - }, + session_persistence::{json::JsonSessionPersistenceStore, SessionPersistenceStore}, spawn_utils::BlockingSpawner, storage::{ filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage, @@ -94,7 +93,8 @@ impl SessionDatabase { pub struct Session { peer_id: Id20, dht: Option, - persistence: Option>, + persistence: Option>, + bitv_factory: Arc, peer_opts: PeerConnectionOptions, spawner: BlockingSpawner, next_id: AtomicUsize, @@ -371,6 +371,9 @@ pub struct SessionOptions { /// librqbit instances at a time. pub dht_config: Option, + /// Enable fastresume, to restore state quickly after restart. + pub fastresume: bool, + /// Turn on to dump session contents into a file periodically, so that on next start /// all remembered torrents will continue where they left off. pub persistence: Option, @@ -506,7 +509,18 @@ impl Session { async fn persistence_factory( opts: &SessionOptions, - ) -> anyhow::Result> { + ) -> anyhow::Result<(Option>, Arc)> { + + macro_rules! make_result { + ($store:expr) => { + if opts.fastresume { + Ok((Some($store.clone()), $store)) + } else { + Ok((Some($store), Arc::new(NonPersistentBitVFactory {}))) + } + }; + } + match &opts.persistence { Some(SessionPersistenceConfig::Json { folder }) => { let folder = match folder.as_ref() { @@ -514,23 +528,25 @@ impl Session { None => SessionPersistenceConfig::default_json_persistence_folder()?, }; - Ok(Some(Box::new( + let s = Arc::new( JsonSessionPersistenceStore::new(folder) .await .context("error initializing JsonSessionPersistenceStore")?, - ))) + ); + + make_result!(s) }, #[cfg(feature = "postgres")] Some(SessionPersistenceConfig::Postgres { connection_string }) => { use crate::session_persistence::postgres::PostgresSessionStorage; - let p = PostgresSessionStorage::new(connection_string).await?; - Ok(Some(Box::new(p))) + let p = Arc::new(PostgresSessionStorage::new(connection_string).await?); + make_result!(p) } - None => Ok(None), + None => Ok((None, Arc::new(NonPersistentBitVFactory {}))), } } - let persistence = persistence_factory(&opts) + let (persistence, bitv_factory) = persistence_factory(&opts) .await .context("error initializing session persistence store")?; @@ -570,6 +586,7 @@ impl Session { let session = Arc::new(Self { persistence, + bitv_factory, peer_id, dht, peer_opts, @@ -1129,6 +1146,7 @@ impl Session { opts.paused, self.cancellation_token.child_token(), self.concurrent_initialize_semaphore.clone(), + self.bitv_factory.clone(), ) .context("error starting torrent")?; } @@ -1284,6 +1302,7 @@ impl Session { false, self.cancellation_token.child_token(), self.concurrent_initialize_semaphore.clone(), + self.bitv_factory.clone(), )?; self.try_update_persistence_metadata(handle).await; Ok(()) diff --git a/crates/librqbit/src/session_persistence/json.rs b/crates/librqbit/src/session_persistence/json.rs index c681a7e..d415375 100644 --- a/crates/librqbit/src/session_persistence/json.rs +++ b/crates/librqbit/src/session_persistence/json.rs @@ -1,8 +1,14 @@ use std::{any::TypeId, collections::HashMap, path::PathBuf}; use crate::{ - session::TorrentId, storage::filesystem::FilesystemStorageFactory, - torrent_state::ManagedTorrentHandle, ManagedTorrentState, + api::TorrentIdOrHash, + bitv::{BitV, MmapBitV}, + bitv_factory::BitVFactory, + session::TorrentId, + storage::filesystem::FilesystemStorageFactory, + torrent_state::ManagedTorrentHandle, + type_aliases::BF, + ManagedTorrentState, }; use anyhow::{bail, Context}; use async_trait::async_trait; @@ -65,6 +71,20 @@ impl JsonSessionPersistenceStore { }) } + async fn to_hash(&self, id: TorrentIdOrHash) -> anyhow::Result { + match id { + TorrentIdOrHash::Id(id) => self + .db_content + .read() + .await + .torrents + .get(&id) + .map(|v| *v.info_hash()) + .context("not found"), + TorrentIdOrHash::Hash(h) => Ok(h), + } + } + async fn flush(&self) -> anyhow::Result<()> { let tmp_filename = format!("{}.tmp", self.db_filename.to_str().unwrap()); let mut tmp = tokio::fs::OpenOptions::new() @@ -97,6 +117,10 @@ impl JsonSessionPersistenceStore { self.output_folder.join(format!("{:?}.torrent", info_hash)) } + fn bitv_filename(&self, info_hash: &Id20) -> PathBuf { + self.output_folder.join(format!("{:?}.bitv", info_hash)) + } + async fn update_db( &self, id: TorrentId, @@ -152,6 +176,58 @@ impl JsonSessionPersistenceStore { } } +#[async_trait::async_trait] +impl BitVFactory for JsonSessionPersistenceStore { + async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result>> { + let h = self.to_hash(id).await?; + let filename = self.bitv_filename(&h); + let f = match std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&filename) + { + Ok(f) => f, + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => return Ok(None), + _ => return Err(e).with_context(|| format!("error opening {filename:?}")), + }, + }; + Ok(Some(MmapBitV::new(f)?.into_dyn())) + } + + async fn store_initial_check( + &self, + id: TorrentIdOrHash, + b: BF, + ) -> anyhow::Result> { + let h = self.to_hash(id).await?; + let filename = self.bitv_filename(&h); + let tmp_filename = format!("{}.tmp", filename.to_str().context("bug")?); + let mut dst = tokio::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&tmp_filename) + .await + .with_context(|| format!("error opening {filename:?}"))?; + tokio::io::copy(&mut b.as_raw_slice(), &mut dst) + .await + .context("error writing bitslice to {filename:?}")?; + tokio::fs::rename(&tmp_filename, &filename) + .await + .with_context(|| format!("error renaming {tmp_filename:?} to {filename:?}"))?; + let f = std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&filename) + .with_context(|| format!("error opening {filename:?}"))?; + trace!(?filename, "stored initial check bitfield"); + Ok(MmapBitV::new(f) + .with_context(|| format!("error constructing MmapBitV from file {filename:?}"))? + .into_dyn()) + } +} + #[async_trait] impl SessionPersistenceStore for JsonSessionPersistenceStore { async fn next_id(&self) -> anyhow::Result { @@ -175,11 +251,15 @@ impl SessionPersistenceStore for JsonSessionPersistenceStore { if let Some(t) = removed { debug!(?id, "deleted from in-memory db, flushing"); self.flush().await?; - let tf = self.torrent_bytes_filename(&t.info_hash); - if let Err(e) = tokio::fs::remove_file(&tf).await { - warn!(error=?e, filename=?tf, "error removing torrent file"); - } else { - debug!(filename=?tf, "removed"); + for tf in [ + self.torrent_bytes_filename(&t.info_hash), + self.bitv_filename(&t.info_hash), + ] { + if let Err(e) = tokio::fs::remove_file(&tf).await { + warn!(error=?e, filename=?tf, "error removing"); + } else { + debug!(filename=?tf, "removed"); + } } } else { bail!("error deleting: didn't find torrent id={id}") diff --git a/crates/librqbit/src/session_persistence/mod.rs b/crates/librqbit/src/session_persistence/mod.rs index 3de287d..f1459f8 100644 --- a/crates/librqbit/src/session_persistence/mod.rs +++ b/crates/librqbit/src/session_persistence/mod.rs @@ -13,7 +13,8 @@ use librqbit_core::Id20; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::{ - session::TorrentId, torrent_state::ManagedTorrentHandle, AddTorrent, AddTorrentOptions, + bitv_factory::BitVFactory, session::TorrentId, torrent_state::ManagedTorrentHandle, AddTorrent, + AddTorrentOptions, }; #[derive(Serialize, Deserialize, Clone)] @@ -63,7 +64,7 @@ impl SerializedTorrent { // TODO: make this info_hash first, ID-second. #[async_trait] -pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync { +pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync + BitVFactory { async fn next_id(&self) -> anyhow::Result; async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()>; async fn delete(&self, id: TorrentId) -> anyhow::Result<()>; @@ -78,8 +79,6 @@ pub trait SessionPersistenceStore: core::fmt::Debug + Send + Sync { ) -> anyhow::Result>>; } -pub type BoxSessionPersistenceStore = Box; - fn serialize_info_hash(id: &Id20, serializer: S) -> Result where S: Serializer, diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index bd1d45b..98dd6f3 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -1,10 +1,14 @@ use std::path::PathBuf; -use crate::{session::TorrentId, torrent_state::ManagedTorrentHandle}; +use crate::{ + api::TorrentIdOrHash, bitv::BitV, bitv_factory::BitVFactory, session::TorrentId, + torrent_state::ManagedTorrentHandle, type_aliases::BF, +}; use anyhow::Context; use futures::{stream::BoxStream, StreamExt}; use librqbit_core::Id20; use sqlx::{Pool, Postgres}; +use tracing::error_span; use super::{SerializedTorrent, SessionPersistenceStore}; @@ -51,12 +55,20 @@ impl PostgresSessionStorage { .connect(connection_string) .await?; - sqlx::query("CREATE SEQUENCE IF NOT EXISTS torrents_id AS integer;") - .execute(&pool) - .await - .context("error executing CREATE SEQUENCE")?; + macro_rules! exec { + ($q:expr) => { + sqlx::query($q) + .execute(&pool) + .await + .context($q) + .context("error running query")?; + }; + } - let create_q = "CREATE TABLE IF NOT EXISTS torrents ( + exec!("CREATE SEQUENCE IF NOT EXISTS torrents_id AS integer;"); + + exec!( + "CREATE TABLE IF NOT EXISTS torrents ( id INTEGER PRIMARY KEY DEFAULT nextval('torrents_id'), info_hash BYTEA NOT NULL, torrent_bytes BYTEA NOT NULL, @@ -64,11 +76,10 @@ impl PostgresSessionStorage { output_folder TEXT NOT NULL, only_files INTEGER[], is_paused BOOLEAN NOT NULL - )"; - sqlx::query(create_q) - .execute(&pool) - .await - .context("error executing CREATE TABLE")?; + )" + ); + + exec!("ALTER TABLE torrents ADD COLUMN IF NOT EXISTS have_bitfield BYTEA"); Ok(Self { pool }) } @@ -167,3 +178,132 @@ impl SessionPersistenceStore for PostgresSessionStorage { Ok(futures::stream::iter(torrents).boxed()) } } + +struct PgBitfield { + torrent_id: TorrentIdOrHash, + inmem: BF, + pool: Pool, +} + +impl BitV for PgBitfield { + fn as_slice(&self) -> &bitvec::prelude::BitSlice { + self.inmem.as_bitslice() + } + + fn as_slice_mut(&mut self) -> &mut bitvec::prelude::BitSlice { + self.inmem.as_mut_bitslice() + } + + fn into_dyn(self) -> Box { + Box::new(self) + } + + fn as_bytes(&self) -> &[u8] { + self.inmem.as_raw_slice() + } + + fn flush(&mut self) -> anyhow::Result<()> { + // TODO: make flush async, and don't spawn this, to avoid allocations and capture the result. + crate::spawn_utils::spawn( + "pg", + error_span!("pg_update_bitfield", id=?self.torrent_id), + { + let hb = self.as_bytes().to_owned(); + let pool = self.pool.clone(); + let torrent_id = self.torrent_id; + + macro_rules! exec { + ($q:expr, $bf:expr, $id:expr) => { + sqlx::query($q) + .bind($bf) + .bind($id) + .execute(&pool) + .await + .context($q) + .context("error executing query") + }; + } + + async move { + match torrent_id { + TorrentIdOrHash::Id(id) => { + let id: i32 = id.try_into()?; + exec!( + "UPDATE torrents SET have_bitfield = $1 WHERE id = $2", + &hb, + id + )?; + } + TorrentIdOrHash::Hash(h) => { + exec!( + "UPDATE torrents SET have_bitfield = $1 WHERE info_hash = $2", + &hb, + &h.0[..] + )?; + } + }; + Ok(()) + } + }, + ); + Ok(()) + } +} + +#[async_trait::async_trait] +impl BitVFactory for PostgresSessionStorage { + async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result>> { + #[derive(sqlx::FromRow)] + struct HaveBitfield { + have_bitfield: Option>, + } + + macro_rules! exec { + ($q:expr, $v:expr) => { + sqlx::query_as($q) + .bind($v) + .fetch_one(&self.pool) + .await + .context($q) + .context("error executing query")? + }; + } + + let hb: HaveBitfield = match id { + TorrentIdOrHash::Id(id) => { + let id: i32 = id.try_into()?; + exec!("SELECT have_bitfield FROM torrents WHERE id = $1", id) + } + TorrentIdOrHash::Hash(h) => { + exec!( + "SELECT have_bitfield FROM torrents WHERE info_hash = $1", + &h.0[..] + ) + } + }; + + let hb = hb.have_bitfield; + Ok(hb.map(|b| { + PgBitfield { + torrent_id: id, + inmem: BF::from_boxed_slice(b.into_boxed_slice()), + pool: self.pool.clone(), + } + .into_dyn() + })) + } + + async fn store_initial_check( + &self, + id: TorrentIdOrHash, + b: BF, + ) -> anyhow::Result> { + let mut bf = PgBitfield { + torrent_id: id, + inmem: b, + pool: self.pool.clone(), + }; + bf.flush()?; + Ok(bf.into_dyn()) + } +} diff --git a/crates/librqbit/src/tests/e2e.rs b/crates/librqbit/src/tests/e2e.rs index 01496d0..e39d316 100644 --- a/crates/librqbit/src/tests/e2e.rs +++ b/crates/librqbit/src/tests/e2e.rs @@ -10,7 +10,7 @@ use tokio::{ spawn, time::{interval, timeout}, }; -use tracing::{error_span, info, Instrument}; +use tracing::{error, error_span, info, Instrument}; use crate::{ create_torrent, @@ -35,6 +35,10 @@ async fn test_e2e_download() { async fn _test_e2e_download() { let _ = tracing_subscriber::fmt::try_init(); + match crate::try_increase_nofile_limit() { + Ok(limit) => info!(limit, "increased ulimit"), + Err(e) => error!(error=?e, "error increasing ulimit"), + }; spawn_debug_server(); @@ -187,6 +191,7 @@ async fn _test_e2e_download() { persistence: Some(SessionPersistenceConfig::Json { folder: Some(session_persistence), }), + fastresume: true, listen_port_range: None, enable_upnp_port_forwarding: false, root_span: Some(error_span!("client")), diff --git a/crates/librqbit/src/tests/test_util.rs b/crates/librqbit/src/tests/test_util.rs index 4f4dd3b..ad1de10 100644 --- a/crates/librqbit/src/tests/test_util.rs +++ b/crates/librqbit/src/tests/test_util.rs @@ -5,7 +5,7 @@ use axum::{response::IntoResponse, routing::get, Router}; use librqbit_core::Id20; use rand::{thread_rng, Rng, RngCore, SeedableRng}; use tempfile::TempDir; -use tracing::info; +use tracing::{debug, info}; pub fn create_new_file_with_random_content(path: &Path, mut size: usize) { let mut file = std::fs::OpenOptions::new() @@ -14,7 +14,7 @@ pub fn create_new_file_with_random_content(path: &Path, mut size: usize) { .open(path) .unwrap(); - eprintln!("creating temp file {:?}", path); + debug!(?path, "creating temp file"); const BUF_SIZE: usize = 8192 * 16; let mut rng = rand::rngs::SmallRng::from_entropy(); @@ -32,7 +32,7 @@ pub fn create_default_random_dir_with_torrents( tempdir_prefix: Option<&str>, ) -> TempDir { let dir = TempDir::with_prefix(tempdir_prefix.unwrap_or("rqbit_test")).unwrap(); - dbg!(dir.path()); + info!(path=?dir.path(), "created tempdir"); for f in 0..num_files { create_new_file_with_random_content(&dir.path().join(&format!("{f}.data")), file_size); } diff --git a/crates/librqbit/src/torrent_state/initializing.rs b/crates/librqbit/src/torrent_state/initializing.rs index 953bfd8..5f7fdb8 100644 --- a/crates/librqbit/src/torrent_state/initializing.rs +++ b/crates/librqbit/src/torrent_state/initializing.rs @@ -5,10 +5,19 @@ use std::{ use anyhow::Context; +use librqbit_core::lengths::Lengths; use size_format::SizeFormatterBinary as SF; use tracing::{debug, info, warn}; -use crate::{chunk_tracker::ChunkTracker, file_ops::FileOps, type_aliases::FileStorage}; +use crate::{ + api::TorrentIdOrHash, + bitv::BitV, + bitv_factory::BitVFactory, + chunk_tracker::ChunkTracker, + file_ops::FileOps, + type_aliases::{FileStorage, BF}, + FileInfos, +}; use super::{paused::TorrentStatePaused, ManagedTorrentInfo}; @@ -19,6 +28,24 @@ pub struct TorrentStateInitializing { pub(crate) checked_bytes: AtomicU64, } +fn compute_selected_pieces( + lengths: &Lengths, + only_files: Option<&[usize]>, + file_infos: &FileInfos, +) -> BF { + let mut bf = BF::from_boxed_slice(vec![0u8; lengths.piece_bitfield_bytes()].into_boxed_slice()); + for (_, fi) in file_infos + .iter() + .enumerate() + .filter(|(id, _)| only_files.map(|of| of.contains(id)).unwrap_or(false)) + { + if let Some(r) = bf.get_mut(fi.piece_range_usize()) { + r.fill(true); + } + } + bf +} + impl TorrentStateInitializing { pub fn new( meta: Arc, @@ -38,23 +65,68 @@ impl TorrentStateInitializing { .load(std::sync::atomic::Ordering::Relaxed) } - pub async fn check(&self) -> anyhow::Result { - info!("Doing initial checksum validation, this might take a while..."); - let initial_check_results = self.meta.spawner.spawn_block_in_place(|| { - FileOps::new( - &self.meta.info, - &self.files, - &self.meta.file_infos, - &self.meta.lengths, - ) - .initial_check(self.only_files.as_deref(), &self.checked_bytes) - })?; + pub async fn check( + &self, + bitv_factory: Arc, + ) -> anyhow::Result { + let id: TorrentIdOrHash = self.meta.info_hash.into(); + let mut have_pieces = bitv_factory + .load(id) + .await + .context("error loading have_pieces")?; + if let Some(hp) = have_pieces.as_ref() { + let actual = hp.as_bytes().len(); + let expected = self.meta.lengths.piece_bitfield_bytes(); + if actual != expected { + warn!( + actual, + expected, + "the bitfield loaded isn't of correct length, ignoring it, will do full check" + ); + have_pieces = None; + } + } + let have_pieces = match have_pieces { + Some(h) => h, + None => { + info!("Doing initial checksum validation, this might take a while..."); + let have_pieces = self.meta.spawner.spawn_block_in_place(|| { + FileOps::new( + &self.meta.info, + &self.files, + &self.meta.file_infos, + &self.meta.lengths, + ) + .initial_check(&self.checked_bytes) + })?; + bitv_factory + .store_initial_check(id, have_pieces) + .await + .context("error storing initial check bitfield")? + } + }; + + let selected_pieces = compute_selected_pieces( + &self.meta.lengths, + self.only_files.as_deref(), + &self.meta.file_infos, + ); + + let chunk_tracker = ChunkTracker::new( + have_pieces.into_dyn(), + selected_pieces, + self.meta.lengths, + &self.meta.file_infos, + ) + .context("error creating chunk tracker")?; + + let hns = chunk_tracker.get_hns(); info!( "Initial check results: have {}, needed {}, total selected {}", - SF::new(initial_check_results.have_bytes), - SF::new(initial_check_results.needed_bytes), - SF::new(initial_check_results.selected_bytes) + SF::new(hns.have_bytes), + SF::new(hns.needed_bytes), + SF::new(hns.selected_bytes) ); // Ensure file lenghts are correct, and reopen read-only. @@ -85,14 +157,6 @@ impl TorrentStateInitializing { Ok::<_, anyhow::Error>(()) })?; - let chunk_tracker = ChunkTracker::new( - initial_check_results.have_pieces, - initial_check_results.selected_pieces, - self.meta.lengths, - &self.meta.file_infos, - ) - .context("error creating chunk tracker")?; - let paused = TorrentStatePaused { info: self.meta.clone(), files: self.files.take()?, diff --git a/crates/librqbit/src/torrent_state/live/mod.rs b/crates/librqbit/src/torrent_state/live/mod.rs index 7d913fe..a537fd0 100644 --- a/crates/librqbit/src/torrent_state/live/mod.rs +++ b/crates/librqbit/src/torrent_state/live/mod.rs @@ -131,6 +131,8 @@ pub(crate) struct TorrentStateLocked { // If this is None, then it was already used fatal_errors_tx: Option>, + + unflushed_bitv_bytes: u64, } impl TorrentStateLocked { @@ -145,6 +147,23 @@ impl TorrentStateLocked { .as_mut() .context("chunk tracker empty, torrent was paused") } + + fn try_flush_bitv(&mut self) { + if self.unflushed_bitv_bytes == 0 { + return; + } + trace!("trying to flush bitfield"); + if let Some(Err(e)) = self + .chunks + .as_mut() + .map(|ct| ct.get_have_pieces_mut().flush()) + { + warn!(error=?e, "error flushing bitfield"); + } else { + trace!("flushed bitfield"); + self.unflushed_bitv_bytes = 0; + } + } } #[derive(Default)] @@ -155,6 +174,8 @@ pub struct TorrentStateOptions { pub peer_read_write_timeout: Option, } +const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024; + pub struct TorrentStateLive { peers: PeerStates, meta: Arc, @@ -223,6 +244,7 @@ impl TorrentStateLive { inflight_pieces: Default::default(), file_priorities, fatal_errors_tx: Some(fatal_errors_tx), + unflushed_bitv_bytes: 0, }), files: paused.files, stats: AtomicStats { @@ -684,6 +706,7 @@ impl TorrentStateLive { fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> { let mut g = self.lock_write("on_piece_completed"); + let g = &mut **g; let chunks = g.get_chunks_mut()?; // if we have all the pieces of the file, reopen it read only @@ -701,13 +724,20 @@ impl TorrentStateLive { self.streams .wake_streams_on_piece_completed(id, &self.meta.lengths); + g.unflushed_bitv_bytes += self.meta.lengths.piece_length(id) as u64; + if g.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES { + g.try_flush_bitv() + } + + let chunks = g.get_chunks()?; if chunks.is_finished() { if chunks.get_selected_pieces()[id.get_usize()] { + g.try_flush_bitv(); info!("torrent finished downloading"); } self.finished_notify.notify_waiters(); - if !self.has_active_streams_unfinished_files(&g) { + if !self.has_active_streams_unfinished_files(g) { // There is not poing being connected to peers that have all the torrent, when // we don't need anything from them, and they don't need anything from us. self.disconnect_all_peers_that_have_full_torrent(); @@ -835,7 +865,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler { fn serialize_bitfield_message_to_buf(&self, buf: &mut Vec) -> anyhow::Result { let g = self.state.lock_read("serialize_bitfield_message_to_buf"); - let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_raw_slice())); + let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes())); let len = msg.serialize(buf, &|| None)?; trace!("sending: {:?}, length={}", &msg, len); Ok(len) diff --git a/crates/librqbit/src/torrent_state/mod.rs b/crates/librqbit/src/torrent_state/mod.rs index 68f4d04..2002924 100644 --- a/crates/librqbit/src/torrent_state/mod.rs +++ b/crates/librqbit/src/torrent_state/mod.rs @@ -34,6 +34,7 @@ use tracing::debug; use tracing::error_span; use tracing::warn; +use crate::bitv_factory::BitVFactory; use crate::chunk_tracker::ChunkTracker; use crate::file_info::FileInfo; use crate::session::TorrentId; @@ -209,6 +210,7 @@ impl ManagedTorrent { start_paused: bool, live_cancellation_token: CancellationToken, init_semaphore: Arc, + bitv_factory: Arc, ) -> anyhow::Result<()> { let mut g = self.locked.write(); @@ -301,7 +303,7 @@ impl ManagedTorrent { .await .context("bug: concurrent init semaphore was closed")?; - match init.check().await { + match init.check(bitv_factory).await { Ok(paused) => { let mut g = t.locked.write(); if let ManagedTorrentState::Initializing(_) = &g.state { @@ -368,6 +370,7 @@ impl ManagedTorrent { start_paused, live_cancellation_token, init_semaphore, + bitv_factory, ) } ManagedTorrentState::None => bail!("bug: torrent is in empty state"), diff --git a/crates/librqbit/src/torrent_state/streaming.rs b/crates/librqbit/src/torrent_state/streaming.rs index 19009eb..cd711a2 100644 --- a/crates/librqbit/src/torrent_state/streaming.rs +++ b/crates/librqbit/src/torrent_state/streaming.rs @@ -187,7 +187,7 @@ impl AsyncRead for FileStream { // if the piece is not there, register to wake when it is // check if we have the piece for real let have = poll_try_io!(self.torrent.with_chunk_tracker(|ct| { - let have = ct.get_have_pieces()[current.id.get() as usize]; + let have = ct.get_have_pieces().as_slice()[current.id.get() as usize]; if !have { self.streams .register_waker(self.stream_id, cx.waker().clone()); diff --git a/crates/librqbit/src/type_aliases.rs b/crates/librqbit/src/type_aliases.rs index be5741e..337ce8c 100644 --- a/crates/librqbit/src/type_aliases.rs +++ b/crates/librqbit/src/type_aliases.rs @@ -4,6 +4,9 @@ use futures::stream::BoxStream; use crate::{file_info::FileInfo, storage::TorrentStorage}; +// NOTE: Msb0 is used because that's what bittorrent protocol uses for bitfield. +// Don't change to Lsb0 even though it might be a bit faster (in theory) on LE architectures. +pub type BS = bitvec::slice::BitSlice; pub type BF = bitvec::boxed::BitBox; pub type PeerHandle = SocketAddr; diff --git a/crates/peer_binary_protocol/src/lib.rs b/crates/peer_binary_protocol/src/lib.rs index a6b2e7a..bd2d1a8 100644 --- a/crates/peer_binary_protocol/src/lib.rs +++ b/crates/peer_binary_protocol/src/lib.rs @@ -199,8 +199,8 @@ pub enum Message { pub type MessageBorrowed<'a> = Message>; pub type MessageOwned = Message; -pub type BitfieldBorrowed<'a> = &'a bitvec::slice::BitSlice; -pub type BitfieldOwned = bitvec::vec::BitVec; +pub type BitfieldBorrowed<'a> = &'a bitvec::slice::BitSlice; +pub type BitfieldOwned = bitvec::vec::BitVec; pub struct Bitfield<'a> { pub data: BitfieldBorrowed<'a>, diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index 02fcb90..90d50f3 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -143,6 +143,10 @@ struct ServerStartOptions { /// The folder to store session data in. By default uses OS specific folder. #[arg(long = "persistence-config")] persistence_config: Option, + + /// [Experimental] if set, will try to resume quickly after restart and skip checksumming. + #[arg(long = "fastresume")] + fastresume: bool, } #[derive(Parser)] @@ -341,6 +345,7 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { socks_proxy_url: socks_url, concurrent_init_limit: Some(opts.concurrent_init_limit), root_span: None, + fastresume: false, }; let stats_printer = |session: Arc| async move { @@ -421,6 +426,8 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { } } + sopts.fastresume = start_opts.fastresume; + let session = Session::new_with_opts(PathBuf::from(&start_opts.output_folder), sopts) .await