diff options
author | Dylan McKay <me@dylanmckay.io> | 2018-12-22 23:29:45 +1300 |
---|---|---|
committer | doug tangren <d.tangren@gmail.com> | 2018-12-22 19:29:45 +0900 |
commit | 6b5f0c0f9ddfac9c052210c5dbf3224020646127 (patch) | |
tree | 7310447a251e66e5d061f5da00b07b6ce498fc8a /src/tty.rs | |
parent | 79d65c286025c551a775c0964d168e6feb4b3409 (diff) |
Support interactive stdin/stdout streams (#136)
* Support interactive stdin/stdout streams
This adds support for streaming stdin, stderr, and stdout independently
to a running container.
The underlying API is futures-based, meaning the code is implemented
asynchronously. A synchronous API is also exposed, which is implemented
by simply waiting on the asynchronous API futures.
This also modifies the existing Tty logic so that the storage type of
the data is a Vec<u8> rather than a String. This is also how the Rust
standard library persists data from the standard streams. In my
particular application, I'm using stdin/stdout as the communication
method between a container a host application. In it, a byte-based protocol is
used.
Streaming works by performing a TCP upgrade; upgrading a higher-level
HTTP connection to a lower-level TCP byte stream upon agreement with the
server. Docker will automatically upgrade HTTP container log requests to
TCP byte streams of a custom std{in,out,err} multiplexing protocol if
the client requests it with the 'Connection: Upgrade' header.
* Return an error rather than panic when Docker refuses to upgrade to TCP
* Add interpret-as-string accessors to tty::Chunk
Also updates the examples to use them.
Diffstat (limited to 'src/tty.rs')
-rw-r--r-- | src/tty.rs | 168 |
1 files changed, 159 insertions, 9 deletions
@@ -1,21 +1,38 @@ -use byteorder::{BigEndian, ReadBytesExt}; +use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use bytes::BytesMut; use errors::Error; use std::io::Cursor; use tokio_codec::Decoder; +use futures::{self, Async}; +use hyper::rt::{Stream, Future}; +use tokio_io::{AsyncRead, AsyncWrite}; +use std::io; + #[derive(Debug)] -pub struct TtyLine { +pub struct Chunk { pub stream_type: StreamType, - pub data: String, + pub data: Vec<u8>, } #[derive(Debug, Clone, Copy)] pub enum StreamType { + StdIn, StdOut, StdErr, } +/// A multiplexed stream. +pub struct Multiplexed { + stdin: Box<AsyncWrite>, + chunks: Box<futures::Stream<Item=Chunk, Error=::Error>>, +} + +pub struct MultiplexedBlocking { + stdin: Box<AsyncWrite>, + chunks: Box<Iterator<Item=Result<Chunk, ::Error>>>, +} + /// Represent the current state of the decoding of a TTY frame enum TtyDecoderState { /// We have yet to read a frame header @@ -29,6 +46,23 @@ pub struct TtyDecoder { state: TtyDecoderState, } +impl Chunk { + /// Interprets the raw bytes as a string. + /// + /// Returns `None` if the raw bytes do not represent + /// a valid UTF-8 string. + pub fn as_string(&self) -> Option<String> { + String::from_utf8(self.data.clone()).ok() + } + + /// Unconditionally interprets the raw bytes as a string. + /// + /// Inserts placeholder symbols for all non-character bytes. + pub fn as_string_lossy(&self) -> String { + String::from_utf8_lossy(&self.data).into_owned() + } +} + impl TtyDecoder { pub fn new() -> Self { Self { @@ -38,7 +72,7 @@ impl TtyDecoder { } impl Decoder for TtyDecoder { - type Item = TtyLine; + type Item = Chunk; type Error = Error; fn decode( @@ -74,7 +108,7 @@ impl Decoder for TtyDecoder { let length = Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize; trace!( - "Read header: length = {}, line_type = {:?}", + "Read header: length = {}, stream_type = {:?}", length, stream_type ); @@ -93,17 +127,133 @@ impl Decoder for TtyDecoder { return Ok(None); } else { trace!("Reading payload"); - let payload = src.split_to(len); - let data = String::from_utf8_lossy(&payload).trim().to_string(); - let tty_line = TtyLine { stream_type, data }; + let data = src.split_to(len)[..].to_owned(); + let tty_chunk = Chunk { stream_type, data }; // We've successfully read a full frame, now we go back to waiting for the next // header self.state = TtyDecoderState::WaitingHeader; - return Ok(Some(tty_line)); + return Ok(Some(tty_chunk)); } } } } } } + +impl Multiplexed { + /// Create a multiplexed stream. + pub(crate) fn new<T>(stream: T) -> Multiplexed + where T: AsyncRead + AsyncWrite + 'static { + let (reader, stdin) = stream.split(); + Multiplexed { + chunks: Box::new(chunks(reader)), + stdin: Box::new(stdin), + } + } + + pub fn wait(self) -> MultiplexedBlocking { + MultiplexedBlocking { + stdin: self.stdin, + chunks: Box::new(self.chunks.wait()), + } + } +} + +impl futures::Stream for Multiplexed { + type Item = Chunk; + type Error = ::Error; + + fn poll(&mut self) -> Result<Async<Option<Chunk>>, ::Error> { + self.chunks.poll() + } +} + +impl Iterator for MultiplexedBlocking { + type Item = Result<Chunk, ::Error>; + + fn next(&mut self) -> Option<Result<Chunk, ::Error>> { + self.chunks.next() + } +} + +macro_rules! delegate_io_write { + ($ty:ty) => { + impl io::Write for $ty { + fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> { + self.stdin.write(buf) + } + + fn flush(&mut self) -> Result<(), io::Error> { + self.stdin.flush() + } + } + }; +} + +delegate_io_write!(Multiplexed); +delegate_io_write!(MultiplexedBlocking); + +pub fn chunks<S>(stream: S) -> impl futures::Stream<Item=Chunk, Error=::Error> + where S: AsyncRead { + + let stream = futures::stream::unfold(stream, |stream| { + let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]); + + let fut = header_future + .and_then(|(stream, header_bytes)| { + let size_bytes = &header_bytes[4..]; + let data_length = BigEndian::read_u32(size_bytes); + let stream_type = match header_bytes[0] { + 0 => StreamType::StdIn, + 1 => StreamType::StdOut, + 2 => StreamType::StdErr, + n => panic!("invalid stream number from docker daemon: '{}'", n) + }; + + ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]).map(move |(stream, data)| { + (Chunk { stream_type, data }, stream) + }) + }); + // FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk, + // stream)). + // This is much better because it would allow us to swallow the unexpected eof and + // stop the stream much cleaner than writing a custom stream filter. + Some(fut) + }); + + util::stop_on_err(stream, |e| e.kind() != io::ErrorKind::UnexpectedEof) + .map_err(|e| ::Error::from(e)) +} + +mod util { + use futures::{Stream, Async}; + + pub struct StopOnError<S, F> { + stream: S, + f: F, + } + + pub fn stop_on_err<S, F>(stream: S, f: F) -> StopOnError<S, F> + where S: Stream, + F: FnMut(&S::Error) -> bool, + { + StopOnError { stream, f } + } + + impl<S, F> Stream for StopOnError<S, F> + where S: Stream, + F: FnMut(&S::Error) -> bool, + { + type Item = S::Item; + type Error = S::Error; + + fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> { + match self.stream.poll() { + Err(e) => if (self.f)(&e) { Err(e) } else { Ok(Async::Ready(None)) }, + a => a, + } + } + } +} + |