diff options
author | Antoine Büsch <antoine.busch@gmail.com> | 2018-11-14 20:36:14 +1100 |
---|---|---|
committer | doug tangren <d.tangren@gmail.com> | 2018-11-14 18:36:14 +0900 |
commit | 79d65c286025c551a775c0964d168e6feb4b3409 (patch) | |
tree | 34b49a0f97f6f851f47711be1cad0c002b8b78f7 /src/tty.rs | |
parent | 29bd95b42cd2b3c364f0be1f3e07e4b654e0ccf3 (diff) |
Async api (#128)
* Refactored Transport for better async use
Still a bit rough, but it now builds a big future using combinators. It
still does one `Runtime::block_on()` to keep the existing API, but this
is a first up before making the whole API async.
* Migrate most APIs to be Future-based
I still need to finish a few of the more tricky ones that I've commented
out for now, but most of it compiles and some examples work. In
particular, `Docker::stats()` now properly returns an async stream of
stats.
* Fix events and containerinspect examples
* Fix imageinspect, images, info and top examples
* Fix containercreate, imagedelete and imagepull examples
* Fix more examples
* Add back debug statement in Transport::request
* De-glob imports in examples
* Remove unused imports in examples
* Fix NetworkCreateOptions serialization
* Add back error message extraction in Transport
* Fix Container::create serialization of options
* Add containerdelete example
* Simplify result
* Fix some error handling to remove unwrap()
* Fix Image::export()
* Fix imagebuild example
* Add adapter from Stream of Chunks to AsyncRead
Having an `AsyncRead` is required to be able to use the `FramedRead` and
`Decoder` stuff from tokio_codec. This code is "borrowed" from
https:/github.com/ferristseng/rust-ipfs-api though should probably be
moved to its own crate or to tokio_codec.
* Fix Container::logs()
It now properly demuxes stdout/stderr, and returns a `Stream<Item =
TtyLine>`.
* Fix Container::export()
* Use LineCodec for streaming JSON
Although in my limited testing it seemed to work fine, there is no
guarantee that 1 chunk == 1 piece of valid JSON. However, each JSON
structure seems to be serialized on one line, so use LineCodec to turn
the body into a stream of lines, then deserialize over this.
* Fix serialization of ExecContainerOptions
* Fix Container::exec() (kind of...)
* Simplify deserialisation in Image::delete()
* Small clean-ups
* More clean ups
* Fix rustdoc + remove extraneous "extern crate"
* Fix doc example
* Fix formatting
Diffstat (limited to 'src/tty.rs')
-rw-r--r-- | src/tty.rs | 151 |
1 files changed, 87 insertions, 64 deletions
@@ -1,86 +1,109 @@ use byteorder::{BigEndian, ReadBytesExt}; +use bytes::BytesMut; +use errors::Error; use std::io::Cursor; -use std::io::Read; +use tokio_codec::Decoder; #[derive(Debug)] -pub enum TtyLine { - StdOut(String), - StdErr(String), +pub struct TtyLine { + pub stream_type: StreamType, + pub data: String, } -pub struct Tty { - pub stdout: String, - pub stderr: String, +#[derive(Debug, Clone, Copy)] +pub enum StreamType { + StdOut, + StdErr, } -// https://docs.docker.com/engine/api/v1.26/#operation/ContainerAttach -impl Tty { - pub fn new(stream: impl Read) -> Tty { - let mut stdout: Vec<String> = vec![]; - let mut stderr: Vec<String> = vec![]; +/// Represent the current state of the decoding of a TTY frame +enum TtyDecoderState { + /// We have yet to read a frame header + WaitingHeader, + /// We have read a header and extracted the payload size and stream type, + /// and are now waiting to read the corresponding payload + WaitingPayload(usize, StreamType), +} - let lines = demux(stream); - for line in lines { - match line { - TtyLine::StdOut(s) => stdout.push(s), - TtyLine::StdErr(s) => stderr.push(s), - } - } +pub struct TtyDecoder { + state: TtyDecoderState, +} - Tty { - stdout: stdout.concat(), - stderr: stderr.concat(), +impl TtyDecoder { + pub fn new() -> Self { + Self { + state: TtyDecoderState::WaitingHeader, } } } -/// Used to demux the output of Docker log, but still keep lines from stdout and stderr interlaced -/// in the right order. -pub struct InterlacedTty { - pub lines: Vec<TtyLine>, -} +impl Decoder for TtyDecoder { + type Item = TtyLine; + type Error = Error; -// https://docs.docker.com/engine/api/v1.26/#operation/ContainerAttach -impl InterlacedTty { - pub fn new(stream: impl Read) -> InterlacedTty { - let lines = demux(stream); + fn decode( + &mut self, + src: &mut BytesMut, + ) -> Result<Option<Self::Item>, Self::Error> { + loop { + match self.state { + TtyDecoderState::WaitingHeader => { + if src.len() < 8 { + trace!("Not enough data to read a header"); + return Ok(None); + } else { + trace!("Reading header"); + let header_bytes = src.split_to(8); + let payload_size: Vec<u8> = header_bytes[4..8].to_vec(); + let stream_type = match header_bytes[0] { + 0 => { + return Err(Error::InvalidResponse( + "Unsupported stream of type stdin".to_string(), + )) + } + 1 => StreamType::StdOut, + 2 => StreamType::StdErr, + n => { + return Err(Error::InvalidResponse(format!( + "Unsupported stream of type {}", + n + ))) + } + }; - InterlacedTty { lines } - } -} + let length = + Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize; + trace!( + "Read header: length = {}, line_type = {:?}", + length, + stream_type + ); + // We've successfully read a header, now we wait for the payload + self.state = TtyDecoderState::WaitingPayload(length, stream_type); + continue; + } + } + TtyDecoderState::WaitingPayload(len, stream_type) => { + if src.len() < len { + trace!( + "Not enough data to read payload. Need {} but only {} available", + len, + src.len() + ); + return Ok(None); + } else { + trace!("Reading payload"); + let payload = src.split_to(len); + let data = String::from_utf8_lossy(&payload).trim().to_string(); + let tty_line = TtyLine { stream_type, data }; -fn demux(mut stream: impl Read) -> Vec<TtyLine> { - let mut lines: Vec<TtyLine> = vec![]; - loop { - // 8 byte header [ STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4 ] - let mut header = [0; 8]; - match stream.read_exact(&mut header) { - Ok(_) => { - let payload_size: Vec<u8> = header[4..8].to_vec(); - let mut buffer = - vec![0; Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize]; - match stream.read_exact(&mut buffer) { - Ok(_) => { - match header[0] { - // stdin, unhandled - 0 => break, - // stdout - 1 => lines.push(TtyLine::StdOut( - String::from_utf8_lossy(&buffer).trim().to_string(), - )), - // stderr - 2 => lines.push(TtyLine::StdErr( - String::from_utf8_lossy(&buffer).trim().to_string(), - )), - //unhandled - _ => break, - } + // We've successfully read a full frame, now we go back to waiting for the next + // header + self.state = TtyDecoderState::WaitingHeader; + return Ok(Some(tty_line)); } - Err(_) => break, - }; + } } - Err(_) => break, } } - lines } |