Force set file length on stream

This commit is contained in:
Igor Katson 2024-04-29 19:28:05 +01:00
parent 0ebd4aa128
commit 4a73739871
3 changed files with 23 additions and 13 deletions

View file

@ -4,7 +4,7 @@
- [x] use some concurrent hashmap e.g. flurry or dashmap - [x] use some concurrent hashmap e.g. flurry or dashmap
- [x] tracing instead of logging. Debugging peers: RUST_LOG=[{peer=.*}]=debug - [x] tracing instead of logging. Debugging peers: RUST_LOG=[{peer=.*}]=debug
test-log for tests test-log for tests
- [x] reopen read only is bugged - [x] (reopen) read only is bugged
- [x] initializing/checking - [x] initializing/checking
- [x] blocks the whole process. Need to break it up. On slower devices (rpi) just hangs for a good while - [x] blocks the whole process. Need to break it up. On slower devices (rpi) just hangs for a good while
- [x] checking torrents should be visible right away - [x] checking torrents should be visible right away
@ -98,3 +98,4 @@ Other:
- [ ] keepalive is useless, the tieout is 120s, and read timeout is 10s. Need to send keepalive only if nothing was done recently. - [ ] keepalive is useless, the tieout is 120s, and read timeout is 10s. Need to send keepalive only if nothing was done recently.
- [x] url should have the filename - [x] url should have the filename
- [ ] reopening files: get rid of it!!! Even on Windows it should be alright - no need to reopen them.

View file

@ -1,5 +1,5 @@
use std::{ use std::{
fs::{File, OpenOptions}, fs::OpenOptions,
sync::{atomic::AtomicU64, Arc}, sync::{atomic::AtomicU64, Arc},
time::Instant, time::Instant,
}; };

View file

@ -14,7 +14,7 @@ use librqbit_core::lengths::{Lengths, ValidPieceIndex};
use tokio::io::{AsyncRead, AsyncSeek}; use tokio::io::{AsyncRead, AsyncSeek};
use tracing::{debug, trace}; use tracing::{debug, trace};
use crate::{opened_file::OpenedFile, ManagedTorrent}; use crate::{opened_file::OpenedFile, type_aliases::OpenedFiles, ManagedTorrent};
use super::ManagedTorrentHandle; use super::ManagedTorrentHandle;
@ -292,9 +292,9 @@ impl Drop for FileStream {
} }
impl ManagedTorrent { impl ManagedTorrent {
fn with_opened_file<F, R>(&self, file_id: usize, f: F) -> anyhow::Result<R> fn with_opened_files<F, R>(&self, f: F) -> anyhow::Result<R>
where where
F: FnOnce(&OpenedFile) -> R, F: FnOnce(&OpenedFiles) -> R,
{ {
self.with_state(|s| { self.with_state(|s| {
let files = match s { let files = match s {
@ -302,11 +302,20 @@ impl ManagedTorrent {
crate::ManagedTorrentState::Live(l) => &l.files, crate::ManagedTorrentState::Live(l) => &l.files,
s => anyhow::bail!("with_opened_file: invalid state {}", s.name()), s => anyhow::bail!("with_opened_file: invalid state {}", s.name()),
}; };
let fd = files.get(file_id).context("invalid file id")?; Ok(f(files))
Ok(f(fd))
}) })
} }
fn with_opened_file<F, R>(&self, file_id: usize, f: F) -> anyhow::Result<R>
where
F: FnOnce(&OpenedFile) -> R,
{
self.with_opened_files(|opened_files| {
let fd = opened_files.get(file_id).context("invalid file id")?;
Ok(f(fd))
})?
}
fn streams(&self) -> anyhow::Result<Arc<TorrentStreams>> { fn streams(&self) -> anyhow::Result<Arc<TorrentStreams>> {
self.with_state(|s| match s { self.with_state(|s| match s {
crate::ManagedTorrentState::Paused(p) => Ok(p.streams.clone()), crate::ManagedTorrentState::Paused(p) => Ok(p.streams.clone()),
@ -343,12 +352,12 @@ impl ManagedTorrent {
torrent: self, torrent: self,
}; };
if s.torrent.maybe_reconnect_needed_peers_for_file(file_id) { if s.torrent.maybe_reconnect_needed_peers_for_file(file_id) {
s.torrent.with_opened_file(file_id, |fd| { // TODO: get rid of reopening files, it's such a source of bugs and complexity
fd.reopen(false)?; s.torrent.with_opened_files(|files| {
fd.file for file in files {
.lock() file.reopen(false)?;
.set_len(fd.len) }
.context("error setting file length") Ok::<_, anyhow::Error>(())
})??; })??;
} }
streams.streams.insert( streams.streams.insert(