This commit is contained in:
Igor Katson 2024-08-21 16:12:20 +01:00
parent b4512e4809
commit 451debedbb
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
19 changed files with 127 additions and 114 deletions

View file

@ -106,7 +106,7 @@ use super::{
paused::TorrentStatePaused,
streaming::TorrentStreams,
utils::{timeit, TimedExistence},
ManagedTorrentInfo,
ManagedTorrentShared,
};
#[derive(Debug)]
@ -180,7 +180,7 @@ const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024;
pub struct TorrentStateLive {
peers: PeerStates,
meta: Arc<ManagedTorrentInfo>,
torrent: Arc<ManagedTorrentShared>,
locked: RwLock<TorrentStateLocked>,
pub(crate) files: FileStorage,
@ -241,7 +241,7 @@ impl TorrentStateLive {
let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128);
let state = Arc::new(TorrentStateLive {
meta: paused.info.clone(),
torrent: paused.info.clone(),
peers: PeerStates {
session_stats: session_stats.clone(),
stats: Default::default(),
@ -277,7 +277,7 @@ impl TorrentStateLive {
});
state.spawn(
error_span!(parent: state.meta.span.clone(), "speed_estimator_updater"),
error_span!(parent: state.torrent.span.clone(), "speed_estimator_updater"),
{
let state = Arc::downgrade(&state);
async move {
@ -303,7 +303,7 @@ impl TorrentStateLive {
);
state.spawn(
error_span!(parent: state.meta.span.clone(), "peer_adder"),
error_span!(parent: state.torrent.span.clone(), "peer_adder"),
state.clone().task_peer_adder(peer_queue_rx),
);
Ok(state)
@ -330,7 +330,7 @@ impl TorrentStateLive {
}
fn disk_work_tx(&self) -> Option<&DiskWorkQueueSender> {
self.meta.options.disk_write_queue.as_ref()
self.torrent.options.disk_write_queue.as_ref()
}
pub(crate) fn add_incoming_peer(
@ -378,7 +378,7 @@ impl TorrentStateLive {
self.spawn(
error_span!(
parent: self.meta.span.clone(),
parent: self.torrent.span.clone(),
"manage_incoming_peer",
addr = %checked_peer.addr
),
@ -410,18 +410,18 @@ impl TorrentStateLive {
first_message_received: AtomicBool::new(false),
};
let options = PeerConnectionOptions {
connect_timeout: self.meta.options.peer_connect_timeout,
read_write_timeout: self.meta.options.peer_read_write_timeout,
connect_timeout: self.torrent.options.peer_connect_timeout,
read_write_timeout: self.torrent.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
checked_peer.addr,
self.meta.info_hash,
self.meta.peer_id,
self.torrent.info_hash,
self.torrent.peer_id,
&handler,
Some(options),
self.meta.spawner,
self.meta.connector.clone(),
self.torrent.spawner,
self.torrent.connector.clone(),
);
let requester = handler.task_peer_chunk_requester();
@ -474,18 +474,18 @@ impl TorrentStateLive {
first_message_received: AtomicBool::new(false),
};
let options = PeerConnectionOptions {
connect_timeout: state.meta.options.peer_connect_timeout,
read_write_timeout: state.meta.options.peer_read_write_timeout,
connect_timeout: state.torrent.options.peer_connect_timeout,
read_write_timeout: state.torrent.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
addr,
state.meta.info_hash,
state.meta.peer_id,
state.torrent.info_hash,
state.torrent.peer_id,
&handler,
Some(options),
state.meta.spawner,
state.meta.connector.clone(),
state.torrent.spawner,
state.torrent.connector.clone(),
);
let requester = aframe!(handler
.task_peer_chunk_requester()
@ -532,30 +532,30 @@ impl TorrentStateLive {
let permit = state.peer_semaphore.clone().acquire_owned().await?;
state.spawn(
error_span!(parent: state.meta.span.clone(), "manage_peer", peer = addr.to_string()),
error_span!(parent: state.torrent.span.clone(), "manage_peer", peer = addr.to_string()),
aframe!(state.clone().task_manage_outgoing_peer(addr, permit)),
);
}
}
pub fn meta(&self) -> &ManagedTorrentInfo {
&self.meta
pub fn torrent(&self) -> &ManagedTorrentShared {
&self.torrent
}
pub fn info(&self) -> &TorrentMetaV1Info<ByteBufOwned> {
&self.meta.info
&self.torrent.info
}
pub fn info_hash(&self) -> Id20 {
self.meta.info_hash
self.torrent.info_hash
}
pub fn peer_id(&self) -> Id20 {
self.meta.peer_id
self.torrent.peer_id
}
pub(crate) fn file_ops(&self) -> FileOps<'_> {
FileOps::new(
&self.meta.info,
&self.torrent.info,
&*self.files,
&self.meta().file_infos,
&self.torrent().file_infos,
&self.lengths,
)
}
@ -664,7 +664,7 @@ impl TorrentStateLive {
// g.chunks;
Ok(TorrentStatePaused {
info: self.meta.clone(),
info: self.torrent.clone(),
files: self.files.take()?,
chunk_tracker,
streams: self.streams.clone(),
@ -687,7 +687,8 @@ impl TorrentStateLive {
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
let mut g = self.lock_write("update_only_files");
let ct = g.get_chunks_mut()?;
let hns = ct.update_only_files(self.meta().file_infos.iter().map(|f| f.len), only_files)?;
let hns =
ct.update_only_files(self.torrent().file_infos.iter().map(|f| f.len), only_files)?;
if !hns.finished() {
self.reconnect_all_not_needed_peers();
}
@ -706,7 +707,7 @@ impl TorrentStateLive {
};
self.streams
.streamed_file_ids()
.any(|file_id| !chunks.is_file_finished(&self.meta.file_infos[file_id]))
.any(|file_id| !chunks.is_file_finished(&self.torrent.file_infos[file_id]))
}
// We might have the torrent "finished" i.e. no selected files. But if someone is streaming files despite
@ -725,7 +726,7 @@ impl TorrentStateLive {
// if we have all the pieces of the file, reopen it read only
for (idx, file_info) in self
.meta()
.torrent()
.file_infos
.iter()
.enumerate()
@ -736,9 +737,9 @@ impl TorrentStateLive {
}
self.streams
.wake_streams_on_piece_completed(id, &self.meta.lengths);
.wake_streams_on_piece_completed(id, &self.torrent.lengths);
g.unflushed_bitv_bytes += self.meta.lengths.piece_length(id) as u64;
g.unflushed_bitv_bytes += self.torrent.lengths.piece_length(id) as u64;
if g.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES {
g.try_flush_bitv()
}
@ -930,7 +931,7 @@ impl<'a> PeerConnectionHandler for &'a PeerHandler {
&self,
handshake: &mut ExtendedHandshake<ByteBuf>,
) -> anyhow::Result<()> {
let info_bytes = &self.state.meta().info_bytes;
let info_bytes = &self.state.torrent().info_bytes;
if !info_bytes.is_empty() {
if let Ok(len) = info_bytes.len().try_into() {
handshake.metadata_size = Some(len);
@ -1010,7 +1011,7 @@ impl PeerHandler {
if let Some(dur) = backoff {
self.state.clone().spawn(
error_span!(
parent: self.state.meta.span.clone(),
parent: self.state.torrent.span.clone(),
"wait_for_peer",
peer = handle.to_string(),
duration = format!("{dur:?}")
@ -1069,7 +1070,7 @@ impl PeerHandler {
&& !g.inflight_pieces.contains_key(pid)
});
let natural_order_pieces = chunk_tracker
.iter_queued_pieces(&g.file_priorities, &self.state.meta().file_infos);
.iter_queued_pieces(&g.file_priorities, &self.state.torrent().file_infos);
for n in priority_streamed_pieces.chain(natural_order_pieces) {
if bf.get(n.get() as usize).map(|v| *v) == Some(true) {
n_opt = Some(n);
@ -1612,7 +1613,7 @@ impl PeerHandler {
dtx.send(Box::new(work)).await?;
} else {
self.state
.meta
.torrent
.spawner
.spawn_block_in_place(|| {
write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info)
@ -1624,7 +1625,7 @@ impl PeerHandler {
}
fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> {
let data = &self.state.meta().info_bytes;
let data = &self.state.torrent().info_bytes;
let metadata_size = data.len();
if metadata_size == 0 {
anyhow::bail!("peer requested for info metadata but we don't have it")