summaryrefslogtreecommitdiffstats
path: root/crates/atuin-server-postgres
diff options
context:
space:
mode:
Diffstat (limited to 'crates/atuin-server-postgres')
-rw-r--r--crates/atuin-server-postgres/Cargo.toml23
-rw-r--r--crates/atuin-server-postgres/build.rs5
-rw-r--r--crates/atuin-server-postgres/migrations/20210425153745_create_history.sql11
-rw-r--r--crates/atuin-server-postgres/migrations/20210425153757_create_users.sql10
-rw-r--r--crates/atuin-server-postgres/migrations/20210425153800_create_sessions.sql6
-rw-r--r--crates/atuin-server-postgres/migrations/20220419082412_add_count_trigger.sql51
-rw-r--r--crates/atuin-server-postgres/migrations/20220421073605_fix_count_trigger_delete.sql35
-rw-r--r--crates/atuin-server-postgres/migrations/20220421174016_larger-commands.sql3
-rw-r--r--crates/atuin-server-postgres/migrations/20220426172813_user-created-at.sql1
-rw-r--r--crates/atuin-server-postgres/migrations/20220505082442_create-events.sql14
-rw-r--r--crates/atuin-server-postgres/migrations/20220610074049_history-length.sql2
-rw-r--r--crates/atuin-server-postgres/migrations/20230315220537_drop-events.sql2
-rw-r--r--crates/atuin-server-postgres/migrations/20230315224203_create-deleted.sql5
-rw-r--r--crates/atuin-server-postgres/migrations/20230515221038_trigger-delete-only.sql30
-rw-r--r--crates/atuin-server-postgres/migrations/20230623070418_records.sql15
-rw-r--r--crates/atuin-server-postgres/migrations/20231202170508_create-store.sql15
-rw-r--r--crates/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql2
-rw-r--r--crates/atuin-server-postgres/migrations/20240108124837_drop-some-defaults.sql4
-rw-r--r--crates/atuin-server-postgres/src/lib.rs538
-rw-r--r--crates/atuin-server-postgres/src/wrappers.rs77
20 files changed, 849 insertions, 0 deletions
diff --git a/crates/atuin-server-postgres/Cargo.toml b/crates/atuin-server-postgres/Cargo.toml
new file mode 100644
index 00000000..647d934a
--- /dev/null
+++ b/crates/atuin-server-postgres/Cargo.toml
@@ -0,0 +1,23 @@
+[package]
+name = "atuin-server-postgres"
+edition = "2021"
+description = "server postgres database library for atuin"
+
+version = { workspace = true }
+authors = { workspace = true }
+license = { workspace = true }
+homepage = { workspace = true }
+repository = { workspace = true }
+
+[dependencies]
+atuin-common = { path = "../atuin-common", version = "18.2.0" }
+atuin-server-database = { path = "../atuin-server-database", version = "18.2.0" }
+
+eyre = { workspace = true }
+tracing = "0.1"
+time = { workspace = true }
+serde = { workspace = true }
+sqlx = { workspace = true }
+async-trait = { workspace = true }
+uuid = { workspace = true }
+futures-util = "0.3"
diff --git a/crates/atuin-server-postgres/build.rs b/crates/atuin-server-postgres/build.rs
new file mode 100644
index 00000000..d5068697
--- /dev/null
+++ b/crates/atuin-server-postgres/build.rs
@@ -0,0 +1,5 @@
+// generated by `sqlx migrate build-script`
+fn main() {
+ // trigger recompilation when a new migration is added
+ println!("cargo:rerun-if-changed=migrations");
+}
diff --git a/crates/atuin-server-postgres/migrations/20210425153745_create_history.sql b/crates/atuin-server-postgres/migrations/20210425153745_create_history.sql
new file mode 100644
index 00000000..2c2d17b0
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20210425153745_create_history.sql
@@ -0,0 +1,11 @@
+create table history (
+ id bigserial primary key,
+ client_id text not null unique, -- the client-generated ID
+ user_id bigserial not null, -- allow multiple users
+ hostname text not null, -- a unique identifier from the client (can be hashed, random, whatever)
+ timestamp timestamp not null, -- one of the few non-encrypted metadatas
+
+ data varchar(8192) not null, -- store the actual history data, encrypted. I don't wanna know!
+
+ created_at timestamp not null default current_timestamp
+);
diff --git a/crates/atuin-server-postgres/migrations/20210425153757_create_users.sql b/crates/atuin-server-postgres/migrations/20210425153757_create_users.sql
new file mode 100644
index 00000000..a25dcced
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20210425153757_create_users.sql
@@ -0,0 +1,10 @@
+create table users (
+ id bigserial primary key, -- also store our own ID
+ username varchar(32) not null unique, -- being able to contact users is useful
+ email varchar(128) not null unique, -- being able to contact users is useful
+ password varchar(128) not null unique
+);
+
+-- the prior index is case sensitive :(
+CREATE UNIQUE INDEX email_unique_idx on users (LOWER(email));
+CREATE UNIQUE INDEX username_unique_idx on users (LOWER(username));
diff --git a/crates/atuin-server-postgres/migrations/20210425153800_create_sessions.sql b/crates/atuin-server-postgres/migrations/20210425153800_create_sessions.sql
new file mode 100644
index 00000000..c2fb6559
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20210425153800_create_sessions.sql
@@ -0,0 +1,6 @@
+-- Add migration script here
+create table sessions (
+ id bigserial primary key,
+ user_id bigserial,
+ token varchar(128) unique not null
+);
diff --git a/crates/atuin-server-postgres/migrations/20220419082412_add_count_trigger.sql b/crates/atuin-server-postgres/migrations/20220419082412_add_count_trigger.sql
new file mode 100644
index 00000000..dd1afa88
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20220419082412_add_count_trigger.sql
@@ -0,0 +1,51 @@
+-- Prior to this, the count endpoint was super naive and just ran COUNT(1).
+-- This is slow asf. Now that we have an amount of actual traffic,
+-- stop doing that!
+-- This basically maintains a count, so we can read ONE row, instead of ALL the
+-- rows. Much better.
+-- Future optimisation could use some sort of cache so we don't even need to hit
+-- postgres at all.
+
+create table total_history_count_user(
+ id bigserial primary key,
+ user_id bigserial,
+ total integer -- try and avoid using keywords - hence total, not count
+);
+
+create or replace function user_history_count()
+returns trigger as
+$func$
+begin
+ if (TG_OP='INSERT') then
+ update total_history_count_user set total = total + 1 where user_id = new.user_id;
+
+ if not found then
+ insert into total_history_count_user(user_id, total)
+ values (
+ new.user_id,
+ (select count(1) from history where user_id = new.user_id)
+ );
+ end if;
+
+ elsif (TG_OP='DELETE') then
+ update total_history_count_user set total = total - 1 where user_id = new.user_id;
+
+ if not found then
+ insert into total_history_count_user(user_id, total)
+ values (
+ new.user_id,
+ (select count(1) from history where user_id = new.user_id)
+ );
+ end if;
+ end if;
+
+ return NEW; -- this is actually ignored for an after trigger, but oh well
+end;
+$func$
+language plpgsql volatile -- pldfplplpflh
+cost 100; -- default value
+
+create trigger tg_user_history_count
+ after insert or delete on history
+ for each row
+ execute procedure user_history_count();
diff --git a/crates/atuin-server-postgres/migrations/20220421073605_fix_count_trigger_delete.sql b/crates/atuin-server-postgres/migrations/20220421073605_fix_count_trigger_delete.sql
new file mode 100644
index 00000000..6198f300
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20220421073605_fix_count_trigger_delete.sql
@@ -0,0 +1,35 @@
+-- the old version of this function used NEW in the delete part when it should
+-- use OLD
+
+create or replace function user_history_count()
+returns trigger as
+$func$
+begin
+ if (TG_OP='INSERT') then
+ update total_history_count_user set total = total + 1 where user_id = new.user_id;
+
+ if not found then
+ insert into total_history_count_user(user_id, total)
+ values (
+ new.user_id,
+ (select count(1) from history where user_id = new.user_id)
+ );
+ end if;
+
+ elsif (TG_OP='DELETE') then
+ update total_history_count_user set total = total - 1 where user_id = old.user_id;
+
+ if not found then
+ insert into total_history_count_user(user_id, total)
+ values (
+ old.user_id,
+ (select count(1) from history where user_id = old.user_id)
+ );
+ end if;
+ end if;
+
+ return NEW; -- this is actually ignored for an after trigger, but oh well
+end;
+$func$
+language plpgsql volatile -- pldfplplpflh
+cost 100; -- default value
diff --git a/crates/atuin-server-postgres/migrations/20220421174016_larger-commands.sql b/crates/atuin-server-postgres/migrations/20220421174016_larger-commands.sql
new file mode 100644
index 00000000..0ac43433
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20220421174016_larger-commands.sql
@@ -0,0 +1,3 @@
+-- Make it 4x larger. Most commands are less than this, but as it's base64
+-- SOME are more than 8192. Should be enough for now.
+ALTER TABLE history ALTER COLUMN data TYPE varchar(32768);
diff --git a/crates/atuin-server-postgres/migrations/20220426172813_user-created-at.sql b/crates/atuin-server-postgres/migrations/20220426172813_user-created-at.sql
new file mode 100644
index 00000000..a9138194
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20220426172813_user-created-at.sql
@@ -0,0 +1 @@
+alter table users add column created_at timestamp not null default now();
diff --git a/crates/atuin-server-postgres/migrations/20220505082442_create-events.sql b/crates/atuin-server-postgres/migrations/20220505082442_create-events.sql
new file mode 100644
index 00000000..57e16ec7
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20220505082442_create-events.sql
@@ -0,0 +1,14 @@
+create type event_type as enum ('create', 'delete');
+
+create table events (
+ id bigserial primary key,
+ client_id text not null unique, -- the client-generated ID
+ user_id bigserial not null, -- allow multiple users
+ hostname text not null, -- a unique identifier from the client (can be hashed, random, whatever)
+ timestamp timestamp not null, -- one of the few non-encrypted metadatas
+
+ event_type event_type,
+ data text not null, -- store the actual history data, encrypted. I don't wanna know!
+
+ created_at timestamp not null default current_timestamp
+);
diff --git a/crates/atuin-server-postgres/migrations/20220610074049_history-length.sql b/crates/atuin-server-postgres/migrations/20220610074049_history-length.sql
new file mode 100644
index 00000000..b1c23016
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20220610074049_history-length.sql
@@ -0,0 +1,2 @@
+-- Add migration script here
+alter table history alter column data type text;
diff --git a/crates/atuin-server-postgres/migrations/20230315220537_drop-events.sql b/crates/atuin-server-postgres/migrations/20230315220537_drop-events.sql
new file mode 100644
index 00000000..fe3cae17
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20230315220537_drop-events.sql
@@ -0,0 +1,2 @@
+-- Add migration script here
+drop table events;
diff --git a/crates/atuin-server-postgres/migrations/20230315224203_create-deleted.sql b/crates/atuin-server-postgres/migrations/20230315224203_create-deleted.sql
new file mode 100644
index 00000000..9a9e6263
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20230315224203_create-deleted.sql
@@ -0,0 +1,5 @@
+-- Add migration script here
+alter table history add column if not exists deleted_at timestamp;
+
+-- queries will all be selecting the ids of history for a user, that has been deleted
+create index if not exists history_deleted_index on history(client_id, user_id, deleted_at);
diff --git a/crates/atuin-server-postgres/migrations/20230515221038_trigger-delete-only.sql b/crates/atuin-server-postgres/migrations/20230515221038_trigger-delete-only.sql
new file mode 100644
index 00000000..3d0bba52
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20230515221038_trigger-delete-only.sql
@@ -0,0 +1,30 @@
+-- We do not need to run the trigger on deletes, as the only time we are deleting history is when the user
+-- has already been deleted
+-- This actually slows down deleting all the history a good bit!
+
+create or replace function user_history_count()
+returns trigger as
+$func$
+begin
+ if (TG_OP='INSERT') then
+ update total_history_count_user set total = total + 1 where user_id = new.user_id;
+
+ if not found then
+ insert into total_history_count_user(user_id, total)
+ values (
+ new.user_id,
+ (select count(1) from history where user_id = new.user_id)
+ );
+ end if;
+ end if;
+
+ return NEW; -- this is actually ignored for an after trigger, but oh well
+end;
+$func$
+language plpgsql volatile -- pldfplplpflh
+cost 100; -- default value
+
+create or replace trigger tg_user_history_count
+ after insert on history
+ for each row
+ execute procedure user_history_count();
diff --git a/crates/atuin-server-postgres/migrations/20230623070418_records.sql b/crates/atuin-server-postgres/migrations/20230623070418_records.sql
new file mode 100644
index 00000000..22437595
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20230623070418_records.sql
@@ -0,0 +1,15 @@
+-- Add migration script here
+create table records (
+ id uuid primary key, -- remember to use uuidv7 for happy indices <3
+ client_id uuid not null, -- I am too uncomfortable with the idea of a client-generated primary key
+ host uuid not null, -- a unique identifier for the host
+ parent uuid default null, -- the ID of the parent record, bearing in mind this is a linked list
+ timestamp bigint not null, -- not a timestamp type, as those do not have nanosecond precision
+ version text not null,
+ tag text not null, -- what is this? history, kv, whatever. Remember clients get a log per tag per host
+ data text not null, -- store the actual history data, encrypted. I don't wanna know!
+ cek text not null,
+
+ user_id bigint not null, -- allow multiple users
+ created_at timestamp not null default current_timestamp
+);
diff --git a/crates/atuin-server-postgres/migrations/20231202170508_create-store.sql b/crates/atuin-server-postgres/migrations/20231202170508_create-store.sql
new file mode 100644
index 00000000..ffb57966
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20231202170508_create-store.sql
@@ -0,0 +1,15 @@
+-- Add migration script here
+create table store (
+ id uuid primary key, -- remember to use uuidv7 for happy indices <3
+ client_id uuid not null, -- I am too uncomfortable with the idea of a client-generated primary key, even though it's fine mathematically
+ host uuid not null, -- a unique identifier for the host
+ idx bigint not null, -- the index of the record in this store, identified by (host, tag)
+ timestamp bigint not null, -- not a timestamp type, as those do not have nanosecond precision
+ version text not null,
+ tag text not null, -- what is this? history, kv, whatever. Remember clients get a log per tag per host
+ data text not null, -- store the actual history data, encrypted. I don't wanna know!
+ cek text not null,
+
+ user_id bigint not null, -- allow multiple users
+ created_at timestamp not null default current_timestamp
+);
diff --git a/crates/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql b/crates/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql
new file mode 100644
index 00000000..56d67145
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20231203124112_create-store-idx.sql
@@ -0,0 +1,2 @@
+-- Add migration script here
+create unique index record_uniq ON store(user_id, host, tag, idx);
diff --git a/crates/atuin-server-postgres/migrations/20240108124837_drop-some-defaults.sql b/crates/atuin-server-postgres/migrations/20240108124837_drop-some-defaults.sql
new file mode 100644
index 00000000..ad2af5a1
--- /dev/null
+++ b/crates/atuin-server-postgres/migrations/20240108124837_drop-some-defaults.sql
@@ -0,0 +1,4 @@
+-- Add migration script here
+alter table history alter column user_id drop default;
+alter table sessions alter column user_id drop default;
+alter table total_history_count_user alter column user_id drop default;
diff --git a/crates/atuin-server-postgres/src/lib.rs b/crates/atuin-server-postgres/src/lib.rs
new file mode 100644
index 00000000..6dc56fe4
--- /dev/null
+++ b/crates/atuin-server-postgres/src/lib.rs
@@ -0,0 +1,538 @@
+use std::ops::Range;
+
+use async_trait::async_trait;
+use atuin_common::record::{EncryptedData, HostId, Record, RecordIdx, RecordStatus};
+use atuin_server_database::models::{History, NewHistory, NewSession, NewUser, Session, User};
+use atuin_server_database::{Database, DbError, DbResult};
+use futures_util::TryStreamExt;
+use serde::{Deserialize, Serialize};
+use sqlx::postgres::PgPoolOptions;
+use sqlx::Row;
+
+use time::{OffsetDateTime, PrimitiveDateTime, UtcOffset};
+use tracing::instrument;
+use uuid::Uuid;
+use wrappers::{DbHistory, DbRecord, DbSession, DbUser};
+
+mod wrappers;
+
+const MIN_PG_VERSION: u32 = 14;
+
+#[derive(Clone)]
+pub struct Postgres {
+ pool: sqlx::Pool<sqlx::postgres::Postgres>,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub struct PostgresSettings {
+ pub db_uri: String,
+}
+
+fn fix_error(error: sqlx::Error) -> DbError {
+ match error {
+ sqlx::Error::RowNotFound => DbError::NotFound,
+ error => DbError::Other(error.into()),
+ }
+}
+
+#[async_trait]
+impl Database for Postgres {
+ type Settings = PostgresSettings;
+ async fn new(settings: &PostgresSettings) -> DbResult<Self> {
+ let pool = PgPoolOptions::new()
+ .max_connections(100)
+ .connect(settings.db_uri.as_str())
+ .await
+ .map_err(fix_error)?;
+
+ // Call server_version_num to get the DB server's major version number
+ // The call returns None for servers older than 8.x.
+ let pg_major_version: u32 = pool
+ .acquire()
+ .await
+ .map_err(fix_error)?
+ .server_version_num()
+ .ok_or(DbError::Other(eyre::Report::msg(
+ "could not get PostgreSQL version",
+ )))?
+ / 10000;
+
+ if pg_major_version < MIN_PG_VERSION {
+ return Err(DbError::Other(eyre::Report::msg(format!(
+ "unsupported PostgreSQL version {}, minimum required is {}",
+ pg_major_version, MIN_PG_VERSION
+ ))));
+ }
+
+ sqlx::migrate!("./migrations")
+ .run(&pool)
+ .await
+ .map_err(|error| DbError::Other(error.into()))?;
+
+ Ok(Self { pool })
+ }
+
+ #[instrument(skip_all)]
+ async fn get_session(&self, token: &str) -> DbResult<Session> {
+ sqlx::query_as("select id, user_id, token from sessions where token = $1")
+ .bind(token)
+ .fetch_one(&self.pool)
+ .await
+ .map_err(fix_error)
+ .map(|DbSession(session)| session)
+ }
+
+ #[instrument(skip_all)]
+ async fn get_user(&self, username: &str) -> DbResult<User> {
+ sqlx::query_as("select id, username, email, password from users where username = $1")
+ .bind(username)
+ .fetch_one(&self.pool)
+ .await
+ .map_err(fix_error)
+ .map(|DbUser(user)| user)
+ }
+
+ #[instrument(skip_all)]
+ async fn get_session_user(&self, token: &str) -> DbResult<User> {
+ sqlx::query_as(
+ "select users.id, users.username, users.email, users.password from users
+ inner join sessions
+ on users.id = sessions.user_id
+ and sessions.token = $1",
+ )
+ .bind(token)
+ .fetch_one(&self.pool)
+ .await
+ .map_err(fix_error)
+ .map(|DbUser(user)| user)
+ }
+
+ #[instrument(skip_all)]
+ async fn count_history(&self, user: &User) -> DbResult<i64> {
+ // The cache is new, and the user might not yet have a cache value.
+ // They will have one as soon as they post up some new history, but handle that
+ // edge case.
+
+ let res: (i64,) = sqlx::query_as(
+ "select count(1) from history
+ where user_id = $1",
+ )
+ .bind(user.id)
+ .fetch_one(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ Ok(res.0)
+ }
+
+ #[instrument(skip_all)]
+ async fn total_history(&self) -> DbResult<i64> {
+ // The cache is new, and the user might not yet have a cache value.
+ // They will have one as soon as they post up some new history, but handle that
+ // edge case.
+
+ let res: (i64,) = sqlx::query_as("select sum(total) from total_history_count_user")
+ .fetch_optional(&self.pool)
+ .await
+ .map_err(fix_error)?
+ .unwrap_or((0,));
+
+ Ok(res.0)
+ }
+
+ #[instrument(skip_all)]
+ async fn count_history_cached(&self, user: &User) -> DbResult<i64> {
+ let res: (i32,) = sqlx::query_as(
+ "select total from total_history_count_user
+ where user_id = $1",
+ )
+ .bind(user.id)
+ .fetch_one(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ Ok(res.0 as i64)
+ }
+
+ async fn delete_store(&self, user: &User) -> DbResult<()> {
+ sqlx::query(
+ "delete from store
+ where user_id = $1",
+ )
+ .bind(user.id)
+ .execute(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ Ok(())
+ }
+
+ async fn delete_history(&self, user: &User, id: String) -> DbResult<()> {
+ sqlx::query(
+ "update history
+ set deleted_at = $3
+ where user_id = $1
+ and client_id = $2
+ and deleted_at is null", // don't just keep setting it
+ )
+ .bind(user.id)
+ .bind(id)
+ .bind(OffsetDateTime::now_utc())
+ .fetch_all(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ Ok(())
+ }
+
+ #[instrument(skip_all)]
+ async fn deleted_history(&self, user: &User) -> DbResult<Vec<String>> {
+ // The cache is new, and the user might not yet have a cache value.
+ // They will have one as soon as they post up some new history, but handle that
+ // edge case.
+
+ let res = sqlx::query(
+ "select client_id from history
+ where user_id = $1
+ and deleted_at is not null",
+ )
+ .bind(user.id)
+ .fetch_all(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ let res = res
+ .iter()
+ .map(|row| row.get::<String, _>("client_id"))
+ .collect();
+
+ Ok(res)
+ }
+
+ #[instrument(skip_all)]
+ async fn count_history_range(
+ &self,
+ user: &User,
+ range: Range<OffsetDateTime>,
+ ) -> DbResult<i64> {
+ let res: (i64,) = sqlx::query_as(
+ "select count(1) from history
+ where user_id = $1
+ and timestamp >= $2::date
+ and timestamp < $3::date",
+ )
+ .bind(user.id)
+ .bind(into_utc(range.start))
+ .bind(into_utc(range.end))
+ .fetch_one(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ Ok(res.0)
+ }
+
+ #[instrument(skip_all)]
+ async fn list_history(
+ &self,
+ user: &User,
+ created_after: OffsetDateTime,
+ since: OffsetDateTime,
+ host: &str,
+ page_size: i64,
+ ) -> DbResult<Vec<History>> {
+ let res = sqlx::query_as(
+ "select id, client_id, user_id, hostname, timestamp, data, created_at from history
+ where user_id = $1
+ and hostname != $2
+ and created_at >= $3
+ and timestamp >= $4
+ order by timestamp asc
+ limit $5",
+ )
+ .bind(user.id)
+ .bind(host)
+ .bind(into_utc(created_after))
+ .bind(into_utc(since))
+ .bind(page_size)
+ .fetch(&self.pool)
+ .map_ok(|DbHistory(h)| h)
+ .try_collect()
+ .await
+ .map_err(fix_error)?;
+
+ Ok(res)
+ }
+
+ #[instrument(skip_all)]
+ async fn add_history(&self, history: &[NewHistory]) -> DbResult<()> {
+ let mut tx = self.pool.begin().await.map_err(fix_error)?;
+
+ for i in history {
+ let client_id: &str = &i.client_id;
+ let hostname: &str = &i.hostname;
+ let data: &str = &i.data;
+
+ sqlx::query(
+ "insert into history
+ (client_id, user_id, hostname, timestamp, data)
+ values ($1, $2, $3, $4, $5)
+ on conflict do nothing
+ ",
+ )
+ .bind(client_id)
+ .bind(i.user_id)
+ .bind(hostname)
+ .bind(i.timestamp)
+ .bind(data)
+ .execute(&mut *tx)
+ .await
+ .map_err(fix_error)?;
+ }
+
+ tx.commit().await.map_err(fix_error)?;
+
+ Ok(())
+ }
+
+ #[instrument(skip_all)]
+ async fn delete_user(&self, u: &User) -> DbResult<()> {
+ sqlx::query("delete from sessions where user_id = $1")
+ .bind(u.id)
+ .execute(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ sqlx::query("delete from users where id = $1")
+ .bind(u.id)
+ .execute(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ sqlx::query("delete from history where user_id = $1")
+ .bind(u.id)
+ .execute(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ sqlx::query("delete from total_history_count_user where user_id = $1")
+ .bind(u.id)
+ .execute(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ Ok(())
+ }
+
+ #[instrument(skip_all)]
+ async fn update_user_password(&self, user: &User) -> DbResult<()> {
+ sqlx::query(
+ "update users
+ set password = $1
+ where id = $2",
+ )
+ .bind(&user.password)
+ .bind(user.id)
+ .execute(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ Ok(())
+ }
+
+ #[instrument(skip_all)]
+ async fn add_user(&self, user: &NewUser) -> DbResult<i64> {
+ let email: &str = &user.email;
+ let username: &str = &user.username;
+ let password: &str = &user.password;
+
+ let res: (i64,) = sqlx::query_as(
+ "insert into users
+ (username, email, password)
+ values($1, $2, $3)
+ returning id",
+ )
+ .bind(username)
+ .bind(email)
+ .bind(password)
+ .fetch_one(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ Ok(res.0)
+ }
+
+ #[instrument(skip_all)]
+ async fn add_session(&self, session: &NewSession) -> DbResult<()> {
+ let token: &str = &session.token;
+
+ sqlx::query(
+ "insert into sessions
+ (user_id, token)
+ values($1, $2)",
+ )
+ .bind(session.user_id)
+ .bind(token)
+ .execute(&self.pool)
+ .await
+ .map_err(fix_error)?;
+
+ Ok(())
+ }
+
+ #[instrument(skip_all)]
+ async fn get_user_session(&self, u: &User) -> DbResult<Session> {
+ sqlx::query_as("select id, user_id, token from sessions where user_id = $1")
+ .bind(u.id)
+ .fetch_one(&self.pool)
+ .await
+ .map_err(fix_error)
+ .map(|DbSession(session)| session)
+ }
+
+ #[instrument(skip_all)]
+ async fn oldest_history(&self, user: &User) -> DbResult<History> {
+ sqlx::query_as(
+ "select id, client_id, user_id, hostname, timestamp, data, created_at from history
+ where user_id = $1
+ order by timestamp asc
+ limit 1",
+ )
+ .bind(user.id)
+ .fetch_one(&self.pool)
+ .await
+ .map_err(fix_error)
+ .map(|DbHistory(h)| h)
+ }
+
+ #[instrument(skip_all)]
+ async fn add_records(&self, user: &User, records: &[Record<EncryptedData>]) -> DbResult<()> {
+ let mut tx = self.pool.begin().await.map_err(fix_error)?;
+
+ for i in records {
+ let id = atuin_common::utils::uuid_v7();
+
+ sqlx::query(
+ "insert into store
+ (id, client_id, host, idx, timestamp, version, tag, data, cek, user_id)
+ values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
+ on conflict do nothing
+ ",
+ )
+ .bind(id)
+ .bind(i.id)
+ .bind(i.host.id)
+ .bind(i.idx as i64)
+ .bind(i.timestamp as i64) // throwing away some data, but i64 is still big in terms of time
+ .bind(&i.version)
+ .bind(&i.tag)
+ .bind(&i.data.data)
+ .bind(&i.data.content_encryption_key)
+ .bind(user.id)
+ .execute(&mut *tx)
+ .await
+ .map_err(fix_error)?;
+ }
+
+ tx.commit().await.map_err(fix_error)?;
+
+ Ok(())
+ }
+
+ #[instrument(skip_all)]
+ async fn next_records(
+ &self,
+ user: &User,
+ host: HostId,
+ tag: String,
+ start: Option<RecordIdx>,
+ count: u64,
+ ) -> DbResult<Vec<Record<EncryptedData>>> {
+ tracing::debug