summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorroot <github@bigly.dog>2024-03-20 23:29:03 -0700
committerroot <github@bigly.dog>2024-03-20 23:29:03 -0700
commitc4403961d77cb62c4fd200abb22c958146f02c39 (patch)
tree17da871d3f6704b4b127ab161bf5e086b4237082
parent5353f177c4360e076821e73686264cbc94c6b4ee (diff)
compile again
-rw-r--r--src/input.rs13
-rw-r--r--src/main.rs85
2 files changed, 48 insertions, 50 deletions
diff --git a/src/input.rs b/src/input.rs
index b5efba7..ca3e091 100644
--- a/src/input.rs
+++ b/src/input.rs
@@ -87,7 +87,14 @@ async fn stream_patch(patch: &Path) -> Box<dyn Stream<Item = Result<LineIn, Fail
let mut buf = Vec::default();
match s.0.read_until(b'\0', &mut buf).await {
Err(err) => Err(Fail::IO(s.1.to_owned(), err.kind())),
- Ok(0) => Ok(None),
+ Ok(0) if s.3.is_empty() => Ok(None),
+ Ok(0) => {
+ let path = s.2;
+ let ranges = s.3;
+ s.2 = PathBuf::new();
+ s.3 = HashSet::new();
+ Ok(Some((Some(LineIn::Piecewise(path, ranges)), s)))
+ }
Ok(_) => {
buf.pop();
let line =
@@ -108,7 +115,7 @@ async fn stream_patch(patch: &Path) -> Box<dyn Stream<Item = Result<LineIn, Fail
},
);
- return Box::new(stream.try_filter_map(|x| ready(Ok(x))));
+ Box::new(stream.try_filter_map(|x| ready(Ok(x))))
}
fn u8_pathbuf(v8: Vec<u8>) -> PathBuf {
@@ -158,7 +165,7 @@ fn stream_stdin(use_nul: bool) -> impl Stream<Item = Result<LineIn, Fail>> {
}
});
- return stream.try_filter_map(|x| ready(Ok(x)));
+ stream.try_filter_map(|x| ready(Ok(x)))
}
pub async fn stream_in(
diff --git a/src/main.rs b/src/main.rs
index 901f496..e1c4712 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -23,7 +23,7 @@ use {
displace::displace,
futures::{
future::{select, try_join3, try_join_all, Either},
- pin_mut,
+ stream::{once, Stream, StreamExt},
},
input::{stream_in, LineIn},
output::stream_out,
@@ -43,61 +43,52 @@ use {
};
fn stream_trans(
- abort: &Arc<Abort>,
threads: usize,
opts: &Options,
- stream: &MPMCR<LineIn>,
-) -> (JoinHandle<()>, Receiver<OsString>) {
- let a_opts = Arc::new(opts.clone());
- let (tx, rx) = mpsc::channel::<OsString>(1);
+ stream: Box<dyn Stream<Item = Result<LineIn, Fail>>>,
+) -> impl Stream<Item = Result<OsString, Fail>> {
+ //let (tx, rx) = mpsc::channel::<OsString>(1);
- let handles = (1..=threads * 2)
- .map(|_| {
- let abort = abort.clone();
- let stream = stream.clone();
- let opts = a_opts.clone();
- let tx = tx.clone();
+ //let handles = (1..=threads * 2)
+ // .map(|_| {
+ // let abort = abort.clone();
+ // let stream = stream.clone();
+ // let opts = a_opts.clone();
+ // let tx = tx.clone();
- spawn(async move {
- loop {
- let f1 = abort.notified();
- let f2 = stream.recv();
- pin_mut!(f1);
- pin_mut!(f2);
+ // spawn(async move {
+ // loop {
+ // let f1 = abort.notified();
+ // let f2 = stream.recv();
+ // pin_mut!(f1);
+ // pin_mut!(f2);
- match select(f1, f2).await {
- Either::Left(_) | Either::Right((Err(_), _)) => break,
- Either::Right((Ok(payload), _)) => match displace(&opts, payload).await {
- Ok(displaced) => {
- if tx.send(displaced).await.is_err() {
- break;
- }
- }
- Err(err) => {
- abort.send(err).await;
- break;
- }
- },
- }
- }
- })
- })
- .collect::<Vec<_>>();
-
- let abort = abort.clone();
- let handle = spawn(async move {
- if let Err(err) = try_join_all(handles).await {
- abort.send(err.into()).await;
- }
- });
- (handle, rx)
+ // match select(f1, f2).await {
+ // Either::Left(_) | Either::Right((Err(_), _)) => break,
+ // Either::Right((Ok(payload), _)) => match displace(&opts, payload).await {
+ // Ok(displaced) => {
+ // if tx.send(displaced).await.is_err() {
+ // break;
+ // }
+ // }
+ // Err(err) => {
+ // abort.send(err).await;
+ // break;
+ // }
+ // },
+ // }
+ // }
+ // })
+ // })
+ // .collect::<Vec<_>>();
+ once(async { Err(Fail::Join) })
}
-async fn run(abort: &Arc<Abort>, threads: usize) -> Result<(), Fail> {
+async fn run(threads: usize) -> Result<(), Fail> {
let (mode, args) = parse_args();
let input_stream = stream_in(&mode, &args).await;
let opts = parse_opts(mode, args)?;
- //let (h_2, trans_stream) = stream_trans(abort, threads, &opts, &input_stream);
+ let trans_stream = stream_trans(threads, &opts, input_stream);
//let h_3 = stream_out(abort, &opts, trans_stream);
//try_join3(h_1, h_2, h_3).await?;
Ok(())
@@ -113,7 +104,7 @@ fn main() -> impl Termination {
let errors = rt.block_on(async {
let abort = Abort::new();
- if let Err(err) = run(&abort, threads).await {
+ if let Err(err) = run(threads).await {
let mut errs = abort.fin().await;
errs.push(err);
errs