Implement fastresume for postgres

This commit is contained in:
Igor Katson 2024-08-20 22:57:08 +01:00
parent cd4d812aca
commit 627c0ac28f
No known key found for this signature in database
GPG key ID: B4EC22B66D61A3F5
2 changed files with 137 additions and 17 deletions

View file

@ -27,7 +27,7 @@ devserver-postgres:
echo -n '' > /tmp/rqbit-log && cargo run -- \ echo -n '' > /tmp/rqbit-log && cargo run -- \
--log-file /tmp/rqbit-log \ --log-file /tmp/rqbit-log \
--log-file-rust-log=debug,librqbit=trace \ --log-file-rust-log=debug,librqbit=trace \
server start --persistence-config postgres:///rqbit /tmp/scratch/ server start --fastresume --persistence-config postgres:///rqbit /tmp/scratch/
@PHONY: clean @PHONY: clean
clean: clean:

View file

@ -8,7 +8,7 @@ use anyhow::Context;
use futures::{stream::BoxStream, StreamExt}; use futures::{stream::BoxStream, StreamExt};
use librqbit_core::Id20; use librqbit_core::Id20;
use sqlx::{Pool, Postgres}; use sqlx::{Pool, Postgres};
use tracing::debug; use tracing::error_span;
use super::{SerializedTorrent, SessionPersistenceStore}; use super::{SerializedTorrent, SessionPersistenceStore};
@ -55,12 +55,20 @@ impl PostgresSessionStorage {
.connect(connection_string) .connect(connection_string)
.await?; .await?;
sqlx::query("CREATE SEQUENCE IF NOT EXISTS torrents_id AS integer;") macro_rules! exec {
.execute(&pool) ($q:expr) => {
.await sqlx::query($q)
.context("error executing CREATE SEQUENCE")?; .execute(&pool)
.await
.context($q)
.context("error running query")?;
};
}
let create_q = "CREATE TABLE IF NOT EXISTS torrents ( exec!("CREATE SEQUENCE IF NOT EXISTS torrents_id AS integer;");
exec!(
"CREATE TABLE IF NOT EXISTS torrents (
id INTEGER PRIMARY KEY DEFAULT nextval('torrents_id'), id INTEGER PRIMARY KEY DEFAULT nextval('torrents_id'),
info_hash BYTEA NOT NULL, info_hash BYTEA NOT NULL,
torrent_bytes BYTEA NOT NULL, torrent_bytes BYTEA NOT NULL,
@ -68,11 +76,10 @@ impl PostgresSessionStorage {
output_folder TEXT NOT NULL, output_folder TEXT NOT NULL,
only_files INTEGER[], only_files INTEGER[],
is_paused BOOLEAN NOT NULL is_paused BOOLEAN NOT NULL
)"; )"
sqlx::query(create_q) );
.execute(&pool)
.await exec!("ALTER TABLE torrents ADD COLUMN IF NOT EXISTS have_bitfield BYTEA");
.context("error executing CREATE TABLE")?;
Ok(Self { pool }) Ok(Self { pool })
} }
@ -172,18 +179,131 @@ impl SessionPersistenceStore for PostgresSessionStorage {
} }
} }
struct PgBitfield {
torrent_id: TorrentIdOrHash,
inmem: BF,
pool: Pool<Postgres>,
}
impl BitV for PgBitfield {
fn as_slice(&self) -> &bitvec::prelude::BitSlice<u8, bitvec::prelude::Msb0> {
self.inmem.as_bitslice()
}
fn as_slice_mut(&mut self) -> &mut bitvec::prelude::BitSlice<u8, bitvec::prelude::Msb0> {
self.inmem.as_mut_bitslice()
}
fn into_dyn(self) -> Box<dyn BitV> {
Box::new(self)
}
fn as_bytes(&self) -> &[u8] {
self.inmem.as_raw_slice()
}
fn flush(&mut self) -> anyhow::Result<()> {
// TODO: make flush async, and don't spawn this, to avoid allocations and capture the result.
crate::spawn_utils::spawn(
"pg",
error_span!("pg_update_bitfield", id=?self.torrent_id),
{
let hb = self.as_bytes().to_owned();
let pool = self.pool.clone();
let torrent_id = self.torrent_id;
macro_rules! exec {
($q:expr, $bf:expr, $id:expr) => {
sqlx::query($q)
.bind($bf)
.bind($id)
.execute(&pool)
.await
.context($q)
.context("error executing query")
};
}
async move {
match torrent_id {
TorrentIdOrHash::Id(id) => {
let id: i32 = id.try_into()?;
exec!(
"UPDATE torrents SET have_bitfield = $1 WHERE id = $2",
&hb,
id
)?;
}
TorrentIdOrHash::Hash(h) => {
exec!(
"UPDATE torrents SET have_bitfield = $1 WHERE info_hash = $2",
&hb,
&h.0[..]
)?;
}
};
Ok(())
}
},
);
Ok(())
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl BitVFactory for PostgresSessionStorage { impl BitVFactory for PostgresSessionStorage {
async fn load(&self, _: TorrentIdOrHash) -> anyhow::Result<Option<Box<dyn BitV>>> { async fn load(&self, id: TorrentIdOrHash) -> anyhow::Result<Option<Box<dyn BitV>>> {
debug!("BitVFactory not implemented for PostgresSessionStorage: fastresume not available"); #[derive(sqlx::FromRow)]
Ok(None) struct HaveBitfield {
have_bitfield: Option<Vec<u8>>,
}
macro_rules! exec {
($q:expr, $v:expr) => {
sqlx::query_as($q)
.bind($v)
.fetch_one(&self.pool)
.await
.context($q)
.context("error executing query")?
};
}
let hb: HaveBitfield = match id {
TorrentIdOrHash::Id(id) => {
let id: i32 = id.try_into()?;
exec!("SELECT have_bitfield FROM torrents WHERE id = $1", id)
}
TorrentIdOrHash::Hash(h) => {
exec!(
"SELECT have_bitfield FROM torrents WHERE info_hash = $1",
&h.0[..]
)
}
};
let hb = hb.have_bitfield;
Ok(hb.map(|b| {
PgBitfield {
torrent_id: id,
inmem: BF::from_boxed_slice(b.into_boxed_slice()),
pool: self.pool.clone(),
}
.into_dyn()
}))
} }
async fn store_initial_check( async fn store_initial_check(
&self, &self,
_: TorrentIdOrHash, id: TorrentIdOrHash,
b: BF, b: BF,
) -> anyhow::Result<Box<dyn BitV>> { ) -> anyhow::Result<Box<dyn BitV>> {
Ok(b.into_dyn()) let mut bf = PgBitfield {
torrent_id: id,
inmem: b,
pool: self.pool.clone(),
};
bf.flush()?;
Ok(bf.into_dyn())
} }
} }