From 6cd1d7f93bd6f150341582a1b54087cefffdbf87 Mon Sep 17 00:00:00 2001 From: "Eli W. Hunter" <42009212+elihunter173@users.noreply.github.com> Date: Thu, 23 Jul 2020 23:54:12 -0400 Subject: Async/Await Support (continuation of #191) (#229) * it builds! * remove unused dependencies * bump dependencies * reimplement 'exec' endpoint * update a few more examples * update remaining examples * fix doc tests, remove unused 'read' module * remove feature-gated async closures * split futures dependency to just 'futures-util' * update version and readme * make functions accepting Body generic over Into again * update changelog * reinstate 'unix-socket' feature * reinstate 'attach' endpoint * fix clippy lints * fix documentation typo * fix container copyfrom/into implementations * add convenience methods for TtyChunk struct * remove 'main' from code example to silence clippy lint * Update hyper to 0.13.1 * Add Send bounds to TtyWriter * Appease clippy * Fix examples * Update issue in changelog Co-authored-by: Daniel Eades Co-authored-by: Marc Schreiber --- src/tty.rs | 372 ++++++++++++++++++++++--------------------------------------- 1 file changed, 133 insertions(+), 239 deletions(-) (limited to 'src/tty.rs') diff --git a/src/tty.rs b/src/tty.rs index a4ab27f..a26846f 100644 --- a/src/tty.rs +++ b/src/tty.rs @@ -1,278 +1,172 @@ -use crate::errors::Error; -use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use bytes::BytesMut; -use futures::{self, Async}; -use hyper::rt::{Future, Stream}; -use log::trace; -use std::io::{self, Cursor}; -use tokio_codec::Decoder; -use tokio_io::{AsyncRead, AsyncWrite}; - -#[derive(Debug)] -pub struct Chunk { - pub stream_type: StreamType, - pub data: Vec, -} - -#[derive(Debug, Clone, Copy)] -pub enum StreamType { - StdIn, - StdOut, - StdErr, -} - -/// A multiplexed stream. -pub struct Multiplexed { - stdin: Box, - chunks: Box>, -} - -pub struct MultiplexedBlocking { - stdin: Box, - chunks: Box>>, -} - -/// Represent the current state of the decoding of a TTY frame -enum TtyDecoderState { - /// We have yet to read a frame header - WaitingHeader, - /// We have read a header and extracted the payload size and stream type, - /// and are now waiting to read the corresponding payload - WaitingPayload(usize, StreamType), -} - -pub struct TtyDecoder { - state: TtyDecoderState, -} - -impl Chunk { - /// Interprets the raw bytes as a string. - /// - /// Returns `None` if the raw bytes do not represent - /// a valid UTF-8 string. - pub fn as_string(&self) -> Option { - String::from_utf8(self.data.clone()).ok() - } - - /// Unconditionally interprets the raw bytes as a string. - /// - /// Inserts placeholder symbols for all non-character bytes. - pub fn as_string_lossy(&self) -> String { - String::from_utf8_lossy(&self.data).into_owned() +//! Types for working with docker TTY streams + +use crate::{Error, Result}; +use bytes::{BigEndian, ByteOrder}; +use futures_util::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite}, + stream::{Stream, TryStreamExt}, +}; +use pin_project::pin_project; +use std::io; + +/// An enum representing a chunk of TTY text streamed from a Docker container. +/// +/// For convenience, this type can deref to the contained `Vec`. +#[derive(Debug, Clone)] +pub enum TtyChunk { + StdIn(Vec), + StdOut(Vec), + StdErr(Vec), +} + +impl From for Vec { + fn from(tty_chunk: TtyChunk) -> Self { + match tty_chunk { + TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, + } } } -impl TtyDecoder { - pub fn new() -> Self { - Self { - state: TtyDecoderState::WaitingHeader, +impl AsRef> for TtyChunk { + fn as_ref(&self) -> &Vec { + match self { + TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, } } } -impl Default for TtyDecoder { - fn default() -> Self { - Self::new() +impl std::ops::Deref for TtyChunk { + type Target = Vec; + fn deref(&self) -> &Self::Target { + self.as_ref() } } -impl Decoder for TtyDecoder { - type Item = Chunk; - type Error = Error; - - fn decode( - &mut self, - src: &mut BytesMut, - ) -> Result, Self::Error> { - loop { - match self.state { - TtyDecoderState::WaitingHeader => { - if src.len() < 8 { - trace!("Not enough data to read a header"); - return Ok(None); - } else { - trace!("Reading header"); - let header_bytes = src.split_to(8); - let payload_size: Vec = header_bytes[4..8].to_vec(); - let stream_type = match header_bytes[0] { - 0 => { - return Err(Error::InvalidResponse( - "Unsupported stream of type stdin".to_string(), - )); - } - 1 => StreamType::StdOut, - 2 => StreamType::StdErr, - n => { - return Err(Error::InvalidResponse(format!( - "Unsupported stream of type {}", - n - ))); - } - }; - - let length = - Cursor::new(&payload_size).read_u32::().unwrap() as usize; - trace!( - "Read header: length = {}, stream_type = {:?}", - length, - stream_type - ); - // We've successfully read a header, now we wait for the payload - self.state = TtyDecoderState::WaitingPayload(length, stream_type); - continue; - } - } - TtyDecoderState::WaitingPayload(len, stream_type) => { - if src.len() < len { - trace!( - "Not enough data to read payload. Need {} but only {} available", - len, - src.len() - ); - return Ok(None); - } else { - trace!("Reading payload"); - let data = src.split_to(len)[..].to_owned(); - let tty_chunk = Chunk { stream_type, data }; - - // We've successfully read a full frame, now we go back to waiting for the next - // header - self.state = TtyDecoderState::WaitingHeader; - return Ok(Some(tty_chunk)); - } - } - } +impl std::ops::DerefMut for TtyChunk { + fn deref_mut(&mut self) -> &mut Vec { + match self { + TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes, } } } -impl Multiplexed { - /// Create a multiplexed stream. - pub(crate) fn new(stream: T) -> Multiplexed - where - T: AsyncRead + AsyncWrite + 'static, - { - let (reader, stdin) = stream.split(); - Multiplexed { - chunks: Box::new(chunks(reader)), - stdin: Box::new(stdin), - } - } +async fn decode_chunk(mut stream: S) -> Option<(Result, S)> +where + S: AsyncRead + Unpin, +{ + let mut header_bytes = vec![0u8; 8]; - pub fn wait(self) -> MultiplexedBlocking { - MultiplexedBlocking { - stdin: self.stdin, - chunks: Box::new(self.chunks.wait()), - } + match stream.read_exact(&mut header_bytes).await { + Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None, + Err(e) => return Some((Err(Error::IO(e)), stream)), + _ => (), } -} - -impl futures::Stream for Multiplexed { - type Item = Chunk; - type Error = crate::Error; - fn poll(&mut self) -> Result>, crate::Error> { - self.chunks.poll() - } -} + let size_bytes = &header_bytes[4..]; + let data_length = BigEndian::read_u32(size_bytes); -impl Iterator for MultiplexedBlocking { - type Item = Result; + let mut data = vec![0u8; data_length as usize]; - fn next(&mut self) -> Option> { - self.chunks.next() + if stream.read_exact(&mut data).await.is_err() { + return None; } -} - -macro_rules! delegate_io_write { - ($ty:ty) => { - impl io::Write for $ty { - fn write( - &mut self, - buf: &[u8], - ) -> Result { - self.stdin.write(buf) - } - fn flush(&mut self) -> Result<(), io::Error> { - self.stdin.flush() - } - } + let chunk = match header_bytes[0] { + 0 => TtyChunk::StdIn(data), + 1 => TtyChunk::StdOut(data), + 2 => TtyChunk::StdErr(data), + n => panic!("invalid stream number from docker daemon: '{}'", n), }; -} -delegate_io_write!(Multiplexed); -delegate_io_write!(MultiplexedBlocking); + Some((Ok(chunk), stream)) +} -pub fn chunks(stream: S) -> impl futures::Stream +pub(crate) fn decode(hyper_chunk_stream: S) -> impl Stream> where - S: AsyncRead, + S: Stream> + Unpin, { - let stream = futures::stream::unfold(stream, |stream| { - let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]); + let stream = hyper_chunk_stream + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .into_async_read(); - let fut = header_future.and_then(|(stream, header_bytes)| { - let size_bytes = &header_bytes[4..]; - let data_length = BigEndian::read_u32(size_bytes); - let stream_type = match header_bytes[0] { - 0 => StreamType::StdIn, - 1 => StreamType::StdOut, - 2 => StreamType::StdErr, - n => panic!("invalid stream number from docker daemon: '{}'", n), - }; + futures_util::stream::unfold(stream, decode_chunk) +} - ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]) - .map(move |(stream, data)| (Chunk { stream_type, data }, stream)) - }); - // FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk, - // stream)). - // This is much better because it would allow us to swallow the unexpected eof and - // stop the stream much cleaner than writing a custom stream filter. - Some(fut) - }); +type TtyReader<'a> = Pin> + Send + 'a>>; +type TtyWriter<'a> = Pin>; - util::stop_on_err(stream, |e| e.kind() != io::ErrorKind::UnexpectedEof) - .map_err(crate::Error::from) +/// TTY multiplexer returned by the `attach` method. +/// +/// This object can emit a stream of `TtyChunk`s and also implements `AsyncWrite` for streaming bytes to Stdin. +#[pin_project] +pub struct Multiplexer<'a> { + #[pin] + reader: TtyReader<'a>, + #[pin] + writer: TtyWriter<'a>, } -mod util { - use futures::{Async, Stream}; +impl<'a> Multiplexer<'a> { + pub(crate) fn new(tcp_connection: T) -> Self + where + T: AsyncRead + AsyncWrite + Send + 'a, + { + let (reader, writer) = tcp_connection.split(); - pub struct StopOnError { - stream: S, - f: F, + Self { + reader: Box::pin(futures_util::stream::unfold(reader, |reader| { + decode_chunk(reader) + })), + writer: Box::pin(writer), + } } +} - pub fn stop_on_err( - stream: S, - f: F, - ) -> StopOnError - where - S: Stream, - F: FnMut(&S::Error) -> bool, - { - StopOnError { stream, f } +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +impl<'a> Stream for Multiplexer<'a> { + type Item = Result; + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().reader.poll_next(cx) } +} - impl Stream for StopOnError - where - S: Stream, - F: FnMut(&S::Error) -> bool, - { - type Item = S::Item; - type Error = S::Error; +impl<'a> AsyncWrite for Multiplexer<'a> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().writer.poll_write(cx, buf) + } + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().writer.poll_flush(cx) + } + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().writer.poll_close(cx) + } +} - fn poll(&mut self) -> Result>, S::Error> { - match self.stream.poll() { - Err(e) => { - if (self.f)(&e) { - Err(e) - } else { - Ok(Async::Ready(None)) - } - } - a => a, - } - } +impl<'a> Multiplexer<'a> { + /// Split the `Multiplexer` into the component `Stream` and `AsyncWrite` parts + pub fn split( + self + ) -> ( + impl Stream> + 'a, + impl AsyncWrite + Send + 'a, + ) { + (self.reader, self.writer) } } -- cgit v1.2.3