summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEllie Huxtable <ellie@elliehuxtable.com>2024-01-05 17:57:49 +0000
committerGitHub <noreply@github.com>2024-01-05 17:57:49 +0000
commit7bc6ccdd70422f8fc763e2fd814a481bc79ce7b5 (patch)
treea1c064a7c7394d261711c6e046d4c60791e6cf6f
parent604ae40b9dee24e3ebe66cbc1ef1020683a34699 (diff)
feat: rework record sync for improved reliability (#1478)
* feat: rework record sync for improved reliability So, to tell a story 1. We introduced the record sync, intended to be the new algorithm to sync history. 2. On top of this, I added the KV store. This was intended as a simple test of the record sync, and to see if people wanted that sort of functionality 3. History remained syncing via the old means, as while it had issues it worked more-or-less OK. And we are aware of its flaws 4. If KV syncing worked ok, history would be moved across KV syncing ran ok for 6mo or so, so I started to move across history. For several weeks, I ran a local fork of Atuin + the server that synced via records instead. The record store maintained ordering via a linked list, which was a mistake. It performed well in testing, but was really difficult to debug and reason about. So when a few small sync issues occured, they took an extremely long time to debug. This PR is huge, which I regret. It involves replacing the "parent" relationship that records once had (pointing to the previous record) with a simple index (generally referred to as idx). This also means we had to change the recordindex, which referenced "tails". Tails were the last item in the chain. Now that we use an "array" vs linked list, that logic was also replaced. And is much simpler :D Same for the queries that act on this data. ---- This isn't final - we still need to add 1. Proper server/client error handling, which has been lacking for a while 2. The actual history implementation on top This exists in a branch, just without deletions. Won't be much to add that, I just don't want to make this any larger than it already is The _only_ caveat here is that we basically lose data synced via the old record store. This is the KV data from before. It hasn't been deleted or anything, just no longer hooked up. So it's totally possible to write a migration script. I just need to do that. * update .gitignore * use correct endpoint * fix for stores with length of 1 * use create/delete enum for history store * lint, remove unneeded host_id * remove prints * add command to import old history * add enable/disable switch for record sync * add record sync to auto sync * satisfy the almighty clippy * remove file that I did not mean to commit * feedback
-rw-r--r--.gitignore2
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--atuin-client/Cargo.toml1
-rw-r--r--atuin-client/record-migrations/20231127090831_create-store.sql15
-rw-r--r--atuin-client/src/api_client.rs48
-rw-r--r--atuin-client/src/history.rs210
-rw-r--r--atuin-client/src/history/store.rs219
-rw-r--r--atuin-client/src/kv.rs100
-rw-r--r--atuin-client/src/record/encryption.rs29
-rw-r--r--atuin-client/src/record/sqlite_store.rs250
-rw-r--r--atuin-client/src/record/store.rs36
-rw-r--r--atuin-client/src/record/sync.rs498
-rw-r--r--atuin-client/src/settings.rs9
-rw-r--r--atuin-common/src/record.rs166
-rw-r--r--atuin-server-database/src/lib.rs6
-rw-r--r--atuin-server-postgres/migrations/20231202170508_create-store.sql15
-rw-r--r--atuin-server-postgres/migrations/20231203124112_create-store-idx.sql2
-rw-r--r--atuin-server-postgres/src/lib.rs98
-rw-r--r--atuin-server-postgres/src/wrappers.rs7
-rw-r--r--atuin-server/src/handlers/mod.rs1
-rw-r--r--atuin-server/src/handlers/record.rs109
-rw-r--r--atuin-server/src/handlers/v0/mod.rs1
-rw-r--r--atuin-server/src/handlers/v0/record.rs111
-rw-r--r--atuin-server/src/router.rs11
-rw-r--r--atuin/src/command/client.rs14
-rw-r--r--atuin/src/command/client/history.rs70
-rw-r--r--atuin/src/command/client/kv.rs6
-rw-r--r--atuin/src/command/client/record.rs63
-rw-r--r--atuin/src/command/client/sync.rs15
30 files changed, 1484 insertions, 630 deletions
diff --git a/.gitignore b/.gitignore
index 17c0b070..3d544414 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,8 @@
+.DS_Store
/target
*/target
.env
.idea/
.vscode/
result
+publish.sh
diff --git a/Cargo.lock b/Cargo.lock
index 28c00fb8..46499c34 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -242,6 +242,7 @@ dependencies = [
"shellexpand",
"sql-builder",
"sqlx",
+ "thiserror",
"time",
"tokio",
"typed-builder",
diff --git a/Cargo.toml b/Cargo.toml
index 1d9f6176..9aa03831 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -43,6 +43,7 @@ uuid = { version = "1.3", features = ["v4", "v7", "serde"] }
whoami = "1.1.2"
typed-builder = "0.18.0"
pretty_assertions = "1.3.0"
+thiserror = "1.0"
[workspace.dependencies.reqwest]
version = "0.11"
diff --git a/atuin-client/Cargo.toml b/atuin-client/Cargo.toml
index c3d9cab7..cbb8d016 100644
--- a/atuin-client/Cargo.toml
+++ b/atuin-client/Cargo.toml
@@ -48,6 +48,7 @@ rmp = { version = "0.8.11" }
typed-builder = { workspace = true }
tokio = { workspace = true }
semver = { workspace = true }
+thiserror = { workspace = true }
futures = "0.3"
crypto_secretbox = "0.1.1"
generic-array = { version = "0.14", features = ["serde"] }
diff --git a/atuin-client/record-migrations/20231127090831_create-store.sql b/atuin-client/record-migrations/20231127090831_create-store.sql
new file mode 100644
index 00000000..53d78860
--- /dev/null
+++ b/atuin-client/record-migrations/20231127090831_create-store.sql
@@ -0,0 +1,15 @@
+-- Add migration script here
+create table if not exists store (
+ id text primary key, -- globally unique ID
+
+ idx integer, -- incrementing integer ID unique per (host, tag)
+ host text not null, -- references the host row
+ tag text not null,
+
+ timestamp integer not null,
+ version text not null,
+ data blob not null,
+ cek blob not null
+);
+
+create unique index record_uniq ON store(host, tag, idx);
diff --git a/atuin-client/src/api_client.rs b/atuin-client/src/api_client.rs
index ae8df5ad..9007b9ab 100644
--- a/atuin-client/src/api_client.rs
+++ b/atuin-client/src/api_client.rs
@@ -13,11 +13,11 @@ use atuin_common::{
AddHistoryRequest, CountResponse, DeleteHistoryRequest, ErrorResponse, IndexResponse,
LoginRequest, LoginResponse, RegisterResponse, StatusResponse, SyncHistoryResponse,
},
- record::RecordIndex,
+ record::RecordStatus,
};
use atuin_common::{
api::{ATUIN_CARGO_VERSION, ATUIN_HEADER_VERSION, ATUIN_VERSION},
- record::{EncryptedData, HostId, Record, RecordId},
+ record::{EncryptedData, HostId, Record, RecordIdx},
};
use semver::Version;
use time::format_description::well_known::Rfc3339;
@@ -267,10 +267,18 @@ impl<'a> Client<'a> {
}
pub async fn post_records(&self, records: &[Record<EncryptedData>]) -> Result<()> {
- let url = format!("{}/record", self.sync_addr);
+ let url = format!("{}/api/v0/record", self.sync_addr);
let url = Url::parse(url.as_str())?;
- self.client.post(url).json(records).send().await?;
+ let resp = self.client.post(url).json(records).send().await?;
+ info!("posted records, got {}", resp.status());
+
+ if !resp.status().is_success() {
+ error!(
+ "failed to post records to server; got: {:?}",
+ resp.text().await
+ );
+ }
Ok(())
}
@@ -279,24 +287,22 @@ impl<'a> Client<'a> {
&self,
host: HostId,
tag: String,
- start: Option<RecordId>,
+ start: RecordIdx,
count: u64,
) -> Result<Vec<Record<EncryptedData>>> {
+ debug!(
+ "fetching record/s from host {}/{}/{}",
+ host.0.to_string(),
+ tag,
+ start
+ );
+
let url = format!(
- "{}/record/next?host={}&tag={}&count={}",
- self.sync_addr, host.0, tag, count
+ "{}/api/v0/record/next?host={}&tag={}&count={}&start={}",
+ self.sync_addr, host.0, tag, count, start
);
- let mut url = Url::parse(url.as_str())?;
-
- if let Some(start) = start {
- url.set_query(Some(
- format!(
- "host={}&tag={}&count={}&start={}",
- host.0, tag, count, start.0
- )
- .as_str(),
- ));
- }
+
+ let url = Url::parse(url.as_str())?;
let resp = self.client.get(url).send().await?;
@@ -305,8 +311,8 @@ impl<'a> Client<'a> {
Ok(records)
}
- pub async fn record_index(&self) -> Result<RecordIndex> {
- let url = format!("{}/record", self.sync_addr);
+ pub async fn record_status(&self) -> Result<RecordStatus> {
+ let url = format!("{}/api/v0/record", self.sync_addr);
let url = Url::parse(url.as_str())?;
let resp = self.client.get(url).send().await?;
@@ -317,6 +323,8 @@ impl<'a> Client<'a> {
let index = resp.json().await?;
+ debug!("got remote index {:?}", index);
+
Ok(index)
}
diff --git a/atuin-client/src/history.rs b/atuin-client/src/history.rs
index fbcb169c..2b2c41ee 100644
--- a/atuin-client/src/history.rs
+++ b/atuin-client/src/history.rs
@@ -1,12 +1,21 @@
+use rmp::decode::ValueReadError;
+use rmp::{decode::Bytes, Marker};
use std::env;
+use atuin_common::record::DecryptedData;
use atuin_common::utils::uuid_v7;
+
+use eyre::{bail, eyre, Result};
use regex::RegexSet;
use crate::{secrets::SECRET_PATTERNS, settings::Settings};
use time::OffsetDateTime;
mod builder;
+pub mod store;
+
+const HISTORY_VERSION: &str = "v0";
+const HISTORY_TAG: &str = "history";
/// Client-side history entry.
///
@@ -81,6 +90,108 @@ impl History {
}
}
+ pub fn serialize(&self) -> Result<DecryptedData> {
+ // This is pretty much the same as what we used for the old history, with one difference -
+ // it uses integers for timestamps rather than a string format.
+
+ use rmp::encode;
+
+ let mut output = vec![];
+
+ // write the version
+ encode::write_u16(&mut output, 0)?;
+ // INFO: ensure this is updated when adding new fields
+ encode::write_array_len(&mut output, 9)?;
+
+ encode::write_str(&mut output, &self.id)?;
+ encode::write_u64(&mut output, self.timestamp.unix_timestamp_nanos() as u64)?;
+ encode::write_sint(&mut output, self.duration)?;
+ encode::write_sint(&mut output, self.exit)?;
+ encode::write_str(&mut output, &self.command)?;
+ encode::write_str(&mut output, &self.cwd)?;
+ encode::write_str(&mut output, &self.session)?;
+ encode::write_str(&mut output, &self.hostname)?;
+
+ match self.deleted_at {
+ Some(d) => encode::write_u64(&mut output, d.unix_timestamp_nanos() as u64)?,
+ None => encode::write_nil(&mut output)?,
+ }
+
+ Ok(DecryptedData(output))
+ }
+
+ fn deserialize_v0(bytes: &[u8]) -> Result<History> {
+ use rmp::decode;
+
+ fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
+ eyre!("{err:?}")
+ }
+
+ let mut bytes = Bytes::new(bytes);
+
+ let version = decode::read_u16(&mut bytes).map_err(error_report)?;
+
+ if version != 0 {
+ bail!("expected decoding v0 record, found v{version}");
+ }
+
+ let nfields = decode::read_array_len(&mut bytes).map_err(error_report)?;
+
+ if nfields != 9 {
+ bail!("cannot decrypt history from a different version of Atuin");
+ }
+
+ let bytes = bytes.remaining_slice();
+ let (id, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
+
+ let mut bytes = Bytes::new(bytes);
+ let timestamp = decode::read_u64(&mut bytes).map_err(error_report)?;
+ let duration = decode::read_int(&mut bytes).map_err(error_report)?;
+ let exit = decode::read_int(&mut bytes).map_err(error_report)?;
+
+ let bytes = bytes.remaining_slice();
+ let (command, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
+ let (cwd, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
+ let (session, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
+ let (hostname, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
+
+ // if we have more fields, try and get the deleted_at
+ let mut bytes = Bytes::new(bytes);
+
+ let (deleted_at, bytes) = match decode::read_u64(&mut bytes) {
+ Ok(unix) => (Some(unix), bytes.remaining_slice()),
+ // we accept null here
+ Err(ValueReadError::TypeMismatch(Marker::Null)) => (None, bytes.remaining_slice()),
+ Err(err) => return Err(error_report(err)),
+ };
+
+ if !bytes.is_empty() {
+ bail!("trailing bytes in encoded history. malformed")
+ }
+
+ Ok(History {
+ id: id.to_owned(),
+ timestamp: OffsetDateTime::from_unix_timestamp_nanos(timestamp as i128)?,
+ duration,
+ exit,
+ command: command.to_owned(),
+ cwd: cwd.to_owned(),
+ session: session.to_owned(),
+ hostname: hostname.to_owned(),
+ deleted_at: deleted_at
+ .map(|t| OffsetDateTime::from_unix_timestamp_nanos(t as i128))
+ .transpose()?,
+ })
+ }
+
+ pub fn deserialize(bytes: &[u8], version: &str) -> Result<History> {
+ match version {
+ HISTORY_VERSION => Self::deserialize_v0(bytes),
+
+ _ => bail!("unknown version {version:?}"),
+ }
+ }
+
/// Builder for a history entry that is imported from shell history.
///
/// The only two required fields are `timestamp` and `command`.
@@ -202,8 +313,9 @@ impl History {
#[cfg(test)]
mod tests {
use regex::RegexSet;
+ use time::macros::datetime;
- use crate::settings::Settings;
+ use crate::{history::HISTORY_VERSION, settings::Settings};
use super::History;
@@ -274,4 +386,100 @@ mod tests {
assert!(stripe_key.should_save(&settings));
}
+
+ #[test]
+ fn test_serialize_deserialize() {
+ let bytes = [
+ 205, 0, 0, 153, 217, 32, 54, 54, 100, 49, 54, 99, 98, 101, 101, 55, 99, 100, 52, 55,
+ 53, 51, 56, 101, 53, 99, 53, 98, 56, 98, 52, 52, 101, 57, 48, 48, 54, 101, 207, 23, 99,
+ 98, 117, 24, 210, 246, 128, 206, 2, 238, 210, 240, 0, 170, 103, 105, 116, 32, 115, 116,
+ 97, 116, 117, 115, 217, 42, 47, 85, 115, 101, 114, 115, 47, 99, 111, 110, 114, 97, 100,
+ 46, 108, 117, 100, 103, 97, 116, 101, 47, 68, 111, 99, 117, 109, 101, 110, 116, 115,
+ 47, 99, 111, 100, 101, 47, 97, 116, 117, 105, 110, 217, 32, 98, 57, 55, 100, 57, 97,
+ 51, 48, 54, 102, 50, 55, 52, 52, 55, 51, 97, 50, 48, 51, 100, 50, 101, 98, 97, 52, 49,
+ 102, 57, 52, 53, 55, 187, 102, 118, 102, 103, 57, 51, 54, 99, 48, 107, 112, 102, 58,
+ 99, 111, 110, 114, 97, 100, 46, 108, 117, 100, 103, 97, 116, 101, 192,
+ ];
+
+ let history = History {
+ id: "66d16cbee7cd47538e5c5b8b44e9006e".to_owned(),
+ timestamp: datetime!(2023-05-28 18:35:40.633872 +00:00),
+ duration: 49206000,
+ exit: 0,
+ command: "git status".to_owned(),
+ cwd: "/Users/conrad.ludgate/Documents/code/atuin".to_owned(),
+ session: "b97d9a306f274473a203d2eba41f9457".to_owned(),
+ hostname: "fvfg936c0kpf:conrad.ludgate".to_owned(),
+ deleted_at: None,
+ };
+
+ let serialized = history.serialize().expect("failed to serialize history");
+ assert_eq!(serialized.0, bytes);
+
+ let deserialized = History::deserialize(&serialized.0, HISTORY_VERSION)
+ .expect("failed to deserialize history");
+ assert_eq!(history, deserialized);
+
+ // test the snapshot too
+ let deserialized =
+ History::deserialize(&bytes, HISTORY_VERSION).expect("failed to deserialize history");
+ assert_eq!(history, deserialized);
+ }
+
+ #[test]
+ fn test_serialize_deserialize_deleted() {
+ let history = History {
+ id: "66d16cbee7cd47538e5c5b8b44e9006e".to_owned(),
+ timestamp: datetime!(2023-05-28 18:35:40.633872 +00:00),
+ duration: 49206000,
+ exit: 0,
+ command: "git status".to_owned(),
+ cwd: "/Users/conrad.ludgate/Documents/code/atuin".to_owned(),
+ session: "b97d9a306f274473a203d2eba41f9457".to_owned(),
+ hostname: "fvfg936c0kpf:conrad.ludgate".to_owned(),
+ deleted_at: Some(datetime!(2023-11-19 20:18 +00:00)),
+ };
+
+ let serialized = history.serialize().expect("failed to serialize history");
+
+ let deserialized = History::deserialize(&serialized.0, HISTORY_VERSION)
+ .expect("failed to deserialize history");
+
+ assert_eq!(history, deserialized);
+ }
+
+ #[test]
+ fn test_serialize_deserialize_version() {
+ // v0
+ let bytes_v0 = [
+ 205, 0, 0, 153, 217, 32, 54, 54, 100, 49, 54, 99, 98, 101, 101, 55, 99, 100, 52, 55,
+ 53, 51, 56, 101, 53, 99, 53, 98, 56, 98, 52, 52, 101, 57, 48, 48, 54, 101, 207, 23, 99,
+ 98, 117, 24, 210, 246, 128, 206, 2, 238, 210, 240, 0, 170, 103, 105, 116, 32, 115, 116,
+ 97, 116, 117, 115, 217, 42, 47, 85, 115, 101, 114, 115, 47, 99, 111, 110, 114, 97, 100,
+ 46, 108, 117, 100, 103, 97, 116, 101, 47, 68, 111, 99, 117, 109, 101, 110, 116, 115,
+ 47, 99, 111, 100, 101, 47, 97, 116, 117, 105, 110, 217, 32, 98, 57, 55, 100, 57, 97,
+ 51, 48, 54, 102, 50, 55, 52, 52, 55, 51, 97, 50, 48, 51, 100, 50, 101, 98, 97, 52, 49,
+ 102, 57, 52, 53, 55, 187, 102, 118, 102, 103, 57, 51, 54, 99, 48, 107, 112, 102, 58,
+ 99, 111, 110, 114, 97, 100, 46, 108, 117, 100, 103, 97, 116, 101, 192,
+ ];
+
+ // some other version
+ let bytes_v1 = [
+ 205, 1, 0, 153, 217, 32, 54, 54, 100, 49, 54, 99, 98, 101, 101, 55, 99, 100, 52, 55,
+ 53, 51, 56, 101, 53, 99, 53, 98, 56, 98, 52, 52, 101, 57, 48, 48, 54, 101, 207, 23, 99,
+ 98, 117, 24, 210, 246, 128, 206, 2, 238, 210, 240, 0, 170, 103, 105, 116, 32, 115, 116,
+ 97, 116, 117, 115, 217, 42, 47, 85, 115, 101, 114, 115, 47, 99, 111, 110, 114, 97, 100,
+ 46, 108, 117, 100, 103, 97, 116, 101, 47, 68, 111, 99, 117, 109, 101, 110, 116, 115,
+ 47, 99, 111, 100, 101, 47, 97, 116, 117, 105, 110, 217, 32, 98, 57, 55, 100, 57, 97,
+ 51, 48, 54, 102, 50, 55, 52, 52, 55, 51, 97, 50, 48, 51, 100, 50, 101, 98, 97, 52, 49,
+ 102, 57, 52, 53, 55, 187, 102, 118, 102, 103, 57, 51, 54, 99, 48, 107, 112, 102, 58,
+ 99, 111, 110, 114, 97, 100, 46, 108, 117, 100, 103, 97, 116, 101, 192,
+ ];
+
+ let deserialized = History::deserialize(&bytes_v0, HISTORY_VERSION);
+ assert!(deserialized.is_ok());
+
+ let deserialized = History::deserialize(&bytes_v1, HISTORY_VERSION);
+ assert!(deserialized.is_err());
+ }
}
diff --git a/atuin-client/src/history/store.rs b/atuin-client/src/history/store.rs
new file mode 100644
index 00000000..bf74a0a8
--- /dev/null
+++ b/atuin-client/src/history/store.rs
@@ -0,0 +1,219 @@
+use eyre::{bail, eyre, Result};
+use rmp::decode::Bytes;
+
+use crate::record::{encryption::PASETO_V4, sqlite_store::SqliteStore, store::Store};
+use atuin_common::record::{DecryptedData, Host, HostId, Record, RecordIdx};
+
+use super::{History, HISTORY_TAG, HISTORY_VERSION};
+
+#[derive(Debug)]
+pub struct HistoryStore {
+ pub store: SqliteStore,
+ pub host_id: HostId,
+ pub encryption_key: [u8; 32],
+}
+
+#[derive(Debug, Eq, PartialEq, Clone)]
+pub enum HistoryRecord {
+ Create(History), // Create a history record
+ Delete(String), // Delete a history record, identified by ID
+}
+
+impl HistoryRecord {
+ /// Serialize a history record, returning DecryptedData
+ /// The record will be of a certain type
+ /// We map those like so:
+ ///
+ /// HistoryRecord::Create -> 0
+ /// HistoryRecord::Delete-> 1
+ ///
+ /// This numeric identifier is then written as the first byte to the buffer. For history, we
+ /// append the serialized history right afterwards, to avoid having to handle serialization
+ /// twice.
+ ///
+ /// Deletion simply refers to the history by ID
+ pub fn serialize(&self) -> Result<DecryptedData> {
+ // probably don't actually need to use rmp here, but if we ever need to extend it, it's a
+ // nice wrapper around raw byte stuff
+ use rmp::encode;
+
+ let mut output = vec![];
+
+ match self {
+ HistoryRecord::Create(history) => {
+ // 0 -> a history create
+ encode::write_u8(&mut output, 0)?;
+
+ let bytes = history.serialize()?;
+
+ encode::write_bin(&mut output, &bytes.0)?;
+ }
+ HistoryRecord::Delete(id) => {
+ // 1 -> a history delete
+ encode::write_u8(&mut output, 1)?;
+ encode::write_str(&mut output, id)?;
+ }
+ };
+
+ Ok(DecryptedData(output))
+ }
+
+ pub fn deserialize(bytes: &[u8], version: &str) -> Result<Self> {
+ use rmp::decode;
+
+ fn error_report<E: std::fmt::Debug>(err: E) -> eyre::Report {
+ eyre!("{err:?}")
+ }
+
+ let mut bytes = Bytes::new(bytes);
+
+ let record_type = decode::read_u8(&mut bytes).map_err(error_report)?;
+
+ match record_type {
+ // 0 -> HistoryRecord::Create
+ 0 => {
+ // not super useful to us atm, but perhaps in the future
+ // written by write_bin above
+ let _ = decode::read_bin_len(&mut bytes).map_err(error_report)?;
+
+ let record = History::deserialize(bytes.remaining_slice(), version)?;
+
+ Ok(HistoryRecord::Create(record))
+ }
+
+ // 1 -> HistoryRecord::Delete
+ 1 => {
+ let bytes = bytes.remaining_slice();
+ let (id, bytes) = decode::read_str_from_slice(bytes).map_err(error_report)?;
+
+ if !bytes.is_empty() {
+ bail!(
+ "trailing bytes decoding HistoryRecord::Delete - malformed? got {bytes:?}"
+ );
+ }
+
+ Ok(HistoryRecord::Delete(id.to_string()))
+ }
+
+ n => {
+ bail!("unknown HistoryRecord type {n}")
+ }
+ }
+ }
+}
+
+impl HistoryStore {
+ pub fn new(store: SqliteStore, host_id: HostId, encryption_key: [u8; 32]) -> Self {
+ HistoryStore {
+ store,
+ host_id,
+ encryption_key,
+ }
+ }
+
+ async fn push_record(&self, record: HistoryRecord) -> Result<RecordIdx> {
+ let bytes = record.serialize()?;
+ let idx = self
+ .store
+ .last(self.host_id, HISTORY_TAG)
+ .await?
+ .map_or(0, |p| p.idx + 1);
+
+ let record = Record::builder()
+ .host(Host::new(self.host_id))
+ .version(HISTORY_VERSION.to_string())
+ .tag(HISTORY_TAG.to_string())
+ .idx(idx)
+ .data(bytes)
+ .build();
+
+ self.store
+ .push(&record.encrypt::<PASETO_V4>(&self.encryption_key))
+ .await?;
+
+ Ok(idx)
+ }
+
+ pub async fn delete(&self, id: String) -> Result<RecordIdx> {
+ let record = HistoryRecord::Delete(id);
+
+ self.push_record(record).await
+ }
+
+ pub async fn push(&self, history: History) -> Result<RecordIdx> {
+ // TODO(ellie): move the history store to its own file
+ // it's tiny rn so fine as is
+ let record = HistoryRecord::Create(history);
+
+ self.push_record(record).await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use time::macros::datetime;
+
+ use crate::history::{store::HistoryRecord, HISTORY_VERSION};
+
+ use super::History;
+
+ #[test]
+ fn test_serialize_deserialize_create() {
+ let bytes = [
+ 204, 0, 196, 141, 205, 0, 0, 153, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 56, 49,
+ 55, 53, 55, 99, 100, 50, 97, 101, 101, 54, 53, 99, 100, 55, 56, 54, 49, 102, 57, 99,
+ 56, 49, 207, 23, 166, 251, 212, 181, 82, 0, 0, 100, 0, 162, 108, 115, 217, 41, 47, 85,
+ 115, 101, 114, 115, 47, 101, 108, 108, 105, 101, 47, 115, 114, 99, 47, 103, 105, 116,
+ 104, 117, 98, 46, 99, 111, 109, 47, 97, 116, 117, 105, 110, 115, 104, 47, 97, 116, 117,
+ 105, 110, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 97, 100, 56, 57, 55, 53, 57, 55,
+ 56, 53, 50, 53, 50, 55, 97, 51, 49, 99, 57, 57, 56, 48, 53, 57, 170, 98, 111, 111, 112,
+ 58, 101, 108, 108, 105, 101, 192,
+ ];
+
+ let history = History {
+ id: "018cd4fe81757cd2aee65cd7861f9c81".to_owned(),
+ timestamp: datetime!(2024-01-04 00:00:00.000000 +00:00),
+ duration: 100,
+ exit: 0,
+ command: "ls".to_owned(),
+ cwd: "/Users/ellie/src/github.com/atuinsh/atuin".to_owned(),
+ session: "018cd4fead897597852527a31c998059".to_owned(),
+ hostname: "boop:ellie".to_owned(),
+ deleted_at: None,
+ };
+
+ let record = HistoryRecord::Create(history);
+
+ let serialized = record.serialize().expect("failed to serialize history");
+ assert_eq!(serialized.0, bytes);
+
+ let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
+ assert_eq!(deserialized, record);
+
+ // check the snapshot too
+ let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
+ assert_eq!(deserialized, record);
+ }
+
+ #[test]
+ fn test_serialize_deserialize_delete() {
+ let bytes = [
+ 204, 1, 217, 32, 48, 49, 56, 99, 100, 52, 102, 101, 56, 49, 55, 53, 55, 99, 100, 50,
+ 97, 101, 101, 54, 53, 99, 100, 55, 56, 54, 49, 102, 57, 99, 56, 49,
+ ];
+ let record = HistoryRecord::Delete("018cd4fe81757cd2aee65cd7861f9c81".to_string());
+
+ let serialized = record.serialize().expect("failed to serialize history");
+ assert_eq!(serialized.0, bytes);
+
+ let deserialized = HistoryRecord::deserialize(&serialized.0, HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
+ assert_eq!(deserialized, record);
+
+ let deserialized = HistoryRecord::deserialize(&bytes, HISTORY_VERSION)
+ .expect("failed to deserialize HistoryRecord");
+ assert_eq!(deserialized, record);
+ }
+}
diff --git a/atuin-client/src/kv.rs b/atuin-client/src/kv.rs
index 1ca6b5e8..cee7063d 100644
--- a/atuin-client/src/kv.rs
+++ b/atuin-client/src/kv.rs
@@ -1,6 +1,6 @@
use std::collections::BTreeMap;
-use atuin_common::record::{DecryptedData, HostId};
+use atuin_common::record::{DecryptedData, Host, HostId};
use eyre::{bail, ensure, eyre, Result};
use serde::Deserialize;
@@ -89,7 +89,7 @@ impl KvStore {
pub async fn set(
&self,
- store: &mut (impl Store + Send + Sync),
+ store: &(impl Store + Send + Sync),
encryption_key: &[u8; 32],
host_id: HostId,
namespace: &str,
@@ -111,13 +111,16 @@ impl KvStore {
let bytes = record.serialize()?;
- let parent = store.tail(host_id, KV_TAG).await?.map(|entry| entry.id);
+ let idx = store
+ .last(host_id, KV_TAG)
+ .await?
+ .map