summaryrefslogtreecommitdiffstats
path: root/src/tty.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/tty.rs')
-rw-r--r--src/tty.rs372
1 files changed, 133 insertions, 239 deletions
diff --git a/src/tty.rs b/src/tty.rs
index a4ab27f..a26846f 100644
--- a/src/tty.rs
+++ b/src/tty.rs
@@ -1,278 +1,172 @@
-use crate::errors::Error;
-use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
-use bytes::BytesMut;
-use futures::{self, Async};
-use hyper::rt::{Future, Stream};
-use log::trace;
-use std::io::{self, Cursor};
-use tokio_codec::Decoder;
-use tokio_io::{AsyncRead, AsyncWrite};
-
-#[derive(Debug)]
-pub struct Chunk {
- pub stream_type: StreamType,
- pub data: Vec<u8>,
-}
-
-#[derive(Debug, Clone, Copy)]
-pub enum StreamType {
- StdIn,
- StdOut,
- StdErr,
-}
-
-/// A multiplexed stream.
-pub struct Multiplexed {
- stdin: Box<dyn AsyncWrite>,
- chunks: Box<dyn futures::Stream<Item = Chunk, Error = crate::Error>>,
-}
-
-pub struct MultiplexedBlocking {
- stdin: Box<dyn AsyncWrite>,
- chunks: Box<dyn Iterator<Item = Result<Chunk, crate::Error>>>,
-}
-
-/// Represent the current state of the decoding of a TTY frame
-enum TtyDecoderState {
- /// We have yet to read a frame header
- WaitingHeader,
- /// We have read a header and extracted the payload size and stream type,
- /// and are now waiting to read the corresponding payload
- WaitingPayload(usize, StreamType),
-}
-
-pub struct TtyDecoder {
- state: TtyDecoderState,
-}
-
-impl Chunk {
- /// Interprets the raw bytes as a string.
- ///
- /// Returns `None` if the raw bytes do not represent
- /// a valid UTF-8 string.
- pub fn as_string(&self) -> Option<String> {
- String::from_utf8(self.data.clone()).ok()
- }
-
- /// Unconditionally interprets the raw bytes as a string.
- ///
- /// Inserts placeholder symbols for all non-character bytes.
- pub fn as_string_lossy(&self) -> String {
- String::from_utf8_lossy(&self.data).into_owned()
+//! Types for working with docker TTY streams
+
+use crate::{Error, Result};
+use bytes::{BigEndian, ByteOrder};
+use futures_util::{
+ io::{AsyncRead, AsyncReadExt, AsyncWrite},
+ stream::{Stream, TryStreamExt},
+};
+use pin_project::pin_project;
+use std::io;
+
+/// An enum representing a chunk of TTY text streamed from a Docker container.
+///
+/// For convenience, this type can deref to the contained `Vec<u8>`.
+#[derive(Debug, Clone)]
+pub enum TtyChunk {
+ StdIn(Vec<u8>),
+ StdOut(Vec<u8>),
+ StdErr(Vec<u8>),
+}
+
+impl From<TtyChunk> for Vec<u8> {
+ fn from(tty_chunk: TtyChunk) -> Self {
+ match tty_chunk {
+ TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
+ }
}
}
-impl TtyDecoder {
- pub fn new() -> Self {
- Self {
- state: TtyDecoderState::WaitingHeader,
+impl AsRef<Vec<u8>> for TtyChunk {
+ fn as_ref(&self) -> &Vec<u8> {
+ match self {
+ TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
}
}
}
-impl Default for TtyDecoder {
- fn default() -> Self {
- Self::new()
+impl std::ops::Deref for TtyChunk {
+ type Target = Vec<u8>;
+ fn deref(&self) -> &Self::Target {
+ self.as_ref()
}
}
-impl Decoder for TtyDecoder {
- type Item = Chunk;
- type Error = Error;
-
- fn decode(
- &mut self,
- src: &mut BytesMut,
- ) -> Result<Option<Self::Item>, Self::Error> {
- loop {
- match self.state {
- TtyDecoderState::WaitingHeader => {
- if src.len() < 8 {
- trace!("Not enough data to read a header");
- return Ok(None);
- } else {
- trace!("Reading header");
- let header_bytes = src.split_to(8);
- let payload_size: Vec<u8> = header_bytes[4..8].to_vec();
- let stream_type = match header_bytes[0] {
- 0 => {
- return Err(Error::InvalidResponse(
- "Unsupported stream of type stdin".to_string(),
- ));
- }
- 1 => StreamType::StdOut,
- 2 => StreamType::StdErr,
- n => {
- return Err(Error::InvalidResponse(format!(
- "Unsupported stream of type {}",
- n
- )));
- }
- };
-
- let length =
- Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize;
- trace!(
- "Read header: length = {}, stream_type = {:?}",
- length,
- stream_type
- );
- // We've successfully read a header, now we wait for the payload
- self.state = TtyDecoderState::WaitingPayload(length, stream_type);
- continue;
- }
- }
- TtyDecoderState::WaitingPayload(len, stream_type) => {
- if src.len() < len {
- trace!(
- "Not enough data to read payload. Need {} but only {} available",
- len,
- src.len()
- );
- return Ok(None);
- } else {
- trace!("Reading payload");
- let data = src.split_to(len)[..].to_owned();
- let tty_chunk = Chunk { stream_type, data };
-
- // We've successfully read a full frame, now we go back to waiting for the next
- // header
- self.state = TtyDecoderState::WaitingHeader;
- return Ok(Some(tty_chunk));
- }
- }
- }
+impl std::ops::DerefMut for TtyChunk {
+ fn deref_mut(&mut self) -> &mut Vec<u8> {
+ match self {
+ TtyChunk::StdIn(bytes) | TtyChunk::StdOut(bytes) | TtyChunk::StdErr(bytes) => bytes,
}
}
}
-impl Multiplexed {
- /// Create a multiplexed stream.
- pub(crate) fn new<T>(stream: T) -> Multiplexed
- where
- T: AsyncRead + AsyncWrite + 'static,
- {
- let (reader, stdin) = stream.split();
- Multiplexed {
- chunks: Box::new(chunks(reader)),
- stdin: Box::new(stdin),
- }
- }
+async fn decode_chunk<S>(mut stream: S) -> Option<(Result<TtyChunk>, S)>
+where
+ S: AsyncRead + Unpin,
+{
+ let mut header_bytes = vec![0u8; 8];
- pub fn wait(self) -> MultiplexedBlocking {
- MultiplexedBlocking {
- stdin: self.stdin,
- chunks: Box::new(self.chunks.wait()),
- }
+ match stream.read_exact(&mut header_bytes).await {
+ Err(e) if e.kind() == futures_util::io::ErrorKind::UnexpectedEof => return None,
+ Err(e) => return Some((Err(Error::IO(e)), stream)),
+ _ => (),
}
-}
-
-impl futures::Stream for Multiplexed {
- type Item = Chunk;
- type Error = crate::Error;
- fn poll(&mut self) -> Result<Async<Option<Chunk>>, crate::Error> {
- self.chunks.poll()
- }
-}
+ let size_bytes = &header_bytes[4..];
+ let data_length = BigEndian::read_u32(size_bytes);
-impl Iterator for MultiplexedBlocking {
- type Item = Result<Chunk, crate::Error>;
+ let mut data = vec![0u8; data_length as usize];
- fn next(&mut self) -> Option<Result<Chunk, crate::Error>> {
- self.chunks.next()
+ if stream.read_exact(&mut data).await.is_err() {
+ return None;
}
-}
-
-macro_rules! delegate_io_write {
- ($ty:ty) => {
- impl io::Write for $ty {
- fn write(
- &mut self,
- buf: &[u8],
- ) -> Result<usize, io::Error> {
- self.stdin.write(buf)
- }
- fn flush(&mut self) -> Result<(), io::Error> {
- self.stdin.flush()
- }
- }
+ let chunk = match header_bytes[0] {
+ 0 => TtyChunk::StdIn(data),
+ 1 => TtyChunk::StdOut(data),
+ 2 => TtyChunk::StdErr(data),
+ n => panic!("invalid stream number from docker daemon: '{}'", n),
};
-}
-delegate_io_write!(Multiplexed);
-delegate_io_write!(MultiplexedBlocking);
+ Some((Ok(chunk), stream))
+}
-pub fn chunks<S>(stream: S) -> impl futures::Stream<Item = Chunk, Error = crate::Error>
+pub(crate) fn decode<S>(hyper_chunk_stream: S) -> impl Stream<Item = Result<TtyChunk>>
where
- S: AsyncRead,
+ S: Stream<Item = Result<hyper::body::Bytes>> + Unpin,
{
- let stream = futures::stream::unfold(stream, |stream| {
- let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]);
+ let stream = hyper_chunk_stream
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+ .into_async_read();
- let fut = header_future.and_then(|(stream, header_bytes)| {
- let size_bytes = &header_bytes[4..];
- let data_length = BigEndian::read_u32(size_bytes);
- let stream_type = match header_bytes[0] {
- 0 => StreamType::StdIn,
- 1 => StreamType::StdOut,
- 2 => StreamType::StdErr,
- n => panic!("invalid stream number from docker daemon: '{}'", n),
- };
+ futures_util::stream::unfold(stream, decode_chunk)
+}
- ::tokio_io::io::read_exact(stream, vec![0; data_length as usize])
- .map(move |(stream, data)| (Chunk { stream_type, data }, stream))
- });
- // FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk,
- // stream)).
- // This is much better because it would allow us to swallow the unexpected eof and
- // stop the stream much cleaner than writing a custom stream filter.
- Some(fut)
- });
+type TtyReader<'a> = Pin<Box<dyn Stream<Item = Result<TtyChunk>> + Send + 'a>>;
+type TtyWriter<'a> = Pin<Box<dyn AsyncWrite + Send + 'a>>;
- util::stop_on_err(stream, |e| e.kind() != io::ErrorKind::UnexpectedEof)
- .map_err(crate::Error::from)
+/// TTY multiplexer returned by the `attach` method.
+///
+/// This object can emit a stream of `TtyChunk`s and also implements `AsyncWrite` for streaming bytes to Stdin.
+#[pin_project]
+pub struct Multiplexer<'a> {
+ #[pin]
+ reader: TtyReader<'a>,
+ #[pin]
+ writer: TtyWriter<'a>,
}
-mod util {
- use futures::{Async, Stream};
+impl<'a> Multiplexer<'a> {
+ pub(crate) fn new<T>(tcp_connection: T) -> Self
+ where
+ T: AsyncRead + AsyncWrite + Send + 'a,
+ {
+ let (reader, writer) = tcp_connection.split();
- pub struct StopOnError<S, F> {
- stream: S,
- f: F,
+ Self {
+ reader: Box::pin(futures_util::stream::unfold(reader, |reader| {
+ decode_chunk(reader)
+ })),
+ writer: Box::pin(writer),
+ }
}
+}
- pub fn stop_on_err<S, F>(
- stream: S,
- f: F,
- ) -> StopOnError<S, F>
- where
- S: Stream,
- F: FnMut(&S::Error) -> bool,
- {
- StopOnError { stream, f }
+use std::{
+ pin::Pin,
+ task::{Context, Poll},
+};
+
+impl<'a> Stream for Multiplexer<'a> {
+ type Item = Result<TtyChunk>;
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
+ self.project().reader.poll_next(cx)
}
+}
- impl<S, F> Stream for StopOnError<S, F>
- where
- S: Stream,
- F: FnMut(&S::Error) -> bool,
- {
- type Item = S::Item;
- type Error = S::Error;
+impl<'a> AsyncWrite for Multiplexer<'a> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ self.project().writer.poll_write(cx, buf)
+ }
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.project().writer.poll_flush(cx)
+ }
+ fn poll_close(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<io::Result<()>> {
+ self.project().writer.poll_close(cx)
+ }
+}
- fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
- match self.stream.poll() {
- Err(e) => {
- if (self.f)(&e) {
- Err(e)
- } else {
- Ok(Async::Ready(None))
- }
- }
- a => a,
- }
- }
+impl<'a> Multiplexer<'a> {
+ /// Split the `Multiplexer` into the component `Stream` and `AsyncWrite` parts
+ pub fn split(
+ self
+ ) -> (
+ impl Stream<Item = Result<TtyChunk>> + 'a,
+ impl AsyncWrite + Send + 'a,
+ ) {
+ (self.reader, self.writer)
}
}