1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
use {
super::types::Fail,
futures::{
future::ready,
stream::{once, select, try_unfold, Stream, StreamExt},
},
std::{collections::HashMap, ffi::OsString, marker::Unpin, path::PathBuf, process::Stdio},
tokio::{
io::{AsyncWrite, AsyncWriteExt, BufWriter},
process::Command,
},
};
#[derive(Clone, Debug)]
pub struct SubprocCommand {
pub prog: PathBuf,
pub args: Vec<String>,
pub env: HashMap<String, String>,
}
pub fn stream_into(
path: PathBuf,
writer: impl AsyncWrite + Send + Unpin,
stream: impl Stream<Item = Result<OsString, Fail>> + Send + Unpin,
) -> impl Stream<Item = Result<(), Fail>> + Send
where
{
let buf = BufWriter::new(writer);
try_unfold((stream, buf, path), |mut s| async {
match s.0.next().await {
None => {
s.1
.shutdown()
.await
.map_err(|e| Fail::IO(s.2.clone(), e.kind()))?;
Ok(None)
}
Some(Err(e)) => {
let _ = s.1.shutdown().await;
Err(e)
}
Some(Ok(print)) => {
#[cfg(target_family = "unix")]
let bytes = {
use std::os::unix::ffi::OsStrExt;
print.as_bytes()
};
#[cfg(target_family = "windows")]
let bytes = {
let tmp = print.to_string_lossy();
tmp.as_bytes()
};
s.1
.write_all(bytes)
.await
.map_err(|e| Fail::IO(s.2.clone(), e.kind()))?;
Ok(Some(((), s)))
}
}
})
}
pub fn stream_subproc<'a>(
cmd: SubprocCommand,
stream: impl Stream<Item = Result<OsString, Fail>> + Unpin + Send + 'a,
) -> Box<dyn Stream<Item = Result<(), Fail>> + Send + 'a> {
let subprocess = Command::new(&cmd.prog)
.kill_on_drop(true)
.args(&cmd.args)
.envs(&cmd.env)
.stdin(Stdio::piped())
.spawn();
match subprocess {
Err(e) => {
let err = Fail::IO(cmd.prog, e.kind());
Box::new(once(ready(Err(err))))
}
Ok(mut child) => {
let stdin = child.stdin.take().expect("child process stdin");
let out = stream_into(cmd.prog.clone(), stdin, stream);
let die = once(async move {
match child.wait().await {
Err(e) => Err(Fail::IO(cmd.prog, e.kind())),
Ok(status) if status.success() => Ok(()),
Ok(status) => {
let code = status.code().unwrap_or(1);
Err(Fail::BadExit(cmd.prog, code))
}
}
});
Box::new(select(out, die))
}
}
}
|