1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio_test::assert_ok;
use futures::future::{self, FutureExt};
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
fn cat() -> Command {
let mut me = env::current_exe().unwrap();
me.pop();
if me.ends_with("deps") {
me.pop();
}
me.push("test-cat");
let mut cmd = Command::new(me);
cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
cmd
}
async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
let mut stdin = cat.stdin.take().unwrap();
let stdout = cat.stdout.take().unwrap();
// Produce n lines on the child's stdout.
let write = async {
for i in 0..n {
let bytes = format!("line {}\n", i).into_bytes();
stdin.write_all(&bytes).await.unwrap();
}
drop(stdin);
};
let read = async {
let mut reader = BufReader::new(stdout).lines();
let mut num_lines = 0;
// Try to read `n + 1` lines, ensuring the last one is empty
// (i.e. EOF is reached after `n` lines.
loop {
let data = reader
.next_line()
.await
.unwrap_or_else(|_| Some(String::new()))
.expect("failed to read line");
let num_read = data.len();
let done = num_lines >= n;
match (done, num_read) {
(false, 0) => panic!("broken pipe"),
(true, n) if n != 0 => panic!("extraneous data"),
_ => {
let expected = format!("line {}", num_lines);
assert_eq!(expected, data);
}
};
num_lines += 1;
if num_lines >= n {
break;
}
}
};
// Compose reading and writing concurrently.
future::join3(write, read, cat)
.map(|(_, _, status)| status)
.await
}
/// Check for the following properties when feeding stdin and
/// consuming stdout of a cat-like process:
///
/// - A number of lines that amounts to a number of bytes exceeding a
/// typical OS buffer size can be fed to the child without
/// deadlock. This tests that we also consume the stdout
/// concurrently; otherwise this would deadlock.
///
/// - We read the same lines from the child that we fed it.
///
/// - The child does produce EOF on stdout after the last line.
#[tokio::test]
async fn feed_a_lot() {
let child = cat().spawn().unwrap();
let status = feed_cat(child, 10000).await.unwrap();
assert_eq!(status.code(), Some(0));
}
#[tokio::test]
async fn wait_with_output_captures() {
let mut child = cat().spawn().unwrap();
let mut stdin = child.stdin.take().unwrap();
let write_bytes = b"1234";
let future = async {
stdin.write_all(write_bytes).await?;
drop(stdin);
let out = child.wait_with_output();
out.await
};
let output = future.await.unwrap();
assert!(output.status.success());
assert_eq!(output.stdout, write_bytes);
assert_eq!(output.stderr.len(), 0);
}
#[tokio::test]
async fn status_closes_any_pipes() {
// Cat will open a pipe between the parent and child.
// If `status_async` doesn't ensure the handles are closed,
// we would end up blocking forever (and time out).
let child = cat().status();
assert_ok!(child.await);
}
|