summaryrefslogtreecommitdiffstats
path: root/src/tty.rs
blob: 96c72b5a0667976b556efd6590604ca6029d15dd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use byteorder::{BigEndian, ReadBytesExt};
use bytes::BytesMut;
use errors::Error;
use std::io::Cursor;
use tokio_codec::Decoder;

#[derive(Debug)]
pub struct TtyLine {
    pub stream_type: StreamType,
    pub data: String,
}

#[derive(Debug, Clone, Copy)]
pub enum StreamType {
    StdOut,
    StdErr,
}

/// Represent the current state of the decoding of a TTY frame
enum TtyDecoderState {
    /// We have yet to read a frame header
    WaitingHeader,
    /// We have read a header and extracted the payload size and stream type,
    /// and are now waiting to read the corresponding payload
    WaitingPayload(usize, StreamType),
}

pub struct TtyDecoder {
    state: TtyDecoderState,
}

impl TtyDecoder {
    pub fn new() -> Self {
        Self {
            state: TtyDecoderState::WaitingHeader,
        }
    }
}

impl Decoder for TtyDecoder {
    type Item = TtyLine;
    type Error = Error;

    fn decode(
        &mut self,
        src: &mut BytesMut,
    ) -> Result<Option<Self::Item>, Self::Error> {
        loop {
            match self.state {
                TtyDecoderState::WaitingHeader => {
                    if src.len() < 8 {
                        trace!("Not enough data to read a header");
                        return Ok(None);
                    } else {
                        trace!("Reading header");
                        let header_bytes = src.split_to(8);
                        let payload_size: Vec<u8> = header_bytes[4..8].to_vec();
                        let stream_type = match header_bytes[0] {
                            0 => {
                                return Err(Error::InvalidResponse(
                                    "Unsupported stream of type stdin".to_string(),
                                ))
                            }
                            1 => StreamType::StdOut,
                            2 => StreamType::StdErr,
                            n => {
                                return Err(Error::InvalidResponse(format!(
                                    "Unsupported stream of type {}",
                                    n
                                )))
                            }
                        };

                        let length =
                            Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize;
                        trace!(
                            "Read header: length = {}, line_type = {:?}",
                            length,
                            stream_type
                        );
                        // We've successfully read a header, now we wait for the payload
                        self.state = TtyDecoderState::WaitingPayload(length, stream_type);
                        continue;
                    }
                }
                TtyDecoderState::WaitingPayload(len, stream_type) => {
                    if src.len() < len {
                        trace!(
                            "Not enough data to read payload. Need {} but only {} available",
                            len,
                            src.len()
                        );
                        return Ok(None);
                    } else {
                        trace!("Reading payload");
                        let payload = src.split_to(len);
                        let data = String::from_utf8_lossy(&payload).trim().to_string();
                        let tty_line = TtyLine { stream_type, data };

                        // We've successfully read a full frame, now we go back to waiting for the next
                        // header
                        self.state = TtyDecoderState::WaitingHeader;
                        return Ok(Some(tty_line));
                    }
                }
            }
        }
    }
}