diff --git a/Makefile b/Makefile index 53bd9b1..800c7d9 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,13 @@ devserver: --log-file-rust-log=debug,librqbit=trace \ server start /tmp/scratch/ +@PHONY: devserver +devserver-postgres: + echo -n '' > /tmp/rqbit-log && cargo run -- \ + --log-file /tmp/rqbit-log \ + --log-file-rust-log=debug,librqbit=trace \ + server start --persistence-config postgres:///rqbit /tmp/scratch/ + @PHONY: clean clean: rm -rf target diff --git a/crates/librqbit/Cargo.toml b/crates/librqbit/Cargo.toml index dacb034..5d7fd62 100644 --- a/crates/librqbit/Cargo.toml +++ b/crates/librqbit/Cargo.toml @@ -19,9 +19,13 @@ default-tls = ["reqwest/default-tls"] rust-tls = ["reqwest/rustls-tls"] storage_middleware = ["lru"] storage_examples = [] +postgres = ["sqlx"] [dependencies] -sqlx = { version = "0.7", features = ["runtime-tokio", "postgres"] } +sqlx = { version = "0.7", features = [ + "runtime-tokio", + "postgres", +], optional = true } bencode = { path = "../bencode", default-features = false, package = "librqbit-bencode", version = "2.2.3" } tracker_comms = { path = "../tracker_comms", default-features = false, package = "librqbit-tracker-comms", version = "1.0.3" } buffers = { path = "../buffers", package = "librqbit-buffers", version = "3.0.1" } diff --git a/crates/librqbit/src/session.rs b/crates/librqbit/src/session.rs index 70afda0..848e4f4 100644 --- a/crates/librqbit/src/session.rs +++ b/crates/librqbit/src/session.rs @@ -332,6 +332,8 @@ impl<'a> AddTorrent<'a> { pub enum SessionPersistenceConfig { /// The filename for persistence. By default uses an OS-specific folder. Json { folder: Option }, + #[cfg(feature = "postgres")] + Postgres { connection_string: String }, } impl SessionPersistenceConfig { @@ -494,6 +496,12 @@ impl Session { .await .context("error initializing JsonSessionPersistenceStore")?, ))) + }, + #[cfg(feature = "postgres")] + Some(SessionPersistenceConfig::Postgres { connection_string }) => { + use crate::session_persistence::postgres::PostgresSessionStorage; + let p = PostgresSessionStorage::new(connection_string).await?; + Ok(Some(Box::new(p))) } None => Ok(None), } diff --git a/crates/librqbit/src/session_persistence/mod.rs b/crates/librqbit/src/session_persistence/mod.rs index f710d9a..3de287d 100644 --- a/crates/librqbit/src/session_persistence/mod.rs +++ b/crates/librqbit/src/session_persistence/mod.rs @@ -1,5 +1,5 @@ pub mod json; -// #[cfg(feature = "postgres")] +#[cfg(feature = "postgres")] pub mod postgres; use std::{collections::HashSet, path::PathBuf}; diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index 0485e8f..bd1d45b 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, str::FromStr}; +use std::path::PathBuf; use crate::{session::TorrentId, torrent_state::ManagedTorrentHandle}; use anyhow::Context; @@ -16,7 +16,7 @@ pub struct PostgresSessionStorage { #[derive(sqlx::FromRow)] struct TorrentsTableRecord { id: i32, - info_hash: String, + info_hash: Vec, torrent_bytes: Vec, trackers: Vec, output_folder: String, @@ -29,7 +29,7 @@ impl TorrentsTableRecord { Some(( self.id as TorrentId, SerializedTorrent { - info_hash: Id20::from_str(&self.info_hash).ok()?, + info_hash: Id20::from_bytes(&self.info_hash).ok()?, torrent_bytes: self.torrent_bytes.into(), trackers: self.trackers.into_iter().collect(), output_folder: PathBuf::from(self.output_folder), @@ -51,21 +51,24 @@ impl PostgresSessionStorage { .connect(connection_string) .await?; - sqlx::query( - " - CREATE SEQUENCE IF NOT EXISTS torrents_id; + sqlx::query("CREATE SEQUENCE IF NOT EXISTS torrents_id AS integer;") + .execute(&pool) + .await + .context("error executing CREATE SEQUENCE")?; - CREATE TABLE IF NOT EXISTS torrents ( - id INTEGER PRIMARY KEY DEFAULT nextval('torrents_id'), - info_hash BYTEA NOT NULL, -- Assuming the info_hash is unique and fits in 64 characters - torrent_bytes BYTEA NOT NULL, - trackers TEXT[] NOT NULL, - output_folder TEXT NOT NULL, - only_files INTEGER[], -- Nullable array of integers (usize in Rust is equivalent to integer) - is_paused BOOLEAN NOT NULL -- Boolean value to indicate if the torrent is paused - ); - " - ).execute(&pool).await?; + let create_q = "CREATE TABLE IF NOT EXISTS torrents ( + id INTEGER PRIMARY KEY DEFAULT nextval('torrents_id'), + info_hash BYTEA NOT NULL, + torrent_bytes BYTEA NOT NULL, + trackers TEXT[] NOT NULL, + output_folder TEXT NOT NULL, + only_files INTEGER[], + is_paused BOOLEAN NOT NULL + )"; + sqlx::query(create_q) + .execute(&pool) + .await + .context("error executing CREATE TABLE")?; Ok(Self { pool }) } @@ -74,22 +77,21 @@ impl PostgresSessionStorage { #[async_trait::async_trait] impl SessionPersistenceStore for PostgresSessionStorage { async fn next_id(&self) -> anyhow::Result { - let (id,): (i32,) = sqlx::query_as("SELECT nextval('torrents_id')") + let (id,): (i32,) = sqlx::query_as("SELECT nextval('torrents_id')::int") .fetch_one(&self.pool) - .await?; + .await + .context("error executing SELECT nextval")?; Ok(id as usize) } async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> { let torrent_bytes: &[u8] = &torrent.info().torrent_bytes; - let q = " - INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused) - VALUES(?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO NOTHING - "; + let q = "INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused) + VALUES($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT(id) DO NOTHING"; sqlx::query(q) .bind::(id.try_into()?) - .bind(torrent.info_hash().as_string()) + .bind(&torrent.info_hash().0[..]) .bind(torrent_bytes) .bind(torrent.info().trackers.iter().cloned().collect::>()) .bind( @@ -108,15 +110,17 @@ impl SessionPersistenceStore for PostgresSessionStorage { })) .bind(torrent.is_paused()) .execute(&self.pool) - .await?; + .await + .context("error executing INSERT INTO torrents")?; Ok(()) } async fn delete(&self, id: TorrentId) -> anyhow::Result<()> { - sqlx::query("DELETE * FROM torrents WHERE id = ?") + sqlx::query("DELETE FROM torrents WHERE id = $1") .bind::(id.try_into()?) .execute(&self.pool) - .await?; + .await + .context("error executing DELETE FROM torrents")?; Ok(()) } @@ -124,7 +128,8 @@ impl SessionPersistenceStore for PostgresSessionStorage { let row = sqlx::query_as::<_, TorrentsTableRecord>("SELECT * FROM torrents WHERE id = ?") .bind::(id.try_into()?) .fetch_one(&self.pool) - .await?; + .await + .context("error executing SELECT * FROM torrents")?; row.into_serialized_torrent() .context("bug") .map(|(_, st)| st) @@ -135,7 +140,7 @@ impl SessionPersistenceStore for PostgresSessionStorage { id: TorrentId, torrent: &ManagedTorrentHandle, ) -> anyhow::Result<()> { - sqlx::query("UPDATE torrents SET only_files = ?, paused = ? WHERE id = ?") + sqlx::query("UPDATE torrents SET only_files = $1, is_paused = $2 WHERE id = $3") .bind(torrent.only_files().map(|v| { v.into_iter() .filter_map(|f| f.try_into().ok()) @@ -144,21 +149,21 @@ impl SessionPersistenceStore for PostgresSessionStorage { .bind(torrent.is_paused()) .bind::(id.try_into()?) .execute(&self.pool) - .await?; + .await + .context("error executing UPDATE torrents")?; Ok(()) } async fn stream_all( &self, ) -> anyhow::Result>> { - let res = futures::stream::iter( - sqlx::query_as::<_, TorrentsTableRecord>("SELECT * FROM torrents") - .fetch_all(&self.pool) - .await? - .into_iter() - .filter_map(TorrentsTableRecord::into_serialized_torrent) - .map(Ok), - ); - Ok(res.boxed()) + let torrents = sqlx::query_as::<_, TorrentsTableRecord>("SELECT * FROM torrents") + .fetch_all(&self.pool) + .await + .context("error executing SELECT * FROM torrents")? + .into_iter() + .filter_map(TorrentsTableRecord::into_serialized_torrent) + .map(Ok); + Ok(futures::stream::iter(torrents).boxed()) } } diff --git a/crates/librqbit_core/src/hash_id.rs b/crates/librqbit_core/src/hash_id.rs index 7d9c4ae..08ae619 100644 --- a/crates/librqbit_core/src/hash_id.rs +++ b/crates/librqbit_core/src/hash_id.rs @@ -14,6 +14,15 @@ impl Id { hex::encode(self.0) } + pub fn from_bytes(b: &[u8]) -> anyhow::Result { + let mut v = [0u8; N]; + if b.len() != N { + anyhow::bail!("buffer length must be {}, but it's {}", N, b.len()); + } + v.copy_from_slice(b); + Ok(Id(v)) + } + pub fn distance(&self, other: &Id) -> Id { let mut xor = [0u8; N]; for (idx, (s, o)) in self diff --git a/crates/rqbit/Cargo.toml b/crates/rqbit/Cargo.toml index b880994..8aec1d7 100644 --- a/crates/rqbit/Cargo.toml +++ b/crates/rqbit/Cargo.toml @@ -12,7 +12,7 @@ readme = "README.md" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["default-tls", "webui"] +default = ["default-tls", "webui", "postgres"] openssl-vendored = ["openssl/vendored"] tokio-console = ["console-subscriber", "tokio/tracing"] webui = ["librqbit/webui"] @@ -20,6 +20,7 @@ timed_existence = ["librqbit/timed_existence"] default-tls = ["librqbit/default-tls"] rust-tls = ["librqbit/rust-tls"] debug_slow_disk = ["librqbit/storage_middleware"] +postgres = ["librqbit/postgres"] [dependencies] librqbit = { path = "../librqbit", default-features = false, version = "6.0.0" } diff --git a/crates/rqbit/src/main.rs b/crates/rqbit/src/main.rs index e5799b3..19fedb7 100644 --- a/crates/rqbit/src/main.rs +++ b/crates/rqbit/src/main.rs @@ -137,8 +137,8 @@ struct ServerStartOptions { disable_persistence: bool, /// The folder to store session data in. By default uses OS specific folder. - #[arg(long = "persistence-folder")] - persistence_folder: Option, + #[arg(long = "persistence-config")] + persistence_config: Option, } #[derive(Parser)] @@ -393,9 +393,26 @@ async fn async_main(opts: Opts) -> anyhow::Result<()> { SubCommand::Server(server_opts) => match &server_opts.subcommand { ServerSubcommand::Start(start_opts) => { if !start_opts.disable_persistence { - sopts.persistence = Some(SessionPersistenceConfig::Json { - folder: start_opts.persistence_folder.clone().map(PathBuf::from), - }) + if let Some(p) = start_opts.persistence_config.as_ref() { + if p.starts_with("postgres://") { + #[cfg(feature = "postgres")] + { + sopts.persistence = Some(SessionPersistenceConfig::Postgres { + connection_string: p.clone(), + }) + } + #[cfg(not(feature = "postgres"))] + { + anyhow::bail!("rqbit was compiled without postgres support") + } + } else { + sopts.persistence = Some(SessionPersistenceConfig::Json { + folder: Some(p.into()), + }) + } + } else { + sopts.persistence = Some(SessionPersistenceConfig::Json { folder: None }) + } } let session =