//! Types for working with docker TTY streams use crate::{Error, Result}; use futures_util::{ io::{AsyncRead, AsyncReadExt, AsyncWrite}, stream::{Stream, TryStreamExt}, }; use pin_project::pin_project; use std::{convert::TryInto, io}; /// An enum representing a chunk of TTY text streamed from a Docker container. /// /// For convenience, this type can deref to the contained `Vec`. #[derive(Debug, Clone)] pub enum TtyChunk { StdIn(Vec), StdOut(Vec), StdErr(Vec), } impl From for Vec { fn from(tty_chunk: TtyChunk) -> Self { match tty_chunk { TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, } } } impl AsRef> for TtyChunk { fn as_ref(&self) -> &Vec { match self { TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, } } } impl std::ops::Deref for TtyChunk { type Target = Vec; fn deref(&self) -> &Self::Target { self.as_ref() } } impl std::ops::DerefMut for TtyChunk { fn deref_mut(&mut self) -> &mut Vec { match self { TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, } } } async fn decode_chunk(mut stream: S) -> Option<(Result, S)> where S: AsyncRead + Unpin, { let mut header_bytes = [0u8; 8]; match stream.read_exact(&mut header_bytes).await { Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None, Err(e) => return Some((Err(Error::IO(e)), stream)), _ => (), } let size_bytes = &header_bytes[4..]; let data_length = u32::from_be_bytes(size_bytes.try_into().unwrap()); let mut data = vec![0u8; data_length as usize]; if stream.read_exact(&mut data).await.is_err() { return None; } let chunk = match header_bytes[0] { 0 => TtyChunk::StdIn(data), 1 => TtyChunk::StdOut(data), 2 => TtyChunk::StdErr(data), n => panic!("invalid stream number from docker daemon: '{}'", n), }; Some((Ok(chunk), stream)) } pub(crate) fn decode(hyper_chunk_stream: S) -> impl Stream> where S: Stream> + Unpin, { let stream = hyper_chunk_stream .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) .into_async_read(); futures_util::stream::unfold(stream, decode_chunk) } type TtyReader<'a> = Pin> + Send + 'a>>; type TtyWriter<'a> = Pin>; /// TTY multiplexer returned by the `attach` method. /// /// This object can emit a stream of `TtyChunk`s and also implements `AsyncWrite` for streaming bytes to Stdin. #[pin_project] pub struct Multiplexer<'a> { #[pin] reader: TtyReader<'a>, #[pin] writer: TtyWriter<'a>, } impl<'a> Multiplexer<'a> { pub(crate) fn new(tcp_connection: T) -> Self where T: AsyncRead + AsyncWrite + Send + 'a, { let (reader, writer) = tcp_connection.split(); Self { reader: Box::pin(futures_util::stream::unfold(reader, |reader| { decode_chunk(reader) })), writer: Box::pin(writer), } } } use std::{ pin::Pin, task::{Context, Poll}, }; impl<'a> Stream for Multiplexer<'a> { type Item = Result; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { self.project().reader.poll_next(cx) } } impl<'a> AsyncWrite for Multiplexer<'a> { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { self.project().writer.poll_write(cx, buf) } fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { self.project().writer.poll_flush(cx) } fn poll_close( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { self.project().writer.poll_close(cx) } } impl<'a> Multiplexer<'a> { /// Split the `Multiplexer` into the component `Stream` and `AsyncWrite` parts pub fn split( self ) -> ( impl Stream> + 'a, impl AsyncWrite + Send + 'a, ) { (self.reader, self.writer) } }