diff options
-rw-r--r-- | Cargo.lock | 56 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/fzf.rs | 138 | ||||
-rw-r--r-- | src/main.rs | 2 | ||||
-rw-r--r-- | src/output.rs | 56 | ||||
-rw-r--r-- | src/subprocess.rs | 125 |
6 files changed, 144 insertions, 234 deletions
@@ -84,19 +84,6 @@ dependencies = [ ] [[package]] -name = "async-channel" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3" -dependencies = [ - "concurrent-queue", - "event-listener", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - -[[package]] name = "autocfg" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -195,21 +182,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] -name = "concurrent-queue" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" -dependencies = [ - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-utils" -version = "0.8.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" - -[[package]] name = "difflib" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -232,27 +204,6 @@ dependencies = [ ] [[package]] -name = "event-listener" -version = "5.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b5fb89194fa3cad959b833185b3063ba881dbfc7030680b314250779fb4cc91" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291" -dependencies = [ - "event-listener", - "pin-project-lite", -] - -[[package]] name = "futures" version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -453,12 +404,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] -name = "parking" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" - -[[package]] name = "parking_lot" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -574,7 +519,6 @@ version = "0.4.24" dependencies = [ "aho-corasick", "ansi_term", - "async-channel", "clap", "difflib", "futures", @@ -9,7 +9,6 @@ version = "0.4.24" [dependencies] aho-corasick = { version = "*" } ansi_term = { version = "*" } -async-channel = { version = "*" } clap = { version = "*", features = ["derive", "wrap_help"] } difflib = { version = "*" } futures = { version = "*" } @@ -2,7 +2,7 @@ use { super::{ argparse::Mode, subprocess::{stream_into, SubprocCommand}, - types::{Abort, Fail}, + types::Fail, }, futures::future::try_join, std::{ @@ -51,83 +51,81 @@ async fn reset_term() -> Result<(), Fail> { Err(Fail::IO(PathBuf::from("reset"), ErrorKind::NotFound)) } -fn run_fzf(abort: &Arc<Abort>, cmd: SubprocCommand, stream: Receiver<OsString>) -> JoinHandle<()> { - let abort = abort.clone(); - +fn run_fzf(cmd: SubprocCommand, stream: Receiver<OsString>) -> JoinHandle<()> { spawn(async move { - let subprocess = Command::new(&cmd.prog) - .kill_on_drop(true) - .args(&cmd.args) - .envs(&cmd.env) - .stdin(Stdio::piped()) - .spawn(); + todo!() + //let subprocess = Command::new(&cmd.prog) + // .kill_on_drop(true) + // .args(&cmd.args) + // .envs(&cmd.env) + // .stdin(Stdio::piped()) + // .spawn(); - match subprocess { - Err(err) => { - abort.send(Fail::IO(cmd.prog, err.kind())).await; - } - Ok(mut child) => { - let mut stdin = child.stdin.take().map(BufWriter::new).expect("nil stdin"); + //match subprocess { + // Err(err) => { + // abort.send(Fail::IO(cmd.prog, err.kind())).await; + // } + // Ok(mut child) => { + // let mut stdin = child.stdin.take().map(BufWriter::new).expect("nil stdin"); - let abort_1 = abort.clone(); - let p1 = cmd.prog.clone(); - let handle_in = spawn(async move { - stream_into(&abort_1, p1.clone(), &mut stdin, stream).await; - if let Err(err) = stdin.shutdown().await { - abort_1.send(Fail::IO(p1, err.kind())).await; - } - }); + // let abort_1 = abort.clone(); + // let p1 = cmd.prog.clone(); + // let handle_in = spawn(async move { + // stream_into(&abort_1, p1.clone(), &mut stdin, stream).await; + // if let Err(err) = stdin.shutdown().await { + // abort_1.send(Fail::IO(p1, err.kind())).await; + // } + // }); - let abort_2 = abort.clone(); - let p2 = cmd.prog.clone(); - let handle_child = spawn(async move { - select! { - () = abort_2.notified() => { - match child.kill().await { - Err(err) => { - abort_2.send(Fail::IO(p2, err.kind())).await; - }, - _ => { - if let Err(err) = reset_term().await { - abort_2.send(err).await; - } - } - } - }, - rhs = child.wait() => { - match rhs { - Ok(status) => { - match status.code() { - Some(0 | 1) | None => (), - Some(130) => { - abort_2.send(Fail::Interrupt).await; - } - Some(c) => { - abort_2.send(Fail::BadExit(p2, c)).await; - if let Err(err) = reset_term().await { - abort_2.send(err).await; - } - } - } - } - Err(err) => { - abort_2.send(Fail::IO(p2, err.kind())).await; - } - } - } - } - }); + // let abort_2 = abort.clone(); + // let p2 = cmd.prog.clone(); + // let handle_child = spawn(async move { + // select! { + // () = abort_2.notified() => { + // match child.kill().await { + // Err(err) => { + // abort_2.send(Fail::IO(p2, err.kind())).await; + // }, + // _ => { + // if let Err(err) = reset_term().await { + // abort_2.send(err).await; + // } + // } + // } + // }, + // rhs = child.wait() => { + // match rhs { + // Ok(status) => { + // match status.code() { + // Some(0 | 1) | None => (), + // Some(130) => { + // abort_2.send(Fail::Interrupt).await; + // } + // Some(c) => { + // abort_2.send(Fail::BadExit(p2, c)).await; + // if let Err(err) = reset_term().await { + // abort_2.send(err).await; + // } + // } + // } + // } + // Err(err) => { + // abort_2.send(Fail::IO(p2, err.kind())).await; + // } + // } + // } + // } + // }); - if let Err(err) = try_join(handle_child, handle_in).await { - abort.send(err.into()).await; - } - } - } + // if let Err(err) = try_join(handle_child, handle_in).await { + // abort.send(err.into()).await; + // } + // } + //} }) } pub fn stream_fzf_proc( - abort: &Arc<Abort>, bin: PathBuf, args: Vec<String>, stream: Receiver<OsString>, @@ -166,5 +164,5 @@ pub fn stream_fzf_proc( args: arguments, env: fzf_env, }; - run_fzf(abort, cmd, stream) + run_fzf(cmd, stream) } diff --git a/src/main.rs b/src/main.rs index e932871..f44dba4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -49,7 +49,7 @@ async fn run(threads: usize) -> Result<(), Fail> { async move { displace(&opts, input).await } }) .try_buffer_unordered(threads); - let sink = stream_sink(&opts); + let sink = stream_sink(&opts, trans_stream); Ok(()) } diff --git a/src/output.rs b/src/output.rs index 4ded17b..aeba3ed 100644 --- a/src/output.rs +++ b/src/output.rs @@ -2,64 +2,28 @@ use { super::{ argparse::{Action, Options, Printer}, fzf::stream_fzf_proc, - subprocess::stream_subproc, + subprocess::{stream_into, stream_subproc}, types::Fail, }, - futures::{ - future::ready, - sink::{unfold, Sink, SinkExt}, - stream::{Stream, StreamExt, TryStreamExt}, - }, - std::{ffi::OsString, path::PathBuf, sync::Arc}, + futures::stream::{Stream, StreamExt, TryStreamExt}, + std::{ffi::OsString, marker::Unpin, path::PathBuf, sync::Arc}, tokio::io::{self, AsyncWrite, AsyncWriteExt, BufWriter}, }; -fn stream_into<W>( - path: PathBuf, - writer: BufWriter<W>, -) -> impl Sink<Result<OsString, Fail>, Error = Fail> -where - W: AsyncWrite + Send + Unpin, -{ - unfold((writer, path), |mut s, line: Result<OsString, Fail>| async { - match line { - Err(e) => Err(e), - Ok(print) => { - #[cfg(target_family = "unix")] - let bytes = { - use std::os::unix::ffi::OsStrExt; - print.as_bytes() - }; - #[cfg(target_family = "windows")] - let bytes = { - let tmp = print.to_string_lossy(); - tmp.as_bytes() - }; - s.0 - .write_all(bytes) - .await - .map_err(|e| Fail::IO(s.1.clone(), e.kind()))?; - Ok(s) - } - } - }) -} - -pub fn stream_sink(opts: &Options) -> impl Sink<Result<OsString, Fail>, Error = Fail> { +pub fn stream_sink( + opts: &Options, + stream: impl Stream<Item = Result<OsString, Fail>> + Unpin, +) -> Box<dyn Stream<Item = Result<(), Fail>>> { match (&opts.action, &opts.printer) { (Action::FzfPreview(fzf_p, fzf_a), _) => { - //stream_fzf_proc(abort, fzf_p.clone(), fzf_a.clone(), stream) - - todo!() - } - (_, Printer::Pager(cmd)) => { - //stream_subproc(abort, cmd.clone(), stream) + //stream_fzf_proc( fzf_p, fzf_a, stream) todo!() } + (_, Printer::Pager(cmd)) => Box::new(stream_subproc(cmd.clone(), stream)), (_, Printer::Stdout) => { let stdout = BufWriter::new(io::stdout()); - stream_into(PathBuf::from("/dev/stdout"), stdout) + Box::new(stream_into(PathBuf::from("/dev/stdout"), stdout, stream)) } } } diff --git a/src/subprocess.rs b/src/subprocess.rs index 5227ec9..427613c 100644 --- a/src/subprocess.rs +++ b/src/subprocess.rs @@ -1,10 +1,12 @@ use { - super::types::{Abort, Fail}, + super::types::Fail, futures::{ - future::{select, try_join, Either}, - pin_mut, + future::{ready, select, try_join, Either}, + stream::{once, try_unfold, Stream, StreamExt}, + }, + std::{ + collections::HashMap, ffi::OsString, marker::Unpin, path::PathBuf, process::Stdio, sync::Arc, }, - std::{collections::HashMap, ffi::OsString, path::PathBuf, process::Stdio, sync::Arc}, tokio::{ io::{AsyncWrite, AsyncWriteExt, BufWriter}, process::Command, @@ -20,79 +22,82 @@ pub struct SubprocCommand { pub env: HashMap<String, String>, } -pub async fn stream_into( - abort: &Arc<Abort>, +pub fn stream_into( path: PathBuf, - writer: &mut BufWriter<impl AsyncWrite + Send + Unpin>, - mut stream: Receiver<OsString>, -) { - loop { - let f1 = abort.notified(); - let f2 = stream.recv(); - pin_mut!(f1); - pin_mut!(f2); - match select(f1, f2).await { - Either::Right((Some(print), _)) => { + writer: impl AsyncWrite + Send + Unpin, + stream: impl Stream<Item = Result<OsString, Fail>> + Unpin, +) -> impl Stream<Item = Result<(), Fail>> +where +{ + let buf = BufWriter::new(writer); + try_unfold((buf, stream, path), |mut s| async { + match s.1.next().await { + None => Ok(None), + Some(Err(e)) => Err(e), + Some(Ok(print)) => { #[cfg(target_family = "unix")] let bytes = { use std::os::unix::ffi::OsStrExt; print.as_bytes() }; #[cfg(target_family = "windows")] - let tmp = print.to_string_lossy(); - #[cfg(target_family = "windows")] - let bytes = tmp.as_bytes(); - if let Err(err) = writer.write_all(bytes).await { - abort.send(Fail::IO(path, err.kind())).await; - break; - } + let bytes = { + let tmp = print.to_string_lossy(); + tmp.as_bytes() + }; + s.0 + .write_all(bytes) + .await + .map_err(|e| Fail::IO(s.2.clone(), e.kind()))?; + Ok(Some(((), s))) } - _ => break, } - } + }) } pub fn stream_subproc( - abort: &Arc<Abort>, cmd: SubprocCommand, - stream: Receiver<OsString>, -) -> JoinHandle<()> { - let abort = abort.clone(); + stream: impl Stream<Item = Result<OsString, Fail>>, +) -> Box<dyn Stream<Item = Result<(), Fail>>> { + let subprocess = Command::new(&cmd.prog) + .kill_on_drop(true) + .args(&cmd.args) + .envs(&cmd.env) + .stdin(Stdio::piped()) + .spawn(); - spawn(async move { - let subprocess = Command::new(&cmd.prog) - .kill_on_drop(true) - .args(&cmd.args) - .envs(&cmd.env) - .stdin(Stdio::piped()) - .spawn(); + match subprocess { + Err(e) => { + let err = Fail::IO(cmd.prog, e.kind()); + Box::new(once(ready(Err(err)))) + } + Ok(mut child) => { + todo!() + //let mut stdin = child + // .stdin + // .take() + // .map(BufWriter::new) + // .expect("child process stdin"); - match subprocess { - Err(err) => abort.send(Fail::IO(cmd.prog, err.kind())).await, - Ok(mut child) => { - let mut stdin = child.stdin.take().map(BufWriter::new).expect("nil stdin"); + //stream_into( p1.clone(), stdin); - let abort_1 = abort.clone(); - let p1 = cmd.prog.clone(); - let handle_in = spawn(async move { - stream_into(&abort_1, p1.clone(), &mut stdin, stream).await; - if let Err(err) = stdin.shutdown().await { - abort_1.send(Fail::IO(p1, err.kind())).await; - } - }); + //let p1 = cmd.prog.clone(); + //let handle_in = spawn(async move { + // .await; + // if let Err(err) = stdin.shutdown().await { + // abort_1.send(Fail::IO(p1, err.kind())).await; + // } + //}); - let abort_2 = abort.clone(); - let p2 = cmd.prog.clone(); - let handle_child = spawn(async move { - if let Err(err) = child.wait().await { - abort_2.send(Fail::IO(p2, err.kind())).await; - } - }); + //let handle_child = spawn(async move { + // if let Err(err) = child.wait().await { + // abort_2.send(Fail::IO(p2, err.kind())).await; + // } + //}); - if let Err(err) = try_join(handle_child, handle_in).await { - abort.send(err.into()).await; - } - } + //if let Err(err) = try_join(handle_child, handle_in).await { + // abort.send(err.into()).await; + //} } - }) + } } |