diff options
Diffstat (limited to 'src/tty.rs')
-rw-r--r-- | src/tty.rs | 151 |
1 files changed, 87 insertions, 64 deletions
@@ -1,86 +1,109 @@ use byteorder::{BigEndian, ReadBytesExt}; +use bytes::BytesMut; +use errors::Error; use std::io::Cursor; -use std::io::Read; +use tokio_codec::Decoder; #[derive(Debug)] -pub enum TtyLine { - StdOut(String), - StdErr(String), +pub struct TtyLine { + pub stream_type: StreamType, + pub data: String, } -pub struct Tty { - pub stdout: String, - pub stderr: String, +#[derive(Debug, Clone, Copy)] +pub enum StreamType { + StdOut, + StdErr, } -// https://docs.docker.com/engine/api/v1.26/#operation/ContainerAttach -impl Tty { - pub fn new(stream: impl Read) -> Tty { - let mut stdout: Vec<String> = vec![]; - let mut stderr: Vec<String> = vec![]; +/// Represent the current state of the decoding of a TTY frame +enum TtyDecoderState { + /// We have yet to read a frame header + WaitingHeader, + /// We have read a header and extracted the payload size and stream type, + /// and are now waiting to read the corresponding payload + WaitingPayload(usize, StreamType), +} - let lines = demux(stream); - for line in lines { - match line { - TtyLine::StdOut(s) => stdout.push(s), - TtyLine::StdErr(s) => stderr.push(s), - } - } +pub struct TtyDecoder { + state: TtyDecoderState, +} - Tty { - stdout: stdout.concat(), - stderr: stderr.concat(), +impl TtyDecoder { + pub fn new() -> Self { + Self { + state: TtyDecoderState::WaitingHeader, } } } -/// Used to demux the output of Docker log, but still keep lines from stdout and stderr interlaced -/// in the right order. -pub struct InterlacedTty { - pub lines: Vec<TtyLine>, -} +impl Decoder for TtyDecoder { + type Item = TtyLine; + type Error = Error; -// https://docs.docker.com/engine/api/v1.26/#operation/ContainerAttach -impl InterlacedTty { - pub fn new(stream: impl Read) -> InterlacedTty { - let lines = demux(stream); + fn decode( + &mut self, + src: &mut BytesMut, + ) -> Result<Option<Self::Item>, Self::Error> { + loop { + match self.state { + TtyDecoderState::WaitingHeader => { + if src.len() < 8 { + trace!("Not enough data to read a header"); + return Ok(None); + } else { + trace!("Reading header"); + let header_bytes = src.split_to(8); + let payload_size: Vec<u8> = header_bytes[4..8].to_vec(); + let stream_type = match header_bytes[0] { + 0 => { + return Err(Error::InvalidResponse( + "Unsupported stream of type stdin".to_string(), + )) + } + 1 => StreamType::StdOut, + 2 => StreamType::StdErr, + n => { + return Err(Error::InvalidResponse(format!( + "Unsupported stream of type {}", + n + ))) + } + }; - InterlacedTty { lines } - } -} + let length = + Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize; + trace!( + "Read header: length = {}, line_type = {:?}", + length, + stream_type + ); + // We've successfully read a header, now we wait for the payload + self.state = TtyDecoderState::WaitingPayload(length, stream_type); + continue; + } + } + TtyDecoderState::WaitingPayload(len, stream_type) => { + if src.len() < len { + trace!( + "Not enough data to read payload. Need {} but only {} available", + len, + src.len() + ); + return Ok(None); + } else { + trace!("Reading payload"); + let payload = src.split_to(len); + let data = String::from_utf8_lossy(&payload).trim().to_string(); + let tty_line = TtyLine { stream_type, data }; -fn demux(mut stream: impl Read) -> Vec<TtyLine> { - let mut lines: Vec<TtyLine> = vec![]; - loop { - // 8 byte header [ STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4 ] - let mut header = [0; 8]; - match stream.read_exact(&mut header) { - Ok(_) => { - let payload_size: Vec<u8> = header[4..8].to_vec(); - let mut buffer = - vec![0; Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize]; - match stream.read_exact(&mut buffer) { - Ok(_) => { - match header[0] { - // stdin, unhandled - 0 => break, - // stdout - 1 => lines.push(TtyLine::StdOut( - String::from_utf8_lossy(&buffer).trim().to_string(), - )), - // stderr - 2 => lines.push(TtyLine::StdErr( - String::from_utf8_lossy(&buffer).trim().to_string(), - )), - //unhandled - _ => break, - } + // We've successfully read a full frame, now we go back to waiting for the next + // header + self.state = TtyDecoderState::WaitingHeader; + return Ok(Some(tty_line)); } - Err(_) => break, - }; + } } - Err(_) => break, } } - lines } |