Split out TorrentMetadata

This commit is contained in:
Igor Katson 2024-12-05 22:57:34 +00:00
parent e440f03970
commit 100b7116df
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
20 changed files with 411 additions and 225 deletions

View file

@ -24,11 +24,12 @@ use crate::{
FileInfos,
};
use super::{paused::TorrentStatePaused, ManagedTorrentShared};
use super::{paused::TorrentStatePaused, ManagedTorrentShared, TorrentMetadata};
pub struct TorrentStateInitializing {
pub(crate) files: FileStorage,
pub(crate) shared: Arc<ManagedTorrentShared>,
pub(crate) metadata: Arc<TorrentMetadata>,
pub(crate) only_files: Option<Vec<usize>>,
pub(crate) checked_bytes: AtomicU64,
previously_errored: bool,
@ -54,13 +55,15 @@ fn compute_selected_pieces(
impl TorrentStateInitializing {
pub fn new(
meta: Arc<ManagedTorrentShared>,
shared: Arc<ManagedTorrentShared>,
metadata: Arc<TorrentMetadata>,
only_files: Option<Vec<usize>>,
files: FileStorage,
previously_errored: bool,
) -> Self {
Self {
shared: meta,
shared,
metadata,
only_files,
files,
checked_bytes: AtomicU64::new(0),
@ -80,7 +83,7 @@ impl TorrentStateInitializing {
) -> Option<Box<dyn BitV>> {
let hp = have_pieces?;
let actual = hp.as_bytes().len();
let expected = self.shared.lengths.piece_bitfield_bytes();
let expected = self.metadata.lengths.piece_bitfield_bytes();
if actual != expected {
warn!(
actual,
@ -92,21 +95,21 @@ impl TorrentStateInitializing {
let is_broken = self.shared.spawner.spawn_block_in_place(|| {
let fo = crate::file_ops::FileOps::new(
&self.shared.info,
&self.metadata.info,
&self.files,
&self.shared.file_infos,
&self.shared.lengths,
&self.metadata.file_infos,
&self.metadata.lengths,
);
use rand::seq::SliceRandom;
let mut to_validate = BF::from_boxed_slice(
vec![0u8; self.shared.lengths.piece_bitfield_bytes()].into_boxed_slice(),
vec![0u8; self.metadata.lengths.piece_bitfield_bytes()].into_boxed_slice(),
);
let mut queue = hp.as_slice().to_owned();
// Validate at least one piece from each file, if we claim we have it.
for fi in self.shared.file_infos.iter() {
for fi in self.metadata.file_infos.iter() {
let prange = fi.piece_range_usize();
let offset = prange.start;
for piece_id in hp
@ -136,7 +139,7 @@ impl TorrentStateInitializing {
for (id, piece_id) in to_validate
.iter_ones()
.filter_map(|id| {
self.shared
self.metadata
.lengths
.validate_piece_index(id.try_into().ok()?)
})
@ -147,10 +150,10 @@ impl TorrentStateInitializing {
}
#[allow(clippy::cast_possible_truncation)]
let progress = (self.shared.lengths.total_length() as f64
let progress = (self.metadata.lengths.total_length() as f64
/ to_validate_count as f64
* (id + 1) as f64) as u64;
let progress = progress.min(self.shared.lengths.total_length());
let progress = progress.min(self.metadata.lengths.total_length());
self.checked_bytes.store(progress, Ordering::Relaxed);
}
@ -198,10 +201,10 @@ impl TorrentStateInitializing {
info!("Doing initial checksum validation, this might take a while...");
let have_pieces = self.shared.spawner.spawn_block_in_place(|| {
FileOps::new(
&self.shared.info,
&self.metadata.info,
&self.files,
&self.shared.file_infos,
&self.shared.lengths,
&self.metadata.file_infos,
&self.metadata.lengths,
)
.initial_check(&self.checked_bytes)
})?;
@ -213,16 +216,16 @@ impl TorrentStateInitializing {
};
let selected_pieces = compute_selected_pieces(
&self.shared.lengths,
&self.metadata.lengths,
self.only_files.as_deref(),
&self.shared.file_infos,
&self.metadata.file_infos,
);
let chunk_tracker = ChunkTracker::new(
have_pieces.into_dyn(),
selected_pieces,
self.shared.lengths,
&self.shared.file_infos,
self.metadata.lengths,
&self.metadata.file_infos,
)
.context("error creating chunk tracker")?;
@ -237,7 +240,7 @@ impl TorrentStateInitializing {
// Ensure file lenghts are correct, and reopen read-only.
self.shared.spawner.spawn_block_in_place(|| {
for (idx, fi) in self.shared.file_infos.iter().enumerate() {
for (idx, fi) in self.metadata.file_infos.iter().enumerate() {
if self
.only_files
.as_ref()
@ -268,6 +271,7 @@ impl TorrentStateInitializing {
let paused = TorrentStatePaused {
shared: self.shared.clone(),
metadata: self.metadata.clone(),
files: self.files.take()?,
chunk_tracker,
streams: Arc::new(Default::default()),

View file

@ -109,7 +109,7 @@ use super::{
paused::TorrentStatePaused,
streaming::TorrentStreams,
utils::{timeit, TimedExistence},
ManagedTorrentShared,
ManagedTorrentShared, TorrentMetadata,
};
#[derive(Debug)]
@ -175,7 +175,8 @@ const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024;
pub struct TorrentStateLive {
peers: PeerStates,
torrent: Arc<ManagedTorrentShared>,
shared: Arc<ManagedTorrentShared>,
metadata: Arc<TorrentMetadata>,
locked: RwLock<TorrentStateLocked>,
pub(crate) files: FileStorage,
@ -231,11 +232,11 @@ impl TorrentStateLive {
// TODO: make it configurable
let file_priorities = {
let mut pri = (0..paused.shared.file_infos.len()).collect::<Vec<usize>>();
let mut pri = (0..paused.metadata.file_infos.len()).collect::<Vec<usize>>();
// sort by filename, cause many torrents have random sort order.
pri.sort_unstable_by_key(|id| {
paused
.shared
.metadata
.file_infos
.get(*id)
.map(|fi| fi.relative_filename.as_path())
@ -252,7 +253,8 @@ impl TorrentStateLive {
let ratelimits = Limits::new(paused.shared.options.ratelimits);
let state = Arc::new(TorrentStateLive {
torrent: paused.shared.clone(),
shared: paused.shared.clone(),
metadata: paused.metadata.clone(),
peers: PeerStates {
session_stats: session_stats.clone(),
stats: Default::default(),
@ -291,7 +293,7 @@ impl TorrentStateLive {
});
state.spawn(
error_span!(parent: state.torrent.span.clone(), "speed_estimator_updater"),
error_span!(parent: state.shared.span.clone(), "speed_estimator_updater"),
{
let state = Arc::downgrade(&state);
async move {
@ -317,12 +319,12 @@ impl TorrentStateLive {
);
state.spawn(
error_span!(parent: state.torrent.span.clone(), "peer_adder"),
error_span!(parent: state.shared.span.clone(), "peer_adder"),
state.clone().task_peer_adder(peer_queue_rx),
);
state.spawn(
error_span!(parent: state.torrent.span.clone(), "upload_scheduler"),
error_span!(parent: state.shared.span.clone(), "upload_scheduler"),
state.clone().task_upload_scheduler(ratelimit_upload_rx),
);
Ok(state)
@ -346,7 +348,7 @@ impl TorrentStateLive {
}
fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> {
self.torrent.options.disk_write_queue.as_ref()
self.shared.options.disk_write_queue.as_ref()
}
pub(crate) fn add_incoming_peer(
@ -394,7 +396,7 @@ impl TorrentStateLive {
self.spawn(
error_span!(
parent: self.torrent.span.clone(),
parent: self.shared.span.clone(),
"manage_incoming_peer",
addr = %checked_peer.addr
),
@ -416,7 +418,7 @@ impl TorrentStateLive {
self.ratelimits
.prepare_for_upload(NonZeroU32::new(ci.size).unwrap())
.await?;
if let Some(session) = self.torrent.session.upgrade() {
if let Some(session) = self.shared.session.upgrade() {
session
.ratelimits
.prepare_for_upload(NonZeroU32::new(ci.size).unwrap())
@ -449,18 +451,18 @@ impl TorrentStateLive {
first_message_received: AtomicBool::new(false),
};
let options = PeerConnectionOptions {
connect_timeout: self.torrent.options.peer_connect_timeout,
read_write_timeout: self.torrent.options.peer_read_write_timeout,
connect_timeout: self.shared.options.peer_connect_timeout,
read_write_timeout: self.shared.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
checked_peer.addr,
self.torrent.info_hash,
self.torrent.peer_id,
self.shared.info_hash,
self.shared.peer_id,
&handler,
Some(options),
self.torrent.spawner,
self.torrent.connector.clone(),
self.shared.spawner,
self.shared.connector.clone(),
);
let requester = handler.task_peer_chunk_requester();
@ -514,18 +516,18 @@ impl TorrentStateLive {
first_message_received: AtomicBool::new(false),
};
let options = PeerConnectionOptions {
connect_timeout: state.torrent.options.peer_connect_timeout,
read_write_timeout: state.torrent.options.peer_read_write_timeout,
connect_timeout: state.shared.options.peer_connect_timeout,
read_write_timeout: state.shared.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
addr,
state.torrent.info_hash,
state.torrent.peer_id,
state.shared.info_hash,
state.shared.peer_id,
&handler,
Some(options),
state.torrent.spawner,
state.torrent.connector.clone(),
state.shared.spawner,
state.shared.connector.clone(),
);
let requester = aframe!(handler
.task_peer_chunk_requester()
@ -564,7 +566,7 @@ impl TorrentStateLive {
let state = self;
loop {
let addr = peer_queue_rx.recv().await.context("torrent closed")?;
if state.torrent.options.disable_upload() && state.is_finished_and_no_active_streams() {
if state.shared.options.disable_upload() && state.is_finished_and_no_active_streams() {
debug!("ignoring peer {} as we are finished", addr);
state.peers.mark_peer_not_needed(addr);
continue;
@ -572,30 +574,30 @@ impl TorrentStateLive {
let permit = state.peer_semaphore.clone().acquire_owned().await?;
state.spawn(
error_span!(parent: state.torrent.span.clone(), "manage_peer", peer = addr.to_string()),
error_span!(parent: state.shared.span.clone(), "manage_peer", peer = addr.to_string()),
aframe!(state.clone().task_manage_outgoing_peer(addr, permit)),
);
}
}
pub fn torrent(&self) -> &ManagedTorrentShared {
&self.torrent
&self.shared
}
pub fn info(&self) -> &TorrentMetaV1Info<ByteBufOwned> {
&self.torrent.info
&self.metadata.info
}
pub fn info_hash(&self) -> Id20 {
self.torrent.info_hash
self.shared.info_hash
}
pub fn peer_id(&self) -> Id20 {
self.torrent.peer_id
self.shared.peer_id
}
pub(crate) fn file_ops(&self) -> FileOps<'_> {
FileOps::new(
&self.torrent.info,
&self.metadata.info,
&*self.files,
&self.torrent().file_infos,
&self.metadata.file_infos,
&self.lengths,
)
}
@ -703,7 +705,8 @@ impl TorrentStateLive {
// g.chunks;
Ok(TorrentStatePaused {
shared: self.torrent.clone(),
shared: self.shared.clone(),
metadata: self.metadata.clone(),
files: self.files.take()?,
chunk_tracker,
streams: self.streams.clone(),
@ -727,7 +730,7 @@ impl TorrentStateLive {
let mut g = self.lock_write("update_only_files");
let ct = g.get_chunks_mut()?;
let hns =
ct.update_only_files(self.torrent().file_infos.iter().map(|f| f.len), only_files)?;
ct.update_only_files(self.metadata.file_infos.iter().map(|f| f.len), only_files)?;
if !hns.finished() {
self.reconnect_all_not_needed_peers();
}
@ -746,7 +749,7 @@ impl TorrentStateLive {
};
self.streams
.streamed_file_ids()
.any(|file_id| !chunks.is_file_finished(&self.torrent.file_infos[file_id]))
.any(|file_id| !chunks.is_file_finished(&self.metadata.file_infos[file_id]))
}
// We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite
@ -768,7 +771,7 @@ impl TorrentStateLive {
// if we have all the pieces of the file, reopen it read only
for (idx, file_info) in self
.torrent()
.metadata
.file_infos
.iter()
.enumerate()
@ -779,9 +782,9 @@ impl TorrentStateLive {
}
self.streams
.wake_streams_on_piece_completed(id, &self.torrent.lengths);
.wake_streams_on_piece_completed(id, &self.metadata.lengths);
locked.unflushed_bitv_bytes += self.torrent.lengths.piece_length(id) as u64;
locked.unflushed_bitv_bytes += self.metadata.lengths.piece_length(id) as u64;
if locked.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES {
locked.try_flush_bitv()
}
@ -1021,7 +1024,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
if let Some(_peer_pex_msg_id) = hs.ut_pex() {
self.state.clone().spawn(
error_span!(
parent: self.state.torrent.span.clone(),
parent: self.state.shared.span.clone(),
"sending_pex_to_peer",
peer = self.addr.to_string()
),
@ -1054,7 +1057,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
}
fn should_transmit_have(&self, id: ValidPieceIndex) -> bool {
if self.state.torrent.options.disable_upload() {
if self.state.shared.options.disable_upload() {
return false;
}
let have = self
@ -1071,7 +1074,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
&self,
handshake: &mut ExtendedHandshake<ByteBuf>,
) -> anyhow::Result<()> {
let info_bytes = &self.state.torrent().info_bytes;
let info_bytes = &self.state.metadata.info_bytes;
if !info_bytes.is_empty() {
if let Ok(len) = info_bytes.len().try_into() {
handshake.metadata_size = Some(len);
@ -1159,7 +1162,7 @@ impl PeerHandler {
if let Some(dur) = backoff {
self.state.clone().spawn(
error_span!(
parent: self.state.torrent.span.clone(),
parent: self.state.shared.span.clone(),
"wait_for_peer",
peer = handle.to_string(),
duration = format!("{dur:?}")
@ -1218,7 +1221,7 @@ impl PeerHandler {
&& !g.inflight_pieces.contains_key(pid)
});
let natural_order_pieces = chunk_tracker
.iter_queued_pieces(&g.file_priorities, &self.state.torrent().file_infos);
.iter_queued_pieces(&g.file_priorities, &self.state.metadata.file_infos);
for n in priority_streamed_pieces.chain(natural_order_pieces) {
if bf.get(n.get() as usize).map(|v| *v) == Some(true) {
n_opt = Some(n);
@ -1787,7 +1790,7 @@ impl PeerHandler {
dtx.send(Box::new(work)).await?;
} else {
self.state
.torrent
.shared
.spawner
.spawn_block_in_place(|| {
write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info)
@ -1799,7 +1802,7 @@ impl PeerHandler {
}
fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> {
let data = &self.state.torrent().info_bytes;
let data = &self.state.metadata.info_bytes;
let metadata_size = data.len();
if metadata_size == 0 {
anyhow::bail!("peer requested for info metadata but we don't have it")

View file

@ -15,6 +15,7 @@ use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use arc_swap::ArcSwapOption;
use buffers::ByteBufOwned;
use bytes::Bytes;
use futures::future::BoxFuture;
@ -37,6 +38,7 @@ use tracing::trace;
use tracing::warn;
use crate::chunk_tracker::ChunkTracker;
use crate::file_info::FileInfo;
use crate::limits::LimitsConfig;
use crate::session::TorrentId;
use crate::spawn_utils::BlockingSpawner;
@ -123,6 +125,44 @@ impl ManagedTorrentOptions {
}
}
// Torrent bencodee "info" + some precomputed fields based on it for frequent access.
pub struct TorrentMetadata {
pub info: TorrentMetaV1Info<ByteBufOwned>,
pub torrent_bytes: Bytes,
pub info_bytes: Bytes,
pub lengths: Lengths,
pub file_infos: FileInfos,
}
impl TorrentMetadata {
pub(crate) fn new(
info: TorrentMetaV1Info<ByteBufOwned>,
torrent_bytes: Bytes,
info_bytes: Bytes,
) -> anyhow::Result<Self> {
let lengths = Lengths::from_torrent(&info)?;
let file_infos = info
.iter_file_details_ext(&lengths)?
.map(|fd| {
Ok::<_, anyhow::Error>(FileInfo {
relative_filename: fd.details.filename.to_pathbuf()?,
offset_in_torrent: fd.offset,
piece_range: fd.pieces,
len: fd.details.len,
attrs: fd.details.attrs(),
})
})
.collect::<anyhow::Result<Vec<FileInfo>>>()?;
Ok(Self {
info,
torrent_bytes,
info_bytes,
lengths,
file_infos,
})
}
}
/// Common information about torrent shared among all possible states.
///
// The reason it's not inlined into ManagedTorrent is to break the Arc cycle:
@ -130,15 +170,10 @@ impl ManagedTorrentOptions {
// of stuff, but it shouldn't access the state.
pub struct ManagedTorrentShared {
pub id: TorrentId,
pub info: TorrentMetaV1Info<ByteBufOwned>,
pub torrent_bytes: Bytes,
pub info_bytes: Bytes,
pub info_hash: Id20,
pub(crate) spawner: BlockingSpawner,
pub trackers: HashSet<String>,
pub peer_id: Id20,
pub lengths: Lengths,
pub file_infos: FileInfos,
pub span: tracing::Span,
pub(crate) options: ManagedTorrentOptions,
pub(crate) connector: Arc<StreamConnector>,
@ -148,6 +183,7 @@ pub struct ManagedTorrentShared {
pub struct ManagedTorrent {
pub shared: Arc<ManagedTorrentShared>,
pub metadata: ArcSwapOption<TorrentMetadata>,
pub(crate) state_change_notify: Notify,
pub(crate) locked: RwLock<ManagedTorrentLocked>,
}
@ -161,8 +197,13 @@ impl ManagedTorrent {
&self.shared
}
pub fn get_total_bytes(&self) -> u64 {
self.shared.lengths.total_length()
pub fn with_metadata<R>(
&self,
mut f: impl FnMut(&Arc<TorrentMetadata>) -> R,
) -> anyhow::Result<R> {
let r = self.metadata.load();
let r = r.as_ref().context("torrent is not resolved")?;
Ok(f(r))
}
pub fn info_hash(&self) -> Id20 {
@ -384,10 +425,14 @@ impl ManagedTorrent {
Ok(())
}
ManagedTorrentState::Error(_) => {
let metadata = self.metadata.load_full().expect("TODO");
let initializing = Arc::new(TorrentStateInitializing::new(
self.shared.clone(),
metadata.clone(),
g.only_files.clone(),
self.shared.storage_factory.create_and_init(self.shared())?,
self.shared
.storage_factory
.create_and_init(self.shared(), &metadata)?,
true,
));
g.state = ManagedTorrentState::Initializing(initializing.clone());
@ -433,7 +478,12 @@ impl ManagedTorrent {
pub fn stats(&self) -> TorrentStats {
use stats::TorrentStatsState as S;
let mut resp = TorrentStats {
total_bytes: self.shared().lengths.total_length(),
total_bytes: self
.metadata
.load()
.as_ref()
.map(|r| r.lengths.total_length())
.unwrap_or_default(),
file_progress: Vec::new(),
state: S::Error,
error: None,
@ -534,7 +584,9 @@ impl ManagedTorrent {
// Returns true if needed to unpause torrent.
// This is just implementation detail - it's easier to pause/unpause than to tinker with internals.
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
let file_count = self.shared().info.iter_file_lengths()?.count();
let metadata = self.metadata.load();
let metadata = metadata.as_ref().context("torrent is not resolved")?;
let file_count = metadata.file_infos.len();
for f in only_files.iter().copied() {
if f >= file_count {
anyhow::bail!("only_files contains invalid value {f}")

View file

@ -5,10 +5,11 @@ use crate::{
type_aliases::FileStorage,
};
use super::{streaming::TorrentStreams, ManagedTorrentShared};
use super::{streaming::TorrentStreams, ManagedTorrentShared, TorrentMetadata};
pub struct TorrentStatePaused {
pub(crate) shared: Arc<ManagedTorrentShared>,
pub(crate) metadata: Arc<TorrentMetadata>,
pub(crate) files: FileStorage,
pub(crate) chunk_tracker: ChunkTracker,
pub(crate) streams: Arc<TorrentStreams>,
@ -17,7 +18,7 @@ pub struct TorrentStatePaused {
impl TorrentStatePaused {
pub(crate) fn update_only_files(&mut self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
self.chunk_tracker
.update_only_files(self.shared.info.iter_file_lengths()?, only_files)?;
.update_only_files(self.metadata.info.iter_file_lengths()?, only_files)?;
Ok(())
}

View file

@ -19,7 +19,7 @@ use crate::{
file_info::FileInfo, spawn_utils::BlockingSpawner, storage::TorrentStorage, ManagedTorrent,
};
use super::ManagedTorrentHandle;
use super::{ManagedTorrentHandle, TorrentMetadata};
type StreamId = usize;
@ -130,6 +130,7 @@ impl TorrentStreams {
pub struct FileStream {
torrent: ManagedTorrentHandle,
metadata: Arc<TorrentMetadata>,
streams: Arc<TorrentStreams>,
stream_id: usize,
file_id: usize,
@ -178,8 +179,7 @@ impl AsyncRead for FileStream {
}
let current = poll_try_io!(self
.torrent
.shared()
.metadata
.lengths
.compute_current_piece(self.position, self.file_torrent_abs_offset)
.context("invalid position"));
@ -216,11 +216,14 @@ impl AsyncRead for FileStream {
);
poll_try_io!(poll_try_io!(self.spawner.spawn_block_in_place(|| {
self.torrent
.with_storage_and_file(self.file_id, |files, _fi| {
self.torrent.with_storage_and_file(
self.file_id,
|files, _fi| {
files.pread_exact(self.file_id, self.position, buf)?;
Ok::<_, anyhow::Error>(())
})
},
&self.metadata,
)
})));
self.as_mut().advance(bytes_to_read as u64);
@ -269,7 +272,12 @@ impl Drop for FileStream {
}
impl ManagedTorrent {
fn with_storage_and_file<F, R>(&self, file_id: usize, f: F) -> anyhow::Result<R>
fn with_storage_and_file<F, R>(
&self,
file_id: usize,
f: F,
metadata: &TorrentMetadata,
) -> anyhow::Result<R>
where
F: FnOnce(&dyn TorrentStorage, &FileInfo) -> R,
{
@ -279,11 +287,7 @@ impl ManagedTorrent {
crate::ManagedTorrentState::Live(l) => &*l.files,
s => anyhow::bail!("with_storage_and_file: invalid state: {}", s.name()),
};
let fi = self
.shared()
.file_infos
.get(file_id)
.context("invalid file")?;
let fi = metadata.file_infos.get(file_id).context("invalid file")?;
Ok(f(files, fi))
})
}
@ -310,14 +314,26 @@ impl ManagedTorrent {
}
fn is_file_finished(&self, file_id: usize) -> bool {
let metadata = self.metadata.load();
let metadata = match metadata.as_ref() {
Some(r) => r,
None => return false,
};
// TODO: would be nice to remove locking
self.with_chunk_tracker(|ct| ct.is_file_finished(&self.shared.file_infos[file_id]))
self.with_chunk_tracker(|ct| ct.is_file_finished(&metadata.file_infos[file_id]))
.unwrap_or(false)
}
pub fn stream(self: Arc<Self>, file_id: usize) -> anyhow::Result<FileStream> {
let (fd_len, fd_offset) =
self.with_storage_and_file(file_id, |_fd, fi| (fi.len, fi.offset_in_torrent))?;
let metadata = self
.metadata
.load_full()
.context("torrent metadata is not resolved")?;
let (fd_len, fd_offset) = self.with_storage_and_file(
file_id,
|_fd, fi| (fi.len, fi.offset_in_torrent),
&metadata,
)?;
let streams = self.streams()?;
let s = FileStream {
stream_id: streams.next_id(),
@ -329,6 +345,7 @@ impl ManagedTorrent {
file_torrent_abs_offset: fd_offset,
torrent: self,
spawner: BlockingSpawner::default(),
metadata,
};
s.torrent.maybe_reconnect_needed_peers_for_file(file_id);
streams.streams.insert(