summaryrefslogtreecommitdiffstats
path: root/atuin-common
diff options
context:
space:
mode:
authorEllie Huxtable <ellie@elliehuxtable.com>2024-01-05 17:57:49 +0000
committerGitHub <noreply@github.com>2024-01-05 17:57:49 +0000
commit7bc6ccdd70422f8fc763e2fd814a481bc79ce7b5 (patch)
treea1c064a7c7394d261711c6e046d4c60791e6cf6f /atuin-common
parent604ae40b9dee24e3ebe66cbc1ef1020683a34699 (diff)
feat: rework record sync for improved reliability (#1478)
* feat: rework record sync for improved reliability So, to tell a story 1. We introduced the record sync, intended to be the new algorithm to sync history. 2. On top of this, I added the KV store. This was intended as a simple test of the record sync, and to see if people wanted that sort of functionality 3. History remained syncing via the old means, as while it had issues it worked more-or-less OK. And we are aware of its flaws 4. If KV syncing worked ok, history would be moved across KV syncing ran ok for 6mo or so, so I started to move across history. For several weeks, I ran a local fork of Atuin + the server that synced via records instead. The record store maintained ordering via a linked list, which was a mistake. It performed well in testing, but was really difficult to debug and reason about. So when a few small sync issues occured, they took an extremely long time to debug. This PR is huge, which I regret. It involves replacing the "parent" relationship that records once had (pointing to the previous record) with a simple index (generally referred to as idx). This also means we had to change the recordindex, which referenced "tails". Tails were the last item in the chain. Now that we use an "array" vs linked list, that logic was also replaced. And is much simpler :D Same for the queries that act on this data. ---- This isn't final - we still need to add 1. Proper server/client error handling, which has been lacking for a while 2. The actual history implementation on top This exists in a branch, just without deletions. Won't be much to add that, I just don't want to make this any larger than it already is The _only_ caveat here is that we basically lose data synced via the old record store. This is the KV data from before. It hasn't been deleted or anything, just no longer hooked up. So it's totally possible to write a migration script. I just need to do that. * update .gitignore * use correct endpoint * fix for stores with length of 1 * use create/delete enum for history store * lint, remove unneeded host_id * remove prints * add command to import old history * add enable/disable switch for record sync * add record sync to auto sync * satisfy the almighty clippy * remove file that I did not mean to commit * feedback
Diffstat (limited to 'atuin-common')
-rw-r--r--atuin-common/src/record.rs166
1 files changed, 93 insertions, 73 deletions
diff --git a/atuin-common/src/record.rs b/atuin-common/src/record.rs
index cba0917a..e6ce2647 100644
--- a/atuin-common/src/record.rs
+++ b/atuin-common/src/record.rs
@@ -14,13 +14,34 @@ pub struct EncryptedData {
pub content_encryption_key: String,
}
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, PartialOrd, Ord, Eq)]
pub struct Diff {
pub host: HostId,
pub tag: String,
- pub tail: RecordId,
+ pub local: Option<RecordIdx>,
+ pub remote: Option<RecordIdx>,
}
+#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
+pub struct Host {
+ pub id: HostId,
+ pub name: String,
+}
+
+impl Host {
+ pub fn new(id: HostId) -> Self {
+ Host {
+ id,
+ name: String::new(),
+ }
+ }
+}
+
+new_uuid!(RecordId);
+new_uuid!(HostId);
+
+pub type RecordIdx = u64;
+
/// A single record stored inside of our local database
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, TypedBuilder)]
pub struct Record<Data> {
@@ -28,18 +49,14 @@ pub struct Record<Data> {
#[builder(default = RecordId(crate::utils::uuid_v7()))]
pub id: RecordId,
+ /// The integer record ID. This is only unique per (host, tag).
+ pub idx: RecordIdx,
+
/// The unique ID of the host.
// TODO(ellie): Optimize the storage here. We use a bunch of IDs, and currently store
// as strings. I would rather avoid normalization, so store as UUID binary instead of
// encoding to a string and wasting much more storage.
- pub host: HostId,
-
- /// The ID of the parent entry
- // A store is technically just a double linked list
- // We can do some cheating with the timestamps, but should not rely upon them.
- // Clocks are tricksy.
- #[builder(default)]
- pub parent: Option<RecordId>,
+ pub host: Host,
/// The creation time in nanoseconds since unix epoch
#[builder(default = time::OffsetDateTime::now_utc().unix_timestamp_nanos() as u64)]
@@ -56,25 +73,22 @@ pub struct Record<Data> {
pub data: Data,
}
-new_uuid!(RecordId);
-new_uuid!(HostId);
-
/// Extra data from the record that should be encoded in the data
#[derive(Debug, Copy, Clone)]
pub struct AdditionalData<'a> {
pub id: &'a RecordId,
+ pub idx: &'a u64,
pub version: &'a str,
pub tag: &'a str,
pub host: &'a HostId,
- pub parent: Option<&'a RecordId>,
}
impl<Data> Record<Data> {
- pub fn new_child(&self, data: Vec<u8>) -> Record<DecryptedData> {
+ pub fn append(&self, data: Vec<u8>) -> Record<DecryptedData> {
Record::builder()
- .host(self.host)
+ .host(self.host.clone())
.version(self.version.clone())
- .parent(Some(self.id))
+ .idx(self.idx + 1)
.tag(self.tag.clone())
.data(DecryptedData(data))
.build()
@@ -84,74 +98,76 @@ impl<Data> Record<Data> {
/// An index representing the current state of the record stores
/// This can be both remote, or local, and compared in either direction
#[derive(Debug, Serialize, Deserialize)]
-pub struct RecordIndex {
- // A map of host -> tag -> tail
- pub hosts: HashMap<HostId, HashMap<String, RecordId>>,
+pub struct RecordStatus {
+ // A map of host -> tag -> max(idx)
+ pub hosts: HashMap<HostId, HashMap<String, RecordIdx>>,
}
-impl Default for RecordIndex {
+impl Default for RecordStatus {
fn default() -> Self {
Self::new()
}
}
-impl Extend<(HostId, String, RecordId)> for RecordIndex {
- fn extend<T: IntoIterator<Item = (HostId, String, RecordId)>>(&mut self, iter: T) {
- for (host, tag, tail_id) in iter {
- self.set_raw(host, tag, tail_id);
+impl Extend<(HostId, String, RecordIdx)> for RecordStatus {
+ fn extend<T: IntoIterator<Item = (HostId, String, RecordIdx)>>(&mut self, iter: T) {
+ for (host, tag, tail_idx) in iter {
+ self.set_raw(host, tag, tail_idx);
}
}
}
-impl RecordIndex {
- pub fn new() -> RecordIndex {
- RecordIndex {
+impl RecordStatus {
+ pub fn new() -> RecordStatus {
+ RecordStatus {
hosts: HashMap::new(),
}
}
/// Insert a new tail record into the store
pub fn set(&mut self, tail: Record<DecryptedData>) {
- self.set_raw(tail.host, tail.tag, tail.id)
+ self.set_raw(tail.host.id, tail.tag, tail.idx)
}
- pub fn set_raw(&mut self, host: HostId, tag: String, tail_id: RecordId) {
+ pub fn set_raw(&mut self, host: HostId, tag: String, tail_id: RecordIdx) {
self.hosts.entry(host).or_default().insert(tag, tail_id);
}
- pub fn get(&self, host: HostId, tag: String) -> Option<RecordId> {
+ pub fn get(&self, host: HostId, tag: String) -> Option<RecordIdx> {
self.hosts.get(&host).and_then(|v| v.get(&tag)).cloned()
}
/// Diff this index with another, likely remote index.
/// The two diffs can then be reconciled, and the optimal change set calculated
/// Returns a tuple, with (host, tag, Option(OTHER))
- /// OTHER is set to the value of the tail on the other machine. For example, if the
- /// other machine has a different tail, it will be the differing tail. This is useful to
- /// check if the other index is ahead of us, or behind.
- /// If the other index does not have the (host, tag) pair, then the other value will be None.
+ /// OTHER is set to the value of the idx on the other machine. If it is greater than our index,
+ /// then we need to do some downloading. If it is smaller, then we need to do some uploading
+ /// Note that we cannot upload if we are not the owner of the record store - hosts can only
+ /// write to their own store.
pub fn diff(&self, other: &Self) -> Vec<Diff> {
let mut ret = Vec::new();
// First, we check if other has everything that self has
for (host, tag_map) in self.hosts.iter() {
- for (tag, tail) in tag_map.iter() {
+ for (tag, idx) in tag_map.iter() {
match other.get(*host, tag.clone()) {
// The other store is all up to date! No diff.
- Some(t) if t.eq(tail) => continue,
+ Some(t) if t.eq(idx) => continue,
- // The other store does exist, but it is either ahead or behind us. A diff regardless
+ // The other store does exist, and it is either ahead or behind us. A diff regardless
Some(t) => ret.push(Diff {
host: *host,
tag: tag.clone(),
- tail: t,
+ local: Some(*idx),
+ remote: Some(t),
}),
// The other store does not exist :O
None => ret.push(Diff {
host: *host,
tag: tag.clone(),
- tail: *tail,
+ local: Some(*idx),
+ remote: None,
}),
};
}
@@ -162,7 +178,7 @@ impl RecordIndex {
// account for that!
for (host, tag_map) in other.hosts.iter() {
- for (tag, tail) in tag_map.iter() {
+ for (tag, idx) in tag_map.iter() {
match self.get(*host, tag.clone()) {
// If we have this host/tag combo, the comparison and diff will have already happened above
Some(_) => continue,
@@ -170,13 +186,15 @@ impl RecordIndex {
None => ret.push(Diff {
host: *host,
tag: tag.clone(),
- tail: *tail,
+ remote: Some(*idx),
+ local: None,
}),
};
}
}
- ret.sort_by(|a, b| (a.host, a.tag.clone(), a.tail).cmp(&(b.host, b.tag.clone(), b.tail)));
+ // Stability is a nice property to have
+ ret.sort();
ret
}
}
@@ -201,14 +219,14 @@ impl Record<DecryptedData> {
id: &self.id,
version: &self.version,
tag: &self.tag,
- host: &self.host,
- parent: self.parent.as_ref(),
+ host: &self.host.id,
+ idx: &self.idx,
};
Record {
data: E::encrypt(self.data, ad, key),
id: self.id,
host: self.host,
- parent: self.parent,
+ idx: self.idx,
timestamp: self.timestamp,
version: self.version,
tag: self.tag,
@@ -222,14 +240,14 @@ impl Record<EncryptedData> {
id: &self.id,
version: &self.version,
tag: &self.tag,
- host: &self.host,
- parent: self.parent.as_ref(),
+ host: &self.host.id,
+ idx: &self.idx,
};
Ok(Record {
data: E::decrypt(self.data, ad, key)?,
id: self.id,
host: self.host,
- parent: self.parent,
+ idx: self.idx,
timestamp: self.timestamp,
version: self.version,
tag: self.tag,
@@ -245,14 +263,14 @@ impl Record<EncryptedData> {
id: &self.id,
version: &self.version,
tag: &self.tag,
- host: &self.host,
- parent: self.parent.as_ref(),
+ host: &self.host.id,
+ idx: &self.idx,
};
Ok(Record {
data: E::re_encrypt(self.data, ad, old_key, new_key)?,
id: self.id,
host: self.host,
- parent: self.parent,
+ idx: self.idx,
timestamp: self.timestamp,
version: self.version,
tag: self.tag,
@@ -262,31 +280,32 @@ impl Record<EncryptedData> {
#[cfg(test)]
mod tests {
- use crate::record::HostId;
+ use crate::record::{Host, HostId};
- use super::{DecryptedData, Diff, Record, RecordIndex};
+ use super::{DecryptedData, Diff, Record, RecordStatus};
use pretty_assertions::assert_eq;
fn test_record() -> Record<DecryptedData> {
Record::builder()
- .host(HostId(crate::utils::uuid_v7()))
+ .host(Host::new(HostId(crate::utils::uuid_v7())))
.version("v1".into())
.tag(crate::utils::uuid_v7().simple().to_string())
.data(DecryptedData(vec![0, 1, 2, 3]))
+ .idx(0)
.build()
}
#[test]
fn record_index() {
- let mut index = RecordIndex::new();
+ let mut index = RecordStatus::new();
let record = test_record();
index.set(record.clone());
- let tail = index.get(record.host, record.tag);
+ let tail = index.get(record.host.id, record.tag);
assert_eq!(
- record.id,
+ record.idx,
tail.expect("tail not in store"),
"tail in store did not match"
);
@@ -294,17 +313,17 @@ mod tests {
#[test]
fn record_index_overwrite() {
- let mut index = RecordIndex::new();
+ let mut index = RecordStatus::new();
let record = test_record();
- let child = record.new_child(vec![1, 2, 3]);
+ let child = record.append(vec![1, 2, 3]);
index.set(record.clone());
index.set(child.clone());
- let tail = index.get(record.host, record.tag);
+ let tail = index.get(record.host.id, record.tag);
assert_eq!(
- child.id,
+ child.idx,
tail.expect("tail not in store"),
"tail in store did not match"
);
@@ -314,8 +333,8 @@ mod tests {
fn record_index_no_diff() {
// Here, they both have the same version and should have no diff
- let mut index1 = RecordIndex::new();
- let mut index2 = RecordIndex::new();
+ let mut index1 = RecordStatus::new();
+ let mut index2 = RecordStatus::new();
let record1 = test_record();
@@ -331,11 +350,11 @@ mod tests {
fn record_index_single_diff() {
// Here, they both have the same stores, but one is ahead by a single record
- let mut index1 = RecordIndex::new();
- let mut index2 = RecordIndex::new();
+ let mut index1 = RecordStatus::new();
+ let mut index2 = RecordStatus::new();
let record1 = test_record();
- let record2 = record1.new_child(vec![1, 2, 3]);
+ let record2 = record1.append(vec![1, 2, 3]);
index1.set(record1);
index2.set(record2.clone());
@@ -346,9 +365,10 @@ mod tests {
assert_eq!(
diff[0],
Diff {
- host: record2.host,
+ host: record2.host.id,
tag: record2.tag,
- tail: record2.id
+ remote: Some(1),
+ local: Some(0)
}
);
}
@@ -356,14 +376,14 @@ mod tests {
#[test]
fn record_index_multi_diff() {
// A much more complex case, with a bunch more checks
- let mut index1 = RecordIndex::new();
- let mut index2 = RecordIndex::new();
+ let mut index1 = RecordStatus::new();
+ let mut index2 = RecordStatus::new();
let store1record1 = test_record();
- let store1record2 = store1record1.new_child(vec![1, 2, 3]);
+ let store1record2 = store1record1.append(vec![1, 2, 3]);
let store2record1 = test_record();
- let store2record2 = store2record1.new_child(vec![1, 2, 3]);
+ let store2record2 = store2record1.append(vec![1, 2, 3]);
let store3record1 = test_record();