summaryrefslogtreecommitdiffstats
path: root/src/tty.rs
diff options
context:
space:
mode:
authorAntoine Büsch <antoine.busch@gmail.com>2018-11-14 20:36:14 +1100
committerdoug tangren <d.tangren@gmail.com>2018-11-14 18:36:14 +0900
commit79d65c286025c551a775c0964d168e6feb4b3409 (patch)
tree34b49a0f97f6f851f47711be1cad0c002b8b78f7 /src/tty.rs
parent29bd95b42cd2b3c364f0be1f3e07e4b654e0ccf3 (diff)
Async api (#128)
* Refactored Transport for better async use Still a bit rough, but it now builds a big future using combinators. It still does one `Runtime::block_on()` to keep the existing API, but this is a first up before making the whole API async. * Migrate most APIs to be Future-based I still need to finish a few of the more tricky ones that I've commented out for now, but most of it compiles and some examples work. In particular, `Docker::stats()` now properly returns an async stream of stats. * Fix events and containerinspect examples * Fix imageinspect, images, info and top examples * Fix containercreate, imagedelete and imagepull examples * Fix more examples * Add back debug statement in Transport::request * De-glob imports in examples * Remove unused imports in examples * Fix NetworkCreateOptions serialization * Add back error message extraction in Transport * Fix Container::create serialization of options * Add containerdelete example * Simplify result * Fix some error handling to remove unwrap() * Fix Image::export() * Fix imagebuild example * Add adapter from Stream of Chunks to AsyncRead Having an `AsyncRead` is required to be able to use the `FramedRead` and `Decoder` stuff from tokio_codec. This code is "borrowed" from https:/github.com/ferristseng/rust-ipfs-api though should probably be moved to its own crate or to tokio_codec. * Fix Container::logs() It now properly demuxes stdout/stderr, and returns a `Stream<Item = TtyLine>`. * Fix Container::export() * Use LineCodec for streaming JSON Although in my limited testing it seemed to work fine, there is no guarantee that 1 chunk == 1 piece of valid JSON. However, each JSON structure seems to be serialized on one line, so use LineCodec to turn the body into a stream of lines, then deserialize over this. * Fix serialization of ExecContainerOptions * Fix Container::exec() (kind of...) * Simplify deserialisation in Image::delete() * Small clean-ups * More clean ups * Fix rustdoc + remove extraneous "extern crate" * Fix doc example * Fix formatting
Diffstat (limited to 'src/tty.rs')
-rw-r--r--src/tty.rs151
1 files changed, 87 insertions, 64 deletions
diff --git a/src/tty.rs b/src/tty.rs
index 903a507..96c72b5 100644
--- a/src/tty.rs
+++ b/src/tty.rs
@@ -1,86 +1,109 @@
use byteorder::{BigEndian, ReadBytesExt};
+use bytes::BytesMut;
+use errors::Error;
use std::io::Cursor;
-use std::io::Read;
+use tokio_codec::Decoder;
#[derive(Debug)]
-pub enum TtyLine {
- StdOut(String),
- StdErr(String),
+pub struct TtyLine {
+ pub stream_type: StreamType,
+ pub data: String,
}
-pub struct Tty {
- pub stdout: String,
- pub stderr: String,
+#[derive(Debug, Clone, Copy)]
+pub enum StreamType {
+ StdOut,
+ StdErr,
}
-// https://docs.docker.com/engine/api/v1.26/#operation/ContainerAttach
-impl Tty {
- pub fn new(stream: impl Read) -> Tty {
- let mut stdout: Vec<String> = vec![];
- let mut stderr: Vec<String> = vec![];
+/// Represent the current state of the decoding of a TTY frame
+enum TtyDecoderState {
+ /// We have yet to read a frame header
+ WaitingHeader,
+ /// We have read a header and extracted the payload size and stream type,
+ /// and are now waiting to read the corresponding payload
+ WaitingPayload(usize, StreamType),
+}
- let lines = demux(stream);
- for line in lines {
- match line {
- TtyLine::StdOut(s) => stdout.push(s),
- TtyLine::StdErr(s) => stderr.push(s),
- }
- }
+pub struct TtyDecoder {
+ state: TtyDecoderState,
+}
- Tty {
- stdout: stdout.concat(),
- stderr: stderr.concat(),
+impl TtyDecoder {
+ pub fn new() -> Self {
+ Self {
+ state: TtyDecoderState::WaitingHeader,
}
}
}
-/// Used to demux the output of Docker log, but still keep lines from stdout and stderr interlaced
-/// in the right order.
-pub struct InterlacedTty {
- pub lines: Vec<TtyLine>,
-}
+impl Decoder for TtyDecoder {
+ type Item = TtyLine;
+ type Error = Error;
-// https://docs.docker.com/engine/api/v1.26/#operation/ContainerAttach
-impl InterlacedTty {
- pub fn new(stream: impl Read) -> InterlacedTty {
- let lines = demux(stream);
+ fn decode(
+ &mut self,
+ src: &mut BytesMut,
+ ) -> Result<Option<Self::Item>, Self::Error> {
+ loop {
+ match self.state {
+ TtyDecoderState::WaitingHeader => {
+ if src.len() < 8 {
+ trace!("Not enough data to read a header");
+ return Ok(None);
+ } else {
+ trace!("Reading header");
+ let header_bytes = src.split_to(8);
+ let payload_size: Vec<u8> = header_bytes[4..8].to_vec();
+ let stream_type = match header_bytes[0] {
+ 0 => {
+ return Err(Error::InvalidResponse(
+ "Unsupported stream of type stdin".to_string(),
+ ))
+ }
+ 1 => StreamType::StdOut,
+ 2 => StreamType::StdErr,
+ n => {
+ return Err(Error::InvalidResponse(format!(
+ "Unsupported stream of type {}",
+ n
+ )))
+ }
+ };
- InterlacedTty { lines }
- }
-}
+ let length =
+ Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize;
+ trace!(
+ "Read header: length = {}, line_type = {:?}",
+ length,
+ stream_type
+ );
+ // We've successfully read a header, now we wait for the payload
+ self.state = TtyDecoderState::WaitingPayload(length, stream_type);
+ continue;
+ }
+ }
+ TtyDecoderState::WaitingPayload(len, stream_type) => {
+ if src.len() < len {
+ trace!(
+ "Not enough data to read payload. Need {} but only {} available",
+ len,
+ src.len()
+ );
+ return Ok(None);
+ } else {
+ trace!("Reading payload");
+ let payload = src.split_to(len);
+ let data = String::from_utf8_lossy(&payload).trim().to_string();
+ let tty_line = TtyLine { stream_type, data };
-fn demux(mut stream: impl Read) -> Vec<TtyLine> {
- let mut lines: Vec<TtyLine> = vec![];
- loop {
- // 8 byte header [ STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4 ]
- let mut header = [0; 8];
- match stream.read_exact(&mut header) {
- Ok(_) => {
- let payload_size: Vec<u8> = header[4..8].to_vec();
- let mut buffer =
- vec![0; Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize];
- match stream.read_exact(&mut buffer) {
- Ok(_) => {
- match header[0] {
- // stdin, unhandled
- 0 => break,
- // stdout
- 1 => lines.push(TtyLine::StdOut(
- String::from_utf8_lossy(&buffer).trim().to_string(),
- )),
- // stderr
- 2 => lines.push(TtyLine::StdErr(
- String::from_utf8_lossy(&buffer).trim().to_string(),
- )),
- //unhandled
- _ => break,
- }
+ // We've successfully read a full frame, now we go back to waiting for the next
+ // header
+ self.state = TtyDecoderState::WaitingHeader;
+ return Ok(Some(tty_line));
}
- Err(_) => break,
- };
+ }
}
- Err(_) => break,
}
}
- lines
}