summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorConrad Ludgate <conradludgate@gmail.com>2021-11-21 19:40:50 +0000
committerConrad Ludgate <conradludgate@gmail.com>2021-11-21 19:40:50 +0000
commit04c3f6cdc659c157d48e773539a4b3bc0851f9d5 (patch)
treeb7b36211860d0a04dd08f42cf1ed63a625352c17
parent6e8ec8689d85bf97e79455f7c3644daa5c8e02fd (diff)
-rw-r--r--Cargo.lock2
-rw-r--r--Cargo.toml1
-rw-r--r--atuin-client/Cargo.toml1
-rw-r--r--atuin-client/src/database.rs104
-rw-r--r--atuin-client/src/sync.rs8
-rw-r--r--src/command/history.rs50
-rw-r--r--src/command/import.rs16
-rw-r--r--src/command/search.rs24
-rw-r--r--src/command/stats.rs8
-rw-r--r--src/command/sync.rs4
10 files changed, 94 insertions, 124 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 94ca3e13..52afa9de 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -105,6 +105,7 @@ dependencies = [
"directories",
"eyre",
"fork",
+ "futures",
"humantime 2.1.0",
"indicatif",
"itertools",
@@ -134,6 +135,7 @@ dependencies = [
"directories",
"eyre",
"fern",
+ "futures",
"humantime 2.1.0",
"indicatif",
"itertools",
diff --git a/Cargo.toml b/Cargo.toml
index 259a3308..d1293bc1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -56,6 +56,7 @@ base64 = "0.13.0"
humantime = "2.1.0"
tabwriter = "1.2.1"
crossbeam-channel = "0.5.1"
+futures = "0.3"
[profile.release]
lto = "fat"
diff --git a/atuin-client/Cargo.toml b/atuin-client/Cargo.toml
index e0e50ac1..593fa86b 100644
--- a/atuin-client/Cargo.toml
+++ b/atuin-client/Cargo.toml
@@ -41,6 +41,7 @@ itertools = "0.10.1"
shellexpand = "2"
sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "uuid", "chrono", "sqlite" ] }
minspan = "0.1.1"
+futures = "0.3"
[dev-dependencies]
tokio-test = "*"
diff --git a/atuin-client/src/database.rs b/atuin-client/src/database.rs
index 8dff15fd..536b6811 100644
--- a/atuin-client/src/database.rs
+++ b/atuin-client/src/database.rs
@@ -1,11 +1,11 @@
use std::path::Path;
use std::str::FromStr;
-use async_trait::async_trait;
use chrono::prelude::*;
use chrono::Utc;
use eyre::Result;
+use futures::Stream;
use itertools::Itertools;
use sqlx::sqlite::{
@@ -17,36 +17,6 @@ use super::history::History;
use super::ordering;
use super::settings::SearchMode;
-#[async_trait]
-pub trait Database {
- async fn save(&mut self, h: &History) -> Result<()>;
- async fn save_bulk(&mut self, h: &[History]) -> Result<()>;
-
- async fn load(&self, id: &str) -> Result<History>;
- async fn list(&self, max: Option<usize>, unique: bool) -> Result<Vec<History>>;
- async fn range(
- &self,
- from: chrono::DateTime<Utc>,
- to: chrono::DateTime<Utc>,
- ) -> Result<Vec<History>>;
-
- async fn update(&self, h: &History) -> Result<()>;
- async fn history_count(&self) -> Result<i64>;
-
- async fn first(&self) -> Result<History>;
- async fn last(&self) -> Result<History>;
- async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>>;
-
- async fn search(
- &self,
- limit: Option<i64>,
- search_mode: SearchMode,
- query: &str,
- ) -> Result<Vec<History>>;
-
- async fn query_history(&self, query: &str) -> Result<Vec<History>>;
-}
-
// Intended for use on a developer machine and not a sync server.
// TODO: implement IntoIterator
pub struct Sqlite {
@@ -103,7 +73,7 @@ impl Sqlite {
Ok(())
}
- fn query_history(row: SqliteRow) -> History {
+ fn query_history_row(row: SqliteRow) -> History {
History {
id: row.get("id"),
timestamp: Utc.timestamp_nanos(row.get("timestamp")),
@@ -117,9 +87,8 @@ impl Sqlite {
}
}
-#[async_trait]
-impl Database for Sqlite {
- async fn save(&mut self, h: &History) -> Result<()> {
+impl Sqlite {
+ pub async fn save(&mut self, h: &History) -> Result<()> {
debug!("saving history to sqlite");
let mut tx = self.pool.begin().await?;
@@ -129,7 +98,7 @@ impl Database for Sqlite {
Ok(())
}
- async fn save_bulk(&mut self, h: &[History]) -> Result<()> {
+ pub async fn save_bulk(&mut self, h: &[History]) -> Result<()> {
debug!("saving history to sqlite");
let mut tx = self.pool.begin().await?;
@@ -143,19 +112,19 @@ impl Database for Sqlite {
Ok(())
}
- async fn load(&self, id: &str) -> Result<History> {
+ pub async fn load(&self, id: &str) -> Result<History> {
debug!("loading history item {}", id);
let res = sqlx::query("select * from history where id = ?1")
.bind(id)
- .map(Self::query_history)
+ .map(Self::query_history_row)
.fetch_one(&self.pool)
.await?;
Ok(res)
}
- async fn update(&self, h: &History) -> Result<()> {
+ pub async fn update(&self, h: &History) -> Result<()> {
debug!("updating sqlite history");
sqlx::query(
@@ -178,14 +147,14 @@ impl Database for Sqlite {
}
// make a unique list, that only shows the *newest* version of things
- async fn list(&self, max: Option<usize>, unique: bool) -> Result<Vec<History>> {
+ pub fn list(&self, max: Option<usize>, unique: bool) -> String {
debug!("listing history");
// very likely vulnerable to SQL injection
// however, this is client side, and only used by the client, on their
// own data. They can just open the db file...
// otherwise building the query is awkward
- let query = format!(
+ format!(
"select * from history h
{}
order by timestamp desc
@@ -205,17 +174,14 @@ impl Database for Sqlite {
} else {
"".to_string()
}
- );
-
- let res = sqlx::query(query.as_str())
- .map(Self::query_history)
- .fetch_all(&self.pool)
- .await?;
+ )
- Ok(res)
+ // sqlx::query(query.as_str())
+ // .map(Self::query_history_row)
+ // .fetch(&self.pool)
}
- async fn range(
+ pub async fn range(
&self,
from: chrono::DateTime<Utc>,
to: chrono::DateTime<Utc>,
@@ -227,48 +193,41 @@ impl Database for Sqlite {
)
.bind(from)
.bind(to)
- .map(Self::query_history)
+ .map(Self::query_history_row)
.fetch_all(&self.pool)
.await?;
Ok(res)
}
- async fn first(&self) -> Result<History> {
+ pub async fn first(&self) -> Result<History> {
let res =
sqlx::query("select * from history where duration >= 0 order by timestamp asc limit 1")
- .map(Self::query_history)
+ .map(Self::query_history_row)
.fetch_one(&self.pool)
.await?;
Ok(res)
}
- async fn last(&self) -> Result<History> {
- let res = sqlx::query(
- "select * from history where duration >= 0 order by timestamp desc limit 1",
- )
- .map(Self::query_history)
- .fetch_one(&self.pool)
- .await?;
-
- Ok(res)
+ pub fn last(&self) -> &'static str {
+ "select * from history where duration >= 0 order by timestamp desc limit 1"
}
- async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>> {
+ pub async fn before(&self, timestamp: chrono::DateTime<Utc>, count: i64) -> Result<Vec<History>> {
let res = sqlx::query(
"select * from history where timestamp < ?1 order by timestamp desc limit ?2",
)
.bind(timestamp.timestamp_nanos())
.bind(count)
- .map(Self::query_history)
+ .map(Self::query_history_row)
.fetch_all(&self.pool)
.await?;
Ok(res)
}
- async fn history_count(&self) -> Result<i64> {
+ pub async fn history_count(&self) -> Result<i64> {
let res: (i64,) = sqlx::query_as("select count(1) from history")
.fetch_one(&self.pool)
.await?;
@@ -276,7 +235,7 @@ impl Database for Sqlite {
Ok(res.0)
}
- async fn search(
+ pub async fn search(
&self,
limit: Option<i64>,
search_mode: SearchMode,
@@ -306,20 +265,17 @@ impl Database for Sqlite {
.as_str(),
)
.bind(query)
- .map(Self::query_history)
+ .map(Self::query_history_row)
.fetch_all(&self.pool)
.await?;
Ok(ordering::reorder_fuzzy(search_mode, orig_query, res))
}
- async fn query_history(&self, query: &str) -> Result<Vec<History>> {
- let res = sqlx::query(query)
- .map(Self::query_history)
- .fetch_all(&self.pool)
- .await?;
-
- Ok(res)
+ pub fn query_history<'q: 'e, 'e>(&'e self, query: &'q str) -> impl Stream<Item = Result<History, sqlx::Error>> +'e {
+ sqlx::query(query)
+ .map(Self::query_history_row)
+ .fetch(&self.pool)
}
}
@@ -327,7 +283,7 @@ impl Database for Sqlite {
mod test {
use super::*;
- async fn new_history_item(db: &mut impl Database, cmd: &str) -> Result<()> {
+ async fn new_history_item(db: &mut Sqlite, cmd: &str) -> Result<()> {
let history = History::new(
chrono::Utc::now(),
cmd.to_string(),
diff --git a/atuin-client/src/sync.rs b/atuin-client/src/sync.rs
index c1c02b0a..1175cd4f 100644
--- a/atuin-client/src/sync.rs
+++ b/atuin-client/src/sync.rs
@@ -6,7 +6,7 @@ use eyre::Result;
use atuin_common::{api::AddHistoryRequest, utils::hash_str};
use crate::api_client;
-use crate::database::Database;
+use crate::database::Sqlite;
use crate::encryption::{encrypt, load_encoded_key, load_key};
use crate::settings::{Settings, HISTORY_PAGE_SIZE};
@@ -24,7 +24,7 @@ use crate::settings::{Settings, HISTORY_PAGE_SIZE};
async fn sync_download(
force: bool,
client: &api_client::Client<'_>,
- db: &mut (impl Database + Send),
+ db: &mut Sqlite,
) -> Result<(i64, i64)> {
debug!("starting sync download");
@@ -80,7 +80,7 @@ async fn sync_upload(
settings: &Settings,
_force: bool,
client: &api_client::Client<'_>,
- db: &mut (impl Database + Send),
+ db: &mut Sqlite,
) -> Result<()> {
debug!("starting sync upload");
@@ -130,7 +130,7 @@ async fn sync_upload(
Ok(())
}
-pub async fn sync(settings: &Settings, force: bool, db: &mut (impl Database + Send)) -> Result<()> {
+pub async fn sync(settings: &Settings, force: bool, db: &mut Sqlite) -> Result<()> {
let client = api_client::Client::new(
&settings.sync_address,
&settings.session_token,
diff --git a/src/command/history.rs b/src/command/history.rs
index 4606b304..9795d672 100644
--- a/src/command/history.rs
+++ b/src/command/history.rs
@@ -3,10 +3,11 @@ use std::io::Write;
use std::time::Duration;
use eyre::Result;
+use futures::stream::{Stream, TryStreamExt};
use structopt::StructOpt;
use tabwriter::TabWriter;
-use atuin_client::database::Database;
+use atuin_client::database::Sqlite;
use atuin_client::history::History;
use atuin_client::settings::Settings;
use atuin_client::sync;
@@ -61,10 +62,10 @@ pub enum Cmd {
}
#[allow(clippy::cast_sign_loss)]
-pub fn print_list(h: &[History], human: bool, cmd_only: bool) {
+pub async fn print_list<E>(h: impl Stream<Item = Result<History, E>>, human: bool, cmd_only: bool) -> Result<(), E> {
let mut writer = TabWriter::new(std::io::stdout()).padding(2);
- let lines = h.iter().map(|h| {
+ let lines = h.map_ok(|h| {
if human {
let duration = humantime::format_duration(Duration::from_nanos(std::cmp::max(
h.duration, 0,
@@ -91,20 +92,26 @@ pub fn print_list(h: &[History], human: bool, cmd_only: bool) {
}
});
- for i in lines.rev() {
- writer
- .write_all(i.as_bytes())
- .expect("failed to write to tab writer");
- }
+ let fut = lines
+ .try_for_each(|i| {
+ writer
+ .write_all(i.as_bytes())
+ .expect("failed to write to tab writer");
+
+ futures::future::ready(Ok(()))
+ });
+ fut.await?;
writer.flush().expect("failed to flush tab writer");
+
+ Ok(())
}
impl Cmd {
pub async fn run(
&self,
settings: &Settings,
- db: &mut (impl Database + Send + Sync),
+ db: &mut Sqlite,
) -> Result<()> {
match self {
Self::Start { command: words } => {
@@ -171,33 +178,34 @@ impl Cmd {
None
};
- let history = match (session, cwd) {
- (None, None) => db.list(None, false).await?,
+ let query = match (session, cwd) {
+ (None, None) => db.list(None, false),
(None, Some(cwd)) => {
- let query = format!("select * from history where cwd = {};", cwd);
- db.query_history(&query).await?
+ format!("select * from history where cwd = {};", cwd)
}
(Some(session), None) => {
- let query = format!("select * from history where session = {};", session);
- db.query_history(&query).await?
+ format!("select * from history where session = {};", session)
}
(Some(session), Some(cwd)) => {
- let query = format!(
+ format!(
"select * from history where cwd = {} and session = {};",
cwd, session
- );
- db.query_history(&query).await?
+ )
}
};
- print_list(&history, *human, *cmd_only);
+ let history = db.query_history(&query);
+
+ print_list(history, *human, *cmd_only).await?;
Ok(())
}
Self::Last { human, cmd_only } => {
- let last = db.last().await?;
- print_list(&[last], *human, *cmd_only);
+ let last = db.last();
+ let history = db.query_history(last);
+
+ print_list(history, *human, *cmd_only).await?;
Ok(())
}
diff --git a/src/command/import.rs b/src/command/import.rs
index 53940abb..4fd5b31e 100644
--- a/src/command/import.rs
+++ b/src/command/import.rs
@@ -4,7 +4,7 @@ use eyre::{eyre, Result};
use structopt::StructOpt;
use atuin_client::import::{bash::Bash, zsh::Zsh};
-use atuin_client::{database::Database, import::Importer};
+use atuin_client::{database::Sqlite, import::Importer};
use atuin_client::{history::History, import::resh::Resh};
use indicatif::ProgressBar;
@@ -38,7 +38,7 @@ pub enum Cmd {
const BATCH_SIZE: usize = 100;
impl Cmd {
- pub async fn run(&self, db: &mut (impl Database + Send + Sync)) -> Result<()> {
+ pub async fn run(&self, db: &mut Sqlite) -> Result<()> {
println!(" Atuin ");
println!("======================");
println!(" \u{1f30d} ");
@@ -53,22 +53,22 @@ impl Cmd {
if shell.ends_with("/zsh") {
println!("Detected ZSH");
- import::<Zsh<_>, _>(db, BATCH_SIZE).await
+ import::<Zsh<_>>(db, BATCH_SIZE).await
} else {
println!("cannot import {} history", shell);
Ok(())
}
}
- Self::Zsh => import::<Zsh<_>, _>(db, BATCH_SIZE).await,
- Self::Bash => import::<Bash<_>, _>(db, BATCH_SIZE).await,
- Self::Resh => import::<Resh, _>(db, BATCH_SIZE).await,
+ Self::Zsh => import::<Zsh<_>>(db, BATCH_SIZE).await,
+ Self::Bash => import::<Bash<_>>(db, BATCH_SIZE).await,
+ Self::Resh => import::<Resh>(db, BATCH_SIZE).await,
}
}
}
-async fn import<I: Importer + Send, DB: Database + Send + Sync>(
- db: &mut DB,
+async fn import<I: Importer + Send>(
+ db: &mut Sqlite,
buf_size: usize,
) -> Result<()>
where
diff --git a/src/command/search.rs b/src/command/search.rs
index 00e11f52..6b6e50e3 100644
--- a/src/command/search.rs
+++ b/src/command/search.rs
@@ -14,7 +14,7 @@ use tui::{
};
use unicode_width::UnicodeWidthStr;
-use atuin_client::database::Database;
+use atuin_client::database::Sqlite;
use atuin_client::history::History;
use atuin_client::settings::{SearchMode, Settings};
@@ -151,13 +151,9 @@ impl State {
}
}
-async fn query_results(
- app: &mut State,
- search_mode: SearchMode,
- db: &mut (impl Database + Send + Sync),
-) -> Result<()> {
+async fn query_results(app: &mut State, search_mode: SearchMode, db: &mut Sqlite) -> Result<()> {
let results = match app.input.as_str() {
- "" => db.list(Some(200), true).await?,
+ // "" => db.list(Some(200), true).await?,
i => db.search(Some(200), search_mode, i).await?,
};
@@ -175,7 +171,7 @@ async fn query_results(
async fn key_handler(
input: Key,
search_mode: SearchMode,
- db: &mut (impl Database + Send + Sync),
+ db: &mut Sqlite,
app: &mut State,
) -> Option<String> {
match input {
@@ -314,7 +310,7 @@ fn draw<T: Backend>(f: &mut Frame<'_, T>, history_count: i64, app: &mut State) {
async fn select_history(
query: &[String],
search_mode: SearchMode,
- db: &mut (impl Database + Send + Sync),
+ db: &mut Sqlite,
) -> Result<String> {
let stdout = stdout().into_raw_mode()?;
let stdout = MouseTerminal::from(stdout);
@@ -361,7 +357,7 @@ pub async fn run(
after: Option<String>,
cmd_only: bool,
query: &[String],
- db: &mut (impl Database + Send + Sync),
+ db: &mut Sqlite,
) -> Result<()> {
let dir = if let Some(cwd) = cwd {
if cwd == "." {
@@ -443,7 +439,13 @@ pub async fn run(
.map(std::borrow::ToOwned::to_owned)
.collect();
- super::history::print_list(&results, human, cmd_only);
+ super::history::print_list(
+ futures::stream::iter(results.into_iter().map(Result::<_, ()>::Ok)),
+ human,
+ cmd_only,
+ )
+ .await
+ .unwrap();
}
Ok(())
diff --git a/src/command/stats.rs b/src/command/stats.rs
index 742202ae..cb5bb304 100644
--- a/src/command/stats.rs
+++ b/src/command/stats.rs
@@ -8,7 +8,7 @@ use cli_table::{format::Justify, print_stdout, Cell, Style, Table};
use eyre::{eyre, Result};
use structopt::StructOpt;
-use atuin_client::database::Database;
+use atuin_client::database::Sqlite;
use atuin_client::history::History;
use atuin_client::settings::Settings;
@@ -73,7 +73,7 @@ fn compute_stats(history: &[History]) -> Result<()> {
impl Cmd {
pub async fn run(
&self,
- db: &mut (impl Database + Send + Sync),
+ db: &mut Sqlite,
settings: &Settings,
) -> Result<()> {
match self {
@@ -95,9 +95,9 @@ impl Cmd {
}
Self::All => {
- let history = db.list(None, false).await?;
+ // let history = db.list(None, false).await?;
- compute_stats(&history)?;
+ // compute_stats(&history)?;
Ok(())
}
diff --git a/src/command/sync.rs b/src/command/sync.rs
index f8bfd5e2..0bceca26 100644
--- a/src/command/sync.rs
+++ b/src/command/sync.rs
@@ -1,13 +1,13 @@
use eyre::Result;
-use atuin_client::database::Database;
+use atuin_client::database::Sqlite;
use atuin_client::settings::Settings;
use atuin_client::sync;
pub async fn run(
settings: &Settings,
force: bool,
- db: &mut (impl Database + Send + Sync),
+ db: &mut Sqlite,
) -> Result<()> {
sync::sync(settings, force, db).await?;
println!(