summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEllie Huxtable <ellie@elliehuxtable.com>2024-01-16 11:25:09 +0000
committerGitHub <noreply@github.com>2024-01-16 11:25:09 +0000
commita2578c4521d4615d8265744ab51a1cc4f291605e (patch)
tree26fb3da7a1d7312691703919cc700e433bbd1220
parentc2439d1ed6fc3e973371b76637c08eea0a8f60c8 (diff)
feat: add history rebuild (#1575)
* feat: add history rebuild This adds a function that will 1. List all history from the store 2. Segment by create/delete 3. Insert all creates into the database 4. Delete all deleted This replaces the old history sync. Presently it's incomplete. There is no incremental rebuild, it can only do the entire thing at once. This is ran by `atuin store rebuild history` * fix tests * add incremental sync * add auto sync
-rw-r--r--atuin-client/src/database.rs27
-rw-r--r--atuin-client/src/history/store.rs104
-rw-r--r--atuin-client/src/record/store.rs5
-rw-r--r--atuin-client/src/record/sync.rs17
-rw-r--r--atuin/src/command/client.rs14
-rw-r--r--atuin/src/command/client/history.rs10
-rw-r--r--atuin/src/command/client/record.rs63
-rw-r--r--atuin/src/command/client/store.rs123
-rw-r--r--atuin/src/command/client/sync.rs29
9 files changed, 286 insertions, 106 deletions
diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs
index 376c7b75..a6957093 100644
--- a/atuin-client/src/database.rs
+++ b/atuin-client/src/database.rs
@@ -18,7 +18,7 @@ use sqlx::{
};
use time::OffsetDateTime;
-use crate::history::HistoryStats;
+use crate::history::{HistoryId, HistoryStats};
use super::{
history::History,
@@ -93,6 +93,7 @@ pub trait Database: Send + Sync + 'static {
async fn before(&self, timestamp: OffsetDateTime, count: i64) -> Result<Vec<History>>;
async fn delete(&self, h: History) -> Result<()>;
+ async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()>;
async fn deleted(&self) -> Result<Vec<History>>;
// Yes I know, it's a lot.
@@ -172,6 +173,18 @@ impl Sqlite {
Ok(())
}
+ async fn delete_row_raw(
+ tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
+ id: HistoryId,
+ ) -> Result<()> {
+ sqlx::query("delete from history where id = ?1")
+ .bind(id.0.as_str())
+ .execute(&mut **tx)
+ .await?;
+
+ Ok(())
+ }
+
fn query_history(row: SqliteRow) -> History {
let deleted_at: Option<i64> = row.get("deleted_at");
@@ -567,6 +580,18 @@ impl Database for Sqlite {
Ok(())
}
+ async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()> {
+ let mut tx = self.pool.begin().await?;
+
+ for id in ids {
+ Self::delete_row_raw(&mut tx, id.clone()).await?;
+ }
+
+ tx.commit().await?;
+
+ Ok(())
+ }
+
async fn stats(&self, h: &History) -> Result<HistoryStats> {
// We select the previous in the session by time
let mut prev = SqlBuilder::select_from("history");
diff --git a/atuin-client/src/history/store.rs b/atuin-client/src/history/store.rs
index f4aa9d93..a7785452 100644
--- a/atuin-client/src/history/store.rs
+++ b/atuin-client/src/history/store.rs
@@ -1,8 +1,11 @@
use eyre::{bail, eyre, Result};
use rmp::decode::Bytes;
-use crate::record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store};
-use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordIdx};
+use crate::{
+ database::Database,
+ record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store},
+};
+use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx};
use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION};
@@ -58,14 +61,14 @@ impl HistoryRecord {
Ok(DecryptedData(output))
}
- pub fn deserialize(bytes: &[u8], version: &str) -> Result<Self> {
+ pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> {
use rmp::decode;
fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
eyre!("{err:?}")
}
- let mut bytes = Bytes::new(bytes);
+ let mut bytes = Bytes::new(&bytes.0);
let record_type = decode::read_u8(&mut bytes).map_err(error_report)?;
@@ -147,10 +150,89 @@ impl HistoryStore {
self.push_record(record).await
}
+
+ pub async fn history(&self) -> Result<Vec<HistoryRecord>> {
+ // Atm this loads all history into memory
+ // Not ideal as that is potentially quite a lot, although history will be small.
+ let records = self.store.all_tagged(HISTORY_TAG).await?;
+ let mut ret = Vec::with_capacity(records.len());
+
+ for record in records.into_iter() {
+ let hist = match record.version.as_str() {
+ HISTORY_VERSION => {
+ let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
+ HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)
+ }
+ version => bail!("unknown history version {version:?}"),
+ }?;
+
+ ret.push(hist);
+ }
+
+ Ok(ret)
+ }
+
+ pub async fn build(&self, database: &dyn Database) -> Result<()> {
+ // I'd like to change how we rebuild and not couple this with the database, but need to
+ // consider the structure more deeply. This will be easy to change.
+
+ // TODO(ellie): page or iterate this
+ let history = self.history().await?;
+
+ // In theory we could flatten this here
+ // The current issue is that the database may have history in it already, from the old sync
+ // This didn't actually delete old history
+ // If we're sure we have a DB only maintained by the new store, we can flatten
+ // create/delete before we even get to sqlite
+ let mut creates = Vec::new();
+ let mut deletes = Vec::new();
+
+ for i in history {
+ match i {
+ HistoryRecord::Create(h) => {
+ creates.push(h);
+ }
+ HistoryRecord::Delete(id) => {
+ deletes.push(id);
+ }
+ }
+ }
+
+ database.save_bulk(&creates).await?;
+ database.delete_rows(&deletes).await?;
+
+ Ok(())
+ }
+
+ pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> {
+ for id in ids {
+ let record = self.store.get(*id).await?;
+
+ if record.tag != HISTORY_TAG {
+ continue;
+ }
+
+ let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
+ let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?;
+
+ match record {
+ HistoryRecord::Create(h) => {
+ // TODO: benchmark CPU time/memory tradeoff of batch commit vs one at a time
+ database.save(&h).await?;
+ }
+ HistoryRecord::Delete(id) => {
+ database.delete_rows(&[id]).await?;
+ }
+ }
+ }
+
+ Ok(())
+ }
}
#[cfg(test)]
mod tests {
+ use atuin_common::record::DecryptedData;
use time::macros::datetime;
use crate::history::{store::HistoryRecord, HISTORY_VERSION};
@@ -187,13 +269,14 @@ mod tests {
let serialized = record.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);
- let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
+ let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
// check the snapshot too
- let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
- .expect("failed to deserialize HistoryRecord");
+ let deserialized =
+ HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
}
@@ -208,12 +291,13 @@ mod tests {
let serialized = record.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);
- let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
+ let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
- let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
- .expect("failed to deserialize HistoryRecord");
+ let deserialized =
+ HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
}
}
diff --git a/atuin-client/src/record/store.rs b/atuin-client/src/record/store.rs
index a5c156d6..efe2eb4a 100644
--- a/atuin-client/src/record/store.rs
+++ b/atuin-client/src/record/store.rs
@@ -2,6 +2,7 @@ use async_trait::async_trait;
use eyre::Result;
use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIdx, RecordStatus};
+
/// A record store stores records
/// In more detail - we tend to need to process this into _another_ format to actually query it.
/// As is, the record store is intended as the source of truth for arbitratry data, which could
@@ -44,8 +45,6 @@ pub trait Store {
async fn status(&self) -> Result<RecordStatus>;
- /// Get every start record for a given tag, regardless of host.
- /// Useful when actually operating on synchronized data, and will often have conflict
- /// resolution applied.
+ /// Get all records for a given tag
async fn all_tagged(&self, tag: &str) -> Result<Vec<Record<EncryptedData>>>;
}
diff --git a/atuin-client/src/record/sync.rs b/atuin-client/src/record/sync.rs
index 2694e0ff..19b8dd1b 100644
--- a/atuin-client/src/record/sync.rs
+++ b/atuin-client/src/record/sync.rs
@@ -7,7 +7,7 @@ use thiserror::Error;
use super::store::Store;
use crate::{api_client::Client, settings::Settings};
-use atuin_common::record::{Diff, HostId, RecordIdx, RecordStatus};
+use atuin_common::record::{Diff, HostId, RecordId, RecordIdx, RecordStatus};
#[derive(Error, Debug)]
pub enum SyncError {
@@ -198,11 +198,12 @@ async fn sync_download(
tag: String,
local: Option<RecordIdx>,
remote: RecordIdx,
-) -> Result<i64, SyncError> {
+) -> Result<Vec<RecordId>, SyncError> {
let local = local.unwrap_or(0);
let expected = remote - local;
let download_page_size = 100;
let mut progress = 0;
+ let mut ret = Vec::new();
println!(
"Downloading {} records from {}/{}",
@@ -230,6 +231,8 @@ async fn sync_download(
expected
);
+ ret.extend(page.iter().map(|f| f.id));
+
progress += page.len() as u64;
if progress >= expected {
@@ -237,14 +240,14 @@ async fn sync_download(
}
}
- Ok(progress as i64)
+ Ok(ret)
}
pub async fn sync_remote(
operations: Vec<Operation>,
local_store: &impl Store,
settings: &Settings,
-) -> Result<(i64, i64), SyncError> {
+) -> Result<(i64, Vec<RecordId>), SyncError> {
let client = Client::new(
&settings.sync_address,
&settings.session_token,
@@ -254,7 +257,7 @@ pub async fn sync_remote(
.expect("failed to create client");
let mut uploaded = 0;
- let mut downloaded = 0;
+ let mut downloaded = Vec::new();
// this can totally run in parallel, but lets get it working first
for i in operations {
@@ -271,9 +274,7 @@ pub async fn sync_remote(
tag,
local,
remote,
- } => {
- downloaded += sync_download(local_store, &client, host, tag, local, remote).await?
- }
+ } => downloaded = sync_download(local_store, &client, host, tag, local, remote).await?,
Operation::Noop { .. } => continue,
}
diff --git a/atuin/src/command/client.rs b/atuin/src/command/client.rs
index 9ca199fd..6292d263 100644
--- a/atuin/src/command/client.rs
+++ b/atuin/src/command/client.rs
@@ -16,9 +16,9 @@ mod config;
mod history;
mod import;
mod kv;
-mod record;
mod search;
mod stats;
+mod store;
#[derive(Subcommand, Debug)]
#[command(infer_subcommands = true)]
@@ -48,7 +48,7 @@ pub enum Cmd {
Kv(kv::Cmd),
#[command(subcommand)]
- Record(record::Cmd),
+ Store(store::Cmd),
/// Print example configuration
#[command()]
@@ -83,23 +83,23 @@ impl Cmd {
let record_store_path = PathBuf::from(settings.record_store_path.as_str());
let db = Sqlite::new(db_path).await?;
- let store = SqliteStore::new(record_store_path).await?;
+ let sqlite_store = SqliteStore::new(record_store_path).await?;
match self {
- Self::History(history) => history.run(&settings, &db, store).await,
+ Self::History(history) => history.run(&settings, &db, sqlite_store).await,
Self::Import(import) => import.run(&db).await,
Self::Stats(stats) => stats.run(&db, &settings).await,
Self::Search(search) => search.run(db, &mut settings).await,
#[cfg(feature = "sync")]
- Self::Sync(sync) => sync.run(settings, &db, &store).await,
+ Self::Sync(sync) => sync.run(settings, &db, sqlite_store).await,
#[cfg(feature = "sync")]
Self::Account(account) => account.run(settings).await,
- Self::Kv(kv) => kv.run(&settings, &store).await,
+ Self::Kv(kv) => kv.run(&settings, &sqlite_store).await,
- Self::Record(record) => record.run(&settings, &store).await,
+ Self::Store(store) => store.run(&settings, &db, sqlite_store).await,
Self::DefaultConfig => {
config::run();
diff --git a/atuin/src/command/client/history.rs b/atuin/src/command/client/history.rs
index e22ee6db..10f1feb6 100644
--- a/atuin/src/command/client/history.rs
+++ b/atuin/src/command/client/history.rs
@@ -317,14 +317,14 @@ impl Cmd {
if settings.sync.records {
let (diff, _) = record::sync::diff(settings, &store).await?;
let operations = record::sync::operations(diff, &store).await?;
- let (uploaded, downloaded) =
+ let (_, downloaded) =
record::sync::sync_remote(operations, &store, settings).await?;
- println!("{uploaded}/{downloaded} up/down to record store");
+ history_store.incremental_build(db, &downloaded).await?;
+ } else {
+ debug!("running periodic background sync");
+ sync::sync(settings, false, db).await?;
}
-
- debug!("running periodic background sync");
- sync::sync(settings, false, db).await?;
}
#[cfg(not(feature = "sync"))]
debug!("not compiled with sync support");
diff --git a/atuin/src/command/client/record.rs b/atuin/src/command/client/record.rs
deleted file mode 100644
index 3c91cdcc..00000000
--- a/atuin/src/command/client/record.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-use clap::Subcommand;
-use eyre::Result;
-
-use atuin_client::{record::store::Store, settings::Settings};
-use time::OffsetDateTime;
-
-#[derive(Subcommand, Debug)]
-#[command(infer_subcommands = true)]
-pub enum Cmd {
- Status,
-}
-
-impl Cmd {
- pub async fn run(
- &self,
- _settings: &Settings,
- store: &(impl Store + Send + Sync),
- ) -> Result<()> {
- let host_id = Settings::host_id().expect("failed to get host_id");
-
- let status = store.status().await?;
-
- // TODO: should probs build some data structure and then pretty-print it or smth
- for (host, st) in &status.hosts {
- let host_string = if host == &host_id {
- format!("host: {} <- CURRENT HOST", host.0.as_hyphenated())
- } else {
- format!("host: {}", host.0.as_hyphenated())
- };
-
- println!("{host_string}");
-
- for (tag, idx) in st {
- println!("\tstore: {tag}");
-
- let first = store.first(*host, tag).await?;
- let last = store.last(*host, tag).await?;
-
- println!("\t\tidx: {idx}");
-
- if let Some(first) = first {
- println!("\t\tfirst: {}", first.id.0.as_hyphenated());
-
- let time =
- OffsetDateTime::from_unix_timestamp_nanos(i128::from(first.timestamp))?;
- println!("\t\t\tcreated: {time}");
- }
-
- if let Some(last) = last {
- println!("\t\tlast: {}", last.id.0.as_hyphenated());
-
- let time =
- OffsetDateTime::from_unix_timestamp_nanos(i128::from(last.timestamp))?;
- println!("\t\t\tcreated: {time}");
- }
- }
-
- println!();
- }
-
- Ok(())
- }
-}
diff --git a/atuin/src/command/client/store.rs b/atuin/src/command/client/store.rs
new file mode 100644
index 00000000..640a284b
--- /dev/null
+++ b/atuin/src/command/client/store.rs
@@ -0,0 +1,123 @@
+use clap::{Args, Subcommand};
+use eyre::{bail, Result};
+
+use atuin_client::{
+ database::Database,
+ encryption,
+ history::store::HistoryStore,
+ record::{sqlite_store::SqliteStore, store::Store},
+ settings::Settings,
+};
+use time::OffsetDateTime;
+
+#[derive(Args, Debug)]
+pub struct Rebuild {
+ pub tag: String,
+}
+
+impl Rebuild {
+ pub async fn run(
+ &self,
+ settings: &Settings,
+ store: SqliteStore,
+ database: &dyn Database,
+ ) -> Result<()> {
+ // keep it as a string and not an enum atm
+ // would be super cool to build this dynamically in the future
+ // eg register handles for rebuilding various tags without having to make this part of the
+ // binary big
+ match self.tag.as_str() {
+ "history" => {
+ self.rebuild_history(settings, store.clone(), database)
+ .await?;
+ }
+
+ tag => bail!("unknown tag: {tag}"),
+ }
+
+ Ok(())
+ }
+
+ async fn rebuild_history(
+ &self,
+ settings: &Settings,
+ store: SqliteStore,
+ database: &dyn Database,
+ ) -> Result<()> {
+ let encryption_key: [u8; 32] = encryption::load_key(settings)?.into();
+
+ let host_id = Settings::host_id().expect("failed to get host_id");
+ let history_store = HistoryStore::new(store, host_id, encryption_key);
+
+ history_store.build(database).await?;
+
+ Ok(())
+ }
+}
+
+#[derive(Subcommand, Debug)]
+#[command(infer_subcommands = true)]
+pub enum Cmd {
+ Status,
+ Rebuild(Rebuild),
+}
+
+impl Cmd {
+ pub async fn run(
+ &self,
+ settings: &Settings,
+ database: &dyn Database,
+ store: SqliteStore,
+ ) -> Result<()> {
+ match self {
+ Self::Status => self.status(store).await,
+ Self::Rebuild(rebuild) => rebuild.run(settings, store, database).await,
+ }
+ }
+
+ pub async fn status(&self, store: SqliteStore) -> Result<()> {
+ let host_id = Settings::host_id().expect("failed to get host_id");
+
+ let status = store.status().await?;
+
+ // TODO: should probs build some data structure and then pretty-print it or smth
+ for (host, st) in &status.hosts {
+ let host_string = if host == &host_id {
+ format!("host: {} <- CURRENT HOST", host.0.as_hyphenated())
+ } else {
+ format!("host: {}", host.0.as_hyphenated())
+ };
+
+ println!("{host_string}");
+
+ for (tag, idx) in st {
+ println!("\tstore: {tag}");
+
+ let first = store.first(*host, tag).await?;
+ let last = store.last(*host, tag).await?;
+
+ println!("\t\tidx: {idx}");
+
+ if let Some(first) = first {
+ println!("\t\tfirst: {}", first.id.0.as_hyphenated());
+
+ let time =
+ OffsetDateTime::from_unix_timestamp_nanos(i128::from(first.timestamp))?;
+ println!("\t\t\tcreated: {time}");
+ }
+
+ if let Some(last) = last {
+ println!("\t\tlast: {}", last.id.0.as_hyphenated());
+
+ let time =
+ OffsetDateTime::from_unix_timestamp_nanos(i128::from(last.timestamp))?;
+ println!("\t\t\tcreated: {time}");
+ }
+ }
+
+ println!();
+ }
+
+ Ok(())
+ }
+}
diff --git a/atuin/src/command/client/sync.rs b/atuin/src/command/client/sync.rs
index 1d2cdf4f..2e58f07d 100644
--- a/atuin/src/command/client/sync.rs
+++ b/atuin/src/command/client/sync.rs
@@ -3,7 +3,9 @@ use eyre::{Result, WrapErr};
use atuin_client::{
database::Database,
- record::{store::Store, sync},
+ encryption,
+ history::store::HistoryStore,
+ record::{sqlite_store::SqliteStore, sync},
settings::Settings,
};
@@ -45,7 +47,7 @@ impl Cmd {
self,
settings: Settings,
db: &impl Database,
- store: &(impl Store + Send + Sync),
+ store: SqliteStore,
) -> Result<()> {
match self {
Self::Sync { force } => run(&settings, force, db, store).await,
@@ -75,17 +77,26 @@ async fn run(
settings: &Settings,
force: bool,
db: &impl Database,
- store: &(impl Store + Send + Sync),
+ store: SqliteStore,
) -> Result<()> {
if settings.sync.records {
- let (diff, _) = sync::diff(settings, store).await?;
- let operations = sync::operations(diff, store).await?;
- let (uploaded, downloaded) = sync::sync_remote(operations, store, settings).await?;
+ let (diff, _) = sync::diff(settings, &store).await?;
+ let operations = sync::operations(diff, &store).await?;
+ let (uploaded, downloaded) = sync::sync_remote(operations, &store, settings).await?;
- println!("{uploaded}/{downloaded} up/down to record store");
- }
+ let encryption_key: [u8; 32] = encryption::load_key(settings)
+ .context("could not load encryption key")?
+ .into();
+
+ let host_id = Settings::host_id().expect("failed to get host_id");
+ let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
- atuin_client::sync::sync(settings, force, db).await?;
+ history_store.incremental_build(db, &downloaded).await?;
+
+ println!("{uploaded}/{} up/down to record store", downloaded.len());
+ } else {
+ atuin_client::sync::sync(settings, force, db).await?;
+ }
println!(
"Sync complete! {} items in history database, force: {}",