diff options
author | root <github@bigly.dog> | 2024-03-20 23:29:03 -0700 |
---|---|---|
committer | root <github@bigly.dog> | 2024-03-20 23:29:03 -0700 |
commit | c4403961d77cb62c4fd200abb22c958146f02c39 (patch) | |
tree | 17da871d3f6704b4b127ab161bf5e086b4237082 | |
parent | 5353f177c4360e076821e73686264cbc94c6b4ee (diff) |
compile again
-rw-r--r-- | src/input.rs | 13 | ||||
-rw-r--r-- | src/main.rs | 85 |
2 files changed, 48 insertions, 50 deletions
diff --git a/src/input.rs b/src/input.rs index b5efba7..ca3e091 100644 --- a/src/input.rs +++ b/src/input.rs @@ -87,7 +87,14 @@ async fn stream_patch(patch: &Path) -> Box<dyn Stream<Item = Result<LineIn, Fail let mut buf = Vec::default(); match s.0.read_until(b'\0', &mut buf).await { Err(err) => Err(Fail::IO(s.1.to_owned(), err.kind())), - Ok(0) => Ok(None), + Ok(0) if s.3.is_empty() => Ok(None), + Ok(0) => { + let path = s.2; + let ranges = s.3; + s.2 = PathBuf::new(); + s.3 = HashSet::new(); + Ok(Some((Some(LineIn::Piecewise(path, ranges)), s))) + } Ok(_) => { buf.pop(); let line = @@ -108,7 +115,7 @@ async fn stream_patch(patch: &Path) -> Box<dyn Stream<Item = Result<LineIn, Fail }, ); - return Box::new(stream.try_filter_map(|x| ready(Ok(x)))); + Box::new(stream.try_filter_map(|x| ready(Ok(x)))) } fn u8_pathbuf(v8: Vec<u8>) -> PathBuf { @@ -158,7 +165,7 @@ fn stream_stdin(use_nul: bool) -> impl Stream<Item = Result<LineIn, Fail>> { } }); - return stream.try_filter_map(|x| ready(Ok(x))); + stream.try_filter_map(|x| ready(Ok(x))) } pub async fn stream_in( diff --git a/src/main.rs b/src/main.rs index 901f496..e1c4712 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,7 @@ use { displace::displace, futures::{ future::{select, try_join3, try_join_all, Either}, - pin_mut, + stream::{once, Stream, StreamExt}, }, input::{stream_in, LineIn}, output::stream_out, @@ -43,61 +43,52 @@ use { }; fn stream_trans( - abort: &Arc<Abort>, threads: usize, opts: &Options, - stream: &MPMCR<LineIn>, -) -> (JoinHandle<()>, Receiver<OsString>) { - let a_opts = Arc::new(opts.clone()); - let (tx, rx) = mpsc::channel::<OsString>(1); + stream: Box<dyn Stream<Item = Result<LineIn, Fail>>>, +) -> impl Stream<Item = Result<OsString, Fail>> { + //let (tx, rx) = mpsc::channel::<OsString>(1); - let handles = (1..=threads * 2) - .map(|_| { - let abort = abort.clone(); - let stream = stream.clone(); - let opts = a_opts.clone(); - let tx = tx.clone(); + //let handles = (1..=threads * 2) + // .map(|_| { + // let abort = abort.clone(); + // let stream = stream.clone(); + // let opts = a_opts.clone(); + // let tx = tx.clone(); - spawn(async move { - loop { - let f1 = abort.notified(); - let f2 = stream.recv(); - pin_mut!(f1); - pin_mut!(f2); + // spawn(async move { + // loop { + // let f1 = abort.notified(); + // let f2 = stream.recv(); + // pin_mut!(f1); + // pin_mut!(f2); - match select(f1, f2).await { - Either::Left(_) | Either::Right((Err(_), _)) => break, - Either::Right((Ok(payload), _)) => match displace(&opts, payload).await { - Ok(displaced) => { - if tx.send(displaced).await.is_err() { - break; - } - } - Err(err) => { - abort.send(err).await; - break; - } - }, - } - } - }) - }) - .collect::<Vec<_>>(); - - let abort = abort.clone(); - let handle = spawn(async move { - if let Err(err) = try_join_all(handles).await { - abort.send(err.into()).await; - } - }); - (handle, rx) + // match select(f1, f2).await { + // Either::Left(_) | Either::Right((Err(_), _)) => break, + // Either::Right((Ok(payload), _)) => match displace(&opts, payload).await { + // Ok(displaced) => { + // if tx.send(displaced).await.is_err() { + // break; + // } + // } + // Err(err) => { + // abort.send(err).await; + // break; + // } + // }, + // } + // } + // }) + // }) + // .collect::<Vec<_>>(); + once(async { Err(Fail::Join) }) } -async fn run(abort: &Arc<Abort>, threads: usize) -> Result<(), Fail> { +async fn run(threads: usize) -> Result<(), Fail> { let (mode, args) = parse_args(); let input_stream = stream_in(&mode, &args).await; let opts = parse_opts(mode, args)?; - //let (h_2, trans_stream) = stream_trans(abort, threads, &opts, &input_stream); + let trans_stream = stream_trans(threads, &opts, input_stream); //let h_3 = stream_out(abort, &opts, trans_stream); //try_join3(h_1, h_2, h_3).await?; Ok(()) @@ -113,7 +104,7 @@ fn main() -> impl Termination { let errors = rt.block_on(async { let abort = Abort::new(); - if let Err(err) = run(&abort, threads).await { + if let Err(err) = run(threads).await { let mut errs = abort.fin().await; errs.push(err); errs |