postgres session storage backend

This commit is contained in:
Igor Katson 2024-08-15 14:18:55 +01:00
parent f22814c77b
commit 2871c358e3
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
8 changed files with 99 additions and 48 deletions

View file

@ -1,5 +1,5 @@
pub mod json;
// #[cfg(feature = "postgres")]
#[cfg(feature = "postgres")]
pub mod postgres;
use std::{collections::HashSet, path::PathBuf};

View file

@ -1,4 +1,4 @@
use std::{path::PathBuf, str::FromStr};
use std::path::PathBuf;
use crate::{session::TorrentId, torrent_state::ManagedTorrentHandle};
use anyhow::Context;
@ -16,7 +16,7 @@ pub struct PostgresSessionStorage {
#[derive(sqlx::FromRow)]
struct TorrentsTableRecord {
id: i32,
info_hash: String,
info_hash: Vec<u8>,
torrent_bytes: Vec<u8>,
trackers: Vec<String>,
output_folder: String,
@ -29,7 +29,7 @@ impl TorrentsTableRecord {
Some((
self.id as TorrentId,
SerializedTorrent {
info_hash: Id20::from_str(&self.info_hash).ok()?,
info_hash: Id20::from_bytes(&self.info_hash).ok()?,
torrent_bytes: self.torrent_bytes.into(),
trackers: self.trackers.into_iter().collect(),
output_folder: PathBuf::from(self.output_folder),
@ -51,21 +51,24 @@ impl PostgresSessionStorage {
.connect(connection_string)
.await?;
sqlx::query(
"
CREATE SEQUENCE IF NOT EXISTS torrents_id;
sqlx::query("CREATE SEQUENCE IF NOT EXISTS torrents_id AS integer;")
.execute(&pool)
.await
.context("error executing CREATE SEQUENCE")?;
CREATE TABLE IF NOT EXISTS torrents (
id INTEGER PRIMARY KEY DEFAULT nextval('torrents_id'),
info_hash BYTEA NOT NULL, -- Assuming the info_hash is unique and fits in 64 characters
torrent_bytes BYTEA NOT NULL,
trackers TEXT[] NOT NULL,
output_folder TEXT NOT NULL,
only_files INTEGER[], -- Nullable array of integers (usize in Rust is equivalent to integer)
is_paused BOOLEAN NOT NULL -- Boolean value to indicate if the torrent is paused
);
"
).execute(&pool).await?;
let create_q = "CREATE TABLE IF NOT EXISTS torrents (
id INTEGER PRIMARY KEY DEFAULT nextval('torrents_id'),
info_hash BYTEA NOT NULL,
torrent_bytes BYTEA NOT NULL,
trackers TEXT[] NOT NULL,
output_folder TEXT NOT NULL,
only_files INTEGER[],
is_paused BOOLEAN NOT NULL
)";
sqlx::query(create_q)
.execute(&pool)
.await
.context("error executing CREATE TABLE")?;
Ok(Self { pool })
}
@ -74,22 +77,21 @@ impl PostgresSessionStorage {
#[async_trait::async_trait]
impl SessionPersistenceStore for PostgresSessionStorage {
async fn next_id(&self) -> anyhow::Result<TorrentId> {
let (id,): (i32,) = sqlx::query_as("SELECT nextval('torrents_id')")
let (id,): (i32,) = sqlx::query_as("SELECT nextval('torrents_id')::int")
.fetch_one(&self.pool)
.await?;
.await
.context("error executing SELECT nextval")?;
Ok(id as usize)
}
async fn store(&self, id: TorrentId, torrent: &ManagedTorrentHandle) -> anyhow::Result<()> {
let torrent_bytes: &[u8] = &torrent.info().torrent_bytes;
let q = "
INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused)
VALUES(?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO NOTHING
";
let q = "INSERT INTO torrents (id, info_hash, torrent_bytes, trackers, output_folder, only_files, is_paused)
VALUES($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT(id) DO NOTHING";
sqlx::query(q)
.bind::<i32>(id.try_into()?)
.bind(torrent.info_hash().as_string())
.bind(&torrent.info_hash().0[..])
.bind(torrent_bytes)
.bind(torrent.info().trackers.iter().cloned().collect::<Vec<_>>())
.bind(
@ -108,15 +110,17 @@ impl SessionPersistenceStore for PostgresSessionStorage {
}))
.bind(torrent.is_paused())
.execute(&self.pool)
.await?;
.await
.context("error executing INSERT INTO torrents")?;
Ok(())
}
async fn delete(&self, id: TorrentId) -> anyhow::Result<()> {
sqlx::query("DELETE * FROM torrents WHERE id = ?")
sqlx::query("DELETE FROM torrents WHERE id = $1")
.bind::<i32>(id.try_into()?)
.execute(&self.pool)
.await?;
.await
.context("error executing DELETE FROM torrents")?;
Ok(())
}
@ -124,7 +128,8 @@ impl SessionPersistenceStore for PostgresSessionStorage {
let row = sqlx::query_as::<_, TorrentsTableRecord>("SELECT * FROM torrents WHERE id = ?")
.bind::<i32>(id.try_into()?)
.fetch_one(&self.pool)
.await?;
.await
.context("error executing SELECT * FROM torrents")?;
row.into_serialized_torrent()
.context("bug")
.map(|(_, st)| st)
@ -135,7 +140,7 @@ impl SessionPersistenceStore for PostgresSessionStorage {
id: TorrentId,
torrent: &ManagedTorrentHandle,
) -> anyhow::Result<()> {
sqlx::query("UPDATE torrents SET only_files = ?, paused = ? WHERE id = ?")
sqlx::query("UPDATE torrents SET only_files = $1, is_paused = $2 WHERE id = $3")
.bind(torrent.only_files().map(|v| {
v.into_iter()
.filter_map(|f| f.try_into().ok())
@ -144,21 +149,21 @@ impl SessionPersistenceStore for PostgresSessionStorage {
.bind(torrent.is_paused())
.bind::<i32>(id.try_into()?)
.execute(&self.pool)
.await?;
.await
.context("error executing UPDATE torrents")?;
Ok(())
}
async fn stream_all(
&self,
) -> anyhow::Result<BoxStream<'_, anyhow::Result<(TorrentId, SerializedTorrent)>>> {
let res = futures::stream::iter(
sqlx::query_as::<_, TorrentsTableRecord>("SELECT * FROM torrents")
.fetch_all(&self.pool)
.await?
.into_iter()
.filter_map(TorrentsTableRecord::into_serialized_torrent)
.map(Ok),
);
Ok(res.boxed())
let torrents = sqlx::query_as::<_, TorrentsTableRecord>("SELECT * FROM torrents")
.fetch_all(&self.pool)
.await
.context("error executing SELECT * FROM torrents")?
.into_iter()
.filter_map(TorrentsTableRecord::into_serialized_torrent)
.map(Ok);
Ok(futures::stream::iter(torrents).boxed())
}
}