summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock56
-rw-r--r--Cargo.toml1
-rw-r--r--src/fzf.rs138
-rw-r--r--src/main.rs2
-rw-r--r--src/output.rs56
-rw-r--r--src/subprocess.rs125
6 files changed, 144 insertions, 234 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 5325711..e13d2fc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -84,19 +84,6 @@ dependencies = [
]
[[package]]
-name = "async-channel"
-version = "2.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3"
-dependencies = [
- "concurrent-queue",
- "event-listener",
- "event-listener-strategy",
- "futures-core",
- "pin-project-lite",
-]
-
-[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -195,21 +182,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
-name = "concurrent-queue"
-version = "2.4.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363"
-dependencies = [
- "crossbeam-utils",
-]
-
-[[package]]
-name = "crossbeam-utils"
-version = "0.8.19"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
-
-[[package]]
name = "difflib"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -232,27 +204,6 @@ dependencies = [
]
[[package]]
-name = "event-listener"
-version = "5.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2b5fb89194fa3cad959b833185b3063ba881dbfc7030680b314250779fb4cc91"
-dependencies = [
- "concurrent-queue",
- "parking",
- "pin-project-lite",
-]
-
-[[package]]
-name = "event-listener-strategy"
-version = "0.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291"
-dependencies = [
- "event-listener",
- "pin-project-lite",
-]
-
-[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -453,12 +404,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
-name = "parking"
-version = "2.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
-
-[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -574,7 +519,6 @@ version = "0.4.24"
dependencies = [
"aho-corasick",
"ansi_term",
- "async-channel",
"clap",
"difflib",
"futures",
diff --git a/Cargo.toml b/Cargo.toml
index b015ddf..d6a6af3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,7 +9,6 @@ version = "0.4.24"
[dependencies]
aho-corasick = { version = "*" }
ansi_term = { version = "*" }
-async-channel = { version = "*" }
clap = { version = "*", features = ["derive", "wrap_help"] }
difflib = { version = "*" }
futures = { version = "*" }
diff --git a/src/fzf.rs b/src/fzf.rs
index 2335375..e57ba0b 100644
--- a/src/fzf.rs
+++ b/src/fzf.rs
@@ -2,7 +2,7 @@ use {
super::{
argparse::Mode,
subprocess::{stream_into, SubprocCommand},
- types::{Abort, Fail},
+ types::Fail,
},
futures::future::try_join,
std::{
@@ -51,83 +51,81 @@ async fn reset_term() -> Result<(), Fail> {
Err(Fail::IO(PathBuf::from("reset"), ErrorKind::NotFound))
}
-fn run_fzf(abort: &Arc<Abort>, cmd: SubprocCommand, stream: Receiver<OsString>) -> JoinHandle<()> {
- let abort = abort.clone();
-
+fn run_fzf(cmd: SubprocCommand, stream: Receiver<OsString>) -> JoinHandle<()> {
spawn(async move {
- let subprocess = Command::new(&cmd.prog)
- .kill_on_drop(true)
- .args(&cmd.args)
- .envs(&cmd.env)
- .stdin(Stdio::piped())
- .spawn();
+ todo!()
+ //let subprocess = Command::new(&cmd.prog)
+ // .kill_on_drop(true)
+ // .args(&cmd.args)
+ // .envs(&cmd.env)
+ // .stdin(Stdio::piped())
+ // .spawn();
- match subprocess {
- Err(err) => {
- abort.send(Fail::IO(cmd.prog, err.kind())).await;
- }
- Ok(mut child) => {
- let mut stdin = child.stdin.take().map(BufWriter::new).expect("nil stdin");
+ //match subprocess {
+ // Err(err) => {
+ // abort.send(Fail::IO(cmd.prog, err.kind())).await;
+ // }
+ // Ok(mut child) => {
+ // let mut stdin = child.stdin.take().map(BufWriter::new).expect("nil stdin");
- let abort_1 = abort.clone();
- let p1 = cmd.prog.clone();
- let handle_in = spawn(async move {
- stream_into(&abort_1, p1.clone(), &mut stdin, stream).await;
- if let Err(err) = stdin.shutdown().await {
- abort_1.send(Fail::IO(p1, err.kind())).await;
- }
- });
+ // let abort_1 = abort.clone();
+ // let p1 = cmd.prog.clone();
+ // let handle_in = spawn(async move {
+ // stream_into(&abort_1, p1.clone(), &mut stdin, stream).await;
+ // if let Err(err) = stdin.shutdown().await {
+ // abort_1.send(Fail::IO(p1, err.kind())).await;
+ // }
+ // });
- let abort_2 = abort.clone();
- let p2 = cmd.prog.clone();
- let handle_child = spawn(async move {
- select! {
- () = abort_2.notified() => {
- match child.kill().await {
- Err(err) => {
- abort_2.send(Fail::IO(p2, err.kind())).await;
- },
- _ => {
- if let Err(err) = reset_term().await {
- abort_2.send(err).await;
- }
- }
- }
- },
- rhs = child.wait() => {
- match rhs {
- Ok(status) => {
- match status.code() {
- Some(0 | 1) | None => (),
- Some(130) => {
- abort_2.send(Fail::Interrupt).await;
- }
- Some(c) => {
- abort_2.send(Fail::BadExit(p2, c)).await;
- if let Err(err) = reset_term().await {
- abort_2.send(err).await;
- }
- }
- }
- }
- Err(err) => {
- abort_2.send(Fail::IO(p2, err.kind())).await;
- }
- }
- }
- }
- });
+ // let abort_2 = abort.clone();
+ // let p2 = cmd.prog.clone();
+ // let handle_child = spawn(async move {
+ // select! {
+ // () = abort_2.notified() => {
+ // match child.kill().await {
+ // Err(err) => {
+ // abort_2.send(Fail::IO(p2, err.kind())).await;
+ // },
+ // _ => {
+ // if let Err(err) = reset_term().await {
+ // abort_2.send(err).await;
+ // }
+ // }
+ // }
+ // },
+ // rhs = child.wait() => {
+ // match rhs {
+ // Ok(status) => {
+ // match status.code() {
+ // Some(0 | 1) | None => (),
+ // Some(130) => {
+ // abort_2.send(Fail::Interrupt).await;
+ // }
+ // Some(c) => {
+ // abort_2.send(Fail::BadExit(p2, c)).await;
+ // if let Err(err) = reset_term().await {
+ // abort_2.send(err).await;
+ // }
+ // }
+ // }
+ // }
+ // Err(err) => {
+ // abort_2.send(Fail::IO(p2, err.kind())).await;
+ // }
+ // }
+ // }
+ // }
+ // });
- if let Err(err) = try_join(handle_child, handle_in).await {
- abort.send(err.into()).await;
- }
- }
- }
+ // if let Err(err) = try_join(handle_child, handle_in).await {
+ // abort.send(err.into()).await;
+ // }
+ // }
+ //}
})
}
pub fn stream_fzf_proc(
- abort: &Arc<Abort>,
bin: PathBuf,
args: Vec<String>,
stream: Receiver<OsString>,
@@ -166,5 +164,5 @@ pub fn stream_fzf_proc(
args: arguments,
env: fzf_env,
};
- run_fzf(abort, cmd, stream)
+ run_fzf(cmd, stream)
}
diff --git a/src/main.rs b/src/main.rs
index e932871..f44dba4 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -49,7 +49,7 @@ async fn run(threads: usize) -> Result<(), Fail> {
async move { displace(&opts, input).await }
})
.try_buffer_unordered(threads);
- let sink = stream_sink(&opts);
+ let sink = stream_sink(&opts, trans_stream);
Ok(())
}
diff --git a/src/output.rs b/src/output.rs
index 4ded17b..aeba3ed 100644
--- a/src/output.rs
+++ b/src/output.rs
@@ -2,64 +2,28 @@ use {
super::{
argparse::{Action, Options, Printer},
fzf::stream_fzf_proc,
- subprocess::stream_subproc,
+ subprocess::{stream_into, stream_subproc},
types::Fail,
},
- futures::{
- future::ready,
- sink::{unfold, Sink, SinkExt},
- stream::{Stream, StreamExt, TryStreamExt},
- },
- std::{ffi::OsString, path::PathBuf, sync::Arc},
+ futures::stream::{Stream, StreamExt, TryStreamExt},
+ std::{ffi::OsString, marker::Unpin, path::PathBuf, sync::Arc},
tokio::io::{self, AsyncWrite, AsyncWriteExt, BufWriter},
};
-fn stream_into<W>(
- path: PathBuf,
- writer: BufWriter<W>,
-) -> impl Sink<Result<OsString, Fail>, Error = Fail>
-where
- W: AsyncWrite + Send + Unpin,
-{
- unfold((writer, path), |mut s, line: Result<OsString, Fail>| async {
- match line {
- Err(e) => Err(e),
- Ok(print) => {
- #[cfg(target_family = "unix")]
- let bytes = {
- use std::os::unix::ffi::OsStrExt;
- print.as_bytes()
- };
- #[cfg(target_family = "windows")]
- let bytes = {
- let tmp = print.to_string_lossy();
- tmp.as_bytes()
- };
- s.0
- .write_all(bytes)
- .await
- .map_err(|e| Fail::IO(s.1.clone(), e.kind()))?;
- Ok(s)
- }
- }
- })
-}
-
-pub fn stream_sink(opts: &Options) -> impl Sink<Result<OsString, Fail>, Error = Fail> {
+pub fn stream_sink(
+ opts: &Options,
+ stream: impl Stream<Item = Result<OsString, Fail>> + Unpin,
+) -> Box<dyn Stream<Item = Result<(), Fail>>> {
match (&opts.action, &opts.printer) {
(Action::FzfPreview(fzf_p, fzf_a), _) => {
- //stream_fzf_proc(abort, fzf_p.clone(), fzf_a.clone(), stream)
-
- todo!()
- }
- (_, Printer::Pager(cmd)) => {
- //stream_subproc(abort, cmd.clone(), stream)
+ //stream_fzf_proc( fzf_p, fzf_a, stream)
todo!()
}
+ (_, Printer::Pager(cmd)) => Box::new(stream_subproc(cmd.clone(), stream)),
(_, Printer::Stdout) => {
let stdout = BufWriter::new(io::stdout());
- stream_into(PathBuf::from("/dev/stdout"), stdout)
+ Box::new(stream_into(PathBuf::from("/dev/stdout"), stdout, stream))
}
}
}
diff --git a/src/subprocess.rs b/src/subprocess.rs
index 5227ec9..427613c 100644
--- a/src/subprocess.rs
+++ b/src/subprocess.rs
@@ -1,10 +1,12 @@
use {
- super::types::{Abort, Fail},
+ super::types::Fail,
futures::{
- future::{select, try_join, Either},
- pin_mut,
+ future::{ready, select, try_join, Either},
+ stream::{once, try_unfold, Stream, StreamExt},
+ },
+ std::{
+ collections::HashMap, ffi::OsString, marker::Unpin, path::PathBuf, process::Stdio, sync::Arc,
},
- std::{collections::HashMap, ffi::OsString, path::PathBuf, process::Stdio, sync::Arc},
tokio::{
io::{AsyncWrite, AsyncWriteExt, BufWriter},
process::Command,
@@ -20,79 +22,82 @@ pub struct SubprocCommand {
pub env: HashMap<String, String>,
}
-pub async fn stream_into(
- abort: &Arc<Abort>,
+pub fn stream_into(
path: PathBuf,
- writer: &mut BufWriter<impl AsyncWrite + Send + Unpin>,
- mut stream: Receiver<OsString>,
-) {
- loop {
- let f1 = abort.notified();
- let f2 = stream.recv();
- pin_mut!(f1);
- pin_mut!(f2);
- match select(f1, f2).await {
- Either::Right((Some(print), _)) => {
+ writer: impl AsyncWrite + Send + Unpin,
+ stream: impl Stream<Item = Result<OsString, Fail>> + Unpin,
+) -> impl Stream<Item = Result<(), Fail>>
+where
+{
+ let buf = BufWriter::new(writer);
+ try_unfold((buf, stream, path), |mut s| async {
+ match s.1.next().await {
+ None => Ok(None),
+ Some(Err(e)) => Err(e),
+ Some(Ok(print)) => {
#[cfg(target_family = "unix")]
let bytes = {
use std::os::unix::ffi::OsStrExt;
print.as_bytes()
};
#[cfg(target_family = "windows")]
- let tmp = print.to_string_lossy();
- #[cfg(target_family = "windows")]
- let bytes = tmp.as_bytes();
- if let Err(err) = writer.write_all(bytes).await {
- abort.send(Fail::IO(path, err.kind())).await;
- break;
- }
+ let bytes = {
+ let tmp = print.to_string_lossy();
+ tmp.as_bytes()
+ };
+ s.0
+ .write_all(bytes)
+ .await
+ .map_err(|e| Fail::IO(s.2.clone(), e.kind()))?;
+ Ok(Some(((), s)))
}
- _ => break,
}
- }
+ })
}
pub fn stream_subproc(
- abort: &Arc<Abort>,
cmd: SubprocCommand,
- stream: Receiver<OsString>,
-) -> JoinHandle<()> {
- let abort = abort.clone();
+ stream: impl Stream<Item = Result<OsString, Fail>>,
+) -> Box<dyn Stream<Item = Result<(), Fail>>> {
+ let subprocess = Command::new(&cmd.prog)
+ .kill_on_drop(true)
+ .args(&cmd.args)
+ .envs(&cmd.env)
+ .stdin(Stdio::piped())
+ .spawn();
- spawn(async move {
- let subprocess = Command::new(&cmd.prog)
- .kill_on_drop(true)
- .args(&cmd.args)
- .envs(&cmd.env)
- .stdin(Stdio::piped())
- .spawn();
+ match subprocess {
+ Err(e) => {
+ let err = Fail::IO(cmd.prog, e.kind());
+ Box::new(once(ready(Err(err))))
+ }
+ Ok(mut child) => {
+ todo!()
+ //let mut stdin = child
+ // .stdin
+ // .take()
+ // .map(BufWriter::new)
+ // .expect("child process stdin");
- match subprocess {
- Err(err) => abort.send(Fail::IO(cmd.prog, err.kind())).await,
- Ok(mut child) => {
- let mut stdin = child.stdin.take().map(BufWriter::new).expect("nil stdin");
+ //stream_into( p1.clone(), stdin);
- let abort_1 = abort.clone();
- let p1 = cmd.prog.clone();
- let handle_in = spawn(async move {
- stream_into(&abort_1, p1.clone(), &mut stdin, stream).await;
- if let Err(err) = stdin.shutdown().await {
- abort_1.send(Fail::IO(p1, err.kind())).await;
- }
- });
+ //let p1 = cmd.prog.clone();
+ //let handle_in = spawn(async move {
+ // .await;
+ // if let Err(err) = stdin.shutdown().await {
+ // abort_1.send(Fail::IO(p1, err.kind())).await;
+ // }
+ //});
- let abort_2 = abort.clone();
- let p2 = cmd.prog.clone();
- let handle_child = spawn(async move {
- if let Err(err) = child.wait().await {
- abort_2.send(Fail::IO(p2, err.kind())).await;
- }
- });
+ //let handle_child = spawn(async move {
+ // if let Err(err) = child.wait().await {
+ // abort_2.send(Fail::IO(p2, err.kind())).await;
+ // }
+ //});
- if let Err(err) = try_join(handle_child, handle_in).await {
- abort.send(err.into()).await;
- }
- }
+ //if let Err(err) = try_join(handle_child, handle_in).await {
+ // abort.send(err.into()).await;
+ //}
}
- })
+ }
}