diff --git a/Makefile b/Makefile index dbd48bb..aa5a6d3 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ devserver-postgres: echo -n '' > /tmp/rqbit-log && cargo run -- \ --log-file /tmp/rqbit-log \ --log-file-rust-log=debug,librqbit=trace \ - server start --persistence-config postgres:///rqbit /tmp/scratch/ + server start --fastresume --persistence-config postgres:///rqbit /tmp/scratch/ @PHONY: clean clean: diff --git a/crates/librqbit/src/session_persistence/postgres.rs b/crates/librqbit/src/session_persistence/postgres.rs index 27bdebf..98dd6f3 100644 --- a/crates/librqbit/src/session_persistence/postgres.rs +++ b/crates/librqbit/src/session_persistence/postgres.rs @@ -8,7 +8,7 @@ use anyhow::Context; use futures::{stream::BoxStream, StreamExt}; use librqbit_core::Id20; use sqlx::{Pool, Postgres}; -use tracing::debug; +use tracing::error_span; use super::{SerializedTorrent, SessionPersistenceStore}; @@ -55,12 +55,20 @@ impl PostgresSessionStorage { .connect(connection_string) .await?; - sqlx::query("CREATE SEQUENCE IF NOT EXISTS torrents_id AS integer;") - .execute(&pool) - .await - .context("error executing CREATE SEQUENCE")?; + macro_rules! exec { + ($q:expr) => { + sqlx::query($q) + .execute(&pool) + .await + .context($q) + .context("error running query")?; + }; + } - let create_q = "CREATE TABLE IF NOT EXISTS torrents ( + exec!("CREATE SEQUENCE IF NOT EXISTS torrents_id AS integer;"); + + exec!( + "CREATE TABLE IF NOT EXISTS torrents ( id INTEGER PRIMARY KEY DEFAULT nextval('torrents_id'), info_hash BYTEA NOT NULL, torrent_bytes BYTEA NOT NULL, @@ -68,11 +76,10 @@ impl PostgresSessionStorage { output_folder TEXT NOT NULL, only_files INTEGER[], is_paused BOOLEAN NOT NULL - )"; - sqlx::query(create_q) - .execute(&pool) - .await - .context("error executing CREATE TABLE")?; + )" + ); + + exec!("ALTER TABLE torrents ADD COLUMN IF NOT EXISTS have_bitfield BYTEA"); Ok(Self { pool }) } @@ -172,18 +179,131 @@ impl SessionPersistenceStore for PostgresSessionStorage { } } +struct PgBitfield { + torrent_id: TorrentIdOrHash, + inmem: BF, + pool: Pool, +} + +impl BitV for PgBitfield { + fn as_slice(&self) -> &bitvec::prelude::BitSlice { + self.inmem.as_bitslice() + } + + fn as_slice_mut(&mut self) -> &mut bitvec::prelude::BitSlice { + self.inmem.as_mut_bitslice() + } + + fn into_dyn(self) -> Box { + Box::new(self) + } + + fn as_bytes(&self) -> &[u8] { + self.inmem.as_raw_slice() + } + + fn flush(&mut self) -> anyhow::Result<()> { + // TODO: make flush async, and don't spawn this, to avoid allocations and capture the result. + crate::spawn_utils::spawn( + "pg", + error_span!("pg_update_bitfield", id=?self.torrent_id), + { + let hb = self.as_bytes().to_owned(); + let pool = self.pool.clone(); + let torrent_id = self.torrent_id; + + macro_rules! exec { + ($q:expr, $bf:expr, $id:expr) => { + sqlx::query($q) + .bind($bf) + .bind($id) + .execute(&pool) + .await + .context($q) + .context("error executing query") + }; + } + + async move { + match torrent_id { + TorrentIdOrHash::Id(id) => { + let id: i32 = id.try_into()?; + exec!( + "UPDATE torrents SET have_bitfield = $1 WHERE id = $2", + &hb, + id + )?; + } + TorrentIdOrHash::Hash(h) => { + exec!( + "UPDATE torrents SET have_bitfield = $1 WHERE info_hash = $2", + &hb, + &h.0[..] + )?; + } + }; + Ok(()) + } + }, + ); + Ok(()) + } +} + #[async_trait::async_trait] impl BitVFactory for PostgresSessionStorage { - async fn load(&self, _: TorrentIdOrHash) -> anyhow::Result>> { - debug!("BitVFactory not implemented for PostgresSessionStorage: fastresume not available"); - Ok(None) + async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result>> { + #[derive(sqlx::FromRow)] + struct HaveBitfield { + have_bitfield: Option>, + } + + macro_rules! exec { + ($q:expr, $v:expr) => { + sqlx::query_as($q) + .bind($v) + .fetch_one(&self.pool) + .await + .context($q) + .context("error executing query")? + }; + } + + let hb: HaveBitfield = match id { + TorrentIdOrHash::Id(id) => { + let id: i32 = id.try_into()?; + exec!("SELECT have_bitfield FROM torrents WHERE id = $1", id) + } + TorrentIdOrHash::Hash(h) => { + exec!( + "SELECT have_bitfield FROM torrents WHERE info_hash = $1", + &h.0[..] + ) + } + }; + + let hb = hb.have_bitfield; + Ok(hb.map(|b| { + PgBitfield { + torrent_id: id, + inmem: BF::from_boxed_slice(b.into_boxed_slice()), + pool: self.pool.clone(), + } + .into_dyn() + })) } async fn store_initial_check( &self, - _: TorrentIdOrHash, + id: TorrentIdOrHash, b: BF, ) -> anyhow::Result> { - Ok(b.into_dyn()) + let mut bf = PgBitfield { + torrent_id: id, + inmem: b, + pool: self.pool.clone(), + }; + bf.flush()?; + Ok(bf.into_dyn()) } }