diff options
author | root <github@bigly.dog> | 2024-03-20 23:55:41 -0700 |
---|---|---|
committer | root <github@bigly.dog> | 2024-03-21 00:05:28 -0700 |
commit | cc8e0506c3fc82c80d473339aba81633ec1a403f (patch) | |
tree | 224e1e3af5785d4cc13747a1522b5a33d92c0d2f | |
parent | c4403961d77cb62c4fd200abb22c958146f02c39 (diff) |
mv
-rw-r--r-- | src/displace.rs | 6 | ||||
-rw-r--r-- | src/input.rs | 4 | ||||
-rw-r--r-- | src/main.rs | 62 |
3 files changed, 14 insertions, 58 deletions
diff --git a/src/displace.rs b/src/displace.rs index aa720f5..ea5a2fb 100644 --- a/src/displace.rs +++ b/src/displace.rs @@ -28,8 +28,8 @@ impl LineIn { } } -pub async fn displace(opts: &Arc<Options>, payload: LineIn) -> Result<OsString, Fail> { - let path = payload.path().clone(); +pub async fn displace(opts: &Arc<Options>, input: LineIn) -> Result<OsString, Fail> { + let path = input.path().clone(); let name = opts .cwd .as_ref() @@ -49,7 +49,7 @@ pub async fn displace(opts: &Arc<Options>, payload: LineIn) -> Result<OsString, if *before == after { Ok(OsString::default()) } else { - let print = match (&opts.action, payload) { + let print = match (&opts.action, input) { (Action::Preview, LineIn::Entire(_)) => { spawn_blocking(move || udiff(None, o2.unified, &name, &before, &after)).await? } diff --git a/src/input.rs b/src/input.rs index ca3e091..ea26e00 100644 --- a/src/input.rs +++ b/src/input.rs @@ -68,7 +68,7 @@ fn p_line(line: &str) -> Result<DiffLine, Fail> { Ok(DiffLine(path, range)) } -async fn stream_patch(patch: &Path) -> Box<dyn Stream<Item = Result<LineIn, Fail>>> { +async fn stream_patch(patch: &Path) -> Box<dyn Stream<Item = Result<LineIn, Fail>> + Send> { let patch = patch.to_owned(); let fd = match File::open(&patch).await { @@ -171,7 +171,7 @@ fn stream_stdin(use_nul: bool) -> impl Stream<Item = Result<LineIn, Fail>> { pub async fn stream_in( mode: &Mode, args: &Arguments, -) -> Box<dyn Stream<Item = Result<LineIn, Fail>>> { +) -> Box<dyn Stream<Item = Result<LineIn, Fail>> + Send> { match mode { Mode::Initial if io::stdin().is_terminal() => { let err = Fail::ArgumentError("/dev/stdin connected to tty".to_owned()); diff --git a/src/main.rs b/src/main.rs index e1c4712..5f5c978 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,12 +19,8 @@ mod udiff_spec; use { ansi_term::Colour, argparse::{parse_args, parse_opts, Options}, - async_channel::Receiver as MPMCR, displace::displace, - futures::{ - future::{select, try_join3, try_join_all, Either}, - stream::{once, Stream, StreamExt}, - }, + futures::stream::{BoxStream, StreamExt, TryStreamExt}, input::{stream_in, LineIn}, output::stream_out, std::{ @@ -34,61 +30,21 @@ use { sync::Arc, thread::available_parallelism, }, - tokio::{ - runtime::Builder, - sync::mpsc::{self, Receiver}, - task::{spawn, JoinHandle}, - }, + tokio::runtime::Builder, types::{Abort, Fail}, }; -fn stream_trans( - threads: usize, - opts: &Options, - stream: Box<dyn Stream<Item = Result<LineIn, Fail>>>, -) -> impl Stream<Item = Result<OsString, Fail>> { - //let (tx, rx) = mpsc::channel::<OsString>(1); - - //let handles = (1..=threads * 2) - // .map(|_| { - // let abort = abort.clone(); - // let stream = stream.clone(); - // let opts = a_opts.clone(); - // let tx = tx.clone(); - - // spawn(async move { - // loop { - // let f1 = abort.notified(); - // let f2 = stream.recv(); - // pin_mut!(f1); - // pin_mut!(f2); - - // match select(f1, f2).await { - // Either::Left(_) | Either::Right((Err(_), _)) => break, - // Either::Right((Ok(payload), _)) => match displace(&opts, payload).await { - // Ok(displaced) => { - // if tx.send(displaced).await.is_err() { - // break; - // } - // } - // Err(err) => { - // abort.send(err).await; - // break; - // } - // }, - // } - // } - // }) - // }) - // .collect::<Vec<_>>(); - once(async { Err(Fail::Join) }) -} - async fn run(threads: usize) -> Result<(), Fail> { let (mode, args) = parse_args(); let input_stream = stream_in(&mode, &args).await; let opts = parse_opts(mode, args)?; - let trans_stream = stream_trans(threads, &opts, input_stream); + let options = Arc::new(opts); + let trans_stream = BoxStream::from(input_stream) + .map_ok(move |input| { + let opts = options.clone(); + async move { displace(&opts, input).await } + }) + .try_buffer_unordered(threads); //let h_3 = stream_out(abort, &opts, trans_stream); //try_join3(h_1, h_2, h_3).await?; Ok(()) |