summaryrefslogtreecommitdiffstats
path: root/src/tty.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/tty.rs')
-rw-r--r--src/tty.rs151
1 files changed, 87 insertions, 64 deletions
diff --git a/src/tty.rs b/src/tty.rs
index 903a507..96c72b5 100644
--- a/src/tty.rs
+++ b/src/tty.rs
@@ -1,86 +1,109 @@
use byteorder::{BigEndian, ReadBytesExt};
+use bytes::BytesMut;
+use errors::Error;
use std::io::Cursor;
-use std::io::Read;
+use tokio_codec::Decoder;
#[derive(Debug)]
-pub enum TtyLine {
- StdOut(String),
- StdErr(String),
+pub struct TtyLine {
+ pub stream_type: StreamType,
+ pub data: String,
}
-pub struct Tty {
- pub stdout: String,
- pub stderr: String,
+#[derive(Debug, Clone, Copy)]
+pub enum StreamType {
+ StdOut,
+ StdErr,
}
-// https://docs.docker.com/engine/api/v1.26/#operation/ContainerAttach
-impl Tty {
- pub fn new(stream: impl Read) -> Tty {
- let mut stdout: Vec<String> = vec![];
- let mut stderr: Vec<String> = vec![];
+/// Represent the current state of the decoding of a TTY frame
+enum TtyDecoderState {
+ /// We have yet to read a frame header
+ WaitingHeader,
+ /// We have read a header and extracted the payload size and stream type,
+ /// and are now waiting to read the corresponding payload
+ WaitingPayload(usize, StreamType),
+}
- let lines = demux(stream);
- for line in lines {
- match line {
- TtyLine::StdOut(s) => stdout.push(s),
- TtyLine::StdErr(s) => stderr.push(s),
- }
- }
+pub struct TtyDecoder {
+ state: TtyDecoderState,
+}
- Tty {
- stdout: stdout.concat(),
- stderr: stderr.concat(),
+impl TtyDecoder {
+ pub fn new() -> Self {
+ Self {
+ state: TtyDecoderState::WaitingHeader,
}
}
}
-/// Used to demux the output of Docker log, but still keep lines from stdout and stderr interlaced
-/// in the right order.
-pub struct InterlacedTty {
- pub lines: Vec<TtyLine>,
-}
+impl Decoder for TtyDecoder {
+ type Item = TtyLine;
+ type Error = Error;
-// https://docs.docker.com/engine/api/v1.26/#operation/ContainerAttach
-impl InterlacedTty {
- pub fn new(stream: impl Read) -> InterlacedTty {
- let lines = demux(stream);
+ fn decode(
+ &mut self,
+ src: &mut BytesMut,
+ ) -> Result<Option<Self::Item>, Self::Error> {
+ loop {
+ match self.state {
+ TtyDecoderState::WaitingHeader => {
+ if src.len() < 8 {
+ trace!("Not enough data to read a header");
+ return Ok(None);
+ } else {
+ trace!("Reading header");
+ let header_bytes = src.split_to(8);
+ let payload_size: Vec<u8> = header_bytes[4..8].to_vec();
+ let stream_type = match header_bytes[0] {
+ 0 => {
+ return Err(Error::InvalidResponse(
+ "Unsupported stream of type stdin".to_string(),
+ ))
+ }
+ 1 => StreamType::StdOut,
+ 2 => StreamType::StdErr,
+ n => {
+ return Err(Error::InvalidResponse(format!(
+ "Unsupported stream of type {}",
+ n
+ )))
+ }
+ };
- InterlacedTty { lines }
- }
-}
+ let length =
+ Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize;
+ trace!(
+ "Read header: length = {}, line_type = {:?}",
+ length,
+ stream_type
+ );
+ // We've successfully read a header, now we wait for the payload
+ self.state = TtyDecoderState::WaitingPayload(length, stream_type);
+ continue;
+ }
+ }
+ TtyDecoderState::WaitingPayload(len, stream_type) => {
+ if src.len() < len {
+ trace!(
+ "Not enough data to read payload. Need {} but only {} available",
+ len,
+ src.len()
+ );
+ return Ok(None);
+ } else {
+ trace!("Reading payload");
+ let payload = src.split_to(len);
+ let data = String::from_utf8_lossy(&payload).trim().to_string();
+ let tty_line = TtyLine { stream_type, data };
-fn demux(mut stream: impl Read) -> Vec<TtyLine> {
- let mut lines: Vec<TtyLine> = vec![];
- loop {
- // 8 byte header [ STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4 ]
- let mut header = [0; 8];
- match stream.read_exact(&mut header) {
- Ok(_) => {
- let payload_size: Vec<u8> = header[4..8].to_vec();
- let mut buffer =
- vec![0; Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize];
- match stream.read_exact(&mut buffer) {
- Ok(_) => {
- match header[0] {
- // stdin, unhandled
- 0 => break,
- // stdout
- 1 => lines.push(TtyLine::StdOut(
- String::from_utf8_lossy(&buffer).trim().to_string(),
- )),
- // stderr
- 2 => lines.push(TtyLine::StdErr(
- String::from_utf8_lossy(&buffer).trim().to_string(),
- )),
- //unhandled
- _ => break,
- }
+ // We've successfully read a full frame, now we go back to waiting for the next
+ // header
+ self.state = TtyDecoderState::WaitingHeader;
+ return Ok(Some(tty_line));
}
- Err(_) => break,
- };
+ }
}
- Err(_) => break,
}
}
- lines
}