diff options
Diffstat (limited to 'src/tty.rs')
-rw-r--r-- | src/tty.rs | 372 |
1 files changed, 133 insertions, 239 deletions
@@ -1,278 +1,172 @@ -use crate::errors::Error; -use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use bytes::BytesMut; -use futures::{self, Async}; -use hyper::rt::{Future, Stream}; -use log::trace; -use std::io::{self, Cursor}; -use tokio_codec::Decoder; -use tokio_io::{AsyncRead, AsyncWrite}; - -#[derive(Debug)] -pub struct Chunk { - pub stream_type: StreamType, - pub data: Vec<u8>, -} - -#[derive(Debug, Clone, Copy)] -pub enum StreamType { - StdIn, - StdOut, - StdErr, -} - -/// A multiplexed stream. -pub struct Multiplexed { - stdin: Box<dyn AsyncWrite>, - chunks: Box<dyn futures::Stream<Item = Chunk, Error = crate::Error>>, -} - -pub struct MultiplexedBlocking { - stdin: Box<dyn AsyncWrite>, - chunks: Box<dyn Iterator<Item = Result<Chunk, crate::Error>>>, -} - -/// Represent the current state of the decoding of a TTY frame -enum TtyDecoderState { - /// We have yet to read a frame header - WaitingHeader, - /// We have read a header and extracted the payload size and stream type, - /// and are now waiting to read the corresponding payload - WaitingPayload(usize, StreamType), -} - -pub struct TtyDecoder { - state: TtyDecoderState, -} - -impl Chunk { - /// Interprets the raw bytes as a string. - /// - /// Returns `None` if the raw bytes do not represent - /// a valid UTF-8 string. - pub fn as_string(&self) -> Option<String> { - String::from_utf8(self.data.clone()).ok() - } - - /// Unconditionally interprets the raw bytes as a string. - /// - /// Inserts placeholder symbols for all non-character bytes. - pub fn as_string_lossy(&self) -> String { - String::from_utf8_lossy(&self.data).into_owned() +//! Types for working with docker TTY streams + +use crate::{Error, Result}; +use bytes::{BigEndian, ByteOrder}; +use futures_util::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite}, + stream::{Stream, TryStreamExt}, +}; +use pin_project::pin_project; +use std::io; + +/// An enum representing a chunk of TTY text streamed from a Docker container. +/// +/// For convenience, this type can deref to the contained `Vec<u8>`. +#[derive(Debug, Clone)] +pub enum TtyChunk { + StdIn(Vec<u8>), + StdOut(Vec<u8>), + StdErr(Vec<u8>), +} + +impl From<TtyChunk> for Vec<u8> { + fn from(tty_chunk: TtyChunk) -> Self { + match tty_chunk { + TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, + } } } -impl TtyDecoder { - pub fn new() -> Self { - Self { - state: TtyDecoderState::WaitingHeader, +impl AsRef<Vec<u8>> for TtyChunk { + fn as_ref(&self) -> &Vec<u8> { + match self { + TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, } } } -impl Default for TtyDecoder { - fn default() -> Self { - Self::new() +impl std::ops::Deref for TtyChunk { + type Target = Vec<u8>; + fn deref(&self) -> &Self::Target { + self.as_ref() } } -impl Decoder for TtyDecoder { - type Item = Chunk; - type Error = Error; - - fn decode( - &mut self, - src: &mut BytesMut, - ) -> Result<Option<Self::Item>, Self::Error> { - loop { - match self.state { - TtyDecoderState::WaitingHeader => { - if src.len() < 8 { - trace!("Not enough data to read a header"); - return Ok(None); - } else { - trace!("Reading header"); - let header_bytes = src.split_to(8); - let payload_size: Vec<u8> = header_bytes[4..8].to_vec(); - let stream_type = match header_bytes[0] { - 0 => { - return Err(Error::InvalidResponse( - "Unsupported stream of type stdin".to_string(), - )); - } - 1 => StreamType::StdOut, - 2 => StreamType::StdErr, - n => { - return Err(Error::InvalidResponse(format!( - "Unsupported stream of type {}", - n - ))); - } - }; - - let length = - Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize; - trace!( - "Read header: length = {}, stream_type = {:?}", - length, - stream_type - ); - // We've successfully read a header, now we wait for the payload - self.state = TtyDecoderState::WaitingPayload(length, stream_type); - continue; - } - } - TtyDecoderState::WaitingPayload(len, stream_type) => { - if src.len() < len { - trace!( - "Not enough data to read payload. Need {} but only {} available", - len, - src.len() - ); - return Ok(None); - } else { - trace!("Reading payload"); - let data = src.split_to(len)[..].to_owned(); - let tty_chunk = Chunk { stream_type, data }; - - // We've successfully read a full frame, now we go back to waiting for the next - // header - self.state = TtyDecoderState::WaitingHeader; - return Ok(Some(tty_chunk)); - } - } - } +impl std::ops::DerefMut for TtyChunk { + fn deref_mut(&mut self) -> &mut Vec<u8> { + match self { + TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, } } } -impl Multiplexed { - /// Create a multiplexed stream. - pub(crate) fn new<T>(stream: T) -> Multiplexed - where - T: AsyncRead + AsyncWrite + 'static, - { - let (reader, stdin) = stream.split(); - Multiplexed { - chunks: Box::new(chunks(reader)), - stdin: Box::new(stdin), - } - } +async fn decode_chunk<S>(mut stream: S) -> Option<(Result<TtyChunk>, S)> +where + S: AsyncRead + Unpin, +{ + let mut header_bytes = vec![0u8; 8]; - pub fn wait(self) -> MultiplexedBlocking { - MultiplexedBlocking { - stdin: self.stdin, - chunks: Box::new(self.chunks.wait()), - } + match stream.read_exact(&mut header_bytes).await { + Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None, + Err(e) => return Some((Err(Error::IO(e)), stream)), + _ => (), } -} - -impl futures::Stream for Multiplexed { - type Item = Chunk; - type Error = crate::Error; - fn poll(&mut self) -> Result<Async<Option<Chunk>>, crate::Error> { - self.chunks.poll() - } -} + let size_bytes = &header_bytes[4..]; + let data_length = BigEndian::read_u32(size_bytes); -impl Iterator for MultiplexedBlocking { - type Item = Result<Chunk, crate::Error>; + let mut data = vec![0u8; data_length as usize]; - fn next(&mut self) -> Option<Result<Chunk, crate::Error>> { - self.chunks.next() + if stream.read_exact(&mut data).await.is_err() { + return None; } -} - -macro_rules! delegate_io_write { - ($ty:ty) => { - impl io::Write for $ty { - fn write( - &mut self, - buf: &[u8], - ) -> Result<usize, io::Error> { - self.stdin.write(buf) - } - fn flush(&mut self) -> Result<(), io::Error> { - self.stdin.flush() - } - } + let chunk = match header_bytes[0] { + 0 => TtyChunk::StdIn(data), + 1 => TtyChunk::StdOut(data), + 2 => TtyChunk::StdErr(data), + n => panic!("invalid stream number from docker daemon: '{}'", n), }; -} -delegate_io_write!(Multiplexed); -delegate_io_write!(MultiplexedBlocking); + Some((Ok(chunk), stream)) +} -pub fn chunks<S>(stream: S) -> impl futures::Stream<Item = Chunk, Error = crate::Error> +pub(crate) fn decode<S>(hyper_chunk_stream: S) -> impl Stream<Item = Result<TtyChunk>> where - S: AsyncRead, + S: Stream<Item = Result<hyper::body::Bytes>> + Unpin, { - let stream = futures::stream::unfold(stream, |stream| { - let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]); + let stream = hyper_chunk_stream + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .into_async_read(); - let fut = header_future.and_then(|(stream, header_bytes)| { - let size_bytes = &header_bytes[4..]; - let data_length = BigEndian::read_u32(size_bytes); - let stream_type = match header_bytes[0] { - 0 => StreamType::StdIn, - 1 => StreamType::StdOut, - 2 => StreamType::StdErr, - n => panic!("invalid stream number from docker daemon: '{}'", n), - }; + futures_util::stream::unfold(stream, decode_chunk) +} - ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]) - .map(move |(stream, data)| (Chunk { stream_type, data }, stream)) - }); - // FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk, - // stream)). - // This is much better because it would allow us to swallow the unexpected eof and - // stop the stream much cleaner than writing a custom stream filter. - Some(fut) - }); +type TtyReader<'a> = Pin<Box<dyn Stream<Item = Result<TtyChunk>> + Send + 'a>>; +type TtyWriter<'a> = Pin<Box<dyn AsyncWrite + Send + 'a>>; - util::stop_on_err(stream, |e| e.kind() != io::ErrorKind::UnexpectedEof) - .map_err(crate::Error::from) +/// TTY multiplexer returned by the `attach` method. +/// +/// This object can emit a stream of `TtyChunk`s and also implements `AsyncWrite` for streaming bytes to Stdin. +#[pin_project] +pub struct Multiplexer<'a> { + #[pin] + reader: TtyReader<'a>, + #[pin] + writer: TtyWriter<'a>, } -mod util { - use futures::{Async, Stream}; +impl<'a> Multiplexer<'a> { + pub(crate) fn new<T>(tcp_connection: T) -> Self + where + T: AsyncRead + AsyncWrite + Send + 'a, + { + let (reader, writer) = tcp_connection.split(); - pub struct StopOnError<S, F> { - stream: S, - f: F, + Self { + reader: Box::pin(futures_util::stream::unfold(reader, |reader| { + decode_chunk(reader) + })), + writer: Box::pin(writer), + } } +} - pub fn stop_on_err<S, F>( - stream: S, - f: F, - ) -> StopOnError<S, F> - where - S: Stream, - F: FnMut(&S::Error) -> bool, - { - StopOnError { stream, f } +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +impl<'a> Stream for Multiplexer<'a> { + type Item = Result<TtyChunk>; + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Self::Item>> { + self.project().reader.poll_next(cx) } +} - impl<S, F> Stream for StopOnError<S, F> - where - S: Stream, - F: FnMut(&S::Error) -> bool, - { - type Item = S::Item; - type Error = S::Error; +impl<'a> AsyncWrite for Multiplexer<'a> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + self.project().writer.poll_write(cx, buf) + } + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self.project().writer.poll_flush(cx) + } + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<io::Result<()>> { + self.project().writer.poll_close(cx) + } +} - fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> { - match self.stream.poll() { - Err(e) => { - if (self.f)(&e) { - Err(e) - } else { - Ok(Async::Ready(None)) - } - } - a => a, - } - } +impl<'a> Multiplexer<'a> { + /// Split the `Multiplexer` into the component `Stream` and `AsyncWrite` parts + pub fn split( + self + ) -> ( + impl Stream<Item = Result<TtyChunk>> + 'a, + impl AsyncWrite + Send + 'a, + ) { + (self.reader, self.writer) } } |