From a21737e2b7f8d1e426726bdd7536033f299d476a Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Tue, 20 Apr 2021 21:53:07 +0100 Subject: Use cargo workspaces (#37) * Switch to Cargo workspaces Breaking things into "client", "server" and "common" makes managing the codebase much easier! client - anything running on a user's machine for adding history server - handles storing/syncing history and running a HTTP server common - request/response API definitions, common utils, etc * Update dockerfile --- atuin-client/Cargo.toml | 40 ++++++ atuin-client/config.toml | 24 ++++ atuin-client/src/api_client.rs | 96 +++++++++++++++ atuin-client/src/database.rs | 272 +++++++++++++++++++++++++++++++++++++++++ atuin-client/src/encryption.rs | 108 ++++++++++++++++ atuin-client/src/history.rs | 66 ++++++++++ atuin-client/src/import.rs | 176 ++++++++++++++++++++++++++ atuin-client/src/lib.rs | 13 ++ atuin-client/src/settings.rs | 149 ++++++++++++++++++++++ atuin-client/src/sync.rs | 142 +++++++++++++++++++++ 10 files changed, 1086 insertions(+) create mode 100644 atuin-client/Cargo.toml create mode 100644 atuin-client/config.toml create mode 100644 atuin-client/src/api_client.rs create mode 100644 atuin-client/src/database.rs create mode 100644 atuin-client/src/encryption.rs create mode 100644 atuin-client/src/history.rs create mode 100644 atuin-client/src/import.rs create mode 100644 atuin-client/src/lib.rs create mode 100644 atuin-client/src/settings.rs create mode 100644 atuin-client/src/sync.rs (limited to 'atuin-client') diff --git a/atuin-client/Cargo.toml b/atuin-client/Cargo.toml new file mode 100644 index 00000000..06c96a9a --- /dev/null +++ b/atuin-client/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "atuin-client" +version = "0.1.0" +authors = ["Ellie Huxtable "] +edition = "2018" +license = "MIT" +description = "client library for atuin" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +atuin-common = { path = "../atuin-common", version = "0.1.0" } + +log = "0.4" +fern = {version = "0.6.0", features = ["colored"] } +chrono = { version = "0.4", features = ["serde"] } +eyre = "0.6" +directories = "3" +uuid = { version = "0.8", features = ["v4"] } +indicatif = "0.15.0" +whoami = "1.1.2" +chrono-english = "0.1.4" +config = "0.11" +serde_derive = "1.0.125" +serde = "1.0.125" +serde_json = "1.0.64" +rmp-serde = "0.15.4" +sodiumoxide = "0.2.6" +reqwest = { version = "0.11", features = ["blocking", "json"] } +base64 = "0.13.0" +parse_duration = "2.1.1" +rand = "0.8.3" +rust-crypto = "^0.2" +tokio = { version = "1", features = ["full"] } +async-trait = "0.1.49" +urlencoding = "1.1.1" +humantime = "2.1.0" +rusqlite= { version = "0.25", features = ["bundled"] } +itertools = "0.10.0" +shellexpand = "2" diff --git a/atuin-client/config.toml b/atuin-client/config.toml new file mode 100644 index 00000000..33e5b545 --- /dev/null +++ b/atuin-client/config.toml @@ -0,0 +1,24 @@ +## where to store your database, default is your system data directory +## mac: ~/Library/Application Support/com.elliehuxtable.atuin/history.db +## linux: ~/.local/share/atuin/history.db +# db_path = "~/.history.db" + +## where to store your encryption key, default is your system data directory +# key_path = "~/.key" + +## where to store your auth session token, default is your system data directory +# session_path = "~/.key" + +## date format used, either "us" or "uk" +# dialect = "uk" + +## enable or disable automatic sync +# auto_sync = true + +## how often to sync history. note that this is only triggered when a command +## is ran, so sync intervals may well be longer +## set it to 0 to sync after every command +# sync_frequency = "5m" + +## address of the sync server +# sync_address = "https://api.atuin.sh" diff --git a/atuin-client/src/api_client.rs b/atuin-client/src/api_client.rs new file mode 100644 index 00000000..db2802c3 --- /dev/null +++ b/atuin-client/src/api_client.rs @@ -0,0 +1,96 @@ +use chrono::Utc; +use eyre::Result; +use reqwest::header::{HeaderMap, AUTHORIZATION}; +use reqwest::Url; +use sodiumoxide::crypto::secretbox; + +use atuin_common::api::{AddHistoryRequest, CountResponse, SyncHistoryResponse}; +use atuin_common::utils::hash_str; + +use crate::encryption::decrypt; +use crate::history::History; + +pub struct Client<'a> { + sync_addr: &'a str, + token: &'a str, + key: secretbox::Key, + client: reqwest::Client, +} + +impl<'a> Client<'a> { + pub fn new(sync_addr: &'a str, token: &'a str, key: secretbox::Key) -> Self { + Client { + sync_addr, + token, + key, + client: reqwest::Client::new(), + } + } + + pub async fn count(&self) -> Result { + let url = format!("{}/sync/count", self.sync_addr); + let url = Url::parse(url.as_str())?; + let token = format!("Token {}", self.token); + let token = token.parse()?; + + let mut headers = HeaderMap::new(); + headers.insert(AUTHORIZATION, token); + + let resp = self.client.get(url).headers(headers).send().await?; + + let count = resp.json::().await?; + + Ok(count.count) + } + + pub async fn get_history( + &self, + sync_ts: chrono::DateTime, + history_ts: chrono::DateTime, + host: Option, + ) -> Result> { + let host = match host { + None => hash_str(&format!("{}:{}", whoami::hostname(), whoami::username())), + Some(h) => h, + }; + + let url = format!( + "{}/sync/history?sync_ts={}&history_ts={}&host={}", + self.sync_addr, + urlencoding::encode(sync_ts.to_rfc3339().as_str()), + urlencoding::encode(history_ts.to_rfc3339().as_str()), + host, + ); + + let resp = self + .client + .get(url) + .header(AUTHORIZATION, format!("Token {}", self.token)) + .send() + .await?; + + let history = resp.json::().await?; + let history = history + .history + .iter() + .map(|h| serde_json::from_str(h).expect("invalid base64")) + .map(|h| decrypt(&h, &self.key).expect("failed to decrypt history! check your key")) + .collect(); + + Ok(history) + } + + pub async fn post_history(&self, history: &[AddHistoryRequest]) -> Result<()> { + let url = format!("{}/history", self.sync_addr); + let url = Url::parse(url.as_str())?; + + self.client + .post(url) + .json(history) + .header(AUTHORIZATION, format!("Token {}", self.token)) + .send() + .await?; + + Ok(()) + } +} diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs new file mode 100644 index 00000000..abc22bb8 --- /dev/null +++ b/atuin-client/src/database.rs @@ -0,0 +1,272 @@ +use chrono::prelude::*; +use chrono::Utc; +use std::path::Path; + +use eyre::Result; + +use rusqlite::{params, Connection}; +use rusqlite::{Params, Transaction}; + +use super::history::History; + +pub trait Database { + fn save(&mut self, h: &History) -> Result<()>; + fn save_bulk(&mut self, h: &[History]) -> Result<()>; + + fn load(&self, id: &str) -> Result; + fn list(&self) -> Result>; + fn range(&self, from: chrono::DateTime, to: chrono::DateTime) + -> Result>; + + fn query(&self, query: &str, params: impl Params) -> Result>; + fn update(&self, h: &History) -> Result<()>; + fn history_count(&self) -> Result; + + fn first(&self) -> Result; + fn last(&self) -> Result; + fn before(&self, timestamp: chrono::DateTime, count: i64) -> Result>; + + fn prefix_search(&self, query: &str) -> Result>; +} + +// Intended for use on a developer machine and not a sync server. +// TODO: implement IntoIterator +pub struct Sqlite { + conn: Connection, +} + +impl Sqlite { + pub fn new(path: impl AsRef) -> Result { + let path = path.as_ref(); + debug!("opening sqlite database at {:?}", path); + + let create = !path.exists(); + if create { + if let Some(dir) = path.parent() { + std::fs::create_dir_all(dir)?; + } + } + + let conn = Connection::open(path)?; + + Self::setup_db(&conn)?; + + Ok(Self { conn }) + } + + fn setup_db(conn: &Connection) -> Result<()> { + debug!("running sqlite database setup"); + + conn.execute( + "create table if not exists history ( + id text primary key, + timestamp integer not null, + duration integer not null, + exit integer not null, + command text not null, + cwd text not null, + session text not null, + hostname text not null, + + unique(timestamp, cwd, command) + )", + [], + )?; + + conn.execute( + "create table if not exists history_encrypted ( + id text primary key, + data blob not null + )", + [], + )?; + + Ok(()) + } + + fn save_raw(tx: &Transaction, h: &History) -> Result<()> { + tx.execute( + "insert or ignore into history ( + id, + timestamp, + duration, + exit, + command, + cwd, + session, + hostname + ) values (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + params![ + h.id, + h.timestamp.timestamp_nanos(), + h.duration, + h.exit, + h.command, + h.cwd, + h.session, + h.hostname + ], + )?; + + Ok(()) + } +} + +impl Database for Sqlite { + fn save(&mut self, h: &History) -> Result<()> { + debug!("saving history to sqlite"); + + let tx = self.conn.transaction()?; + Self::save_raw(&tx, h)?; + tx.commit()?; + + Ok(()) + } + + fn save_bulk(&mut self, h: &[History]) -> Result<()> { + debug!("saving history to sqlite"); + + let tx = self.conn.transaction()?; + for i in h { + Self::save_raw(&tx, i)? + } + tx.commit()?; + + Ok(()) + } + + fn load(&self, id: &str) -> Result { + debug!("loading history item"); + + let mut stmt = self.conn.prepare( + "select id, timestamp, duration, exit, command, cwd, session, hostname from history + where id = ?1", + )?; + + let history = stmt.query_row(params![id], |row| { + history_from_sqlite_row(Some(id.to_string()), row) + })?; + + Ok(history) + } + + fn update(&self, h: &History) -> Result<()> { + debug!("updating sqlite history"); + + self.conn.execute( + "update history + set timestamp = ?2, duration = ?3, exit = ?4, command = ?5, cwd = ?6, session = ?7, hostname = ?8 + where id = ?1", + params![h.id, h.timestamp.timestamp_nanos(), h.duration, h.exit, h.command, h.cwd, h.session, h.hostname], + )?; + + Ok(()) + } + + fn list(&self) -> Result> { + debug!("listing history"); + + let mut stmt = self + .conn + .prepare("SELECT * FROM history order by timestamp asc")?; + + let history_iter = stmt.query_map(params![], |row| history_from_sqlite_row(None, row))?; + + Ok(history_iter.filter_map(Result::ok).collect()) + } + + fn range( + &self, + from: chrono::DateTime, + to: chrono::DateTime, + ) -> Result> { + debug!("listing history from {:?} to {:?}", from, to); + + let mut stmt = self.conn.prepare( + "SELECT * FROM history where timestamp >= ?1 and timestamp <= ?2 order by timestamp asc", + )?; + + let history_iter = stmt.query_map( + params![from.timestamp_nanos(), to.timestamp_nanos()], + |row| history_from_sqlite_row(None, row), + )?; + + Ok(history_iter.filter_map(Result::ok).collect()) + } + + fn first(&self) -> Result { + let mut stmt = self + .conn + .prepare("SELECT * FROM history order by timestamp asc limit 1")?; + + let history = stmt.query_row(params![], |row| history_from_sqlite_row(None, row))?; + + Ok(history) + } + + fn last(&self) -> Result { + let mut stmt = self + .conn + .prepare("SELECT * FROM history order by timestamp desc limit 1")?; + + let history = stmt.query_row(params![], |row| history_from_sqlite_row(None, row))?; + + Ok(history) + } + + fn before(&self, timestamp: chrono::DateTime, count: i64) -> Result> { + let mut stmt = self + .conn + .prepare("SELECT * FROM history where timestamp < ? order by timestamp desc limit ?")?; + + let history_iter = stmt.query_map(params![timestamp.timestamp_nanos(), count], |row| { + history_from_sqlite_row(None, row) + })?; + + Ok(history_iter.filter_map(Result::ok).collect()) + } + + fn query(&self, query: &str, params: impl Params) -> Result> { + let mut stmt = self.conn.prepare(query)?; + + let history_iter = stmt.query_map(params, |row| history_from_sqlite_row(None, row))?; + + Ok(history_iter.filter_map(Result::ok).collect()) + } + + fn prefix_search(&self, query: &str) -> Result> { + self.query( + "select * from history where command like ?1 || '%' order by timestamp asc limit 1000", + &[query], + ) + } + + fn history_count(&self) -> Result { + let res: i64 = + self.conn + .query_row_and_then("select count(1) from history;", params![], |row| row.get(0))?; + + Ok(res) + } +} + +fn history_from_sqlite_row( + id: Option, + row: &rusqlite::Row, +) -> Result { + let id = match id { + Some(id) => id, + None => row.get(0)?, + }; + + Ok(History { + id, + timestamp: Utc.timestamp_nanos(row.get(1)?), + duration: row.get(2)?, + exit: row.get(3)?, + command: row.get(4)?, + cwd: row.get(5)?, + session: row.get(6)?, + hostname: row.get(7)?, + }) +} diff --git a/atuin-client/src/encryption.rs b/atuin-client/src/encryption.rs new file mode 100644 index 00000000..37153f94 --- /dev/null +++ b/atuin-client/src/encryption.rs @@ -0,0 +1,108 @@ +// The general idea is that we NEVER send cleartext history to the server +// This way the odds of anything private ending up where it should not are +// very low +// The server authenticates via the usual username and password. This has +// nothing to do with the encryption, and is purely authentication! The client +// generates its own secret key, and encrypts all shell history with libsodium's +// secretbox. The data is then sent to the server, where it is stored. All +// clients must share the secret in order to be able to sync, as it is needed +// to decrypt + +use std::fs::File; +use std::io::prelude::*; +use std::path::PathBuf; + +use eyre::{eyre, Result}; +use sodiumoxide::crypto::secretbox; + +use crate::history::History; +use crate::settings::Settings; + +#[derive(Debug, Serialize, Deserialize)] +pub struct EncryptedHistory { + pub ciphertext: Vec, + pub nonce: secretbox::Nonce, +} + +// Loads the secret key, will create + save if it doesn't exist +pub fn load_key(settings: &Settings) -> Result { + let path = settings.key_path.as_str(); + + if PathBuf::from(path).exists() { + let bytes = std::fs::read(path)?; + let key: secretbox::Key = rmp_serde::from_read_ref(&bytes)?; + Ok(key) + } else { + let key = secretbox::gen_key(); + let buf = rmp_serde::to_vec(&key)?; + + let mut file = File::create(path)?; + file.write_all(&buf)?; + + Ok(key) + } +} + +pub fn encrypt(history: &History, key: &secretbox::Key) -> Result { + // serialize with msgpack + let buf = rmp_serde::to_vec(history)?; + + let nonce = secretbox::gen_nonce(); + + let ciphertext = secretbox::seal(&buf, &nonce, key); + + Ok(EncryptedHistory { ciphertext, nonce }) +} + +pub fn decrypt(encrypted_history: &EncryptedHistory, key: &secretbox::Key) -> Result { + let plaintext = secretbox::open(&encrypted_history.ciphertext, &encrypted_history.nonce, key) + .map_err(|_| eyre!("failed to open secretbox - invalid key?"))?; + + let history = rmp_serde::from_read_ref(&plaintext)?; + + Ok(history) +} + +#[cfg(test)] +mod test { + use sodiumoxide::crypto::secretbox; + + use crate::local::history::History; + + use super::{decrypt, encrypt}; + + #[test] + fn test_encrypt_decrypt() { + let key1 = secretbox::gen_key(); + let key2 = secretbox::gen_key(); + + let history = History::new( + chrono::Utc::now(), + "ls".to_string(), + "/home/ellie".to_string(), + 0, + 1, + Some("beep boop".to_string()), + Some("booop".to_string()), + ); + + let e1 = encrypt(&history, &key1).unwrap(); + let e2 = encrypt(&history, &key2).unwrap(); + + assert_ne!(e1.ciphertext, e2.ciphertext); + assert_ne!(e1.nonce, e2.nonce); + + // test decryption works + // this should pass + match decrypt(&e1, &key1) { + Err(e) => assert!(false, "failed to decrypt, got {}", e), + Ok(h) => assert_eq!(h, history), + }; + + // this should err + match decrypt(&e2, &key1) { + Ok(_) => assert!(false, "expected an error decrypting with invalid key"), + Err(_) => {} + }; + } +} diff --git a/atuin-client/src/history.rs b/atuin-client/src/history.rs new file mode 100644 index 00000000..7f607784 --- /dev/null +++ b/atuin-client/src/history.rs @@ -0,0 +1,66 @@ +use std::env; +use std::hash::{Hash, Hasher}; + +use chrono::Utc; + +use atuin_common::utils::uuid_v4; + +// Any new fields MUST be Optional<>! +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct History { + pub id: String, + pub timestamp: chrono::DateTime, + pub duration: i64, + pub exit: i64, + pub command: String, + pub cwd: String, + pub session: String, + pub hostname: String, +} + +impl History { + pub fn new( + timestamp: chrono::DateTime, + command: String, + cwd: String, + exit: i64, + duration: i64, + session: Option, + hostname: Option, + ) -> Self { + let session = session + .or_else(|| env::var("ATUIN_SESSION").ok()) + .unwrap_or_else(uuid_v4); + let hostname = + hostname.unwrap_or_else(|| format!("{}:{}", whoami::hostname(), whoami::username())); + + Self { + id: uuid_v4(), + timestamp, + command, + cwd, + exit, + duration, + session, + hostname, + } + } +} + +impl PartialEq for History { + // for the sakes of listing unique history only, we do not care about + // anything else + // obviously this does not refer to the *same* item of history, but when + // we only render the command, it looks the same + fn eq(&self, other: &Self) -> bool { + self.command == other.command + } +} + +impl Eq for History {} + +impl Hash for History { + fn hash(&self, state: &mut H) { + self.command.hash(state); + } +} diff --git a/atuin-client/src/import.rs b/atuin-client/src/import.rs new file mode 100644 index 00000000..3b0b2a69 --- /dev/null +++ b/atuin-client/src/import.rs @@ -0,0 +1,176 @@ +// import old shell history! +// automatically hoover up all that we can find + +use std::io::{BufRead, BufReader, Seek, SeekFrom}; +use std::{fs::File, path::Path}; + +use chrono::prelude::*; +use chrono::Utc; +use eyre::{eyre, Result}; +use itertools::Itertools; + +use super::history::History; + +#[derive(Debug)] +pub struct Zsh { + file: BufReader, + + pub loc: u64, + pub counter: i64, +} + +// this could probably be sped up +fn count_lines(buf: &mut BufReader) -> Result { + let lines = buf.lines().count(); + buf.seek(SeekFrom::Start(0))?; + + Ok(lines) +} + +impl Zsh { + pub fn new(path: impl AsRef) -> Result { + let file = File::open(path)?; + let mut buf = BufReader::new(file); + let loc = count_lines(&mut buf)?; + + Ok(Self { + file: buf, + loc: loc as u64, + counter: 0, + }) + } +} + +fn parse_extended(line: &str, counter: i64) -> History { + let line = line.replacen(": ", "", 2); + let (time, duration) = line.splitn(2, ':').collect_tuple().unwrap(); + let (duration, command) = duration.splitn(2, ';').collect_tuple().unwrap(); + + let time = time + .parse::() + .unwrap_or_else(|_| chrono::Utc::now().timestamp()); + + let offset = chrono::Duration::milliseconds(counter); + let time = Utc.timestamp(time, 0); + let time = time + offset; + + let duration = duration.parse::().map_or(-1, |t| t * 1_000_000_000); + + // use nanos, because why the hell not? we won't display them. + History::new( + time, + command.trim_end().to_string(), + String::from("unknown"), + 0, // assume 0, we have no way of knowing :( + duration, + None, + None, + ) +} + +impl Zsh { + fn read_line(&mut self) -> Option> { + let mut line = String::new(); + + match self.file.read_line(&mut line) { + Ok(0) => None, + Ok(_) => Some(Ok(line)), + Err(e) => Some(Err(eyre!("failed to read line: {}", e))), // we can skip past things like invalid utf8 + } + } +} + +impl Iterator for Zsh { + type Item = Result; + + fn next(&mut self) -> Option { + // ZSH extended history records the timestamp + command duration + // These lines begin with : + // So, if the line begins with :, parse it. Otherwise it's just + // the command + let line = self.read_line()?; + + if let Err(e) = line { + return Some(Err(e)); // :( + } + + let mut line = line.unwrap(); + + while line.ends_with("\\\n") { + let next_line = self.read_line()?; + + if next_line.is_err() { + // There's a chance that the last line of a command has invalid + // characters, the only safe thing to do is break :/ + // usually just invalid utf8 or smth + // however, we really need to avoid missing history, so it's + // better to have some items that should have been part of + // something else, than to miss things. So break. + break; + } + + line.push_str(next_line.unwrap().as_str()); + } + + // We have to handle the case where a line has escaped newlines. + // Keep reading until we have a non-escaped newline + + let extended = line.starts_with(':'); + + if extended { + self.counter += 1; + Some(Ok(parse_extended(line.as_str(), self.counter))) + } else { + let time = chrono::Utc::now(); + let offset = chrono::Duration::seconds(self.counter); + let time = time - offset; + + self.counter += 1; + + Some(Ok(History::new( + time, + line.trim_end().to_string(), + String::from("unknown"), + -1, + -1, + None, + None, + ))) + } + } +} + +#[cfg(test)] +mod test { + use chrono::prelude::*; + use chrono::Utc; + + use super::parse_extended; + + #[test] + fn test_parse_extended_simple() { + let parsed = parse_extended(": 1613322469:0;cargo install atuin", 0); + + assert_eq!(parsed.command, "cargo install atuin"); + assert_eq!(parsed.duration, 0); + assert_eq!(parsed.timestamp, Utc.timestamp(1_613_322_469, 0)); + + let parsed = parse_extended(": 1613322469:10;cargo install atuin;cargo update", 0); + + assert_eq!(parsed.command, "cargo install atuin;cargo update"); + assert_eq!(parsed.duration, 10_000_000_000); + assert_eq!(parsed.timestamp, Utc.timestamp(1_613_322_469, 0)); + + let parsed = parse_extended(": 1613322469:10;cargo :b̷i̶t̴r̵o̴t̴ ̵i̷s̴ ̷r̶e̵a̸l̷", 0); + + assert_eq!(parsed.command, "cargo :b̷i̶t̴r̵o̴t̴ ̵i̷s̴ ̷r̶e̵a̸l̷"); + assert_eq!(parsed.duration, 10_000_000_000); + assert_eq!(parsed.timestamp, Utc.timestamp(1_613_322_469, 0)); + + let parsed = parse_extended(": 1613322469:10;cargo install \\n atuin\n", 0); + + assert_eq!(parsed.command, "cargo install \\n atuin"); + assert_eq!(parsed.duration, 10_000_000_000); + assert_eq!(parsed.timestamp, Utc.timestamp(1_613_322_469, 0)); + } +} diff --git a/atuin-client/src/lib.rs b/atuin-client/src/lib.rs new file mode 100644 index 00000000..1207bfdb --- /dev/null +++ b/atuin-client/src/lib.rs @@ -0,0 +1,13 @@ +#[macro_use] +extern crate log; + +#[macro_use] +extern crate serde_derive; + +pub mod api_client; +pub mod database; +pub mod encryption; +pub mod history; +pub mod import; +pub mod settings; +pub mod sync; diff --git a/atuin-client/src/settings.rs b/atuin-client/src/settings.rs new file mode 100644 index 00000000..e28963c0 --- /dev/null +++ b/atuin-client/src/settings.rs @@ -0,0 +1,149 @@ +use std::fs::{create_dir_all, File}; +use std::io::prelude::*; +use std::path::{Path, PathBuf}; + +use chrono::prelude::*; +use chrono::Utc; +use config::{Config, Environment, File as ConfigFile}; +use directories::ProjectDirs; +use eyre::{eyre, Result}; +use parse_duration::parse; + +pub const HISTORY_PAGE_SIZE: i64 = 100; + +#[derive(Clone, Debug, Deserialize)] +pub struct Settings { + pub dialect: String, + pub auto_sync: bool, + pub sync_address: String, + pub sync_frequency: String, + pub db_path: String, + pub key_path: String, + pub session_path: String, + + // This is automatically loaded when settings is created. Do not set in + // config! Keep secrets and settings apart. + pub session_token: String, +} + +impl Settings { + pub fn save_sync_time() -> Result<()> { + let sync_time_path = ProjectDirs::from("com", "elliehuxtable", "atuin") + .ok_or_else(|| eyre!("could not determine key file location"))?; + let sync_time_path = sync_time_path.data_dir().join("last_sync_time"); + + std::fs::write(sync_time_path, Utc::now().to_rfc3339())?; + + Ok(()) + } + + pub fn last_sync() -> Result> { + let sync_time_path = ProjectDirs::from("com", "elliehuxtable", "atuin"); + + if sync_time_path.is_none() { + debug!("failed to load projectdirs, not syncing"); + return Err(eyre!("could not load project dirs")); + } + + let sync_time_path = sync_time_path.unwrap(); + let sync_time_path = sync_time_path.data_dir().join("last_sync_time"); + + if !sync_time_path.exists() { + return Ok(Utc.ymd(1970, 1, 1).and_hms(0, 0, 0)); + } + + let time = std::fs::read_to_string(sync_time_path)?; + let time = chrono::DateTime::parse_from_rfc3339(time.as_str())?; + + Ok(time.with_timezone(&Utc)) + } + + pub fn should_sync(&self) -> Result { + if !self.auto_sync { + return Ok(false); + } + + match parse(self.sync_frequency.as_str()) { + Ok(d) => { + let d = chrono::Duration::from_std(d).unwrap(); + Ok(Utc::now() - Settings::last_sync()? >= d) + } + Err(e) => Err(eyre!("failed to check sync: {}", e)), + } + } + + pub fn new() -> Result { + let config_dir = ProjectDirs::from("com", "elliehuxtable", "atuin").unwrap(); + let config_dir = config_dir.config_dir(); + + create_dir_all(config_dir)?; + + let config_file = if let Ok(p) = std::env::var("ATUIN_CONFIG") { + PathBuf::from(p) + } else { + let mut config_file = PathBuf::new(); + config_file.push(config_dir); + config_file.push("config.toml"); + config_file + }; + + let mut s = Config::new(); + + let db_path = ProjectDirs::from("com", "elliehuxtable", "atuin") + .ok_or_else(|| eyre!("could not determine db file location"))? + .data_dir() + .join("history.db"); + + let key_path = ProjectDirs::from("com", "elliehuxtable", "atuin") + .ok_or_else(|| eyre!("could not determine key file location"))? + .data_dir() + .join("key"); + + let session_path = ProjectDirs::from("com", "elliehuxtable", "atuin") + .ok_or_else(|| eyre!("could not determine session file location"))? + .data_dir() + .join("session"); + + s.set_default("db_path", db_path.to_str())?; + s.set_default("key_path", key_path.to_str())?; + s.set_default("session_path", session_path.to_str())?; + s.set_default("dialect", "us")?; + s.set_default("auto_sync", true)?; + s.set_default("sync_frequency", "5m")?; + s.set_default("sync_address", "https://api.atuin.sh")?; + + if config_file.exists() { + s.merge(ConfigFile::with_name(config_file.to_str().unwrap()))?; + } else { + let example_config = include_bytes!("../config.toml"); + let mut file = File::create(config_file)?; + file.write_all(example_config)?; + } + + s.merge(Environment::with_prefix("atuin").separator("_"))?; + + // all paths should be expanded + let db_path = s.get_str("db_path")?; + let db_path = shellexpand::full(db_path.as_str())?; + s.set("db_path", db_path.to_string())?; + + let key_path = s.get_str("key_path")?; + let key_path = shellexpand::full(key_path.as_str())?; + s.set("key_path", key_path.to_string())?; + + let session_path = s.get_str("session_path")?; + let session_path = shellexpand::full(session_path.as_str())?; + s.set("session_path", session_path.to_string())?; + + // Finally, set the auth token + if Path::new(session_path.to_string().as_str()).exists() { + let token = std::fs::read_to_string(session_path.to_string())?; + s.set("session_token", token.trim())?; + } else { + s.set("session_token", "not logged in")?; + } + + s.try_into() + .map_err(|e| eyre!("failed to deserialize: {}", e)) + } +} diff --git a/atuin-client/src/sync.rs b/atuin-client/src/sync.rs new file mode 100644 index 00000000..0ca8d3a6 --- /dev/null +++ b/atuin-client/src/sync.rs @@ -0,0 +1,142 @@ +use std::convert::TryInto; + +use chrono::prelude::*; +use eyre::Result; + +use atuin_common::{api::AddHistoryRequest, utils::hash_str}; + +use crate::api_client; +use crate::database::Database; +use crate::encryption::{encrypt, load_key}; +use crate::settings::{Settings, HISTORY_PAGE_SIZE}; + +// Currently sync is kinda naive, and basically just pages backwards through +// history. This means newly added stuff shows up properly! We also just use +// the total count in each database to indicate whether a sync is needed. +// I think this could be massively improved! If we had a way of easily +// indicating count per time period (hour, day, week, year, etc) then we can +// easily pinpoint where we are missing data and what needs downloading. Start +// with year, then find the week, then the day, then the hour, then download it +// all! The current naive approach will do for now. + +// Check if remote has things we don't, and if so, download them. +// Returns (num downloaded, total local) +async fn sync_download( + force: bool, + client: &api_client::Client<'_>, + db: &mut (impl Database + Send), +) -> Result<(i64, i64)> { + let remote_count = client.count().await?; + + let initial_local = db.history_count()?; + let mut local_count = initial_local; + + let mut last_sync = if force { + Utc.timestamp_millis(0) + } else { + Settings::last_sync()? + }; + + let mut last_timestamp = Utc.timestamp_millis(0); + + let host = if force { Some(String::from("")) } else { None }; + + while remote_count > local_count { + let page = client + .get_history(last_sync, last_timestamp, host.clone()) + .await?; + + if page.len() < HISTORY_PAGE_SIZE.try_into().unwrap() { + break; + } + + db.save_bulk(&page)?; + + local_count = db.history_count()?; + + let page_last = page + .last() + .expect("could not get last element of page") + .timestamp; + + // in the case of a small sync frequency, it's possible for history to + // be "lost" between syncs. In this case we need to rewind the sync + // timestamps + if page_last == last_timestamp { + last_timestamp = Utc.timestamp_millis(0); + last_sync = last_sync - chrono::Duration::hours(1); + } else { + last_timestamp = page_last; + } + } + + Ok((local_count - initial_local, local_count)) +} + +// Check if we have things remote doesn't, and if so, upload them +async fn sync_upload( + settings: &Settings, + _force: bool, + client: &api_client::Client<'_>, + db: &mut (impl Database + Send), +) -> Result<()> { + let initial_remote_count = client.count().await?; + let mut remote_count = initial_remote_count; + + let local_count = db.history_count()?; + + let key = load_key(settings)?; // encryption key + + // first just try the most recent set + + let mut cursor = Utc::now(); + + while local_count > remote_count { + let last = db.before(cursor, HISTORY_PAGE_SIZE)?; + let mut buffer = Vec::::new(); + + if last.is_empty() { + break; + } + + for i in last { + let data = encrypt(&i, &key)?; + let data = serde_json::to_string(&data)?; + + let add_hist = AddHistoryRequest { + id: i.id, + timestamp: i.timestamp, + data, + hostname: hash_str(i.hostname.as_str()), + }; + + buffer.push(add_hist); + } + + // anything left over outside of the 100 block size + client.post_history(&buffer).await?; + cursor = buffer.last().unwrap().timestamp; + + remote_count = client.count().await?; + } + + Ok(()) +} + +pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> { + let client = api_client::Client::new( + settings.sync_address.as_str(), + settings.session_token.as_str(), + load_key(settings)?, + ); + + sync_upload(settings, force, &client, db).await?; + + let download = sync_download(force, &client, db).await?; + + debug!("sync downloaded {}", download.0); + + Settings::save_sync_time()?; + + Ok(()) +} -- cgit v1.2.3