summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorroot <github@bigly.dog>2024-03-20 23:55:41 -0700
committerroot <github@bigly.dog>2024-03-21 00:05:28 -0700
commitcc8e0506c3fc82c80d473339aba81633ec1a403f (patch)
tree224e1e3af5785d4cc13747a1522b5a33d92c0d2f
parentc4403961d77cb62c4fd200abb22c958146f02c39 (diff)
mv
-rw-r--r--src/displace.rs6
-rw-r--r--src/input.rs4
-rw-r--r--src/main.rs62
3 files changed, 14 insertions, 58 deletions
diff --git a/src/displace.rs b/src/displace.rs
index aa720f5..ea5a2fb 100644
--- a/src/displace.rs
+++ b/src/displace.rs
@@ -28,8 +28,8 @@ impl LineIn {
}
}
-pub async fn displace(opts: &Arc<Options>, payload: LineIn) -> Result<OsString, Fail> {
- let path = payload.path().clone();
+pub async fn displace(opts: &Arc<Options>, input: LineIn) -> Result<OsString, Fail> {
+ let path = input.path().clone();
let name = opts
.cwd
.as_ref()
@@ -49,7 +49,7 @@ pub async fn displace(opts: &Arc<Options>, payload: LineIn) -> Result<OsString,
if *before == after {
Ok(OsString::default())
} else {
- let print = match (&opts.action, payload) {
+ let print = match (&opts.action, input) {
(Action::Preview, LineIn::Entire(_)) => {
spawn_blocking(move || udiff(None, o2.unified, &name, &before, &after)).await?
}
diff --git a/src/input.rs b/src/input.rs
index ca3e091..ea26e00 100644
--- a/src/input.rs
+++ b/src/input.rs
@@ -68,7 +68,7 @@ fn p_line(line: &str) -> Result<DiffLine, Fail> {
Ok(DiffLine(path, range))
}
-async fn stream_patch(patch: &Path) -> Box<dyn Stream<Item = Result<LineIn, Fail>>> {
+async fn stream_patch(patch: &Path) -> Box<dyn Stream<Item = Result<LineIn, Fail>> + Send> {
let patch = patch.to_owned();
let fd = match File::open(&patch).await {
@@ -171,7 +171,7 @@ fn stream_stdin(use_nul: bool) -> impl Stream<Item = Result<LineIn, Fail>> {
pub async fn stream_in(
mode: &Mode,
args: &Arguments,
-) -> Box<dyn Stream<Item = Result<LineIn, Fail>>> {
+) -> Box<dyn Stream<Item = Result<LineIn, Fail>> + Send> {
match mode {
Mode::Initial if io::stdin().is_terminal() => {
let err = Fail::ArgumentError("/dev/stdin connected to tty".to_owned());
diff --git a/src/main.rs b/src/main.rs
index e1c4712..5f5c978 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -19,12 +19,8 @@ mod udiff_spec;
use {
ansi_term::Colour,
argparse::{parse_args, parse_opts, Options},
- async_channel::Receiver as MPMCR,
displace::displace,
- futures::{
- future::{select, try_join3, try_join_all, Either},
- stream::{once, Stream, StreamExt},
- },
+ futures::stream::{BoxStream, StreamExt, TryStreamExt},
input::{stream_in, LineIn},
output::stream_out,
std::{
@@ -34,61 +30,21 @@ use {
sync::Arc,
thread::available_parallelism,
},
- tokio::{
- runtime::Builder,
- sync::mpsc::{self, Receiver},
- task::{spawn, JoinHandle},
- },
+ tokio::runtime::Builder,
types::{Abort, Fail},
};
-fn stream_trans(
- threads: usize,
- opts: &Options,
- stream: Box<dyn Stream<Item = Result<LineIn, Fail>>>,
-) -> impl Stream<Item = Result<OsString, Fail>> {
- //let (tx, rx) = mpsc::channel::<OsString>(1);
-
- //let handles = (1..=threads * 2)
- // .map(|_| {
- // let abort = abort.clone();
- // let stream = stream.clone();
- // let opts = a_opts.clone();
- // let tx = tx.clone();
-
- // spawn(async move {
- // loop {
- // let f1 = abort.notified();
- // let f2 = stream.recv();
- // pin_mut!(f1);
- // pin_mut!(f2);
-
- // match select(f1, f2).await {
- // Either::Left(_) | Either::Right((Err(_), _)) => break,
- // Either::Right((Ok(payload), _)) => match displace(&opts, payload).await {
- // Ok(displaced) => {
- // if tx.send(displaced).await.is_err() {
- // break;
- // }
- // }
- // Err(err) => {
- // abort.send(err).await;
- // break;
- // }
- // },
- // }
- // }
- // })
- // })
- // .collect::<Vec<_>>();
- once(async { Err(Fail::Join) })
-}
-
async fn run(threads: usize) -> Result<(), Fail> {
let (mode, args) = parse_args();
let input_stream = stream_in(&mode, &args).await;
let opts = parse_opts(mode, args)?;
- let trans_stream = stream_trans(threads, &opts, input_stream);
+ let options = Arc::new(opts);
+ let trans_stream = BoxStream::from(input_stream)
+ .map_ok(move |input| {
+ let opts = options.clone();
+ async move { displace(&opts, input).await }
+ })
+ .try_buffer_unordered(threads);
//let h_3 = stream_out(abort, &opts, trans_stream);
//try_join3(h_1, h_2, h_3).await?;
Ok(())