summaryrefslogtreecommitdiffstats
path: root/atuin-client
diff options
context:
space:
mode:
authorEllie Huxtable <ellie@elliehuxtable.com>2024-01-16 11:25:09 +0000
committerGitHub <noreply@github.com>2024-01-16 11:25:09 +0000
commita2578c4521d4615d8265744ab51a1cc4f291605e (patch)
tree26fb3da7a1d7312691703919cc700e433bbd1220 /atuin-client
parentc2439d1ed6fc3e973371b76637c08eea0a8f60c8 (diff)
feat: add history rebuild (#1575)
* feat: add history rebuild This adds a function that will 1. List all history from the store 2. Segment by create/delete 3. Insert all creates into the database 4. Delete all deleted This replaces the old history sync. Presently it's incomplete. There is no incremental rebuild, it can only do the entire thing at once. This is ran by `atuin store rebuild history` * fix tests * add incremental sync * add auto sync
Diffstat (limited to 'atuin-client')
-rw-r--r--atuin-client/src/database.rs27
-rw-r--r--atuin-client/src/history/store.rs104
-rw-r--r--atuin-client/src/record/store.rs5
-rw-r--r--atuin-client/src/record/sync.rs17
4 files changed, 131 insertions, 22 deletions
diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs
index 376c7b753..a69570931 100644
--- a/atuin-client/src/database.rs
+++ b/atuin-client/src/database.rs
@@ -18,7 +18,7 @@ use sqlx::{
};
use time::OffsetDateTime;
-use crate::history::HistoryStats;
+use crate::history::{HistoryId, HistoryStats};
use super::{
history::History,
@@ -93,6 +93,7 @@ pub trait Database: Send + Sync + 'static {
async fn before(&self, timestamp: OffsetDateTime, count: i64) -> Result<Vec<History>>;
async fn delete(&self, h: History) -> Result<()>;
+ async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()>;
async fn deleted(&self) -> Result<Vec<History>>;
// Yes I know, it's a lot.
@@ -172,6 +173,18 @@ impl Sqlite {
Ok(())
}
+ async fn delete_row_raw(
+ tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
+ id: HistoryId,
+ ) -> Result<()> {
+ sqlx::query("delete from history where id = ?1")
+ .bind(id.0.as_str())
+ .execute(&mut **tx)
+ .await?;
+
+ Ok(())
+ }
+
fn query_history(row: SqliteRow) -> History {
let deleted_at: Option<i64> = row.get("deleted_at");
@@ -567,6 +580,18 @@ impl Database for Sqlite {
Ok(())
}
+ async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()> {
+ let mut tx = self.pool.begin().await?;
+
+ for id in ids {
+ Self::delete_row_raw(&mut tx, id.clone()).await?;
+ }
+
+ tx.commit().await?;
+
+ Ok(())
+ }
+
async fn stats(&self, h: &History) -> Result<HistoryStats> {
// We select the previous in the session by time
let mut prev = SqlBuilder::select_from("history");
diff --git a/atuin-client/src/history/store.rs b/atuin-client/src/history/store.rs
index f4aa9d93c..a77854528 100644
--- a/atuin-client/src/history/store.rs
+++ b/atuin-client/src/history/store.rs
@@ -1,8 +1,11 @@
use eyre::{bail, eyre, Result};
use rmp::decode::Bytes;
-use crate::record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store};
-use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordIdx};
+use crate::{
+ database::Database,
+ record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store},
+};
+use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx};
use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION};
@@ -58,14 +61,14 @@ impl HistoryRecord {
Ok(DecryptedData(output))
}
- pub fn deserialize(bytes: &[u8], version: &str) -> Result<Self> {
+ pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> {
use rmp::decode;
fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
eyre!("{err:?}")
}
- let mut bytes = Bytes::new(bytes);
+ let mut bytes = Bytes::new(&bytes.0);
let record_type = decode::read_u8(&mut bytes).map_err(error_report)?;
@@ -147,10 +150,89 @@ impl HistoryStore {
self.push_record(record).await
}
+
+ pub async fn history(&self) -> Result<Vec<HistoryRecord>> {
+ // Atm this loads all history into memory
+ // Not ideal as that is potentially quite a lot, although history will be small.
+ let records = self.store.all_tagged(HISTORY_TAG).await?;
+ let mut ret = Vec::with_capacity(records.len());
+
+ for record in records.into_iter() {
+ let hist = match record.version.as_str() {
+ HISTORY_VERSION => {
+ let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
+ HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)
+ }
+ version => bail!("unknown history version {version:?}"),
+ }?;
+
+ ret.push(hist);
+ }
+
+ Ok(ret)
+ }
+
+ pub async fn build(&self, database: &dyn Database) -> Result<()> {
+ // I'd like to change how we rebuild and not couple this with the database, but need to
+ // consider the structure more deeply. This will be easy to change.
+
+ // TODO(ellie): page or iterate this
+ let history = self.history().await?;
+
+ // In theory we could flatten this here
+ // The current issue is that the database may have history in it already, from the old sync
+ // This didn't actually delete old history
+ // If we're sure we have a DB only maintained by the new store, we can flatten
+ // create/delete before we even get to sqlite
+ let mut creates = Vec::new();
+ let mut deletes = Vec::new();
+
+ for i in history {
+ match i {
+ HistoryRecord::Create(h) => {
+ creates.push(h);
+ }
+ HistoryRecord::Delete(id) => {
+ deletes.push(id);
+ }
+ }
+ }
+
+ database.save_bulk(&creates).await?;
+ database.delete_rows(&deletes).await?;
+
+ Ok(())
+ }
+
+ pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> {
+ for id in ids {
+ let record = self.store.get(*id).await?;
+
+ if record.tag != HISTORY_TAG {
+ continue;
+ }
+
+ let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
+ let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?;
+
+ match record {
+ HistoryRecord::Create(h) => {
+ // TODO: benchmark CPU time/memory tradeoff of batch commit vs one at a time
+ database.save(&h).await?;
+ }
+ HistoryRecord::Delete(id) => {
+ database.delete_rows(&[id]).await?;
+ }
+ }
+ }
+
+ Ok(())
+ }
}
#[cfg(test)]
mod tests {
+ use atuin_common::record::DecryptedData;
use time::macros::datetime;
use crate::history::{store::HistoryRecord, HISTORY_VERSION};
@@ -187,13 +269,14 @@ mod tests {
let serialized = record.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);
- let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
+ let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
// check the snapshot too
- let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
- .expect("failed to deserialize HistoryRecord");
+ let deserialized =
+ HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
}
@@ -208,12 +291,13 @@ mod tests {
let serialized = record.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);
- let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
+ let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
- let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
- .expect("failed to deserialize HistoryRecord");
+ let deserialized =
+ HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
}
}
diff --git a/atuin-client/src/record/store.rs b/atuin-client/src/record/store.rs
index a5c156d68..efe2eb4a7 100644
--- a/atuin-client/src/record/store.rs
+++ b/atuin-client/src/record/store.rs
@@ -2,6 +2,7 @@ use async_trait::async_trait;
use eyre::Result;
use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIdx, RecordStatus};
+
/// A record store stores records
/// In more detail - we tend to need to process this into _another_ format to actually query it.
/// As is, the record store is intended as the source of truth for arbitratry data, which could
@@ -44,8 +45,6 @@ pub trait Store {
async fn status(&self) -> Result<RecordStatus>;
- /// Get every start record for a given tag, regardless of host.
- /// Useful when actually operating on synchronized data, and will often have conflict
- /// resolution applied.
+ /// Get all records for a given tag
async fn all_tagged(&self, tag: &str) -> Result<Vec<Record<EncryptedData>>>;
}
diff --git a/atuin-client/src/record/sync.rs b/atuin-client/src/record/sync.rs
index 2694e0ff0..19b8dd1b1 100644
--- a/atuin-client/src/record/sync.rs
+++ b/atuin-client/src/record/sync.rs
@@ -7,7 +7,7 @@ use thiserror::Error;
use super::store::Store;
use crate::{api_client::Client, settings::Settings};
-use atuin_common::record::{Diff, HostId, RecordIdx, RecordStatus};
+use atuin_common::record::{Diff, HostId, RecordId, RecordIdx, RecordStatus};
#[derive(Error, Debug)]
pub enum SyncError {
@@ -198,11 +198,12 @@ async fn sync_download(
tag: String,
local: Option<RecordIdx>,
remote: RecordIdx,
-) -> Result<i64, SyncError> {
+) -> Result<Vec<RecordId>, SyncError> {
let local = local.unwrap_or(0);
let expected = remote - local;
let download_page_size = 100;
let mut progress = 0;
+ let mut ret = Vec::new();
println!(
"Downloading {} records from {}/{}",
@@ -230,6 +231,8 @@ async fn sync_download(
expected
);
+ ret.extend(page.iter().map(|f| f.id));
+
progress += page.len() as u64;
if progress >= expected {
@@ -237,14 +240,14 @@ async fn sync_download(
}
}
- Ok(progress as i64)
+ Ok(ret)
}
pub async fn sync_remote(
operations: Vec<Operation>,
local_store: &impl Store,
settings: &Settings,
-) -> Result<(i64, i64), SyncError> {
+) -> Result<(i64, Vec<RecordId>), SyncError> {
let client = Client::new(
&settings.sync_address,
&settings.session_token,
@@ -254,7 +257,7 @@ pub async fn sync_remote(
.expect("failed to create client");
let mut uploaded = 0;
- let mut downloaded = 0;
+ let mut downloaded = Vec::new();
// this can totally run in parallel, but lets get it working first
for i in operations {
@@ -271,9 +274,7 @@ pub async fn sync_remote(
tag,
local,
remote,
- } => {
- downloaded += sync_download(local_store, &client, host, tag, local, remote).await?
- }
+ } => downloaded = sync_download(local_store, &client, host, tag, local, remote).await?,
Operation::Noop { .. } => continue,
}