diff options
Diffstat (limited to 'src/fzf.rs')
-rw-r--r-- | src/fzf.rs | 255 |
1 files changed, 120 insertions, 135 deletions
@@ -1,25 +1,130 @@ -use super::errors::{Failure, SadResult, SadnessFrom}; +use super::subprocess::stream_into; use super::subprocess::SubprocessCommand; -use super::types::Task; -use async_channel::{bounded, Receiver, Sender}; +use super::types::{Abort, Fail}; use futures::future::try_join; -use std::{collections::HashMap, env, path::PathBuf, process::Stdio}; +use std::{collections::HashMap, env, path::PathBuf, process::Stdio, sync::Arc}; use tokio::{ - io::{self, AsyncWriteExt, BufWriter}, + io::{AsyncWriteExt, BufWriter, ErrorKind}, process::Command, - select, task, + select, + sync::mpsc::Receiver, + task::{spawn, JoinHandle}, }; use which::which; -pub fn run_fzf( +async fn reset_term() -> Result<(), Fail> { + if let Ok(path) = which("tput") { + let status = Command::new(&path) + .kill_on_drop(true) + .stdin(Stdio::null()) + .arg("reset") + .status() + .await + .map_err(|e| Fail::IO(path, e.kind()))?; + + if status.success() { + return Ok(()); + } + } + if let Ok(path) = which("reset") { + let status = Command::new(&path) + .kill_on_drop(true) + .stdin(Stdio::null()) + .status() + .await + .map_err(|e| Fail::IO(path, e.kind()))?; + if status.success() { + return Ok(()); + } + } + Err(Fail::IO(PathBuf::from("reset"), ErrorKind::NotFound)) +} + +fn run_fzf(abort: &Arc<Abort>, cmd: SubprocessCommand, stream: Receiver<String>) -> JoinHandle<()> { + let abort = abort.clone(); + + spawn(async move { + let subprocess = Command::new(&cmd.prog) + .kill_on_drop(true) + .args(&cmd.args) + .envs(&cmd.env) + .stdin(Stdio::piped()) + .spawn(); + + match subprocess { + Err(err) => { + abort.send(Fail::IO(cmd.prog, err.kind())).await; + } + Ok(mut child) => { + let mut stdin = child.stdin.take().map(BufWriter::new).expect("nil stdin"); + + let abort_1 = abort.clone(); + let p1 = cmd.prog.clone(); + let handle_in = spawn(async move { + stream_into(&abort_1, p1.clone(), &mut stdin, stream).await; + if let Err(err) = stdin.shutdown().await { + abort_1.send(Fail::IO(p1, err.kind())).await; + } + }); + + let abort_2 = abort.clone(); + let p2 = cmd.prog.clone(); + let handle_child = spawn(async move { + select! { + _ = abort_2.notified() => { + match child.kill().await { + Err(err) => { + abort_2.send(Fail::IO(p2, err.kind())).await; + }, + _ => { + if let Err(err) = reset_term().await { + abort_2.send(err).await; + } + } + } + }, + rhs = child.wait() => { + match rhs { + Ok(status) => { + match status.code() { + Some(0) | Some(1) | None => (), + Some(130) => { + abort_2.send(Fail::Interrupt).await; + } + Some(c) => { + abort_2.send(Fail::BadExit(p2, c)).await; + if let Err(err) = reset_term().await { + abort_2.send(err).await; + } + } + } + } + Err(err) => { + abort_2.send(Fail::IO(p2, err.kind())).await; + } + } + } + } + }); + + if let Err(err) = try_join(handle_child, handle_in).await { + abort.send(err.into()).await; + } + } + } + }) +} + +pub fn stream_fzf( + abort: &Arc<Abort>, bin: PathBuf, args: Vec<String>, - stream: Receiver<SadResult<String>>, -) -> (Task, Receiver<SadResult<String>>) { + stream: Receiver<String>, +) -> JoinHandle<()> { let sad = env::current_exe() .or_else(|_| which("sad".to_owned())) .map(|p| format!("{}", p.display())) - .unwrap_or("sad".to_owned()); + .unwrap_or_else(|_| "sad".to_owned()); let preview_args = env::args().skip(1).collect::<Vec<_>>().join("\x04"); let execute = format!( @@ -42,132 +147,12 @@ pub fn run_fzf( arguments.extend(args); let mut env = HashMap::new(); env.insert("SHELL".to_owned(), sad); + // WARN -- versions of FZF only work with C locale. + env.insert("LC_ALL".to_owned(), "C".to_owned()); let cmd = SubprocessCommand { - program: bin, - arguments, + prog: bin, + args: arguments, env, }; - stream_fzf(&cmd, stream) -} - -fn stream_fzf( - cmd: &SubprocessCommand, - stream: Receiver<SadResult<String>>, -) -> (Task, Receiver<SadResult<String>>) { - let (tx, rx) = bounded::<SadResult<String>>(1); - let (tix, rix) = bounded::<Failure>(1); - let ta = Sender::clone(&tx); - - let subprocess = Command::new(&cmd.program) - .args(&cmd.arguments) - .envs(&cmd.env) - .kill_on_drop(true) - .stdin(Stdio::piped()) - .spawn(); - - let mut child = match subprocess.into_sadness() { - Ok(child) => child, - Err(err) => { - let handle = task::spawn(async move { tx.send(Err(err)).await.expect("<CHANNEL>") }); - return (handle, rx); - } - }; - - let mut stdin = child.stdin.take().map(BufWriter::new).expect("nil stdin"); - - let handle_in = task::spawn(async move { - while let Ok(print) = stream.recv().await { - match print { - Ok(val) => { - if let Err(err) = stdin.write(val.as_bytes()).await.into_sadness() { - tix.send(err).await.expect("<CHAN>") - } - } - Err(err) => { - tix.send(err).await.expect("<CHANNEL>"); - break; - } - } - } - if let Err(err) = stdin.shutdown().await { - tix.send(err.into()).await.expect("<CHANNEL>") - } - }); - - let handle_kill = task::spawn(async move { - match rix.recv().await { - Ok(err) => Some(err), - Err(_) => None, - } - }); - - let handle_child = task::spawn(async move { - select! { - lhs = child.wait() => { - match lhs { - Ok(status) => process_status_code(status.code(), tx).await, - Err(err) => tx.send(Err(err.into())).await.expect("<CHANNEL>") - } - }, - rhs = handle_kill => { - match rhs { - Ok(Some(err)) => { - let err = combine_err(err, child.kill().await.into_sadness()); - let err = combine_err(err, child.wait().await.into_sadness()); - let err = combine_err(err, reset_term().await); - tx.send(Err(err)).await.expect("<CHAN>") - }, - Ok(None) => match child.wait().await.into_sadness() { - Err(err) => tx.send(Err(err)).await.expect("<CHANNEL>"), - Ok(status) => process_status_code(status.code(), tx).await, - } - Err(err) => tx.send(Err(err.into())).await.expect("<CHANNEL>") - } - } - } - }); - - let handle = task::spawn(async move { - if let Err(err) = try_join(handle_child, handle_in).await { - ta.send(Err(err.into())).await.expect("<CHAN>") - } - }); - - (handle, rx) -} - -fn combine_err<T>(err: Failure, res: SadResult<T>) -> Failure { - match res { - Ok(_) => err, - Err(e) => Failure::Compound(Box::new(err), Box::new(e)), - } -} - -async fn process_status_code(code: Option<i32>, tx: Sender<SadResult<String>>) { - match code { - Some(0) | Some(1) | None => {} - Some(130) => tx.send(Err(Failure::Interrupt)).await.expect("<CHANNEL>"), - Some(c) => tx - .send(Err(Failure::Fzf(format!("Error exit - {}", c)))) - .await - .expect("<CHANNEL>"), - } -} - -async fn reset_term() -> SadResult<()> { - io::stdout().flush().await.into_sadness()?; - io::stderr().flush().await.into_sadness()?; - if which("tput").is_ok() { - Command::new("tput") - .arg("reset") - .status() - .await - .into_sadness()?; - Ok(()) - } else if which("reset").is_ok() { - Command::new("reset").status().await.into_sadness()?; - Ok(()) - } else { - Err(Failure::Fzf("Unable to clear screen".to_owned())) - } + run_fzf(abort, cmd, stream) } |