summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDylan McKay <me@dylanmckay.io>2018-12-22 23:29:45 +1300
committerdoug tangren <d.tangren@gmail.com>2018-12-22 19:29:45 +0900
commit6b5f0c0f9ddfac9c052210c5dbf3224020646127 (patch)
tree7310447a251e66e5d061f5da00b07b6ce498fc8a
parent79d65c286025c551a775c0964d168e6feb4b3409 (diff)
Support interactive stdin/stdout streams (#136)
* Support interactive stdin/stdout streams This adds support for streaming stdin, stderr, and stdout independently to a running container. The underlying API is futures-based, meaning the code is implemented asynchronously. A synchronous API is also exposed, which is implemented by simply waiting on the asynchronous API futures. This also modifies the existing Tty logic so that the storage type of the data is a Vec<u8> rather than a String. This is also how the Rust standard library persists data from the standard streams. In my particular application, I'm using stdin/stdout as the communication method between a container a host application. In it, a byte-based protocol is used. Streaming works by performing a TCP upgrade; upgrading a higher-level HTTP connection to a lower-level TCP byte stream upon agreement with the server. Docker will automatically upgrade HTTP container log requests to TCP byte streams of a custom std{in,out,err} multiplexing protocol if the client requests it with the 'Connection: Upgrade' header. * Return an error rather than panic when Docker refuses to upgrade to TCP * Add interpret-as-string accessors to tty::Chunk Also updates the examples to use them.
-rw-r--r--examples/containerexec.rs9
-rw-r--r--examples/logs.rs9
-rw-r--r--src/builder.rs25
-rw-r--r--src/errors.rs2
-rw-r--r--src/lib.rs31
-rw-r--r--src/transport.rs58
-rw-r--r--src/tty.rs168
7 files changed, 279 insertions, 23 deletions
diff --git a/examples/containerexec.rs b/examples/containerexec.rs
index 0b3575b..0bfd640 100644
--- a/examples/containerexec.rs
+++ b/examples/containerexec.rs
@@ -25,10 +25,11 @@ fn main() {
.containers()
.get(&id)
.exec(&options)
- .for_each(|line| {
- match line.stream_type {
- StreamType::StdOut => println!("Stdout: {}", line.data),
- StreamType::StdErr => eprintln!("Stderr: {}", line.data),
+ .for_each(|chunk| {
+ match chunk.stream_type {
+ StreamType::StdOut => println!("Stdout: {}", chunk.as_string_lossy()),
+ StreamType::StdErr => eprintln!("Stderr: {}", chunk.as_string_lossy()),
+ StreamType::StdIn => unreachable!(),
}
Ok(())
})
diff --git a/examples/logs.rs b/examples/logs.rs
index b35b0a3..6fad7d6 100644
--- a/examples/logs.rs
+++ b/examples/logs.rs
@@ -14,10 +14,11 @@ fn main() {
.containers()
.get(&id)
.logs(&LogsOptions::builder().stdout(true).stderr(true).build())
- .for_each(|line| {
- match line.stream_type {
- StreamType::StdOut => println!("Stdout: {}", line.data),
- StreamType::StdErr => eprintln!("Stderr: {}", line.data),
+ .for_each(|chunk| {
+ match chunk.stream_type {
+ StreamType::StdOut => println!("Stdout: {}", chunk.as_string_lossy()),
+ StreamType::StdErr => eprintln!("Stderr: {}", chunk.as_string_lossy()),
+ StreamType::StdIn => unreachable!(),
}
Ok(())
})
diff --git a/src/builder.rs b/src/builder.rs
index 27a92e4..ceabc28 100644
--- a/src/builder.rs
+++ b/src/builder.rs
@@ -489,6 +489,31 @@ impl ContainerOptionsBuilder {
self
}
+ /// Whether to attach to `stdin`.
+ pub fn attach_stdin(&mut self, attach: bool) -> &mut Self {
+ self.params.insert("AttachStdin", json!(attach));
+ self.params.insert("OpenStdin", json!(attach));
+ self
+ }
+
+ /// Whether to attach to `stdout`.
+ pub fn attach_stdout(&mut self, attach: bool) -> &mut Self {
+ self.params.insert("AttachStdout", json!(attach));
+ self
+ }
+
+ /// Whether to attach to `stderr`.
+ pub fn attach_stderr(&mut self, attach: bool) -> &mut Self {
+ self.params.insert("AttachStderr", json!(attach));
+ self
+ }
+
+ /// Whether standard streams should be attached to a TTY.
+ pub fn tty(&mut self, tty: bool) -> &mut Self {
+ self.params.insert("Tty", json!(tty));
+ self
+ }
+
pub fn extra_hosts(
&mut self,
hosts: Vec<&str>,
diff --git a/src/errors.rs b/src/errors.rs
index 36be662..fa408c2 100644
--- a/src/errors.rs
+++ b/src/errors.rs
@@ -17,6 +17,7 @@ pub enum Error {
Encoding(FromUtf8Error),
InvalidResponse(String),
Fault { code: StatusCode, message: String },
+ ConnectionNotUpgraded,
}
impl From<SerdeError> for Error {
@@ -59,6 +60,7 @@ impl fmt::Display for Error {
write!(f, "Response doesn't have the expected format: {}", cause)
}
Error::Fault { code, .. } => write!(f, "{}", code),
+ Error::ConnectionNotUpgraded => write!(f, "expected the docker host to upgrade the HTTP connection but it did not"),
}
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 19f0cff..2c7c0da 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -81,7 +81,7 @@ use std::path::Path;
use std::time::Duration;
use tokio_codec::{FramedRead, LinesCodec};
use transport::{tar, Transport};
-use tty::{TtyDecoder, TtyLine};
+use tty::TtyDecoder;
use url::form_urlencoded;
/// Represents the result of all docker operations
@@ -295,7 +295,7 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn logs(
&self,
opts: &LogsOptions,
- ) -> impl Stream<Item = TtyLine, Error = Error> {
+ ) -> impl Stream<Item = tty::Chunk, Error = Error> {
let mut path = vec![format!("/containers/{}/logs", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
@@ -307,6 +307,21 @@ impl<'a, 'b> Container<'a, 'b> {
FramedRead::new(chunk_stream, decoder)
}
+ /// Attaches to a running container, returning a stream that can
+ /// be used to interact with the standard IO streams.
+ pub fn attach(&self)
+ -> impl Future<Item = tty::Multiplexed, Error = Error> {
+ self.docker.stream_post_upgrade_multiplexed::<Body>(
+ &format!("/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", self.id),
+ None)
+ }
+
+ /// Attaches to a running container, returning a stream that can
+ /// be used to interact with the standard IO streams.
+ pub fn attach_blocking(&self) -> Result<tty::MultiplexedBlocking> {
+ self.attach().map(|s| s.wait()).wait()
+ }
+
/// Returns a set of changes made to the container instance
pub fn changes(&self) -> impl Future<Item = Vec<Change>, Error = Error> {
self.docker
@@ -452,7 +467,7 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn exec(
&self,
opts: &ExecContainerOptions,
- ) -> impl Stream<Item = TtyLine, Error = Error> {
+ ) -> impl Stream<Item = tty::Chunk, Error = Error> {
let data = opts.serialize().unwrap(); // TODO fixme
let bytes = data.into_bytes();
let docker2 = self.docker.clone();
@@ -903,4 +918,14 @@ impl Docker {
self.transport
.stream_chunks::<Body>(Method::GET, endpoint, None)
}
+
+ fn stream_post_upgrade_multiplexed<B>(
+ &self,
+ endpoint: &str,
+ body: Option<(B, Mime)>,
+ ) -> impl Future<Item = tty::Multiplexed, Error = Error>
+ where
+ B: Into<Body> + 'static {
+ self.transport.stream_upgrade_multiplexed(Method::POST, endpoint, body)
+ }
}
diff --git a/src/transport.rs b/src/transport.rs
index 2c62f58..55844c3 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -17,6 +17,7 @@ use hyperlocal::Uri as DomainUri;
use mime::Mime;
use serde_json;
use std::fmt;
+use tokio_io::{AsyncRead, AsyncWrite};
pub fn tar() -> Mime {
"application/tar".parse().unwrap()
@@ -91,7 +92,7 @@ impl Transport {
B: Into<Body>,
{
let req = self
- .build_request(method, endpoint, body)
+ .build_request(method, endpoint, body, |_| ())
.expect("Failed to build request!");
self.send_request(req)
@@ -139,11 +140,13 @@ impl Transport {
method: Method,
endpoint: &str,
body: Option<(B, Mime)>,
+ f: impl FnOnce(&mut ::http::request::Builder),
) -> Result<Request<Body>>
where
- B: Into<Body>,
- {
+ B: Into<Body> {
let mut builder = Request::builder();
+ f(&mut builder);
+
let req = match *self {
Transport::Tcp { ref host, .. } => {
builder.method(method).uri(&format!("{}{}", host, endpoint))
@@ -182,6 +185,55 @@ impl Transport {
req.map_err(Error::Hyper)
}
+ /// Makes an HTTP request, upgrading the connection to a TCP
+ /// stream on success.
+ ///
+ /// This method can be used for operations such as viewing
+ /// docker container logs interactively.
+ pub fn stream_upgrade<B>(
+ &self,
+ method: Method,
+ endpoint: &str,
+ body: Option<(B, Mime)>,
+ ) -> impl Future<Item = impl AsyncRead + AsyncWrite, Error = Error>
+ where
+ B: Into<Body>
+ {
+ match self {
+ Transport::Tcp { .. } | Transport::EncryptedTcp { .. } => (),
+ _ => panic!("connection streaming is only supported over TCP"),
+ };
+
+ let req = self.build_request(method, endpoint, body, |builder| {
+ builder.header(header::CONNECTION, "Upgrade")
+ .header(header::UPGRADE, "tcp");
+ }).expect("Failed to build request!");
+
+ self.send_request(req).and_then(|res| {
+ match res.status() {
+ StatusCode::SWITCHING_PROTOCOLS => Ok(res),
+ _ => Err(Error::ConnectionNotUpgraded),
+ }
+ }).and_then(|res| {
+ res.into_body()
+ .on_upgrade()
+ .from_err()
+ })
+ }
+
+ pub fn stream_upgrade_multiplexed<B>(
+ &self,
+ method: Method,
+ endpoint: &str,
+ body: Option<(B, Mime)>,
+ ) -> impl Future<Item = ::tty::Multiplexed, Error = Error>
+ where
+ B: Into<Body> + 'static
+ {
+ self.stream_upgrade(method, endpoint, body)
+ .map(|u| ::tty::Multiplexed::new(u))
+ }
+
/// Extract the error message content from an HTTP response that
/// contains a Docker JSON error structure.
fn get_error_message(body: &str) -> Option<String> {
diff --git a/src/tty.rs b/src/tty.rs
index 96c72b5..a1c1865 100644
--- a/src/tty.rs
+++ b/src/tty.rs
@@ -1,21 +1,38 @@
-use byteorder::{BigEndian, ReadBytesExt};
+use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::BytesMut;
use errors::Error;
use std::io::Cursor;
use tokio_codec::Decoder;
+use futures::{self, Async};
+use hyper::rt::{Stream, Future};
+use tokio_io::{AsyncRead, AsyncWrite};
+use std::io;
+
#[derive(Debug)]
-pub struct TtyLine {
+pub struct Chunk {
pub stream_type: StreamType,
- pub data: String,
+ pub data: Vec<u8>,
}
#[derive(Debug, Clone, Copy)]
pub enum StreamType {
+ StdIn,
StdOut,
StdErr,
}
+/// A multiplexed stream.
+pub struct Multiplexed {
+ stdin: Box<AsyncWrite>,
+ chunks: Box<futures::Stream<Item=Chunk, Error=::Error>>,
+}
+
+pub struct MultiplexedBlocking {
+ stdin: Box<AsyncWrite>,
+ chunks: Box<Iterator<Item=Result<Chunk, ::Error>>>,
+}
+
/// Represent the current state of the decoding of a TTY frame
enum TtyDecoderState {
/// We have yet to read a frame header
@@ -29,6 +46,23 @@ pub struct TtyDecoder {
state: TtyDecoderState,
}
+impl Chunk {
+ /// Interprets the raw bytes as a string.
+ ///
+ /// Returns `None` if the raw bytes do not represent
+ /// a valid UTF-8 string.
+ pub fn as_string(&self) -> Option<String> {
+ String::from_utf8(self.data.clone()).ok()
+ }
+
+ /// Unconditionally interprets the raw bytes as a string.
+ ///
+ /// Inserts placeholder symbols for all non-character bytes.
+ pub fn as_string_lossy(&self) -> String {
+ String::from_utf8_lossy(&self.data).into_owned()
+ }
+}
+
impl TtyDecoder {
pub fn new() -> Self {
Self {
@@ -38,7 +72,7 @@ impl TtyDecoder {
}
impl Decoder for TtyDecoder {
- type Item = TtyLine;
+ type Item = Chunk;
type Error = Error;
fn decode(
@@ -74,7 +108,7 @@ impl Decoder for TtyDecoder {
let length =
Cursor::new(&payload_size).read_u32::<BigEndian>().unwrap() as usize;
trace!(
- "Read header: length = {}, line_type = {:?}",
+ "Read header: length = {}, stream_type = {:?}",
length,
stream_type
);
@@ -93,17 +127,133 @@ impl Decoder for TtyDecoder {
return Ok(None);
} else {
trace!("Reading payload");
- let payload = src.split_to(len);
- let data = String::from_utf8_lossy(&payload).trim().to_string();
- let tty_line = TtyLine { stream_type, data };
+ let data = src.split_to(len)[..].to_owned();
+ let tty_chunk = Chunk { stream_type, data };
// We've successfully read a full frame, now we go back to waiting for the next
// header
self.state = TtyDecoderState::WaitingHeader;
- return Ok(Some(tty_line));
+ return Ok(Some(tty_chunk));
}
}
}
}
}
}
+
+impl Multiplexed {
+ /// Create a multiplexed stream.
+ pub(crate) fn new<T>(stream: T) -> Multiplexed
+ where T: AsyncRead + AsyncWrite + 'static {
+ let (reader, stdin) = stream.split();
+ Multiplexed {
+ chunks: Box::new(chunks(reader)),
+ stdin: Box::new(stdin),
+ }
+ }
+
+ pub fn wait(self) -> MultiplexedBlocking {
+ MultiplexedBlocking {
+ stdin: self.stdin,
+ chunks: Box::new(self.chunks.wait()),
+ }
+ }
+}
+
+impl futures::Stream for Multiplexed {
+ type Item = Chunk;
+ type Error = ::Error;
+
+ fn poll(&mut self) -> Result<Async<Option<Chunk>>, ::Error> {
+ self.chunks.poll()
+ }
+}
+
+impl Iterator for MultiplexedBlocking {
+ type Item = Result<Chunk, ::Error>;
+
+ fn next(&mut self) -> Option<Result<Chunk, ::Error>> {
+ self.chunks.next()
+ }
+}
+
+macro_rules! delegate_io_write {
+ ($ty:ty) => {
+ impl io::Write for $ty {
+ fn write(&mut self, buf: &[u8]) -> Result<usize, io::Error> {
+ self.stdin.write(buf)
+ }
+
+ fn flush(&mut self) -> Result<(), io::Error> {
+ self.stdin.flush()
+ }
+ }
+ };
+}
+
+delegate_io_write!(Multiplexed);
+delegate_io_write!(MultiplexedBlocking);
+
+pub fn chunks<S>(stream: S) -> impl futures::Stream<Item=Chunk, Error=::Error>
+ where S: AsyncRead {
+
+ let stream = futures::stream::unfold(stream, |stream| {
+ let header_future = ::tokio_io::io::read_exact(stream, vec![0; 8]);
+
+ let fut = header_future
+ .and_then(|(stream, header_bytes)| {
+ let size_bytes = &header_bytes[4..];
+ let data_length = BigEndian::read_u32(size_bytes);
+ let stream_type = match header_bytes[0] {
+ 0 => StreamType::StdIn,
+ 1 => StreamType::StdOut,
+ 2 => StreamType::StdErr,
+ n => panic!("invalid stream number from docker daemon: '{}'", n)
+ };
+
+ ::tokio_io::io::read_exact(stream, vec![0; data_length as usize]).map(move |(stream, data)| {
+ (Chunk { stream_type, data }, stream)
+ })
+ });
+ // FIXME: when updated to futures 0.2, the future itself returns the Option((Chunk,
+ // stream)).
+ // This is much better because it would allow us to swallow the unexpected eof and
+ // stop the stream much cleaner than writing a custom stream filter.
+ Some(fut)
+ });
+
+ util::stop_on_err(stream, |e| e.kind() != io::ErrorKind::UnexpectedEof)
+ .map_err(|e| ::Error::from(e))
+}
+
+mod util {
+ use futures::{Stream, Async};
+
+ pub struct StopOnError<S, F> {
+ stream: S,
+ f: F,
+ }
+
+ pub fn stop_on_err<S, F>(stream: S, f: F) -> StopOnError<S, F>
+ where S: Stream,
+ F: FnMut(&S::Error) -> bool,
+ {
+ StopOnError { stream, f }
+ }
+
+ impl<S, F> Stream for StopOnError<S, F>
+ where S: Stream,
+ F: FnMut(&S::Error) -> bool,
+ {
+ type Item = S::Item;
+ type Error = S::Error;
+
+ fn poll(&mut self) -> Result<Async<Option<S::Item>>, S::Error> {
+ match self.stream.poll() {
+ Err(e) => if (self.f)(&e) { Err(e) } else { Ok(Async::Ready(None)) },
+ a => a,
+ }
+ }
+ }
+}
+