Nothing much

This commit is contained in:
Igor Katson 2021-07-13 14:59:44 +01:00
parent d121efd4f4
commit 9e3e3a27ff
9 changed files with 161 additions and 52 deletions

25
Cargo.lock generated
View file

@ -851,6 +851,7 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8536030f9fea7127f841b45bb6243b27255787fb4eb83958aa1ef9d2fdc0c36"
dependencies = [
"num-bigint",
"num-complex",
"num-integer",
"num-iter",
@ -858,6 +859,17 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-bigint"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304"
dependencies = [
"autocfg",
"num-integer",
"num-traits",
]
[[package]]
name = "num-complex"
version = "0.2.4"
@ -896,6 +908,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef"
dependencies = [
"autocfg",
"num-bigint",
"num-integer",
"num-traits",
]
@ -995,6 +1008,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "parse_duration"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7037e5e93e0172a5a96874380bf73bc6ecef022e26fa25f2be26864d6b3ba95d"
dependencies = [
"lazy_static",
"num",
"regex",
]
[[package]]
name = "peer_binary_protocol"
version = "0.1.0"
@ -1296,6 +1320,7 @@ dependencies = [
"futures",
"librqbit",
"log",
"parse_duration",
"pretty_env_logger",
"regex",
"reqwest",

View file

@ -6,7 +6,7 @@ use futures::{stream::FuturesUnordered, StreamExt};
use librqbit_core::torrent_metainfo::TorrentMetaV1Info;
use log::debug;
use crate::peer_info_reader;
use crate::{peer_connection::PeerConnectionOptions, peer_info_reader};
use librqbit_core::id20::Id20;
#[derive(Debug)]
@ -25,6 +25,7 @@ pub async fn read_metainfo_from_peer_receiver<A: StreamExt<Item = SocketAddr> +
peer_id: Id20,
info_hash: Id20,
mut addrs: A,
peer_connection_options: Option<PeerConnectionOptions>,
) -> ReadMetainfoResult<A> {
let mut seen = HashSet::<SocketAddr>::new();
let first_addr = match addrs.next().await {
@ -39,9 +40,14 @@ pub async fn read_metainfo_from_peer_receiver<A: StreamExt<Item = SocketAddr> +
let semaphore = &semaphore;
async move {
let token = semaphore.acquire().await?;
let ret = peer_info_reader::read_metainfo_from_peer(addr, peer_id, info_hash)
.await
.with_context(|| format!("error reading metainfo from {}", addr));
let ret = peer_info_reader::read_metainfo_from_peer(
addr,
peer_id,
info_hash,
peer_connection_options,
)
.await
.with_context(|| format!("error reading metainfo from {}", addr));
drop(token);
ret
}
@ -97,7 +103,7 @@ mod tests {
let dht = Dht::new().await.unwrap();
let peer_rx = dht.get_peers(info_hash).await;
let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx).await {
match read_metainfo_from_peer_receiver(peer_id, info_hash, peer_rx, None).await {
ReadMetainfoResult::Found { info, .. } => dbg!(info),
ReadMetainfoResult::ChannelClosed { .. } => todo!("should not have happened"),
};

View file

@ -9,12 +9,12 @@ use warp::Filter;
use crate::torrent_manager::TorrentManagerHandle;
use crate::torrent_state::StatsSnapshot;
struct Inner {
struct ApiInternal {
startup_time: Instant,
torrent_managers: RwLock<Vec<TorrentManagerHandle>>,
}
impl Inner {
impl ApiInternal {
fn new() -> Self {
Self {
startup_time: Instant::now(),
@ -76,7 +76,7 @@ struct StatsResponse {
time_remaining: Option<Duration>,
}
impl Inner {
impl ApiInternal {
fn mgr_handle(&self, idx: usize) -> Option<TorrentManagerHandle> {
self.torrent_managers.read().get(idx).cloned()
}
@ -141,7 +141,7 @@ impl Inner {
#[derive(Clone)]
pub struct HttpApi {
inner: Arc<Inner>,
inner: Arc<ApiInternal>,
}
fn json_response<T: Serialize>(v: T) -> warp::reply::Response {
@ -170,7 +170,7 @@ fn json_or_404<T: Serialize>(idx: usize, v: Option<T>) -> warp::reply::Response
impl HttpApi {
pub fn new() -> Self {
Self {
inner: Arc::new(Inner::new()),
inner: Arc::new(ApiInternal::new()),
}
}
pub fn add_mgr(&self, handle: TorrentManagerHandle) -> usize {
@ -180,8 +180,6 @@ impl HttpApi {
idx
}
// TODO: this is all for debugging, not even JSON.
// After using this for a bit, not a big fan of warp.
pub async fn make_http_api_and_run(self, addr: SocketAddr) -> anyhow::Result<()> {
let inner = self.inner;

View file

@ -31,11 +31,18 @@ pub enum WriterRequest {
ReadChunkRequest(ChunkInfo),
}
#[derive(Default, Copy, Clone)]
pub struct PeerConnectionOptions {
pub connect_timeout: Option<Duration>,
pub keep_alive_interval: Option<Duration>,
}
pub struct PeerConnection<H> {
handler: H,
addr: SocketAddr,
info_hash: Id20,
peer_id: Id20,
options: PeerConnectionOptions,
}
// async fn read_one<'a, R: AsyncReadExt + Unpin>(
@ -94,12 +101,19 @@ macro_rules! read_one {
}
impl<H: PeerConnectionHandler> PeerConnection<H> {
pub fn new(addr: SocketAddr, info_hash: Id20, peer_id: Id20, handler: H) -> Self {
pub fn new(
addr: SocketAddr,
info_hash: Id20,
peer_id: Id20,
handler: H,
options: Option<PeerConnectionOptions>,
) -> Self {
PeerConnection {
handler,
addr,
info_hash,
peer_id,
options: options.unwrap_or_default(),
}
}
pub fn into_handler(self) -> H {
@ -112,7 +126,9 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let mut conn = match timeout(
Duration::from_secs(2),
self.options
.connect_timeout
.unwrap_or_else(|| Duration::from_secs(2)),
tokio::net::TcpStream::connect(self.addr),
)
.await
@ -191,7 +207,10 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
let (mut read_half, mut write_half) = tokio::io::split(conn);
let writer = async move {
let keep_alive_interval = Duration::from_secs(120);
let keep_alive_interval = self
.options
.keep_alive_interval
.unwrap_or_else(|| Duration::from_secs(120));
if self.handler.get_have_bytes() > 0 {
if let Some(len) = self

View file

@ -17,12 +17,15 @@ use peer_binary_protocol::{
use sha1w::{ISha1, Sha1};
use tokio::sync::mpsc::UnboundedSender;
use crate::peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest};
use crate::peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
};
pub async fn read_metainfo_from_peer(
addr: SocketAddr,
peer_id: Id20,
info_hash: Id20,
peer_connection_options: Option<PeerConnectionOptions>,
) -> anyhow::Result<TorrentMetaV1Info<ByteString>> {
let (result_tx, result_rx) =
tokio::sync::oneshot::channel::<anyhow::Result<TorrentMetaV1Info<ByteString>>>();
@ -34,7 +37,8 @@ pub async fn read_metainfo_from_peer(
result_tx: Mutex::new(Some(result_tx)),
locked: RwLock::new(None),
};
let connection = PeerConnection::new(addr, info_hash, peer_id, handler);
let connection =
PeerConnection::new(addr, info_hash, peer_id, handler, peer_connection_options);
let result_reader = async move { result_rx.await? };
let connection_runner = async move { connection.manage_peer(writer_rx).await };
@ -230,7 +234,7 @@ mod tests {
let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap();
let peer_id = generate_peer_id();
let info_hash = Id20::from_str("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap();
dbg!(read_metainfo_from_peer(addr, peer_id, info_hash)
dbg!(read_metainfo_from_peer(addr, peer_id, info_hash, None)
.await
.unwrap());
}

View file

@ -24,17 +24,24 @@ use crate::{
chunk_tracker::ChunkTracker,
file_ops::FileOps,
spawn_utils::{spawn, BlockingSpawner},
torrent_state::TorrentState,
torrent_state::{TorrentState, TorrentStateOptions},
tracker_comms::{TrackerError, TrackerRequest, TrackerRequestEvent, TrackerResponse},
};
#[derive(Default)]
struct TorrentManagerOptions {
force_tracker_interval: Option<Duration>,
peer_connect_timeout: Option<Duration>,
only_files: Option<Vec<usize>>,
peer_id: Option<Id20>,
overwrite: bool,
}
pub struct TorrentManagerBuilder {
info: TorrentMetaV1Info<ByteString>,
info_hash: Id20,
overwrite: bool,
output_folder: PathBuf,
only_files: Option<Vec<usize>>,
peer_id: Option<Id20>,
force_tracker_interval: Option<Duration>,
options: TorrentManagerOptions,
spawner: Option<BlockingSpawner>,
}
@ -47,27 +54,24 @@ impl TorrentManagerBuilder {
Self {
info,
info_hash,
overwrite: false,
output_folder: output_folder.as_ref().into(),
only_files: None,
peer_id: None,
force_tracker_interval: None,
spawner: None,
options: TorrentManagerOptions::default(),
}
}
pub fn only_files(&mut self, only_files: Vec<usize>) -> &mut Self {
self.only_files = Some(only_files);
self.options.only_files = Some(only_files);
self
}
pub fn overwrite(&mut self, overwrite: bool) -> &mut Self {
self.overwrite = overwrite;
self.options.overwrite = overwrite;
self
}
pub fn force_tracker_interval(&mut self, force_tracker_interval: Duration) -> &mut Self {
self.force_tracker_interval = Some(force_tracker_interval);
self.options.force_tracker_interval = Some(force_tracker_interval);
self
}
@ -77,7 +81,12 @@ impl TorrentManagerBuilder {
}
pub fn peer_id(&mut self, peer_id: Id20) -> &mut Self {
self.peer_id = Some(peer_id);
self.options.peer_id = Some(peer_id);
self
}
pub fn peer_connect_timeout(&mut self, timeout: Duration) -> &mut Self {
self.options.peer_connect_timeout = Some(timeout);
self
}
@ -86,11 +95,8 @@ impl TorrentManagerBuilder {
self.info,
self.info_hash,
self.output_folder,
self.overwrite,
self.only_files,
self.force_tracker_interval,
self.peer_id,
self.spawner.unwrap_or_else(|| BlockingSpawner::new(true)),
Some(self.options),
)
}
}
@ -136,7 +142,7 @@ struct TorrentManager {
#[allow(dead_code)]
speed_estimator: Arc<SpeedEstimator>,
trackers: Mutex<HashSet<Url>>,
force_tracker_interval: Option<Duration>,
options: TorrentManagerOptions,
}
fn make_lengths<ByteBuf: AsRef<[u8]>>(
@ -147,17 +153,14 @@ fn make_lengths<ByteBuf: AsRef<[u8]>>(
}
impl TorrentManager {
#[allow(clippy::too_many_arguments)]
fn start<P: AsRef<Path>>(
info: TorrentMetaV1Info<ByteString>,
info_hash: Id20,
out: P,
overwrite: bool,
only_files: Option<Vec<usize>>,
force_tracker_interval: Option<Duration>,
peer_id: Option<Id20>,
spawner: BlockingSpawner,
options: Option<TorrentManagerOptions>,
) -> anyhow::Result<TorrentManagerHandle> {
let options = options.unwrap_or_default();
let files = {
let mut files =
Vec::<Arc<Mutex<File>>>::with_capacity(info.iter_file_lengths().count());
@ -173,7 +176,7 @@ impl TorrentManager {
}
std::fs::create_dir_all(full_path.parent().unwrap())?;
let file = if overwrite {
let file = if options.overwrite {
OpenOptions::new()
.create(true)
.read(true)
@ -193,13 +196,13 @@ impl TorrentManager {
files
};
let peer_id = peer_id.unwrap_or_else(generate_peer_id);
let peer_id = options.peer_id.unwrap_or_else(generate_peer_id);
let lengths = make_lengths(&info).context("unable to compute Lengths from torrent")?;
debug!("computed lengths: {:?}", &lengths);
info!("Doing initial checksum validation, this might take a while...");
let initial_check_results =
FileOps::<Sha1>::new(&info, &files, &lengths).initial_check(only_files.as_deref())?;
let initial_check_results = FileOps::<Sha1>::new(&info, &files, &lengths)
.initial_check(options.only_files.as_deref())?;
info!(
"Initial check results: have {}, needed {}",
@ -213,6 +216,11 @@ impl TorrentManager {
lengths,
);
let state_options = TorrentStateOptions {
peer_connect_timeout: options.peer_connect_timeout,
..Default::default()
};
let state = TorrentState::new(
info,
info_hash,
@ -223,6 +231,7 @@ impl TorrentManager {
initial_check_results.have_bytes,
initial_check_results.needed_bytes,
spawner,
Some(state_options),
);
let estimator = Arc::new(SpeedEstimator::new(5));
@ -231,7 +240,7 @@ impl TorrentManager {
state,
speed_estimator: estimator.clone(),
trackers: Mutex::new(HashSet::new()),
force_tracker_interval,
options,
});
spawn("stats printer", {
@ -333,6 +342,7 @@ impl TorrentManager {
Ok(interval) => {
event = None;
let interval = self
.options
.force_tracker_interval
.unwrap_or_else(|| Duration::from_secs(interval));
debug!(

View file

@ -36,7 +36,9 @@ use tokio::{
use crate::{
chunk_tracker::{ChunkMarkingResult, ChunkTracker},
file_ops::FileOps,
peer_connection::{PeerConnection, PeerConnectionHandler, WriterRequest},
peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
peer_state::{InflightRequest, LivePeerState, PeerState},
spawn_utils::{spawn, BlockingSpawner},
type_aliases::{PeerHandle, BF},
@ -215,6 +217,11 @@ impl StatsSnapshot {
}
}
#[derive(Default)]
pub struct TorrentStateOptions {
pub peer_connect_timeout: Option<Duration>,
}
pub struct TorrentState {
info: TorrentMetaV1Info<ByteString>,
locked: Arc<RwLock<TorrentStateLocked>>,
@ -226,6 +233,8 @@ pub struct TorrentState {
stats: AtomicStats,
spawner: BlockingSpawner,
options: TorrentStateOptions,
peer_semaphore: Semaphore,
peer_queue_tx: UnboundedSender<(SocketAddr, UnboundedReceiver<WriterRequest>)>,
}
@ -242,7 +251,9 @@ impl TorrentState {
have_bytes: u64,
needed_bytes: u64,
spawner: BlockingSpawner,
options: Option<TorrentStateOptions>,
) -> Arc<Self> {
let options = options.unwrap_or_default();
let (peer_queue_tx, mut peer_queue_rx) = unbounded_channel();
let state = Arc::new(TorrentState {
info_hash,
@ -260,6 +271,7 @@ impl TorrentState {
needed: needed_bytes,
lengths,
spawner,
options,
peer_semaphore: Semaphore::new(128),
peer_queue_tx,
@ -285,8 +297,17 @@ impl TorrentState {
state: state.clone(),
spawner: state.spawner,
};
let peer_connection =
PeerConnection::new(addr, state.info_hash, state.peer_id, handler);
let options = PeerConnectionOptions {
connect_timeout: state.options.peer_connect_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
addr,
state.info_hash,
state.peer_id,
handler,
Some(options),
);
spawn(format!("manage_peer({})", addr), async move {
if let Err(e) = peer_connection.manage_peer(out_rx).await {
debug!("error managing peer {}: {:#}", addr, e)

View file

@ -18,6 +18,7 @@ pretty_env_logger = "0.4"
reqwest = "0.11"
regex = "1"
futures = "0.3"
parse_duration = "2"
[dev-dependencies]
futures = {version = "0.3"}

View file

@ -1,4 +1,4 @@
use std::{fs::File, io::Read, net::SocketAddr, time::Duration};
use std::{fs::File, io::Read, net::SocketAddr, str::FromStr, time::Duration};
use anyhow::Context;
use clap::Clap;
@ -7,6 +7,7 @@ use futures::StreamExt;
use librqbit::{
dht_utils::{read_metainfo_from_peer_receiver, ReadMetainfoResult},
generate_peer_id,
peer_connection::PeerConnectionOptions,
spawn_utils::{spawn, BlockingSpawner},
torrent_from_bytes,
torrent_manager::TorrentManagerBuilder,
@ -53,6 +54,16 @@ enum LogLevel {
Error,
}
#[derive(Debug, Clone, Copy)]
struct ParsedDuration(Duration);
impl FromStr for ParsedDuration {
type Err = parse_duration::parse::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
parse_duration::parse(s).map(ParsedDuration)
}
}
#[derive(Clap)]
#[clap(version = "1.0", author = "Igor Katson <igor.katson@gmail.com>")]
struct Opts {
@ -85,7 +96,7 @@ struct Opts {
#[clap(short = 'i', long = "tracker-refresh-interval")]
force_tracker_interval: Option<u64>,
/// The listen address for (debugging) HTTP API
/// The listen address for HTTP API
#[clap(long = "http-api-listen-addr", default_value = "127.0.0.1:3030")]
http_api_listen_addr: SocketAddr,
@ -97,6 +108,10 @@ struct Opts {
#[clap(long = "disable-dht")]
disable_dht: bool,
/// The connect timeout, e.g. 1s, 1.5s, 100ms etc.
#[clap(long = "peer-connect-timeout")]
peer_connect_timeout: Option<ParsedDuration>,
}
fn compute_only_files<ByteBuf: AsRef<[u8]>>(
@ -178,6 +193,11 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
Some(Dht::new().await.context("error initializing DHT")?)
};
let peer_opts = PeerConnectionOptions {
connect_timeout: opts.peer_connect_timeout.map(|p| p.0),
..Default::default()
};
if opts.torrent_path.starts_with("magnet:") {
let Magnet {
info_hash,
@ -188,7 +208,9 @@ async fn async_main(opts: Opts, spawner: BlockingSpawner) -> anyhow::Result<()>
.get_peers(info_hash)
.await;
let (info, dht_rx, initial_peers) =
match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx).await {
match read_metainfo_from_peer_receiver(peer_id, info_hash, dht_rx, Some(peer_opts))
.await
{
ReadMetainfoResult::Found { info, rx, seen } => (info, rx, seen),
ReadMetainfoResult::ChannelClosed { .. } => {
anyhow::bail!("DHT died, no way to discover torrent metainfo")
@ -293,6 +315,9 @@ async fn main_info(
if let Some(interval) = opts.force_tracker_interval {
builder.force_tracker_interval(Duration::from_secs(interval));
}
if let Some(t) = opts.peer_connect_timeout {
builder.peer_connect_timeout(t.0);
}
let http_api = librqbit::http_api::HttpApi::new();
spawn("HTTP API", {