summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEllie Huxtable <ellie@elliehuxtable.com>2022-04-21 08:03:39 +0100
committerGitHub <noreply@github.com>2022-04-21 08:03:39 +0100
commited4e07d2e63af1584a262037781729a5144a5502 (patch)
tree38bdb691cd060860e7a3fced7901c74283ddf2f1
parent6e11b8e0ed2594e09a41ca59359a20544de1462a (diff)
Use the count cache (#312)
* Use the count cache By default read from the count cache - if there is no value there, then do a full COUNT. The cache will be filled when the user posts up some more history * clean up server db error handling Co-authored-by: Conrad Ludgate <conrad.ludgate@truelayer.com>
-rw-r--r--atuin-server/src/database.rs79
-rw-r--r--atuin-server/src/handlers/history.rs13
-rw-r--r--atuin-server/src/handlers/user.rs25
3 files changed, 63 insertions, 54 deletions
diff --git a/atuin-server/src/database.rs b/atuin-server/src/database.rs
index 9043c2d5..e163d3a7 100644
--- a/atuin-server/src/database.rs
+++ b/atuin-server/src/database.rs
@@ -1,8 +1,7 @@
use async_trait::async_trait;
use std::collections::HashMap;
-use eyre::{eyre, Result};
-use sqlx::postgres::PgPoolOptions;
+use sqlx::{postgres::PgPoolOptions, Result};
use crate::settings::HISTORY_PAGE_SIZE;
@@ -25,6 +24,7 @@ pub trait Database {
async fn add_user(&self, user: &NewUser) -> Result<i64>;
async fn count_history(&self, user: &User) -> Result<i64>;
+ async fn count_history_cached(&self, user: &User) -> Result<i64>;
async fn count_history_range(
&self,
@@ -63,7 +63,7 @@ pub struct Postgres {
}
impl Postgres {
- pub async fn new(uri: &str) -> Result<Self, sqlx::Error> {
+ pub async fn new(uri: &str) -> Result<Self> {
let pool = PgPoolOptions::new()
.max_connections(100)
.connect(uri)
@@ -78,52 +78,36 @@ impl Postgres {
#[async_trait]
impl Database for Postgres {
async fn get_session(&self, token: &str) -> Result<Session> {
- let res: Option<Session> =
- sqlx::query_as::<_, Session>("select * from sessions where token = $1")
- .bind(token)
- .fetch_optional(&self.pool)
- .await?;
-
- if let Some(s) = res {
- Ok(s)
- } else {
- Err(eyre!("could not find session"))
- }
+ sqlx::query_as::<_, Session>("select * from sessions where token = $1")
+ .bind(token)
+ .fetch_one(&self.pool)
+ .await
}
async fn get_user(&self, username: &str) -> Result<User> {
- let res: Option<User> =
- sqlx::query_as::<_, User>("select * from users where username = $1")
- .bind(username)
- .fetch_optional(&self.pool)
- .await?;
-
- if let Some(u) = res {
- Ok(u)
- } else {
- Err(eyre!("could not find user"))
- }
+ sqlx::query_as::<_, User>("select * from users where username = $1")
+ .bind(username)
+ .fetch_one(&self.pool)
+ .await
}
async fn get_session_user(&self, token: &str) -> Result<User> {
- let res: Option<User> = sqlx::query_as::<_, User>(
+ sqlx::query_as::<_, User>(
"select * from users
inner join sessions
on users.id = sessions.user_id
and sessions.token = $1",
)
.bind(token)
- .fetch_optional(&self.pool)
- .await?;
-
- if let Some(u) = res {
- Ok(u)
- } else {
- Err(eyre!("could not find user"))
- }
+ .fetch_one(&self.pool)
+ .await
}
async fn count_history(&self, user: &User) -> Result<i64> {
+ // The cache is new, and the user might not yet have a cache value.
+ // They will have one as soon as they post up some new history, but handle that
+ // edge case.
+
let res: (i64,) = sqlx::query_as(
"select count(1) from history
where user_id = $1",
@@ -135,6 +119,18 @@ impl Database for Postgres {
Ok(res.0)
}
+ async fn count_history_cached(&self, user: &User) -> Result<i64> {
+ let res: (i64,) = sqlx::query_as(
+ "select total from total_history_count_user
+ where user_id = $1",
+ )
+ .bind(user.id)
+ .fetch_one(&self.pool)
+ .await?;
+
+ Ok(res.0)
+ }
+
async fn count_history_range(
&self,
user: &User,
@@ -300,17 +296,10 @@ impl Database for Postgres {
}
async fn get_user_session(&self, u: &User) -> Result<Session> {
- let res: Option<Session> =
- sqlx::query_as::<_, Session>("select * from sessions where user_id = $1")
- .bind(u.id)
- .fetch_optional(&self.pool)
- .await?;
-
- if let Some(s) = res {
- Ok(s)
- } else {
- Err(eyre!("could not find session"))
- }
+ sqlx::query_as::<_, Session>("select * from sessions where user_id = $1")
+ .bind(u.id)
+ .fetch_one(&self.pool)
+ .await
}
async fn oldest_history(&self, user: &User) -> Result<History> {
diff --git a/atuin-server/src/handlers/history.rs b/atuin-server/src/handlers/history.rs
index fde7cf2d..4fa2a962 100644
--- a/atuin-server/src/handlers/history.rs
+++ b/atuin-server/src/handlers/history.rs
@@ -13,10 +13,17 @@ pub async fn count(
user: User,
db: Extension<Postgres>,
) -> Result<Json<CountResponse>, ErrorResponseStatus<'static>> {
- match db.count_history(&user).await {
+ match db.count_history_cached(&user).await {
+ // By default read out the cached value
Ok(count) => Ok(Json(CountResponse { count })),
- Err(_) => Err(ErrorResponse::reply("failed to query history count")
- .with_status(StatusCode::INTERNAL_SERVER_ERROR)),
+
+ // If that fails, fallback on a full COUNT. Cache is built on a POST
+ // only
+ Err(_) => match db.count_history(&user).await {
+ Ok(count) => Ok(Json(CountResponse { count })),
+ Err(_) => Err(ErrorResponse::reply("failed to query history count")
+ .with_status(StatusCode::INTERNAL_SERVER_ERROR)),
+ },
}
}
diff --git a/atuin-server/src/handlers/user.rs b/atuin-server/src/handlers/user.rs
index 1bcfce2f..42e4aa33 100644
--- a/atuin-server/src/handlers/user.rs
+++ b/atuin-server/src/handlers/user.rs
@@ -32,10 +32,15 @@ pub async fn get(
) -> Result<Json<UserResponse>, ErrorResponseStatus<'static>> {
let user = match db.get_user(username.as_ref()).await {
Ok(user) => user,
- Err(e) => {
- debug!("user not found: {}", e);
+ Err(sqlx::Error::RowNotFound) => {
+ debug!("user not found: {}", username);
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
}
+ Err(err) => {
+ error!("database error: {}", err);
+ return Err(ErrorResponse::reply("database error")
+ .with_status(StatusCode::INTERNAL_SERVER_ERROR));
+ }
};
Ok(Json(UserResponse {
@@ -96,20 +101,28 @@ pub async fn login(
) -> Result<Json<LoginResponse>, ErrorResponseStatus<'static>> {
let user = match db.get_user(login.username.borrow()).await {
Ok(u) => u,
+ Err(sqlx::Error::RowNotFound) => {
+ return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
+ }
Err(e) => {
error!("failed to get user {}: {}", login.username.clone(), e);
- return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
+ return Err(ErrorResponse::reply("database error")
+ .with_status(StatusCode::INTERNAL_SERVER_ERROR));
}
};
let session = match db.get_user_session(&user).await {
Ok(u) => u,
- Err(e) => {
- error!("failed to get session for {}: {}", login.username, e);
-
+ Err(sqlx::Error::RowNotFound) => {
+ debug!("user session not found for user id={}", user.id);
return Err(ErrorResponse::reply("user not found").with_status(StatusCode::NOT_FOUND));
}
+ Err(err) => {
+ error!("database error for user {}: {}", login.username, err);
+ return Err(ErrorResponse::reply("database error")
+ .with_status(StatusCode::INTERNAL_SERVER_ERROR));
+ }
};
let verified = verify_str(user.password.as_str(), login.password.borrow());