Clear have_bitfield on error
This commit is contained in:
parent
b7ed850918
commit
c697809e50
8 changed files with 88 additions and 32 deletions
|
|
@ -3,6 +3,7 @@ use crate::{api::TorrentIdOrHash, bitv::BitV, type_aliases::BF};
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait BitVFactory: Send + Sync {
|
pub trait BitVFactory: Send + Sync {
|
||||||
async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result<Option<Box<dyn BitV>>>;
|
async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result<Option<Box<dyn BitV>>>;
|
||||||
|
async fn clear(&self, id: TorrentIdOrHash) -> anyhow::Result<()>;
|
||||||
async fn store_initial_check(
|
async fn store_initial_check(
|
||||||
&self,
|
&self,
|
||||||
id: TorrentIdOrHash,
|
id: TorrentIdOrHash,
|
||||||
|
|
@ -18,6 +19,10 @@ impl BitVFactory for NonPersistentBitVFactory {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn clear(&self, _id: TorrentIdOrHash) -> anyhow::Result<()> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn store_initial_check(
|
async fn store_initial_check(
|
||||||
&self,
|
&self,
|
||||||
_id: TorrentIdOrHash,
|
_id: TorrentIdOrHash,
|
||||||
|
|
|
||||||
|
|
@ -1148,6 +1148,7 @@ impl Session {
|
||||||
minfo.clone(),
|
minfo.clone(),
|
||||||
only_files.clone(),
|
only_files.clone(),
|
||||||
minfo.storage_factory.create_and_init(&minfo)?,
|
minfo.storage_factory.create_and_init(&minfo)?,
|
||||||
|
false,
|
||||||
));
|
));
|
||||||
let handle = Arc::new(ManagedTorrent {
|
let handle = Arc::new(ManagedTorrent {
|
||||||
locked: RwLock::new(ManagedTorrentLocked {
|
locked: RwLock::new(ManagedTorrentLocked {
|
||||||
|
|
|
||||||
|
|
@ -196,6 +196,14 @@ impl BitVFactory for JsonSessionPersistenceStore {
|
||||||
Ok(Some(MmapBitV::new(f)?.into_dyn()))
|
Ok(Some(MmapBitV::new(f)?.into_dyn()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn clear(&self, id: TorrentIdOrHash) -> anyhow::Result<()> {
|
||||||
|
let h = self.to_hash(id).await?;
|
||||||
|
let filename = self.bitv_filename(&h);
|
||||||
|
tokio::fs::remove_file(&filename)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("error removing {filename:?}"))
|
||||||
|
}
|
||||||
|
|
||||||
async fn store_initial_check(
|
async fn store_initial_check(
|
||||||
&self,
|
&self,
|
||||||
id: TorrentIdOrHash,
|
id: TorrentIdOrHash,
|
||||||
|
|
|
||||||
|
|
@ -313,4 +313,31 @@ impl BitVFactory for PostgresSessionStorage {
|
||||||
bf.flush()?;
|
bf.flush()?;
|
||||||
Ok(bf.into_dyn())
|
Ok(bf.into_dyn())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn clear(&self, id: TorrentIdOrHash) -> anyhow::Result<()> {
|
||||||
|
macro_rules! exec {
|
||||||
|
($q:expr, $v:expr) => {
|
||||||
|
sqlx::query($q)
|
||||||
|
.bind($v)
|
||||||
|
.execute(&self.pool)
|
||||||
|
.await
|
||||||
|
.context($q)
|
||||||
|
.context("error executing query")?
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
match id {
|
||||||
|
TorrentIdOrHash::Id(id) => {
|
||||||
|
let id: i32 = id.try_into()?;
|
||||||
|
exec!("UPDATE torrents SET have_bitfield = NULL WHERE id = $1", id);
|
||||||
|
}
|
||||||
|
TorrentIdOrHash::Hash(h) => {
|
||||||
|
exec!(
|
||||||
|
"UPDATE torrents SET have_bitfield = NULL WHERE info_hash = $1",
|
||||||
|
&h.0[..]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,6 @@ use tracing::{debug, info, warn};
|
||||||
use crate::{
|
use crate::{
|
||||||
api::TorrentIdOrHash,
|
api::TorrentIdOrHash,
|
||||||
bitv::BitV,
|
bitv::BitV,
|
||||||
bitv_factory::BitVFactory,
|
|
||||||
chunk_tracker::ChunkTracker,
|
chunk_tracker::ChunkTracker,
|
||||||
file_ops::FileOps,
|
file_ops::FileOps,
|
||||||
type_aliases::{FileStorage, BF},
|
type_aliases::{FileStorage, BF},
|
||||||
|
|
@ -23,9 +22,10 @@ use super::{paused::TorrentStatePaused, ManagedTorrentShared};
|
||||||
|
|
||||||
pub struct TorrentStateInitializing {
|
pub struct TorrentStateInitializing {
|
||||||
pub(crate) files: FileStorage,
|
pub(crate) files: FileStorage,
|
||||||
pub(crate) meta: Arc<ManagedTorrentShared>,
|
pub(crate) shared: Arc<ManagedTorrentShared>,
|
||||||
pub(crate) only_files: Option<Vec<usize>>,
|
pub(crate) only_files: Option<Vec<usize>>,
|
||||||
pub(crate) checked_bytes: AtomicU64,
|
pub(crate) checked_bytes: AtomicU64,
|
||||||
|
previously_errored: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn compute_selected_pieces(
|
fn compute_selected_pieces(
|
||||||
|
|
@ -51,12 +51,14 @@ impl TorrentStateInitializing {
|
||||||
meta: Arc<ManagedTorrentShared>,
|
meta: Arc<ManagedTorrentShared>,
|
||||||
only_files: Option<Vec<usize>>,
|
only_files: Option<Vec<usize>>,
|
||||||
files: FileStorage,
|
files: FileStorage,
|
||||||
|
previously_errored: bool,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
meta,
|
shared: meta,
|
||||||
only_files,
|
only_files,
|
||||||
files,
|
files,
|
||||||
checked_bytes: AtomicU64::new(0),
|
checked_bytes: AtomicU64::new(0),
|
||||||
|
previously_errored,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -65,18 +67,30 @@ impl TorrentStateInitializing {
|
||||||
.load(std::sync::atomic::Ordering::Relaxed)
|
.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn check(
|
pub async fn check(&self) -> anyhow::Result<TorrentStatePaused> {
|
||||||
&self,
|
let id: TorrentIdOrHash = self.shared.info_hash.into();
|
||||||
bitv_factory: Arc<dyn BitVFactory>,
|
let bitv_factory = self
|
||||||
) -> anyhow::Result<TorrentStatePaused> {
|
.shared
|
||||||
let id: TorrentIdOrHash = self.meta.info_hash.into();
|
.session
|
||||||
let mut have_pieces = bitv_factory
|
.upgrade()
|
||||||
.load(id)
|
.context("session is dead")?
|
||||||
.await
|
.bitv_factory
|
||||||
.context("error loading have_pieces")?;
|
.clone();
|
||||||
|
let mut have_pieces = if self.previously_errored {
|
||||||
|
if let Err(e) = bitv_factory.clear(id).await {
|
||||||
|
warn!(error=?e, "error clearing bitfield");
|
||||||
|
}
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
bitv_factory
|
||||||
|
.load(id)
|
||||||
|
.await
|
||||||
|
.context("error loading have_pieces")?
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(hp) = have_pieces.as_ref() {
|
if let Some(hp) = have_pieces.as_ref() {
|
||||||
let actual = hp.as_bytes().len();
|
let actual = hp.as_bytes().len();
|
||||||
let expected = self.meta.lengths.piece_bitfield_bytes();
|
let expected = self.shared.lengths.piece_bitfield_bytes();
|
||||||
if actual != expected {
|
if actual != expected {
|
||||||
warn!(
|
warn!(
|
||||||
actual,
|
actual,
|
||||||
|
|
@ -90,12 +104,12 @@ impl TorrentStateInitializing {
|
||||||
Some(h) => h,
|
Some(h) => h,
|
||||||
None => {
|
None => {
|
||||||
info!("Doing initial checksum validation, this might take a while...");
|
info!("Doing initial checksum validation, this might take a while...");
|
||||||
let have_pieces = self.meta.spawner.spawn_block_in_place(|| {
|
let have_pieces = self.shared.spawner.spawn_block_in_place(|| {
|
||||||
FileOps::new(
|
FileOps::new(
|
||||||
&self.meta.info,
|
&self.shared.info,
|
||||||
&self.files,
|
&self.files,
|
||||||
&self.meta.file_infos,
|
&self.shared.file_infos,
|
||||||
&self.meta.lengths,
|
&self.shared.lengths,
|
||||||
)
|
)
|
||||||
.initial_check(&self.checked_bytes)
|
.initial_check(&self.checked_bytes)
|
||||||
})?;
|
})?;
|
||||||
|
|
@ -107,16 +121,16 @@ impl TorrentStateInitializing {
|
||||||
};
|
};
|
||||||
|
|
||||||
let selected_pieces = compute_selected_pieces(
|
let selected_pieces = compute_selected_pieces(
|
||||||
&self.meta.lengths,
|
&self.shared.lengths,
|
||||||
self.only_files.as_deref(),
|
self.only_files.as_deref(),
|
||||||
&self.meta.file_infos,
|
&self.shared.file_infos,
|
||||||
);
|
);
|
||||||
|
|
||||||
let chunk_tracker = ChunkTracker::new(
|
let chunk_tracker = ChunkTracker::new(
|
||||||
have_pieces.into_dyn(),
|
have_pieces.into_dyn(),
|
||||||
selected_pieces,
|
selected_pieces,
|
||||||
self.meta.lengths,
|
self.shared.lengths,
|
||||||
&self.meta.file_infos,
|
&self.shared.file_infos,
|
||||||
)
|
)
|
||||||
.context("error creating chunk tracker")?;
|
.context("error creating chunk tracker")?;
|
||||||
|
|
||||||
|
|
@ -130,8 +144,8 @@ impl TorrentStateInitializing {
|
||||||
);
|
);
|
||||||
|
|
||||||
// Ensure file lenghts are correct, and reopen read-only.
|
// Ensure file lenghts are correct, and reopen read-only.
|
||||||
self.meta.spawner.spawn_block_in_place(|| {
|
self.shared.spawner.spawn_block_in_place(|| {
|
||||||
for (idx, fi) in self.meta.file_infos.iter().enumerate() {
|
for (idx, fi) in self.shared.file_infos.iter().enumerate() {
|
||||||
if self
|
if self
|
||||||
.only_files
|
.only_files
|
||||||
.as_ref()
|
.as_ref()
|
||||||
|
|
@ -158,7 +172,7 @@ impl TorrentStateInitializing {
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let paused = TorrentStatePaused {
|
let paused = TorrentStatePaused {
|
||||||
info: self.meta.clone(),
|
shared: self.shared.clone(),
|
||||||
files: self.files.take()?,
|
files: self.files.take()?,
|
||||||
chunk_tracker,
|
chunk_tracker,
|
||||||
streams: Arc::new(Default::default()),
|
streams: Arc::new(Default::default()),
|
||||||
|
|
|
||||||
|
|
@ -217,7 +217,7 @@ impl TorrentStateLive {
|
||||||
) -> anyhow::Result<Arc<Self>> {
|
) -> anyhow::Result<Arc<Self>> {
|
||||||
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
|
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
|
||||||
let session = paused
|
let session = paused
|
||||||
.info
|
.shared
|
||||||
.session
|
.session
|
||||||
.upgrade()
|
.upgrade()
|
||||||
.context("session is dead, cannot start torrent")?;
|
.context("session is dead, cannot start torrent")?;
|
||||||
|
|
@ -230,11 +230,11 @@ impl TorrentStateLive {
|
||||||
|
|
||||||
// TODO: make it configurable
|
// TODO: make it configurable
|
||||||
let file_priorities = {
|
let file_priorities = {
|
||||||
let mut pri = (0..paused.info.file_infos.len()).collect::<Vec<usize>>();
|
let mut pri = (0..paused.shared.file_infos.len()).collect::<Vec<usize>>();
|
||||||
// sort by filename, cause many torrents have random sort order.
|
// sort by filename, cause many torrents have random sort order.
|
||||||
pri.sort_unstable_by_key(|id| {
|
pri.sort_unstable_by_key(|id| {
|
||||||
paused
|
paused
|
||||||
.info
|
.shared
|
||||||
.file_infos
|
.file_infos
|
||||||
.get(*id)
|
.get(*id)
|
||||||
.map(|fi| fi.relative_filename.as_path())
|
.map(|fi| fi.relative_filename.as_path())
|
||||||
|
|
@ -245,7 +245,7 @@ impl TorrentStateLive {
|
||||||
let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128);
|
let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128);
|
||||||
|
|
||||||
let state = Arc::new(TorrentStateLive {
|
let state = Arc::new(TorrentStateLive {
|
||||||
torrent: paused.info.clone(),
|
torrent: paused.shared.clone(),
|
||||||
peers: PeerStates {
|
peers: PeerStates {
|
||||||
session_stats: session_stats.clone(),
|
session_stats: session_stats.clone(),
|
||||||
stats: Default::default(),
|
stats: Default::default(),
|
||||||
|
|
@ -668,7 +668,7 @@ impl TorrentStateLive {
|
||||||
|
|
||||||
// g.chunks;
|
// g.chunks;
|
||||||
Ok(TorrentStatePaused {
|
Ok(TorrentStatePaused {
|
||||||
info: self.torrent.clone(),
|
shared: self.torrent.clone(),
|
||||||
files: self.files.take()?,
|
files: self.files.take()?,
|
||||||
chunk_tracker,
|
chunk_tracker,
|
||||||
streams: self.streams.clone(),
|
streams: self.streams.clone(),
|
||||||
|
|
|
||||||
|
|
@ -311,7 +311,7 @@ impl ManagedTorrent {
|
||||||
.await
|
.await
|
||||||
.context("bug: concurrent init semaphore was closed")?;
|
.context("bug: concurrent init semaphore was closed")?;
|
||||||
|
|
||||||
match init.check(session.bitv_factory.clone()).await {
|
match init.check().await {
|
||||||
Ok(paused) => {
|
Ok(paused) => {
|
||||||
let mut g = t.locked.write();
|
let mut g = t.locked.write();
|
||||||
if let ManagedTorrentState::Initializing(_) = &g.state {
|
if let ManagedTorrentState::Initializing(_) = &g.state {
|
||||||
|
|
@ -365,6 +365,7 @@ impl ManagedTorrent {
|
||||||
self.shared.clone(),
|
self.shared.clone(),
|
||||||
g.only_files.clone(),
|
g.only_files.clone(),
|
||||||
self.shared.storage_factory.create_and_init(self.shared())?,
|
self.shared.storage_factory.create_and_init(self.shared())?,
|
||||||
|
true,
|
||||||
));
|
));
|
||||||
g.state = ManagedTorrentState::Initializing(initializing.clone());
|
g.state = ManagedTorrentState::Initializing(initializing.clone());
|
||||||
drop(g);
|
drop(g);
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ use crate::{
|
||||||
use super::{streaming::TorrentStreams, ManagedTorrentShared};
|
use super::{streaming::TorrentStreams, ManagedTorrentShared};
|
||||||
|
|
||||||
pub struct TorrentStatePaused {
|
pub struct TorrentStatePaused {
|
||||||
pub(crate) info: Arc<ManagedTorrentShared>,
|
pub(crate) shared: Arc<ManagedTorrentShared>,
|
||||||
pub(crate) files: FileStorage,
|
pub(crate) files: FileStorage,
|
||||||
pub(crate) chunk_tracker: ChunkTracker,
|
pub(crate) chunk_tracker: ChunkTracker,
|
||||||
pub(crate) streams: Arc<TorrentStreams>,
|
pub(crate) streams: Arc<TorrentStreams>,
|
||||||
|
|
@ -17,7 +17,7 @@ pub struct TorrentStatePaused {
|
||||||
impl TorrentStatePaused {
|
impl TorrentStatePaused {
|
||||||
pub(crate) fn update_only_files(&mut self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
|
pub(crate) fn update_only_files(&mut self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
|
||||||
self.chunk_tracker
|
self.chunk_tracker
|
||||||
.update_only_files(self.info.info.iter_file_lengths()?, only_files)?;
|
.update_only_files(self.shared.info.iter_file_lengths()?, only_files)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue