diff options
author | Conrad Ludgate <conradludgate@gmail.com> | 2021-12-11 11:04:18 +0000 |
---|---|---|
committer | Conrad Ludgate <conradludgate@gmail.com> | 2021-12-11 12:18:43 +0000 |
commit | 1037af30361896546675722327fa77d0fdd35e86 (patch) | |
tree | da3e161b94dc8ac9f47ccc67ff2deb9a6e9cbbb0 | |
parent | 87df7d80eca0ede9e149d1ef533e71650e4b919a (diff) |
streamingv2streaming2
-rw-r--r-- | Cargo.lock | 19 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | atuin-client/Cargo.toml | 1 | ||||
-rw-r--r-- | atuin-client/src/database.rs | 112 | ||||
-rw-r--r-- | src/command/history.rs | 66 | ||||
-rw-r--r-- | src/command/search.rs | 88 | ||||
-rw-r--r-- | src/command/stats.rs | 4 |
7 files changed, 166 insertions, 125 deletions
@@ -84,6 +84,7 @@ dependencies = [ "directories", "eyre", "fork", + "futures", "humantime 2.1.0", "indicatif", "itertools", @@ -113,6 +114,7 @@ dependencies = [ "directories", "eyre", "fern", + "futures-core", "humantime 2.1.0", "indicatif", "itertools", @@ -632,6 +634,7 @@ checksum = "a9d5813545e459ad3ca1bff9915e9ad7f1a47dc6a91b627ce321d5863b7dd253" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -655,6 +658,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "098cd1c6dda6ca01650f1a37a794245eb73181d0d4d4e955e2f3c37db7af1815" [[package]] +name = "futures-executor" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f6cb7042eda00f0049b1d2080aa4b93442997ee507eb3828e8bd7577f94c9d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] name = "futures-intrusive" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -701,6 +715,7 @@ version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c144ad54d60f23927f0a6b6d816e4271278b64f005ad65e4e35291d2de9c025" dependencies = [ + "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -2000,9 +2015,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.5.7" +version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4b94ab0f8c21ee4899b93b06451ef5d965f1a355982ee73684338228498440" +checksum = "7911b0031a0247af40095838002999c7a52fba29d9739e93326e71a5a1bc9d43" dependencies = [ "sqlx-core", "sqlx-macros", @@ -56,6 +56,7 @@ base64 = "0.13.0" humantime = "2.1.0" tabwriter = "1.2.1" crossbeam-channel = "0.5.1" +futures = "0.3" [profile.release] lto = "fat" diff --git a/atuin-client/Cargo.toml b/atuin-client/Cargo.toml index ab0f24c9..bebaa380 100644 --- a/atuin-client/Cargo.toml +++ b/atuin-client/Cargo.toml @@ -41,3 +41,4 @@ itertools = "0.10.3" shellexpand = "2" sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "uuid", "chrono", "sqlite" ] } minspan = "0.1.1" +futures-core = "0.3" diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs index 8dff15fd..690190b6 100644 --- a/atuin-client/src/database.rs +++ b/atuin-client/src/database.rs @@ -6,6 +6,7 @@ use chrono::prelude::*; use chrono::Utc; use eyre::Result; +use futures_core::stream::BoxStream; use itertools::Itertools; use sqlx::sqlite::{ @@ -23,7 +24,6 @@ pub trait Database { async fn save_bulk(&mut self, h: &[History]) -> Result<()>; async fn load(&self, id: &str) -> Result<History>; - async fn list(&self, max: Option<usize>, unique: bool) -> Result<Vec<History>>; async fn range( &self, from: chrono::DateTime<Utc>, @@ -44,7 +44,16 @@ pub trait Database { query: &str, ) -> Result<Vec<History>>; - async fn query_history(&self, query: &str) -> Result<Vec<History>>; + fn list(&self, max: Option<usize>, unique: bool) -> BoxStream<Result<History, sqlx::Error>>; + fn find( + &self, + session: Option<String>, + cwd: Option<String>, + ) -> BoxStream<Result<History, sqlx::Error>>; + fn query_history<'e, 'q: 'e>( + &'e self, + query: &'q str, + ) -> BoxStream<'e, Result<History, sqlx::Error>>; } // Intended for use on a developer machine and not a sync server. @@ -178,41 +187,70 @@ impl Database for Sqlite { } // make a unique list, that only shows the *newest* version of things - async fn list(&self, max: Option<usize>, unique: bool) -> Result<Vec<History>> { + fn list(&self, max: Option<usize>, unique: bool) -> BoxStream<Result<History, sqlx::Error>> { debug!("listing history"); - // very likely vulnerable to SQL injection - // however, this is client side, and only used by the client, on their - // own data. They can just open the db file... - // otherwise building the query is awkward - let query = format!( - "select * from history h - {} + let query = match (max, unique) { + (None, false) => sqlx::query( + "select * from history h + order by timestamp desc", + ), + + (None, true) => sqlx::query( + "select * from history h + where timestamp = ( + select max(timestamp) from history + where h.command = history.command + ) + order by timestamp desc", + ), + + (Some(max), true) => sqlx::query( + "select * from history h + where timestamp = ( + select max(timestamp) from history + where h.command = history.command + ) + order by timestamp desc + limit ?1", + ) + .bind(max as u32), + + (Some(max), false) => sqlx::query( + "select * from history h order by timestamp desc - {}", - // inject the unique check - if unique { - "where timestamp = ( - select max(timestamp) from history - where h.command = history.command - )" - } else { - "" - }, - // inject the limit - if let Some(max) = max { - format!("limit {}", max) - } else { - "".to_string() + limit ?1", + ) + .bind(max as u32), + }; + + query.map(Self::query_history).fetch(&self.pool) + } + + fn find( + &self, + session: Option<String>, + cwd: Option<String>, + ) -> BoxStream<Result<History, sqlx::Error>> { + debug!("listing history"); + + let query = match (session, cwd) { + (None, None) => sqlx::query("select * from history h"), + + (None, Some(cwd)) => sqlx::query("select * from history h where cwd = ?1").bind(cwd), + + (Some(session), None) => { + sqlx::query("select * from history h where session = ?1").bind(session) } - ); - let res = sqlx::query(query.as_str()) - .map(Self::query_history) - .fetch_all(&self.pool) - .await?; + (Some(session), Some(cwd)) => { + sqlx::query("select * from history h where cwd = ?1 and session = ?2") + .bind(cwd) + .bind(session) + } + }; - Ok(res) + query.map(Self::query_history).fetch(&self.pool) } async fn range( @@ -313,13 +351,13 @@ impl Database for Sqlite { Ok(ordering::reorder_fuzzy(search_mode, orig_query, res)) } - async fn query_history(&self, query: &str) -> Result<Vec<History>> { - let res = sqlx::query(query) + fn query_history<'e, 'q: 'e>( + &'e self, + query: &'q str, + ) -> BoxStream<'e, Result<History, sqlx::Error>> { + sqlx::query(query) .map(Self::query_history) - .fetch_all(&self.pool) - .await?; - - Ok(res) + .fetch(&self.pool) } } diff --git a/src/command/history.rs b/src/command/history.rs index 4606b304..8ceaef75 100644 --- a/src/command/history.rs +++ b/src/command/history.rs @@ -3,6 +3,7 @@ use std::io::Write; use std::time::Duration; use eyre::Result; +use futures::{Stream, TryStreamExt}; use structopt::StructOpt; use tabwriter::TabWriter; @@ -61,10 +62,14 @@ pub enum Cmd { } #[allow(clippy::cast_sign_loss)] -pub fn print_list(h: &[History], human: bool, cmd_only: bool) { +pub async fn print_list<E: Send>( + h: impl Stream<Item = Result<History, E>> + Send, + human: bool, + cmd_only: bool, +) -> Result<(), E> { let mut writer = TabWriter::new(std::io::stdout()).padding(2); - let lines = h.iter().map(|h| { + let lines = h.map_ok(|h| { if human { let duration = humantime::format_duration(Duration::from_nanos(std::cmp::max( h.duration, 0, @@ -91,18 +96,23 @@ pub fn print_list(h: &[History], human: bool, cmd_only: bool) { } }); - for i in lines.rev() { + let fut = lines.try_for_each(|i| { writer .write_all(i.as_bytes()) .expect("failed to write to tab writer"); - } + + futures::future::ready(Ok(())) + }); + fut.await?; writer.flush().expect("failed to flush tab writer"); + + Ok(()) } impl Cmd { pub async fn run( - &self, + self, settings: &Settings, db: &mut (impl Database + Send + Sync), ) -> Result<()> { @@ -130,7 +140,7 @@ impl Cmd { return Ok(()); } - let mut h = db.load(id).await?; + let mut h = db.load(&id).await?; if h.duration > 0 { debug!("cannot end history - already has duration"); @@ -139,7 +149,7 @@ impl Cmd { return Ok(()); } - h.exit = *exit; + h.exit = exit; h.duration = chrono::Utc::now().timestamp_nanos() - h.timestamp.timestamp_nanos(); db.update(&h).await?; @@ -160,44 +170,20 @@ impl Cmd { human, cmd_only, } => { - let session = if *session { - Some(env::var("ATUIN_SESSION")?) - } else { - None - }; - let cwd = if *cwd { - Some(env::current_dir()?.display().to_string()) - } else { - None - }; - - let history = match (session, cwd) { - (None, None) => db.list(None, false).await?, - (None, Some(cwd)) => { - let query = format!("select * from history where cwd = {};", cwd); - db.query_history(&query).await? - } - (Some(session), None) => { - let query = format!("select * from history where session = {};", session); - db.query_history(&query).await? - } - (Some(session), Some(cwd)) => { - let query = format!( - "select * from history where cwd = {} and session = {};", - cwd, session - ); - db.query_history(&query).await? - } - }; - - print_list(&history, *human, *cmd_only); + let session = session.then(|| env::var("ATUIN_SESSION")).transpose()?; + let cwd = cwd + .then(|| env::current_dir().map(|x| x.display().to_string())) + .transpose()?; + + let history = db.find(session, cwd); + print_list(history, human, cmd_only).await?; Ok(()) } Self::Last { human, cmd_only } => { - let last = db.last().await?; - print_list(&[last], *human, *cmd_only); + let last = db.last().await; + print_list(futures::stream::iter(Some(last)), human, cmd_only).await?; Ok(()) } diff --git a/src/command/search.rs b/src/command/search.rs index 00e11f52..c88f6040 100644 --- a/src/command/search.rs +++ b/src/command/search.rs @@ -1,5 +1,6 @@ use chrono::Utc; use eyre::Result; +use futures::TryStreamExt; use std::time::Duration; use std::{io::stdout, ops::Sub}; @@ -157,7 +158,7 @@ async fn query_results( db: &mut (impl Database + Send + Sync), ) -> Result<()> { let results = match app.input.as_str() { - "" => db.list(Some(200), true).await?, + "" => db.list(Some(200), true).try_collect::<Vec<_>>().await?, i => db.search(Some(200), search_mode, i).await?, }; @@ -381,69 +382,68 @@ pub async fn run( let item = select_history(query, settings.search_mode, db).await?; eprintln!("{}", item); } else { - let results = db + let mut results = db .search(None, settings.search_mode, query.join(" ").as_str()) .await?; // TODO: This filtering would be better done in the SQL query, I just // need a nice way of building queries. - let results: Vec<History> = results - .iter() - .filter(|h| { - if let Some(exit) = exit { - if h.exit != exit { - return false; - } + results.retain(|h| { + if let Some(exit) = exit { + if h.exit != exit { + return false; } + } - if let Some(exit) = exclude_exit { - if h.exit == exit { - return false; - } + if let Some(exit) = exclude_exit { + if h.exit == exit { + return false; } + } - if let Some(cwd) = &exclude_cwd { - if h.cwd.as_str() == cwd.as_str() { - return false; - } + if let Some(cwd) = &exclude_cwd { + if h.cwd.as_str() == cwd.as_str() { + return false; } + } - if let Some(cwd) = &dir { - if h.cwd.as_str() != cwd.as_str() { - return false; - } + if let Some(cwd) = &dir { + if h.cwd.as_str() != cwd.as_str() { + return false; } + } - if let Some(before) = &before { - let before = chrono_english::parse_date_string( - before.as_str(), - Utc::now(), - chrono_english::Dialect::Uk, - ); + if let Some(before) = &before { + let before = chrono_english::parse_date_string( + before.as_str(), + Utc::now(), + chrono_english::Dialect::Uk, + ); - if before.is_err() || h.timestamp.gt(&before.unwrap()) { - return false; - } + if before.is_err() || h.timestamp.gt(&before.unwrap()) { + return false; } + } - if let Some(after) = &after { - let after = chrono_english::parse_date_string( - after.as_str(), - Utc::now(), - chrono_english::Dialect::Uk, - ); + if let Some(after) = &after { + let after = chrono_english::parse_date_string( + after.as_str(), + Utc::now(), + chrono_english::Dialect::Uk, + ); - if after.is_err() || h.timestamp.lt(&after.unwrap()) { - return false; - } + if after.is_err() || h.timestamp.lt(&after.unwrap()) { + return false; } + } - true - }) - .map(std::borrow::ToOwned::to_owned) - .collect(); + true + }); - super::history::print_list(&results, human, cmd_only); + let stream = futures::stream::iter(results.into_iter().map(Result::<_, ()>::Ok).rev()); + super::history::print_list(stream, human, cmd_only) + .await + .unwrap(); } Ok(()) diff --git a/src/command/stats.rs b/src/command/stats.rs index 742202ae..5e3a67c2 100644 --- a/src/command/stats.rs +++ b/src/command/stats.rs @@ -6,6 +6,7 @@ use chrono_english::parse_date_string; use cli_table::{format::Justify, print_stdout, Cell, Style, Table}; use eyre::{eyre, Result}; +use futures::TryStreamExt; use structopt::StructOpt; use atuin_client::database::Database; @@ -95,8 +96,7 @@ impl Cmd { } Self::All => { - let history = db.list(None, false).await?; - + let history: Vec<History> = db.list(None, false).try_collect().await?; compute_stats(&history)?; Ok(()) |