summaryrefslogtreecommitdiffstats
path: root/src/subprocess.rs
blob: e4cfecc63ee30ac18846e1a45007eb95626e8e21 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use {
  super::types::Fail,
  futures::{
    future::ready,
    stream::{once, select, try_unfold, Stream, StreamExt},
  },
  std::{collections::HashMap, ffi::OsString, marker::Unpin, path::PathBuf, process::Stdio},
  tokio::{
    io::{AsyncWrite, AsyncWriteExt, BufWriter},
    process::Command,
  },
};

#[derive(Clone, Debug)]
pub struct SubprocCommand {
  pub prog: PathBuf,
  pub args: Vec<String>,
  pub env: HashMap<String, String>,
}

pub fn stream_into(
  path: PathBuf,
  writer: impl AsyncWrite + Send + Unpin,
  stream: impl Stream<Item = Result<OsString, Fail>> + Send + Unpin,
) -> impl Stream<Item = Result<(), Fail>> + Send
where
{
  let buf = BufWriter::new(writer);
  try_unfold((stream, buf, path), |mut s| async {
    match s.0.next().await {
      None => {
        s.1
          .shutdown()
          .await
          .map_err(|e| Fail::IO(s.2.clone(), e.kind()))?;
        Ok(None)
      }
      Some(Err(e)) => {
        let _ = s.1.shutdown().await;
        Err(e)
      }
      Some(Ok(print)) => {
        #[cfg(target_family = "unix")]
        let bytes = {
          use std::os::unix::ffi::OsStrExt;
          print.as_bytes()
        };
        #[cfg(target_family = "windows")]
        let bytes = {
          let tmp = print.to_string_lossy();
          tmp.as_bytes()
        };
        s.1
          .write_all(bytes)
          .await
          .map_err(|e| Fail::IO(s.2.clone(), e.kind()))?;
        Ok(Some(((), s)))
      }
    }
  })
}

pub fn stream_subproc<'a>(
  cmd: SubprocCommand,
  stream: impl Stream<Item = Result<OsString, Fail>> + Unpin + Send + 'a,
) -> Box<dyn Stream<Item = Result<(), Fail>> + Send + 'a> {
  let subprocess = Command::new(&cmd.prog)
    .kill_on_drop(true)
    .args(&cmd.args)
    .envs(&cmd.env)
    .stdin(Stdio::piped())
    .spawn();

  match subprocess {
    Err(e) => {
      let err = Fail::IO(cmd.prog, e.kind());
      Box::new(once(ready(Err(err))))
    }
    Ok(mut child) => {
      let stdin = child.stdin.take().expect("child process stdin");
      let out = stream_into(cmd.prog.clone(), stdin, stream);
      let die = once(async move {
        match child.wait().await {
          Err(e) => Err(Fail::IO(cmd.prog, e.kind())),
          Ok(status) if status.success() => Ok(()),
          Ok(status) => {
            let code = status.code().unwrap_or(1);
            Err(Fail::BadExit(cmd.prog, code))
          }
        }
      });
      Box::new(select(out, die))
    }
  }
}