summaryrefslogtreecommitdiffstats
path: root/src/fzf.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/fzf.rs')
-rw-r--r--src/fzf.rs255
1 files changed, 120 insertions, 135 deletions
diff --git a/src/fzf.rs b/src/fzf.rs
index a1fa27d..c2c0249 100644
--- a/src/fzf.rs
+++ b/src/fzf.rs
@@ -1,25 +1,130 @@
-use super::errors::{Failure, SadResult, SadnessFrom};
+use super::subprocess::stream_into;
use super::subprocess::SubprocessCommand;
-use super::types::Task;
-use async_channel::{bounded, Receiver, Sender};
+use super::types::{Abort, Fail};
use futures::future::try_join;
-use std::{collections::HashMap, env, path::PathBuf, process::Stdio};
+use std::{collections::HashMap, env, path::PathBuf, process::Stdio, sync::Arc};
use tokio::{
- io::{self, AsyncWriteExt, BufWriter},
+ io::{AsyncWriteExt, BufWriter, ErrorKind},
process::Command,
- select, task,
+ select,
+ sync::mpsc::Receiver,
+ task::{spawn, JoinHandle},
};
use which::which;
-pub fn run_fzf(
+async fn reset_term() -> Result<(), Fail> {
+ if let Ok(path) = which("tput") {
+ let status = Command::new(&path)
+ .kill_on_drop(true)
+ .stdin(Stdio::null())
+ .arg("reset")
+ .status()
+ .await
+ .map_err(|e| Fail::IO(path, e.kind()))?;
+
+ if status.success() {
+ return Ok(());
+ }
+ }
+ if let Ok(path) = which("reset") {
+ let status = Command::new(&path)
+ .kill_on_drop(true)
+ .stdin(Stdio::null())
+ .status()
+ .await
+ .map_err(|e| Fail::IO(path, e.kind()))?;
+ if status.success() {
+ return Ok(());
+ }
+ }
+ Err(Fail::IO(PathBuf::from("reset"), ErrorKind::NotFound))
+}
+
+fn run_fzf(abort: &Arc<Abort>, cmd: SubprocessCommand, stream: Receiver<String>) -> JoinHandle<()> {
+ let abort = abort.clone();
+
+ spawn(async move {
+ let subprocess = Command::new(&cmd.prog)
+ .kill_on_drop(true)
+ .args(&cmd.args)
+ .envs(&cmd.env)
+ .stdin(Stdio::piped())
+ .spawn();
+
+ match subprocess {
+ Err(err) => {
+ abort.send(Fail::IO(cmd.prog, err.kind())).await;
+ }
+ Ok(mut child) => {
+ let mut stdin = child.stdin.take().map(BufWriter::new).expect("nil stdin");
+
+ let abort_1 = abort.clone();
+ let p1 = cmd.prog.clone();
+ let handle_in = spawn(async move {
+ stream_into(&abort_1, p1.clone(), &mut stdin, stream).await;
+ if let Err(err) = stdin.shutdown().await {
+ abort_1.send(Fail::IO(p1, err.kind())).await;
+ }
+ });
+
+ let abort_2 = abort.clone();
+ let p2 = cmd.prog.clone();
+ let handle_child = spawn(async move {
+ select! {
+ _ = abort_2.notified() => {
+ match child.kill().await {
+ Err(err) => {
+ abort_2.send(Fail::IO(p2, err.kind())).await;
+ },
+ _ => {
+ if let Err(err) = reset_term().await {
+ abort_2.send(err).await;
+ }
+ }
+ }
+ },
+ rhs = child.wait() => {
+ match rhs {
+ Ok(status) => {
+ match status.code() {
+ Some(0) | Some(1) | None => (),
+ Some(130) => {
+ abort_2.send(Fail::Interrupt).await;
+ }
+ Some(c) => {
+ abort_2.send(Fail::BadExit(p2, c)).await;
+ if let Err(err) = reset_term().await {
+ abort_2.send(err).await;
+ }
+ }
+ }
+ }
+ Err(err) => {
+ abort_2.send(Fail::IO(p2, err.kind())).await;
+ }
+ }
+ }
+ }
+ });
+
+ if let Err(err) = try_join(handle_child, handle_in).await {
+ abort.send(err.into()).await;
+ }
+ }
+ }
+ })
+}
+
+pub fn stream_fzf(
+ abort: &Arc<Abort>,
bin: PathBuf,
args: Vec<String>,
- stream: Receiver<SadResult<String>>,
-) -> (Task, Receiver<SadResult<String>>) {
+ stream: Receiver<String>,
+) -> JoinHandle<()> {
let sad = env::current_exe()
.or_else(|_| which("sad".to_owned()))
.map(|p| format!("{}", p.display()))
- .unwrap_or("sad".to_owned());
+ .unwrap_or_else(|_| "sad".to_owned());
let preview_args = env::args().skip(1).collect::<Vec<_>>().join("\x04");
let execute = format!(
@@ -42,132 +147,12 @@ pub fn run_fzf(
arguments.extend(args);
let mut env = HashMap::new();
env.insert("SHELL".to_owned(), sad);
+ // WARN -- versions of FZF only work with C locale.
+ env.insert("LC_ALL".to_owned(), "C".to_owned());
let cmd = SubprocessCommand {
- program: bin,
- arguments,
+ prog: bin,
+ args: arguments,
env,
};
- stream_fzf(&cmd, stream)
-}
-
-fn stream_fzf(
- cmd: &SubprocessCommand,
- stream: Receiver<SadResult<String>>,
-) -> (Task, Receiver<SadResult<String>>) {
- let (tx, rx) = bounded::<SadResult<String>>(1);
- let (tix, rix) = bounded::<Failure>(1);
- let ta = Sender::clone(&tx);
-
- let subprocess = Command::new(&cmd.program)
- .args(&cmd.arguments)
- .envs(&cmd.env)
- .kill_on_drop(true)
- .stdin(Stdio::piped())
- .spawn();
-
- let mut child = match subprocess.into_sadness() {
- Ok(child) => child,
- Err(err) => {
- let handle = task::spawn(async move { tx.send(Err(err)).await.expect("<CHANNEL>") });
- return (handle, rx);
- }
- };
-
- let mut stdin = child.stdin.take().map(BufWriter::new).expect("nil stdin");
-
- let handle_in = task::spawn(async move {
- while let Ok(print) = stream.recv().await {
- match print {
- Ok(val) => {
- if let Err(err) = stdin.write(val.as_bytes()).await.into_sadness() {
- tix.send(err).await.expect("<CHAN>")
- }
- }
- Err(err) => {
- tix.send(err).await.expect("<CHANNEL>");
- break;
- }
- }
- }
- if let Err(err) = stdin.shutdown().await {
- tix.send(err.into()).await.expect("<CHANNEL>")
- }
- });
-
- let handle_kill = task::spawn(async move {
- match rix.recv().await {
- Ok(err) => Some(err),
- Err(_) => None,
- }
- });
-
- let handle_child = task::spawn(async move {
- select! {
- lhs = child.wait() => {
- match lhs {
- Ok(status) => process_status_code(status.code(), tx).await,
- Err(err) => tx.send(Err(err.into())).await.expect("<CHANNEL>")
- }
- },
- rhs = handle_kill => {
- match rhs {
- Ok(Some(err)) => {
- let err = combine_err(err, child.kill().await.into_sadness());
- let err = combine_err(err, child.wait().await.into_sadness());
- let err = combine_err(err, reset_term().await);
- tx.send(Err(err)).await.expect("<CHAN>")
- },
- Ok(None) => match child.wait().await.into_sadness() {
- Err(err) => tx.send(Err(err)).await.expect("<CHANNEL>"),
- Ok(status) => process_status_code(status.code(), tx).await,
- }
- Err(err) => tx.send(Err(err.into())).await.expect("<CHANNEL>")
- }
- }
- }
- });
-
- let handle = task::spawn(async move {
- if let Err(err) = try_join(handle_child, handle_in).await {
- ta.send(Err(err.into())).await.expect("<CHAN>")
- }
- });
-
- (handle, rx)
-}
-
-fn combine_err<T>(err: Failure, res: SadResult<T>) -> Failure {
- match res {
- Ok(_) => err,
- Err(e) => Failure::Compound(Box::new(err), Box::new(e)),
- }
-}
-
-async fn process_status_code(code: Option<i32>, tx: Sender<SadResult<String>>) {
- match code {
- Some(0) | Some(1) | None => {}
- Some(130) => tx.send(Err(Failure::Interrupt)).await.expect("<CHANNEL>"),
- Some(c) => tx
- .send(Err(Failure::Fzf(format!("Error exit - {}", c))))
- .await
- .expect("<CHANNEL>"),
- }
-}
-
-async fn reset_term() -> SadResult<()> {
- io::stdout().flush().await.into_sadness()?;
- io::stderr().flush().await.into_sadness()?;
- if which("tput").is_ok() {
- Command::new("tput")
- .arg("reset")
- .status()
- .await
- .into_sadness()?;
- Ok(())
- } else if which("reset").is_ok() {
- Command::new("reset").status().await.into_sadness()?;
- Ok(())
- } else {
- Err(Failure::Fzf("Unable to clear screen".to_owned()))
- }
+ run_fzf(abort, cmd, stream)
}