summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorConrad Ludgate <conradludgate@gmail.com>2021-12-11 11:04:18 +0000
committerConrad Ludgate <conradludgate@gmail.com>2021-12-11 12:18:43 +0000
commit1037af30361896546675722327fa77d0fdd35e86 (patch)
treeda3e161b94dc8ac9f47ccc67ff2deb9a6e9cbbb0
parent87df7d80eca0ede9e149d1ef533e71650e4b919a (diff)
streamingv2streaming2
-rw-r--r--Cargo.lock19
-rw-r--r--Cargo.toml1
-rw-r--r--atuin-client/Cargo.toml1
-rw-r--r--atuin-client/src/database.rs112
-rw-r--r--src/command/history.rs66
-rw-r--r--src/command/search.rs88
-rw-r--r--src/command/stats.rs4
7 files changed, 166 insertions, 125 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8f3c04fe..df209d96 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -84,6 +84,7 @@ dependencies = [
"directories",
"eyre",
"fork",
+ "futures",
"humantime 2.1.0",
"indicatif",
"itertools",
@@ -113,6 +114,7 @@ dependencies = [
"directories",
"eyre",
"fern",
+ "futures-core",
"humantime 2.1.0",
"indicatif",
"itertools",
@@ -632,6 +634,7 @@ checksum = "a9d5813545e459ad3ca1bff9915e9ad7f1a47dc6a91b627ce321d5863b7dd253"
dependencies = [
"futures-channel",
"futures-core",
+ "futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@@ -655,6 +658,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "098cd1c6dda6ca01650f1a37a794245eb73181d0d4d4e955e2f3c37db7af1815"
[[package]]
+name = "futures-executor"
+version = "0.3.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "10f6cb7042eda00f0049b1d2080aa4b93442997ee507eb3828e8bd7577f94c9d"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-intrusive"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -701,6 +715,7 @@ version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c144ad54d60f23927f0a6b6d816e4271278b64f005ad65e4e35291d2de9c025"
dependencies = [
+ "futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@@ -2000,9 +2015,9 @@ dependencies = [
[[package]]
name = "sqlx"
-version = "0.5.7"
+version = "0.5.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0e4b94ab0f8c21ee4899b93b06451ef5d965f1a355982ee73684338228498440"
+checksum = "7911b0031a0247af40095838002999c7a52fba29d9739e93326e71a5a1bc9d43"
dependencies = [
"sqlx-core",
"sqlx-macros",
diff --git a/Cargo.toml b/Cargo.toml
index f5504024..c7666b8a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -56,6 +56,7 @@ base64 = "0.13.0"
humantime = "2.1.0"
tabwriter = "1.2.1"
crossbeam-channel = "0.5.1"
+futures = "0.3"
[profile.release]
lto = "fat"
diff --git a/atuin-client/Cargo.toml b/atuin-client/Cargo.toml
index ab0f24c9..bebaa380 100644
--- a/atuin-client/Cargo.toml
+++ b/atuin-client/Cargo.toml
@@ -41,3 +41,4 @@ itertools = "0.10.3"
shellexpand = "2"
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "uuid", "chrono", "sqlite" ] }
minspan = "0.1.1"
+futures-core = "0.3"
diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs
index 8dff15fd..690190b6 100644
--- a/atuin-client/src/database.rs
+++ b/atuin-client/src/database.rs
@@ -6,6 +6,7 @@ use chrono::prelude::*;
use chrono::Utc;
use eyre::Result;
+use futures_core::stream::BoxStream;
use itertools::Itertools;
use sqlx::sqlite::{
@@ -23,7 +24,6 @@ pub trait Database {
async fn save_bulk(&mut self, h: &[History]) -> Result<()>;
async fn load(&self, id: &str) -> Result<History>;
- async fn list(&self, max: Option<usize>, unique: bool) -> Result<Vec<History>>;
async fn range(
&self,
from: chrono::DateTime<Utc>,
@@ -44,7 +44,16 @@ pub trait Database {
query: &str,
) -> Result<Vec<History>>;
- async fn query_history(&self, query: &str) -> Result<Vec<History>>;
+ fn list(&self, max: Option<usize>, unique: bool) -> BoxStream<Result<History, sqlx::Error>>;
+ fn find(
+ &self,
+ session: Option<String>,
+ cwd: Option<String>,
+ ) -> BoxStream<Result<History, sqlx::Error>>;
+ fn query_history<'e, 'q: 'e>(
+ &'e self,
+ query: &'q str,
+ ) -> BoxStream<'e, Result<History, sqlx::Error>>;
}
// Intended for use on a developer machine and not a sync server.
@@ -178,41 +187,70 @@ impl Database for Sqlite {
}
// make a unique list, that only shows the *newest* version of things
- async fn list(&self, max: Option<usize>, unique: bool) -> Result<Vec<History>> {
+ fn list(&self, max: Option<usize>, unique: bool) -> BoxStream<Result<History, sqlx::Error>> {
debug!("listing history");
- // very likely vulnerable to SQL injection
- // however, this is client side, and only used by the client, on their
- // own data. They can just open the db file...
- // otherwise building the query is awkward
- let query = format!(
- "select * from history h
- {}
+ let query = match (max, unique) {
+ (None, false) => sqlx::query(
+ "select * from history h
+ order by timestamp desc",
+ ),
+
+ (None, true) => sqlx::query(
+ "select * from history h
+ where timestamp = (
+ select max(timestamp) from history
+ where h.command = history.command
+ )
+ order by timestamp desc",
+ ),
+
+ (Some(max), true) => sqlx::query(
+ "select * from history h
+ where timestamp = (
+ select max(timestamp) from history
+ where h.command = history.command
+ )
+ order by timestamp desc
+ limit ?1",
+ )
+ .bind(max as u32),
+
+ (Some(max), false) => sqlx::query(
+ "select * from history h
order by timestamp desc
- {}",
- // inject the unique check
- if unique {
- "where timestamp = (
- select max(timestamp) from history
- where h.command = history.command
- )"
- } else {
- ""
- },
- // inject the limit
- if let Some(max) = max {
- format!("limit {}", max)
- } else {
- "".to_string()
+ limit ?1",
+ )
+ .bind(max as u32),
+ };
+
+ query.map(Self::query_history).fetch(&self.pool)
+ }
+
+ fn find(
+ &self,
+ session: Option<String>,
+ cwd: Option<String>,
+ ) -> BoxStream<Result<History, sqlx::Error>> {
+ debug!("listing history");
+
+ let query = match (session, cwd) {
+ (None, None) => sqlx::query("select * from history h"),
+
+ (None, Some(cwd)) => sqlx::query("select * from history h where cwd = ?1").bind(cwd),
+
+ (Some(session), None) => {
+ sqlx::query("select * from history h where session = ?1").bind(session)
}
- );
- let res = sqlx::query(query.as_str())
- .map(Self::query_history)
- .fetch_all(&self.pool)
- .await?;
+ (Some(session), Some(cwd)) => {
+ sqlx::query("select * from history h where cwd = ?1 and session = ?2")
+ .bind(cwd)
+ .bind(session)
+ }
+ };
- Ok(res)
+ query.map(Self::query_history).fetch(&self.pool)
}
async fn range(
@@ -313,13 +351,13 @@ impl Database for Sqlite {
Ok(ordering::reorder_fuzzy(search_mode, orig_query, res))
}
- async fn query_history(&self, query: &str) -> Result<Vec<History>> {
- let res = sqlx::query(query)
+ fn query_history<'e, 'q: 'e>(
+ &'e self,
+ query: &'q str,
+ ) -> BoxStream<'e, Result<History, sqlx::Error>> {
+ sqlx::query(query)
.map(Self::query_history)
- .fetch_all(&self.pool)
- .await?;
-
- Ok(res)
+ .fetch(&self.pool)
}
}
diff --git a/src/command/history.rs b/src/command/history.rs
index 4606b304..8ceaef75 100644
--- a/src/command/history.rs
+++ b/src/command/history.rs
@@ -3,6 +3,7 @@ use std::io::Write;
use std::time::Duration;
use eyre::Result;
+use futures::{Stream, TryStreamExt};
use structopt::StructOpt;
use tabwriter::TabWriter;
@@ -61,10 +62,14 @@ pub enum Cmd {
}
#[allow(clippy::cast_sign_loss)]
-pub fn print_list(h: &[History], human: bool, cmd_only: bool) {
+pub async fn print_list<E: Send>(
+ h: impl Stream<Item = Result<History, E>> + Send,
+ human: bool,
+ cmd_only: bool,
+) -> Result<(), E> {
let mut writer = TabWriter::new(std::io::stdout()).padding(2);
- let lines = h.iter().map(|h| {
+ let lines = h.map_ok(|h| {
if human {
let duration = humantime::format_duration(Duration::from_nanos(std::cmp::max(
h.duration, 0,
@@ -91,18 +96,23 @@ pub fn print_list(h: &[History], human: bool, cmd_only: bool) {
}
});
- for i in lines.rev() {
+ let fut = lines.try_for_each(|i| {
writer
.write_all(i.as_bytes())
.expect("failed to write to tab writer");
- }
+
+ futures::future::ready(Ok(()))
+ });
+ fut.await?;
writer.flush().expect("failed to flush tab writer");
+
+ Ok(())
}
impl Cmd {
pub async fn run(
- &self,
+ self,
settings: &Settings,
db: &mut (impl Database + Send + Sync),
) -> Result<()> {
@@ -130,7 +140,7 @@ impl Cmd {
return Ok(());
}
- let mut h = db.load(id).await?;
+ let mut h = db.load(&id).await?;
if h.duration > 0 {
debug!("cannot end history - already has duration");
@@ -139,7 +149,7 @@ impl Cmd {
return Ok(());
}
- h.exit = *exit;
+ h.exit = exit;
h.duration = chrono::Utc::now().timestamp_nanos() - h.timestamp.timestamp_nanos();
db.update(&h).await?;
@@ -160,44 +170,20 @@ impl Cmd {
human,
cmd_only,
} => {
- let session = if *session {
- Some(env::var("ATUIN_SESSION")?)
- } else {
- None
- };
- let cwd = if *cwd {
- Some(env::current_dir()?.display().to_string())
- } else {
- None
- };
-
- let history = match (session, cwd) {
- (None, None) => db.list(None, false).await?,
- (None, Some(cwd)) => {
- let query = format!("select * from history where cwd = {};", cwd);
- db.query_history(&query).await?
- }
- (Some(session), None) => {
- let query = format!("select * from history where session = {};", session);
- db.query_history(&query).await?
- }
- (Some(session), Some(cwd)) => {
- let query = format!(
- "select * from history where cwd = {} and session = {};",
- cwd, session
- );
- db.query_history(&query).await?
- }
- };
-
- print_list(&history, *human, *cmd_only);
+ let session = session.then(|| env::var("ATUIN_SESSION")).transpose()?;
+ let cwd = cwd
+ .then(|| env::current_dir().map(|x| x.display().to_string()))
+ .transpose()?;
+
+ let history = db.find(session, cwd);
+ print_list(history, human, cmd_only).await?;
Ok(())
}
Self::Last { human, cmd_only } => {
- let last = db.last().await?;
- print_list(&[last], *human, *cmd_only);
+ let last = db.last().await;
+ print_list(futures::stream::iter(Some(last)), human, cmd_only).await?;
Ok(())
}
diff --git a/src/command/search.rs b/src/command/search.rs
index 00e11f52..c88f6040 100644
--- a/src/command/search.rs
+++ b/src/command/search.rs
@@ -1,5 +1,6 @@
use chrono::Utc;
use eyre::Result;
+use futures::TryStreamExt;
use std::time::Duration;
use std::{io::stdout, ops::Sub};
@@ -157,7 +158,7 @@ async fn query_results(
db: &mut (impl Database + Send + Sync),
) -> Result<()> {
let results = match app.input.as_str() {
- "" => db.list(Some(200), true).await?,
+ "" => db.list(Some(200), true).try_collect::<Vec<_>>().await?,
i => db.search(Some(200), search_mode, i).await?,
};
@@ -381,69 +382,68 @@ pub async fn run(
let item = select_history(query, settings.search_mode, db).await?;
eprintln!("{}", item);
} else {
- let results = db
+ let mut results = db
.search(None, settings.search_mode, query.join(" ").as_str())
.await?;
// TODO: This filtering would be better done in the SQL query, I just
// need a nice way of building queries.
- let results: Vec<History> = results
- .iter()
- .filter(|h| {
- if let Some(exit) = exit {
- if h.exit != exit {
- return false;
- }
+ results.retain(|h| {
+ if let Some(exit) = exit {
+ if h.exit != exit {
+ return false;
}
+ }
- if let Some(exit) = exclude_exit {
- if h.exit == exit {
- return false;
- }
+ if let Some(exit) = exclude_exit {
+ if h.exit == exit {
+ return false;
}
+ }
- if let Some(cwd) = &exclude_cwd {
- if h.cwd.as_str() == cwd.as_str() {
- return false;
- }
+ if let Some(cwd) = &exclude_cwd {
+ if h.cwd.as_str() == cwd.as_str() {
+ return false;
}
+ }
- if let Some(cwd) = &dir {
- if h.cwd.as_str() != cwd.as_str() {
- return false;
- }
+ if let Some(cwd) = &dir {
+ if h.cwd.as_str() != cwd.as_str() {
+ return false;
}
+ }
- if let Some(before) = &before {
- let before = chrono_english::parse_date_string(
- before.as_str(),
- Utc::now(),
- chrono_english::Dialect::Uk,
- );
+ if let Some(before) = &before {
+ let before = chrono_english::parse_date_string(
+ before.as_str(),
+ Utc::now(),
+ chrono_english::Dialect::Uk,
+ );
- if before.is_err() || h.timestamp.gt(&before.unwrap()) {
- return false;
- }
+ if before.is_err() || h.timestamp.gt(&before.unwrap()) {
+ return false;
}
+ }
- if let Some(after) = &after {
- let after = chrono_english::parse_date_string(
- after.as_str(),
- Utc::now(),
- chrono_english::Dialect::Uk,
- );
+ if let Some(after) = &after {
+ let after = chrono_english::parse_date_string(
+ after.as_str(),
+ Utc::now(),
+ chrono_english::Dialect::Uk,
+ );
- if after.is_err() || h.timestamp.lt(&after.unwrap()) {
- return false;
- }
+ if after.is_err() || h.timestamp.lt(&after.unwrap()) {
+ return false;
}
+ }
- true
- })
- .map(std::borrow::ToOwned::to_owned)
- .collect();
+ true
+ });
- super::history::print_list(&results, human, cmd_only);
+ let stream = futures::stream::iter(results.into_iter().map(Result::<_, ()>::Ok).rev());
+ super::history::print_list(stream, human, cmd_only)
+ .await
+ .unwrap();
}
Ok(())
diff --git a/src/command/stats.rs b/src/command/stats.rs
index 742202ae..5e3a67c2 100644
--- a/src/command/stats.rs
+++ b/src/command/stats.rs
@@ -6,6 +6,7 @@ use chrono_english::parse_date_string;
use cli_table::{format::Justify, print_stdout, Cell, Style, Table};
use eyre::{eyre, Result};
+use futures::TryStreamExt;
use structopt::StructOpt;
use atuin_client::database::Database;
@@ -95,8 +96,7 @@ impl Cmd {
}
Self::All => {
- let history = db.list(None, false).await?;
-
+ let history: Vec<History> = db.list(None, false).try_collect().await?;
compute_stats(&history)?;
Ok(())