diff options
author | Conrad Ludgate <conradludgate@gmail.com> | 2021-11-21 19:40:50 +0000 |
---|---|---|
committer | Conrad Ludgate <conradludgate@gmail.com> | 2021-11-21 19:40:50 +0000 |
commit | 04c3f6cdc659c157d48e773539a4b3bc0851f9d5 (patch) | |
tree | b7b36211860d0a04dd08f42cf1ed63a625352c17 | |
parent | 6e8ec8689d85bf97e79455f7c3644daa5c8e02fd (diff) |
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | atuin-client/Cargo.toml | 1 | ||||
-rw-r--r-- | atuin-client/src/database.rs | 104 | ||||
-rw-r--r-- | atuin-client/src/sync.rs | 8 | ||||
-rw-r--r-- | src/command/history.rs | 50 | ||||
-rw-r--r-- | src/command/import.rs | 16 | ||||
-rw-r--r-- | src/command/search.rs | 24 | ||||
-rw-r--r-- | src/command/stats.rs | 8 | ||||
-rw-r--r-- | src/command/sync.rs | 4 |
10 files changed, 94 insertions, 124 deletions
@@ -105,6 +105,7 @@ dependencies = [ "directories", "eyre", "fork", + "futures", "humantime 2.1.0", "indicatif", "itertools", @@ -134,6 +135,7 @@ dependencies = [ "directories", "eyre", "fern", + "futures", "humantime 2.1.0", "indicatif", "itertools", @@ -56,6 +56,7 @@ base64 = "0.13.0" humantime = "2.1.0" tabwriter = "1.2.1" crossbeam-channel = "0.5.1" +futures = "0.3" [profile.release] lto = "fat" diff --git a/atuin-client/Cargo.toml b/atuin-client/Cargo.toml index e0e50ac1..593fa86b 100644 --- a/atuin-client/Cargo.toml +++ b/atuin-client/Cargo.toml @@ -41,6 +41,7 @@ itertools = "0.10.1" shellexpand = "2" sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "uuid", "chrono", "sqlite" ] } minspan = "0.1.1" +futures = "0.3" [dev-dependencies] tokio-test = "*" diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs index 8dff15fd..536b6811 100644 --- a/atuin-client/src/database.rs +++ b/atuin-client/src/database.rs @@ -1,11 +1,11 @@ use std::path::Path; use std::str::FromStr; -use async_trait::async_trait; use chrono::prelude::*; use chrono::Utc; use eyre::Result; +use futures::Stream; use itertools::Itertools; use sqlx::sqlite::{ @@ -17,36 +17,6 @@ use super::history::History; use super::ordering; use super::settings::SearchMode; -#[async_trait] -pub trait Database { - async fn save(&mut self, h: &History) -> Result<()>; - async fn save_bulk(&mut self, h: &[History]) -> Result<()>; - - async fn load(&self, id: &str) -> Result<History>; - async fn list(&self, max: Option<usize>, unique: bool) -> Result<Vec<History>>; - async fn range( - &self, - from: chrono::DateTime<Utc>, - to: chrono::DateTime<Utc>, - ) -> Result<Vec<History>>; - - async fn update(&self, h: &History) -> Result<()>; - async fn history_count(&self) -> Result<i64>; - - async fn first(&self) -> Result<History>; - async fn last(&self) -> Result<History>; - async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>>; - - async fn search( - &self, - limit: Option<i64>, - search_mode: SearchMode, - query: &str, - ) -> Result<Vec<History>>; - - async fn query_history(&self, query: &str) -> Result<Vec<History>>; -} - // Intended for use on a developer machine and not a sync server. // TODO: implement IntoIterator pub struct Sqlite { @@ -103,7 +73,7 @@ impl Sqlite { Ok(()) } - fn query_history(row: SqliteRow) -> History { + fn query_history_row(row: SqliteRow) -> History { History { id: row.get("id"), timestamp: Utc.timestamp_nanos(row.get("timestamp")), @@ -117,9 +87,8 @@ impl Sqlite { } } -#[async_trait] -impl Database for Sqlite { - async fn save(&mut self, h: &History) -> Result<()> { +impl Sqlite { + pub async fn save(&mut self, h: &History) -> Result<()> { debug!("saving history to sqlite"); let mut tx = self.pool.begin().await?; @@ -129,7 +98,7 @@ impl Database for Sqlite { Ok(()) } - async fn save_bulk(&mut self, h: &[History]) -> Result<()> { + pub async fn save_bulk(&mut self, h: &[History]) -> Result<()> { debug!("saving history to sqlite"); let mut tx = self.pool.begin().await?; @@ -143,19 +112,19 @@ impl Database for Sqlite { Ok(()) } - async fn load(&self, id: &str) -> Result<History> { + pub async fn load(&self, id: &str) -> Result<History> { debug!("loading history item {}", id); let res = sqlx::query("select * from history where id = ?1") .bind(id) - .map(Self::query_history) + .map(Self::query_history_row) .fetch_one(&self.pool) .await?; Ok(res) } - async fn update(&self, h: &History) -> Result<()> { + pub async fn update(&self, h: &History) -> Result<()> { debug!("updating sqlite history"); sqlx::query( @@ -178,14 +147,14 @@ impl Database for Sqlite { } // make a unique list, that only shows the *newest* version of things - async fn list(&self, max: Option<usize>, unique: bool) -> Result<Vec<History>> { + pub fn list(&self, max: Option<usize>, unique: bool) -> String { debug!("listing history"); // very likely vulnerable to SQL injection // however, this is client side, and only used by the client, on their // own data. They can just open the db file... // otherwise building the query is awkward - let query = format!( + format!( "select * from history h {} order by timestamp desc @@ -205,17 +174,14 @@ impl Database for Sqlite { } else { "".to_string() } - ); - - let res = sqlx::query(query.as_str()) - .map(Self::query_history) - .fetch_all(&self.pool) - .await?; + ) - Ok(res) + // sqlx::query(query.as_str()) + // .map(Self::query_history_row) + // .fetch(&self.pool) } - async fn range( + pub async fn range( &self, from: chrono::DateTime<Utc>, to: chrono::DateTime<Utc>, @@ -227,48 +193,41 @@ impl Database for Sqlite { ) .bind(from) .bind(to) - .map(Self::query_history) + .map(Self::query_history_row) .fetch_all(&self.pool) .await?; Ok(res) } - async fn first(&self) -> Result<History> { + pub async fn first(&self) -> Result<History> { let res = sqlx::query("select * from history where duration >= 0 order by timestamp asc limit 1") - .map(Self::query_history) + .map(Self::query_history_row) .fetch_one(&self.pool) .await?; Ok(res) } - async fn last(&self) -> Result<History> { - let res = sqlx::query( - "select * from history where duration >= 0 order by timestamp desc limit 1", - ) - .map(Self::query_history) - .fetch_one(&self.pool) - .await?; - - Ok(res) + pub fn last(&self) -> &'static str { + "select * from history where duration >= 0 order by timestamp desc limit 1" } - async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>> { + pub async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>> { let res = sqlx::query( "select * from history where timestamp < ?1 order by timestamp desc limit ?2", ) .bind(timestamp.timestamp_nanos()) .bind(count) - .map(Self::query_history) + .map(Self::query_history_row) .fetch_all(&self.pool) .await?; Ok(res) } - async fn history_count(&self) -> Result<i64> { + pub async fn history_count(&self) -> Result<i64> { let res: (i64,) = sqlx::query_as("select count(1) from history") .fetch_one(&self.pool) .await?; @@ -276,7 +235,7 @@ impl Database for Sqlite { Ok(res.0) } - async fn search( + pub async fn search( &self, limit: Option<i64>, search_mode: SearchMode, @@ -306,20 +265,17 @@ impl Database for Sqlite { .as_str(), ) .bind(query) - .map(Self::query_history) + .map(Self::query_history_row) .fetch_all(&self.pool) .await?; Ok(ordering::reorder_fuzzy(search_mode, orig_query, res)) } - async fn query_history(&self, query: &str) -> Result<Vec<History>> { - let res = sqlx::query(query) - .map(Self::query_history) - .fetch_all(&self.pool) - .await?; - - Ok(res) + pub fn query_history<'q: 'e, 'e>(&'e self, query: &'q str) -> impl Stream<Item = Result<History, sqlx::Error>> +'e { + sqlx::query(query) + .map(Self::query_history_row) + .fetch(&self.pool) } } @@ -327,7 +283,7 @@ impl Database for Sqlite { mod test { use super::*; - async fn new_history_item(db: &mut impl Database, cmd: &str) -> Result<()> { + async fn new_history_item(db: &mut Sqlite, cmd: &str) -> Result<()> { let history = History::new( chrono::Utc::now(), cmd.to_string(), diff --git a/atuin-client/src/sync.rs b/atuin-client/src/sync.rs index c1c02b0a..1175cd4f 100644 --- a/atuin-client/src/sync.rs +++ b/atuin-client/src/sync.rs @@ -6,7 +6,7 @@ use eyre::Result; use atuin_common::{api::AddHistoryRequest, utils::hash_str}; use crate::api_client; -use crate::database::Database; +use crate::database::Sqlite; use crate::encryption::{encrypt, load_encoded_key, load_key}; use crate::settings::{Settings, HISTORY_PAGE_SIZE}; @@ -24,7 +24,7 @@ use crate::settings::{Settings, HISTORY_PAGE_SIZE}; async fn sync_download( force: bool, client: &api_client::Client<'_>, - db: &mut (impl Database + Send), + db: &mut Sqlite, ) -> Result<(i64, i64)> { debug!("starting sync download"); @@ -80,7 +80,7 @@ async fn sync_upload( settings: &Settings, _force: bool, client: &api_client::Client<'_>, - db: &mut (impl Database + Send), + db: &mut Sqlite, ) -> Result<()> { debug!("starting sync upload"); @@ -130,7 +130,7 @@ async fn sync_upload( Ok(()) } -pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> { +pub async fn sync(settings: &Settings, force: bool, db: &mut Sqlite) -> Result<()> { let client = api_client::Client::new( &settings.sync_address, &settings.session_token, diff --git a/src/command/history.rs b/src/command/history.rs index 4606b304..9795d672 100644 --- a/src/command/history.rs +++ b/src/command/history.rs @@ -3,10 +3,11 @@ use std::io::Write; use std::time::Duration; use eyre::Result; +use futures::stream::{Stream, TryStreamExt}; use structopt::StructOpt; use tabwriter::TabWriter; -use atuin_client::database::Database; +use atuin_client::database::Sqlite; use atuin_client::history::History; use atuin_client::settings::Settings; use atuin_client::sync; @@ -61,10 +62,10 @@ pub enum Cmd { } #[allow(clippy::cast_sign_loss)] -pub fn print_list(h: &[History], human: bool, cmd_only: bool) { +pub async fn print_list<E>(h: impl Stream<Item = Result<History, E>>, human: bool, cmd_only: bool) -> Result<(), E> { let mut writer = TabWriter::new(std::io::stdout()).padding(2); - let lines = h.iter().map(|h| { + let lines = h.map_ok(|h| { if human { let duration = humantime::format_duration(Duration::from_nanos(std::cmp::max( h.duration, 0, @@ -91,20 +92,26 @@ pub fn print_list(h: &[History], human: bool, cmd_only: bool) { } }); - for i in lines.rev() { - writer - .write_all(i.as_bytes()) - .expect("failed to write to tab writer"); - } + let fut = lines + .try_for_each(|i| { + writer + .write_all(i.as_bytes()) + .expect("failed to write to tab writer"); + + futures::future::ready(Ok(())) + }); + fut.await?; writer.flush().expect("failed to flush tab writer"); + + Ok(()) } impl Cmd { pub async fn run( &self, settings: &Settings, - db: &mut (impl Database + Send + Sync), + db: &mut Sqlite, ) -> Result<()> { match self { Self::Start { command: words } => { @@ -171,33 +178,34 @@ impl Cmd { None }; - let history = match (session, cwd) { - (None, None) => db.list(None, false).await?, + let query = match (session, cwd) { + (None, None) => db.list(None, false), (None, Some(cwd)) => { - let query = format!("select * from history where cwd = {};", cwd); - db.query_history(&query).await? + format!("select * from history where cwd = {};", cwd) } (Some(session), None) => { - let query = format!("select * from history where session = {};", session); - db.query_history(&query).await? + format!("select * from history where session = {};", session) } (Some(session), Some(cwd)) => { - let query = format!( + format!( "select * from history where cwd = {} and session = {};", cwd, session - ); - db.query_history(&query).await? + ) } }; - print_list(&history, *human, *cmd_only); + let history = db.query_history(&query); + + print_list(history, *human, *cmd_only).await?; Ok(()) } Self::Last { human, cmd_only } => { - let last = db.last().await?; - print_list(&[last], *human, *cmd_only); + let last = db.last(); + let history = db.query_history(last); + + print_list(history, *human, *cmd_only).await?; Ok(()) } diff --git a/src/command/import.rs b/src/command/import.rs index 53940abb..4fd5b31e 100644 --- a/src/command/import.rs +++ b/src/command/import.rs @@ -4,7 +4,7 @@ use eyre::{eyre, Result}; use structopt::StructOpt; use atuin_client::import::{bash::Bash, zsh::Zsh}; -use atuin_client::{database::Database, import::Importer}; +use atuin_client::{database::Sqlite, import::Importer}; use atuin_client::{history::History, import::resh::Resh}; use indicatif::ProgressBar; @@ -38,7 +38,7 @@ pub enum Cmd { const BATCH_SIZE: usize = 100; impl Cmd { - pub async fn run(&self, db: &mut (impl Database + Send + Sync)) -> Result<()> { + pub async fn run(&self, db: &mut Sqlite) -> Result<()> { println!(" Atuin "); println!("======================"); println!(" \u{1f30d} "); @@ -53,22 +53,22 @@ impl Cmd { if shell.ends_with("/zsh") { println!("Detected ZSH"); - import::<Zsh<_>, _>(db, BATCH_SIZE).await + import::<Zsh<_>>(db, BATCH_SIZE).await } else { println!("cannot import {} history", shell); Ok(()) } } - Self::Zsh => import::<Zsh<_>, _>(db, BATCH_SIZE).await, - Self::Bash => import::<Bash<_>, _>(db, BATCH_SIZE).await, - Self::Resh => import::<Resh, _>(db, BATCH_SIZE).await, + Self::Zsh => import::<Zsh<_>>(db, BATCH_SIZE).await, + Self::Bash => import::<Bash<_>>(db, BATCH_SIZE).await, + Self::Resh => import::<Resh>(db, BATCH_SIZE).await, } } } -async fn import<I: Importer + Send, DB: Database + Send + Sync>( - db: &mut DB, +async fn import<I: Importer + Send>( + db: &mut Sqlite, buf_size: usize, ) -> Result<()> where diff --git a/src/command/search.rs b/src/command/search.rs index 00e11f52..6b6e50e3 100644 --- a/src/command/search.rs +++ b/src/command/search.rs @@ -14,7 +14,7 @@ use tui::{ }; use unicode_width::UnicodeWidthStr; -use atuin_client::database::Database; +use atuin_client::database::Sqlite; use atuin_client::history::History; use atuin_client::settings::{SearchMode, Settings}; @@ -151,13 +151,9 @@ impl State { } } -async fn query_results( - app: &mut State, - search_mode: SearchMode, - db: &mut (impl Database + Send + Sync), -) -> Result<()> { +async fn query_results(app: &mut State, search_mode: SearchMode, db: &mut Sqlite) -> Result<()> { let results = match app.input.as_str() { - "" => db.list(Some(200), true).await?, + // "" => db.list(Some(200), true).await?, i => db.search(Some(200), search_mode, i).await?, }; @@ -175,7 +171,7 @@ async fn query_results( async fn key_handler( input: Key, search_mode: SearchMode, - db: &mut (impl Database + Send + Sync), + db: &mut Sqlite, app: &mut State, ) -> Option<String> { match input { @@ -314,7 +310,7 @@ fn draw<T: Backend>(f: &mut Frame<'_, T>, history_count: i64, app: &mut State) { async fn select_history( query: &[String], search_mode: SearchMode, - db: &mut (impl Database + Send + Sync), + db: &mut Sqlite, ) -> Result<String> { let stdout = stdout().into_raw_mode()?; let stdout = MouseTerminal::from(stdout); @@ -361,7 +357,7 @@ pub async fn run( after: Option<String>, cmd_only: bool, query: &[String], - db: &mut (impl Database + Send + Sync), + db: &mut Sqlite, ) -> Result<()> { let dir = if let Some(cwd) = cwd { if cwd == "." { @@ -443,7 +439,13 @@ pub async fn run( .map(std::borrow::ToOwned::to_owned) .collect(); - super::history::print_list(&results, human, cmd_only); + super::history::print_list( + futures::stream::iter(results.into_iter().map(Result::<_, ()>::Ok)), + human, + cmd_only, + ) + .await + .unwrap(); } Ok(()) diff --git a/src/command/stats.rs b/src/command/stats.rs index 742202ae..cb5bb304 100644 --- a/src/command/stats.rs +++ b/src/command/stats.rs @@ -8,7 +8,7 @@ use cli_table::{format::Justify, print_stdout, Cell, Style, Table}; use eyre::{eyre, Result}; use structopt::StructOpt; -use atuin_client::database::Database; +use atuin_client::database::Sqlite; use atuin_client::history::History; use atuin_client::settings::Settings; @@ -73,7 +73,7 @@ fn compute_stats(history: &[History]) -> Result<()> { impl Cmd { pub async fn run( &self, - db: &mut (impl Database + Send + Sync), + db: &mut Sqlite, settings: &Settings, ) -> Result<()> { match self { @@ -95,9 +95,9 @@ impl Cmd { } Self::All => { - let history = db.list(None, false).await?; + // let history = db.list(None, false).await?; - compute_stats(&history)?; + // compute_stats(&history)?; Ok(()) } diff --git a/src/command/sync.rs b/src/command/sync.rs index f8bfd5e2..0bceca26 100644 --- a/src/command/sync.rs +++ b/src/command/sync.rs @@ -1,13 +1,13 @@ use eyre::Result; -use atuin_client::database::Database; +use atuin_client::database::Sqlite; use atuin_client::settings::Settings; use atuin_client::sync; pub async fn run( settings: &Settings, force: bool, - db: &mut (impl Database + Send + Sync), + db: &mut Sqlite, ) -> Result<()> { sync::sync(settings, force, db).await?; println!( |