summaryrefslogtreecommitdiffstats
path: root/src/tty.rs
diff options
context:
space:
mode:
authorDylan McKay <me@dylanmckay.io>2018-12-22 23:29:45 +1300
committerdoug tangren <d.tangren@gmail.com>2018-12-22 19:29:45 +0900
commit6b5f0c0f9ddfac9c052210c5dbf3224020646127 (patch)
tree7310447a251e66e5d061f5da00b07b6ce498fc8a /src/tty.rs
parent79d65c286025c551a775c0964d168e6feb4b3409 (diff)
Support interactive stdin/stdout streams (#136)
* Support interactive stdin/stdout streams This adds support for streaming stdin, stderr, and stdout independently to a running container. The underlying API is futures-based, meaning the code is implemented asynchronously. A synchronous API is also exposed, which is implemented by simply waiting on the asynchronous API futures. This also modifies the existing Tty logic so that the storage type of the data is a Vec<u8> rather than a String. This is also how the Rust standard library persists data from the standard streams. In my particular application, I'm using stdin/stdout as the communication method between a container a host application. In it, a byte-based protocol is used. Streaming works by performing a TCP upgrade; upgrading a higher-level HTTP connection to a lower-level TCP byte stream upon agreement with the server. Docker will automatically upgrade HTTP container log requests to TCP byte streams of a custom std{in,out,err} multiplexing protocol if the client requests it with the 'Connection: Upgrade' header. * Return an error rather than panic when Docker refuses to upgrade to TCP * Add interpret-as-string accessors to tty::Chunk Also updates the examples to use them.
Diffstat (limited to 'src/tty.rs')
-rw-r--r--src/tty.rs168
1 files changed, 159 insertions, 9 deletions
diff --git a/src/tty.rs b/src/tty.rs
index 96c72b5..a1c1865 100644
--- a/src/tty.rs
+++ b/src/tty.rs
@@ -1,21 +1,38 @@
-use byteorder::{BigEndian, ReadBytesExt};
+use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::BytesMut;
use errors::Error;
use std::io::Cursor;
use tokio_codec::Decoder;
+use futures::{self, Async};
+use hyper::rt::{Stream, Future};
+use tokio_io::{AsyncRead, AsyncWrite};
+use std::io;
+
#[derive(Debug)]
-pub struct TtyLine {
+pub struct Chunk {
pub stream_type: StreamType,
- pub data: String,
+ pub data: Vec<u8>,
}
#[derive(Debug, Clone, Copy)]
pub enum StreamType {
+ StdIn,
StdOut,
StdErr,
}
+/// A multiplexed stream.
+pub struct Multiplexed {
+ stdin: Box<AsyncWrite>,
+ chunks: Box<futures::Stream<Item=Chunk, Error=::Error>>,
+}
+
+pub struct MultiplexedBlocking {
+ stdin: Box<AsyncWrite>,
+ chunks: Box<Iterator<Item=Result<Chunk, ::Error>>>,
+}
+
/// Represent the current state of the decoding of a TTY frame
enum TtyDecoderState {
/// We have yet to read a frame header
@@ -29,6 +46,23 @@ pub struct TtyDecoder {
state: TtyDecoderState,
}
+impl Chunk {
+ /// Interprets the raw bytes as a string.
+ ///
+ /// Returns `None` if the raw bytes do not represent
+ /// a valid UTF-8 string.
+ pub fn as_string(&self) -> Option<String> {
+ String::from_utf8(self.data.clone()).ok()
+ }
+
+ /// Unconditionally interprets the raw bytes as a string.
+ ///
+ /// Inserts placeholder symbols for all non-character bytes.
+ pub fn as_string_lossy(&self) -> String {
+ String::from_utf8_lossy(&self.data).into_owned()
+ }
+}
+
impl TtyDecoder {
pub fn new() -> Self {
Self {
@@ -38,7 +72,7 @@ impl TtyDecoder {
}
impl Decoder for TtyDecoder {
- type Item = TtyLine;
+ type Item = Chunk;
type Error = Error;
fn decode(
@@ -74,7 +108,7 @@ impl Decoder for TtyDecoder {
let length =
Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize;
trace!(
- "Read header: length = {}, line_type = {:?}",
+ "Read header: length = {}, stream_type = {:?}",
length,
stream_type
);
@@ -93,17 +127,133 @@ impl Decoder for TtyDecoder {
return Ok(None);
} else {
trace!("Reading payload");
- let payload = src.split_to(len);
- let data = String::from_utf8_lossy(&payload).trim().to_string();
- let tty_line = TtyLine { stream_type, data };
+ let data = src.split_to(len)[..].to_owned();
+ let tty_chunk = Chunk { stream_type, data };
// We've successfully read a full frame, now we go back to waiting for the next
// header
self.state = TtyDecoderState::WaitingHeader;
- return Ok(Some(tty_line));
+ return Ok(Some(tty_chunk));
}
}
}
}
}
}
+
+impl Multiplexed {
+ /// Create a multiplexed stream.
+ pub(crate) fn new<T>(stream: T) -> Multiplexed
+ where T: AsyncRead + AsyncWrite + 'static {
+ let (reader, stdin) = stream.split();
+ Multiplexed {
+ chunks: Box::new(chunks(reader)),
+ stdin: Box::new(stdin),
+ }
+ }
+
+ pub fn wait(self) -> MultiplexedBlocking {
+ MultiplexedBlocking {
+ stdin: self.stdin,
+ chunks: Box::new(self.chunks.wait()),
+ }
+ }
+}
+
+impl futures::Stream for Multiplexed {
+ type Item = Chunk;
+ type Error = ::Error;
+
+ fn poll(&mut self) -> Result<Async<Option<Chunk>>, ::Error> {
+ self.chunks.poll()
+ }
+}
+
+impl Iterator for MultiplexedBlocking {
+ type Item = Result<Chunk, ::Error>;
+
+ fn next(&mut self) -> Option<Result<Chunk, ::Error>> {
+ self.chunks.next()
+ }
+}
+
+macro_rules! delegate_io_write {
+ ($ty:ty) => {
+ impl io::Write for $ty {
+ fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
+ self.stdin.write(buf)
+ }
+
+ fn flush(&mut self) -> Result<(), io::Error> {
+ self.stdin.flush()
+ }
+ }
+ };
+}
+
+delegate_io_write!(Multiplexed);
+delegate_io_write!(MultiplexedBlocking);
+
+pub fn chunks<S>(stream: S) -> impl futures::Stream<Item=Chunk, Error=::Error>
+ where S: AsyncRead {
+
+ let stream = futures::stream::unfold(stream, |stream| {
+ let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]);
+
+ let fut = header_future
+ .and_then(|(stream, header_bytes)| {
+ let size_bytes = &header_bytes[4..];
+ let data_length = BigEndian::read_u32(size_bytes);
+ let stream_type = match header_bytes[0] {
+ 0 => StreamType::StdIn,
+ 1 => StreamType::StdOut,
+ 2 => StreamType::StdErr,
+ n => panic!("invalid stream number from docker daemon: '{}'", n)
+ };
+
+ ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]).map(move |(stream, data)| {
+ (Chunk { stream_type, data }, stream)
+ })
+ });
+ // FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk,
+ // stream)).
+ // This is much better because it would allow us to swallow the unexpected eof and
+ // stop the stream much cleaner than writing a custom stream filter.
+ Some(fut)
+ });
+
+ util::stop_on_err(stream, |e| e.kind() != io::ErrorKind::UnexpectedEof)
+ .map_err(|e| ::Error::from(e))
+}
+
+mod util {
+ use futures::{Stream, Async};
+
+ pub struct StopOnError<S, F> {
+ stream: S,
+ f: F,
+ }
+
+ pub fn stop_on_err<S, F>(stream: S, f: F) -> StopOnError<S, F>
+ where S: Stream,
+ F: FnMut(&S::Error) -> bool,
+ {
+ StopOnError { stream, f }
+ }
+
+ impl<S, F> Stream for StopOnError<S, F>
+ where S: Stream,
+ F: FnMut(&S::Error) -> bool,
+ {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
+ match self.stream.poll() {
+ Err(e) => if (self.f)(&e) { Err(e) } else { Ok(Async::Ready(None)) },
+ a => a,
+ }
+ }
+ }
+}
+