summaryrefslogtreecommitdiffstats
path: root/tests-integration/tests/process_stdio.rs
blob: 8bf2c14985340dbe08f44fb59560de3f837326ea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command};
use tokio_test::assert_ok;

use futures::future::{self, FutureExt};
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};

fn cat() -> Command {
    let mut me = env::current_exe().unwrap();
    me.pop();

    if me.ends_with("deps") {
        me.pop();
    }

    me.push("test-cat");

    let mut cmd = Command::new(me);
    cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
    cmd
}

async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
    let mut stdin = cat.stdin.take().unwrap();
    let stdout = cat.stdout.take().unwrap();

    // Produce n lines on the child's stdout.
    let write = async {
        for i in 0..n {
            let bytes = format!("line {}\n", i).into_bytes();
            stdin.write_all(&bytes).await.unwrap();
        }

        drop(stdin);
    };

    let read = async {
        let mut reader = BufReader::new(stdout).lines();
        let mut num_lines = 0;

        // Try to read `n + 1` lines, ensuring the last one is empty
        // (i.e. EOF is reached after `n` lines.
        loop {
            let data = reader
                .next_line()
                .await
                .unwrap_or_else(|_| Some(String::new()))
                .expect("failed to read line");

            let num_read = data.len();
            let done = num_lines >= n;

            match (done, num_read) {
                (false, 0) => panic!("broken pipe"),
                (true, n) if n != 0 => panic!("extraneous data"),
                _ => {
                    let expected = format!("line {}", num_lines);
                    assert_eq!(expected, data);
                }
            };

            num_lines += 1;
            if num_lines >= n {
                break;
            }
        }
    };

    // Compose reading and writing concurrently.
    future::join3(write, read, cat)
        .map(|(_, _, status)| status)
        .await
}

/// Check for the following properties when feeding stdin and
/// consuming stdout of a cat-like process:
///
/// - A number of lines that amounts to a number of bytes exceeding a
///   typical OS buffer size can be fed to the child without
///   deadlock. This tests that we also consume the stdout
///   concurrently; otherwise this would deadlock.
///
/// - We read the same lines from the child that we fed it.
///
/// - The child does produce EOF on stdout after the last line.
#[tokio::test]
async fn feed_a_lot() {
    let child = cat().spawn().unwrap();
    let status = feed_cat(child, 10000).await.unwrap();
    assert_eq!(status.code(), Some(0));
}

#[tokio::test]
async fn wait_with_output_captures() {
    let mut child = cat().spawn().unwrap();
    let mut stdin = child.stdin.take().unwrap();

    let write_bytes = b"1234";

    let future = async {
        stdin.write_all(write_bytes).await?;
        drop(stdin);
        let out = child.wait_with_output();
        out.await
    };

    let output = future.await.unwrap();

    assert!(output.status.success());
    assert_eq!(output.stdout, write_bytes);
    assert_eq!(output.stderr.len(), 0);
}

#[tokio::test]
async fn status_closes_any_pipes() {
    // Cat will open a pipe between the parent and child.
    // If `status_async` doesn't ensure the handles are closed,
    // we would end up blocking forever (and time out).
    let child = cat().status();

    assert_ok!(child.await);
}