summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--atuin-client/src/database.rs27
-rw-r--r--atuin-client/src/history/store.rs104
-rw-r--r--atuin-client/src/record/store.rs5
-rw-r--r--atuin-client/src/record/sync.rs17
-rw-r--r--atuin/src/command/client.rs14
-rw-r--r--atuin/src/command/client/history.rs10
-rw-r--r--atuin/src/command/client/record.rs63
-rw-r--r--atuin/src/command/client/store.rs123
-rw-r--r--atuin/src/command/client/sync.rs29
9 files changed, 286 insertions, 106 deletions
diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs
index 376c7b753..a69570931 100644
--- a/atuin-client/src/database.rs
+++ b/atuin-client/src/database.rs
@@ -18,7 +18,7 @@ use sqlx::{
};
use time::OffsetDateTime;
-use crate::history::HistoryStats;
+use crate::history::{HistoryId, HistoryStats};
use super::{
history::History,
@@ -93,6 +93,7 @@ pub trait Database: Send + Sync + 'static {
async fn before(&self, timestamp: OffsetDateTime, count: i64) -> Result<Vec<History>>;
async fn delete(&self, h: History) -> Result<()>;
+ async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()>;
async fn deleted(&self) -> Result<Vec<History>>;
// Yes I know, it's a lot.
@@ -172,6 +173,18 @@ impl Sqlite {
Ok(())
}
+ async fn delete_row_raw(
+ tx: &mut sqlx::Transaction<'_, sqlx::Sqlite>,
+ id: HistoryId,
+ ) -> Result<()> {
+ sqlx::query("delete from history where id = ?1")
+ .bind(id.0.as_str())
+ .execute(&mut **tx)
+ .await?;
+
+ Ok(())
+ }
+
fn query_history(row: SqliteRow) -> History {
let deleted_at: Option<i64> = row.get("deleted_at");
@@ -567,6 +580,18 @@ impl Database for Sqlite {
Ok(())
}
+ async fn delete_rows(&self, ids: &[HistoryId]) -> Result<()> {
+ let mut tx = self.pool.begin().await?;
+
+ for id in ids {
+ Self::delete_row_raw(&mut tx, id.clone()).await?;
+ }
+
+ tx.commit().await?;
+
+ Ok(())
+ }
+
async fn stats(&self, h: &History) -> Result<HistoryStats> {
// We select the previous in the session by time
let mut prev = SqlBuilder::select_from("history");
diff --git a/atuin-client/src/history/store.rs b/atuin-client/src/history/store.rs
index f4aa9d93c..a77854528 100644
--- a/atuin-client/src/history/store.rs
+++ b/atuin-client/src/history/store.rs
@@ -1,8 +1,11 @@
use eyre::{bail, eyre, Result};
use rmp::decode::Bytes;
-use crate::record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store};
-use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordIdx};
+use crate::{
+ database::Database,
+ record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store},
+};
+use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordId, RecordIdx};
use super::{History, HistoryId, HISTORY_TAG, HISTORY_VERSION};
@@ -58,14 +61,14 @@ impl HistoryRecord {
Ok(DecryptedData(output))
}
- pub fn deserialize(bytes: &[u8], version: &str) -> Result<Self> {
+ pub fn deserialize(bytes: &DecryptedData, version: &str) -> Result<Self> {
use rmp::decode;
fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
eyre!("{err:?}")
}
- let mut bytes = Bytes::new(bytes);
+ let mut bytes = Bytes::new(&bytes.0);
let record_type = decode::read_u8(&mut bytes).map_err(error_report)?;
@@ -147,10 +150,89 @@ impl HistoryStore {
self.push_record(record).await
}
+
+ pub async fn history(&self) -> Result<Vec<HistoryRecord>> {
+ // Atm this loads all history into memory
+ // Not ideal as that is potentially quite a lot, although history will be small.
+ let records = self.store.all_tagged(HISTORY_TAG).await?;
+ let mut ret = Vec::with_capacity(records.len());
+
+ for record in records.into_iter() {
+ let hist = match record.version.as_str() {
+ HISTORY_VERSION => {
+ let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
+ HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)
+ }
+ version => bail!("unknown history version {version:?}"),
+ }?;
+
+ ret.push(hist);
+ }
+
+ Ok(ret)
+ }
+
+ pub async fn build(&self, database: &dyn Database) -> Result<()> {
+ // I'd like to change how we rebuild and not couple this with the database, but need to
+ // consider the structure more deeply. This will be easy to change.
+
+ // TODO(ellie): page or iterate this
+ let history = self.history().await?;
+
+ // In theory we could flatten this here
+ // The current issue is that the database may have history in it already, from the old sync
+ // This didn't actually delete old history
+ // If we're sure we have a DB only maintained by the new store, we can flatten
+ // create/delete before we even get to sqlite
+ let mut creates = Vec::new();
+ let mut deletes = Vec::new();
+
+ for i in history {
+ match i {
+ HistoryRecord::Create(h) => {
+ creates.push(h);
+ }
+ HistoryRecord::Delete(id) => {
+ deletes.push(id);
+ }
+ }
+ }
+
+ database.save_bulk(&creates).await?;
+ database.delete_rows(&deletes).await?;
+
+ Ok(())
+ }
+
+ pub async fn incremental_build(&self, database: &dyn Database, ids: &[RecordId]) -> Result<()> {
+ for id in ids {
+ let record = self.store.get(*id).await?;
+
+ if record.tag != HISTORY_TAG {
+ continue;
+ }
+
+ let decrypted = record.decrypt::<PASETO_V4>(&self.encryption_key)?;
+ let record = HistoryRecord::deserialize(&decrypted.data, HISTORY_VERSION)?;
+
+ match record {
+ HistoryRecord::Create(h) => {
+ // TODO: benchmark CPU time/memory tradeoff of batch commit vs one at a time
+ database.save(&h).await?;
+ }
+ HistoryRecord::Delete(id) => {
+ database.delete_rows(&[id]).await?;
+ }
+ }
+ }
+
+ Ok(())
+ }
}
#[cfg(test)]
mod tests {
+ use atuin_common::record::DecryptedData;
use time::macros::datetime;
use crate::history::{store::HistoryRecord, HISTORY_VERSION};
@@ -187,13 +269,14 @@ mod tests {
let serialized = record.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);
- let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
+ let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
// check the snapshot too
- let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
- .expect("failed to deserialize HistoryRecord");
+ let deserialized =
+ HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
}
@@ -208,12 +291,13 @@ mod tests {
let serialized = record.serialize().expect("failed to serialize history");
assert_eq!(serialized.0, bytes);
- let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
+ let deserialized = HistoryRecord::deserialize(&serialized, HISTORY_VERSION)
.expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
- let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
- .expect("failed to deserialize HistoryRecord");
+ let deserialized =
+ HistoryRecord::deserialize(&DecryptedData(Vec::from(bytes)), HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
assert_eq!(deserialized, record);
}
}
diff --git a/atuin-client/src/record/store.rs b/atuin-client/src/record/store.rs
index a5c156d68..efe2eb4a7 100644
--- a/atuin-client/src/record/store.rs
+++ b/atuin-client/src/record/store.rs
@@ -2,6 +2,7 @@ use async_trait::async_trait;
use eyre::Result;
use atuin_common::record::{EncryptedData, HostId, Record, RecordId, RecordIdx, RecordStatus};
+
/// A record store stores records
/// In more detail - we tend to need to process this into _another_ format to actually query it.
/// As is, the record store is intended as the source of truth for arbitratry data, which could
@@ -44,8 +45,6 @@ pub trait Store {
async fn status(&self) -> Result<RecordStatus>;
- /// Get every start record for a given tag, regardless of host.
- /// Useful when actually operating on synchronized data, and will often have conflict
- /// resolution applied.
+ /// Get all records for a given tag
async fn all_tagged(&self, tag: &str) -> Result<Vec<Record<EncryptedData>>>;
}
diff --git a/atuin-client/src/record/sync.rs b/atuin-client/src/record/sync.rs
index 2694e0ff0..19b8dd1b1 100644
--- a/atuin-client/src/record/sync.rs
+++ b/atuin-client/src/record/sync.rs
@@ -7,7 +7,7 @@ use thiserror::Error;
use super::store::Store;
use crate::{api_client::Client, settings::Settings};
-use atuin_common::record::{Diff, HostId, RecordIdx, RecordStatus};
+use atuin_common::record::{Diff, HostId, RecordId, RecordIdx, RecordStatus};
#[derive(Error, Debug)]
pub enum SyncError {
@@ -198,11 +198,12 @@ async fn sync_download(
tag: String,
local: Option<RecordIdx>,
remote: RecordIdx,
-) -> Result<i64, SyncError> {
+) -> Result<Vec<RecordId>, SyncError> {
let local = local.unwrap_or(0);
let expected = remote - local;
let download_page_size = 100;
let mut progress = 0;
+ let mut ret = Vec::new();
println!(
"Downloading {} records from {}/{}",
@@ -230,6 +231,8 @@ async fn sync_download(
expected
);
+ ret.extend(page.iter().map(|f| f.id));
+
progress += page.len() as u64;
if progress >= expected {
@@ -237,14 +240,14 @@ async fn sync_download(
}
}
- Ok(progress as i64)
+ Ok(ret)
}
pub async fn sync_remote(
operations: Vec<Operation>,
local_store: &impl Store,
settings: &Settings,
-) -> Result<(i64, i64), SyncError> {
+) -> Result<(i64, Vec<RecordId>), SyncError> {
let client = Client::new(
&settings.sync_address,
&settings.session_token,
@@ -254,7 +257,7 @@ pub async fn sync_remote(
.expect("failed to create client");
let mut uploaded = 0;
- let mut downloaded = 0;
+ let mut downloaded = Vec::new();
// this can totally run in parallel, but lets get it working first
for i in operations {
@@ -271,9 +274,7 @@ pub async fn sync_remote(
tag,
local,
remote,
- } => {
- downloaded += sync_download(local_store, &client, host, tag, local, remote).await?
- }
+ } => downloaded = sync_download(local_store, &client, host, tag, local, remote).await?,
Operation::Noop { .. } => continue,
}
diff --git a/atuin/src/command/client.rs b/atuin/src/command/client.rs
index 9ca199fd9..6292d2638 100644
--- a/atuin/src/command/client.rs
+++ b/atuin/src/command/client.rs
@@ -16,9 +16,9 @@ mod config;
mod history;
mod import;
mod kv;
-mod record;
mod search;
mod stats;
+mod store;
#[derive(Subcommand, Debug)]
#[command(infer_subcommands = true)]
@@ -48,7 +48,7 @@ pub enum Cmd {
Kv(kv::Cmd),
#[command(subcommand)]
- Record(record::Cmd),
+ Store(store::Cmd),
/// Print example configuration
#[command()]
@@ -83,23 +83,23 @@ impl Cmd {
let record_store_path = PathBuf::from(settings.record_store_path.as_str());
let db = Sqlite::new(db_path).await?;
- let store = SqliteStore::new(record_store_path).await?;
+ let sqlite_store = SqliteStore::new(record_store_path).await?;
match self {
- Self::History(history) => history.run(&settings, &db, store).await,
+ Self::History(history) => history.run(&settings, &db, sqlite_store).await,
Self::Import(import) => import.run(&db).await,
Self::Stats(stats) => stats.run(&db, &settings).await,
Self::Search(search) => search.run(db, &mut settings).await,
#[cfg(feature = "sync")]
- Self::Sync(sync) => sync.run(settings, &db, &store).await,
+ Self::Sync(sync) => sync.run(settings, &db, sqlite_store).await,
#[cfg(feature = "sync")]
Self::Account(account) => account.run(settings).await,
- Self::Kv(kv) => kv.run(&settings, &store).await,
+ Self::Kv(kv) => kv.run(&settings, &sqlite_store).await,
- Self::Record(record) => record.run(&settings, &store).await,
+ Self::Store(store) => store.run(&settings, &db, sqlite_store).await,
Self::DefaultConfig => {
config::run();
diff --git a/atuin/src/command/client/history.rs b/atuin/src/command/client/history.rs
index e22ee6dbf..10f1feb60 100644
--- a/atuin/src/command/client/history.rs
+++ b/atuin/src/command/client/history.rs
@@ -317,14 +317,14 @@ impl Cmd {
if settings.sync.records {
let (diff, _) = record::sync::diff(settings, &store).await?;
let operations = record::sync::operations(diff, &store).await?;
- let (uploaded, downloaded) =
+ let (_, downloaded) =
record::sync::sync_remote(operations, &store, settings).await?;
- println!("{uploaded}/{downloaded} up/down to record store");
+ history_store.incremental_build(db, &downloaded).await?;
+ } else {
+ debug!("running periodic background sync");
+ sync::sync(settings, false, db).await?;
}
-
- debug!("running periodic background sync");
- sync::sync(settings, false, db).await?;
}
#[cfg(not(feature = "sync"))]
debug!("not compiled with sync support");
diff --git a/atuin/src/command/client/record.rs b/atuin/src/command/client/record.rs
deleted file mode 100644
index 3c91cdcc3..000000000
--- a/atuin/src/command/client/record.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-use clap::Subcommand;
-use eyre::Result;
-
-use atuin_client::{record::store::Store, settings::Settings};
-use time::OffsetDateTime;
-
-#[derive(Subcommand, Debug)]
-#[command(infer_subcommands = true)]
-pub enum Cmd {
- Status,
-}
-
-impl Cmd {
- pub async fn run(
- &self,
- _settings: &Settings,
- store: &(impl Store + Send + Sync),
- ) -> Result<()> {
- let host_id = Settings::host_id().expect("failed to get host_id");
-
- let status = store.status().await?;
-
- // TODO: should probs build some data structure and then pretty-print it or smth
- for (host, st) in &status.hosts {
- let host_string = if host == &host_id {
- format!("host: {} <- CURRENT HOST", host.0.as_hyphenated())
- } else {
- format!("host: {}", host.0.as_hyphenated())
- };
-
- println!("{host_string}");
-
- for (tag, idx) in st {
- println!("\tstore: {tag}");
-
- let first = store.first(*host, tag).await?;
- let last = store.last(*host, tag).await?;
-
- println!("\t\tidx: {idx}");
-
- if let Some(first) = first {
- println!("\t\tfirst: {}", first.id.0.as_hyphenated());
-
- let time =
- OffsetDateTime::from_unix_timestamp_nanos(i128::from(first.timestamp))?;
- println!("\t\t\tcreated: {time}");
- }
-
- if let Some(last) = last {
- println!("\t\tlast: {}", last.id.0.as_hyphenated());
-
- let time =
- OffsetDateTime::from_unix_timestamp_nanos(i128::from(last.timestamp))?;
- println!("\t\t\tcreated: {time}");
- }
- }
-
- println!();
- }
-
- Ok(())
- }
-}
diff --git a/atuin/src/command/client/store.rs b/atuin/src/command/client/store.rs
new file mode 100644
index 000000000..640a284b6
--- /dev/null
+++ b/atuin/src/command/client/store.rs
@@ -0,0 +1,123 @@
+use clap::{Args, Subcommand};
+use eyre::{bail, Result};
+
+use atuin_client::{
+ database::Database,
+ encryption,
+ history::store::HistoryStore,
+ record::{sqlite_store::SqliteStore, store::Store},
+ settings::Settings,
+};
+use time::OffsetDateTime;
+
+#[derive(Args, Debug)]
+pub struct Rebuild {
+ pub tag: String,
+}
+
+impl Rebuild {
+ pub async fn run(
+ &self,
+ settings: &Settings,
+ store: SqliteStore,
+ database: &dyn Database,
+ ) -> Result<()> {
+ // keep it as a string and not an enum atm
+ // would be super cool to build this dynamically in the future
+ // eg register handles for rebuilding various tags without having to make this part of the
+ // binary big
+ match self.tag.as_str() {
+ "history" => {
+ self.rebuild_history(settings, store.clone(), database)
+ .await?;
+ }
+
+ tag => bail!("unknown tag: {tag}"),
+ }
+
+ Ok(())
+ }
+
+ async fn rebuild_history(
+ &self,
+ settings: &Settings,
+ store: SqliteStore,
+ database: &dyn Database,
+ ) -> Result<()> {
+ let encryption_key: [u8; 32] = encryption::load_key(settings)?.into();
+
+ let host_id = Settings::host_id().expect("failed to get host_id");
+ let history_store = HistoryStore::new(store, host_id, encryption_key);
+
+ history_store.build(database).await?;
+
+ Ok(())
+ }
+}
+
+#[derive(Subcommand, Debug)]
+#[command(infer_subcommands = true)]
+pub enum Cmd {
+ Status,
+ Rebuild(Rebuild),
+}
+
+impl Cmd {
+ pub async fn run(
+ &self,
+ settings: &Settings,
+ database: &dyn Database,
+ store: SqliteStore,
+ ) -> Result<()> {
+ match self {
+ Self::Status => self.status(store).await,
+ Self::Rebuild(rebuild) => rebuild.run(settings, store, database).await,
+ }
+ }
+
+ pub async fn status(&self, store: SqliteStore) -> Result<()> {
+ let host_id = Settings::host_id().expect("failed to get host_id");
+
+ let status = store.status().await?;
+
+ // TODO: should probs build some data structure and then pretty-print it or smth
+ for (host, st) in &status.hosts {
+ let host_string = if host == &host_id {
+ format!("host: {} <- CURRENT HOST", host.0.as_hyphenated())
+ } else {
+ format!("host: {}", host.0.as_hyphenated())
+ };
+
+ println!("{host_string}");
+
+ for (tag, idx) in st {
+ println!("\tstore: {tag}");
+
+ let first = store.first(*host, tag).await?;
+ let last = store.last(*host, tag).await?;
+
+ println!("\t\tidx: {idx}");
+
+ if let Some(first) = first {
+ println!("\t\tfirst: {}", first.id.0.as_hyphenated());
+
+ let time =
+ OffsetDateTime::from_unix_timestamp_nanos(i128::from(first.timestamp))?;
+ println!("\t\t\tcreated: {time}");
+ }
+
+ if let Some(last) = last {
+ println!("\t\tlast: {}", last.id.0.as_hyphenated());
+
+ let time =
+ OffsetDateTime::from_unix_timestamp_nanos(i128::from(last.timestamp))?;
+ println!("\t\t\tcreated: {time}");
+ }
+ }
+
+ println!();
+ }
+
+ Ok(())
+ }
+}
diff --git a/atuin/src/command/client/sync.rs b/atuin/src/command/client/sync.rs
index 1d2cdf4fe..2e58f07db 100644
--- a/atuin/src/command/client/sync.rs
+++ b/atuin/src/command/client/sync.rs
@@ -3,7 +3,9 @@ use eyre::{Result, WrapErr};
use atuin_client::{
database::Database,
- record::{store::Store, sync},
+ encryption,
+ history::store::HistoryStore,
+ record::{sqlite_store::SqliteStore, sync},
settings::Settings,
};
@@ -45,7 +47,7 @@ impl Cmd {
self,
settings: Settings,
db: &impl Database,
- store: &(impl Store + Send + Sync),
+ store: SqliteStore,
) -> Result<()> {
match self {
Self::Sync { force } => run(&settings, force, db, store).await,
@@ -75,17 +77,26 @@ async fn run(
settings: &Settings,
force: bool,
db: &impl Database,
- store: &(impl Store + Send + Sync),
+ store: SqliteStore,
) -> Result<()> {
if settings.sync.records {
- let (diff, _) = sync::diff(settings, store).await?;
- let operations = sync::operations(diff, store).await?;
- let (uploaded, downloaded) = sync::sync_remote(operations, store, settings).await?;
+ let (diff, _) = sync::diff(settings, &store).await?;
+ let operations = sync::operations(diff, &store).await?;
+ let (uploaded, downloaded) = sync::sync_remote(operations, &store, settings).await?;
- println!("{uploaded}/{downloaded} up/down to record store");
- }
+ let encryption_key: [u8; 32] = encryption::load_key(settings)
+ .context("could not load encryption key")?
+ .into();
+
+ let host_id = Settings::host_id().expect("failed to get host_id");
+ let history_store = HistoryStore::new(store.clone(), host_id, encryption_key);
- atuin_client::sync::sync(settings, force, db).await?;
+ history_store.incremental_build(db, &downloaded).await?;
+
+ println!("{uploaded}/{} up/down to record store", downloaded.len());
+ } else {
+ atuin_client::sync::sync(settings, force, db).await?;
+ }
println!(
"Sync complete! {} items in history database, force: {}",