summaryrefslogtreecommitdiffstats
path: root/src/tty.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/tty.rs')
-rw-r--r--src/tty.rs168
1 files changed, 159 insertions, 9 deletions
diff --git a/src/tty.rs b/src/tty.rs
index 96c72b5..a1c1865 100644
--- a/src/tty.rs
+++ b/src/tty.rs
@@ -1,21 +1,38 @@
-use byteorder::{BigEndian, ReadBytesExt};
+use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::BytesMut;
use errors::Error;
use std::io::Cursor;
use tokio_codec::Decoder;
+use futures::{self, Async};
+use hyper::rt::{Stream, Future};
+use tokio_io::{AsyncRead, AsyncWrite};
+use std::io;
+
#[derive(Debug)]
-pub struct TtyLine {
+pub struct Chunk {
pub stream_type: StreamType,
- pub data: String,
+ pub data: Vec<u8>,
}
#[derive(Debug, Clone, Copy)]
pub enum StreamType {
+ StdIn,
StdOut,
StdErr,
}
+/// A multiplexed stream.
+pub struct Multiplexed {
+ stdin: Box<AsyncWrite>,
+ chunks: Box<futures::Stream<Item=Chunk, Error=::Error>>,
+}
+
+pub struct MultiplexedBlocking {
+ stdin: Box<AsyncWrite>,
+ chunks: Box<Iterator<Item=Result<Chunk, ::Error>>>,
+}
+
/// Represent the current state of the decoding of a TTY frame
enum TtyDecoderState {
/// We have yet to read a frame header
@@ -29,6 +46,23 @@ pub struct TtyDecoder {
state: TtyDecoderState,
}
+impl Chunk {
+ /// Interprets the raw bytes as a string.
+ ///
+ /// Returns `None` if the raw bytes do not represent
+ /// a valid UTF-8 string.
+ pub fn as_string(&self) -> Option<String> {
+ String::from_utf8(self.data.clone()).ok()
+ }
+
+ /// Unconditionally interprets the raw bytes as a string.
+ ///
+ /// Inserts placeholder symbols for all non-character bytes.
+ pub fn as_string_lossy(&self) -> String {
+ String::from_utf8_lossy(&self.data).into_owned()
+ }
+}
+
impl TtyDecoder {
pub fn new() -> Self {
Self {
@@ -38,7 +72,7 @@ impl TtyDecoder {
}
impl Decoder for TtyDecoder {
- type Item = TtyLine;
+ type Item = Chunk;
type Error = Error;
fn decode(
@@ -74,7 +108,7 @@ impl Decoder for TtyDecoder {
let length =
Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize;
trace!(
- "Read header: length = {}, line_type = {:?}",
+ "Read header: length = {}, stream_type = {:?}",
length,
stream_type
);
@@ -93,17 +127,133 @@ impl Decoder for TtyDecoder {
return Ok(None);
} else {
trace!("Reading payload");
- let payload = src.split_to(len);
- let data = String::from_utf8_lossy(&payload).trim().to_string();
- let tty_line = TtyLine { stream_type, data };
+ let data = src.split_to(len)[..].to_owned();
+ let tty_chunk = Chunk { stream_type, data };
// We've successfully read a full frame, now we go back to waiting for the next
// header
self.state = TtyDecoderState::WaitingHeader;
- return Ok(Some(tty_line));
+ return Ok(Some(tty_chunk));
}
}
}
}
}
}
+
+impl Multiplexed {
+ /// Create a multiplexed stream.
+ pub(crate) fn new<T>(stream: T) -> Multiplexed
+ where T: AsyncRead + AsyncWrite + 'static {
+ let (reader, stdin) = stream.split();
+ Multiplexed {
+ chunks: Box::new(chunks(reader)),
+ stdin: Box::new(stdin),
+ }
+ }
+
+ pub fn wait(self) -> MultiplexedBlocking {
+ MultiplexedBlocking {
+ stdin: self.stdin,
+ chunks: Box::new(self.chunks.wait()),
+ }
+ }
+}
+
+impl futures::Stream for Multiplexed {
+ type Item = Chunk;
+ type Error = ::Error;
+
+ fn poll(&mut self) -> Result<Async<Option<Chunk>>, ::Error> {
+ self.chunks.poll()
+ }
+}
+
+impl Iterator for MultiplexedBlocking {
+ type Item = Result<Chunk, ::Error>;
+
+ fn next(&mut self) -> Option<Result<Chunk, ::Error>> {
+ self.chunks.next()
+ }
+}
+
+macro_rules! delegate_io_write {
+ ($ty:ty) => {
+ impl io::Write for $ty {
+ fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
+ self.stdin.write(buf)
+ }
+
+ fn flush(&mut self) -> Result<(), io::Error> {
+ self.stdin.flush()
+ }
+ }
+ };
+}
+
+delegate_io_write!(Multiplexed);
+delegate_io_write!(MultiplexedBlocking);
+
+pub fn chunks<S>(stream: S) -> impl futures::Stream<Item=Chunk, Error=::Error>
+ where S: AsyncRead {
+
+ let stream = futures::stream::unfold(stream, |stream| {
+ let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]);
+
+ let fut = header_future
+ .and_then(|(stream, header_bytes)| {
+ let size_bytes = &header_bytes[4..];
+ let data_length = BigEndian::read_u32(size_bytes);
+ let stream_type = match header_bytes[0] {
+ 0 => StreamType::StdIn,
+ 1 => StreamType::StdOut,
+ 2 => StreamType::StdErr,
+ n => panic!("invalid stream number from docker daemon: '{}'", n)
+ };
+
+ ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]).map(move |(stream, data)| {
+ (Chunk { stream_type, data }, stream)
+ })
+ });
+ // FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk,
+ // stream)).
+ // This is much better because it would allow us to swallow the unexpected eof and
+ // stop the stream much cleaner than writing a custom stream filter.
+ Some(fut)
+ });
+
+ util::stop_on_err(stream, |e| e.kind() != io::ErrorKind::UnexpectedEof)
+ .map_err(|e| ::Error::from(e))
+}
+
+mod util {
+ use futures::{Stream, Async};
+
+ pub struct StopOnError<S, F> {
+ stream: S,
+ f: F,
+ }
+
+ pub fn stop_on_err<S, F>(stream: S, f: F) -> StopOnError<S, F>
+ where S: Stream,
+ F: FnMut(&S::Error) -> bool,
+ {
+ StopOnError { stream, f }
+ }
+
+ impl<S, F> Stream for StopOnError<S, F>
+ where S: Stream,
+ F: FnMut(&S::Error) -> bool,
+ {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
+ match self.stream.poll() {
+ Err(e) => if (self.f)(&e) { Err(e) } else { Ok(Async::Ready(None)) },
+ a => a,
+ }
+ }
+ }
+}
+