diff options
author | Dylan McKay <me@dylanmckay.io> | 2018-12-22 23:29:45 +1300 |
---|---|---|
committer | doug tangren <d.tangren@gmail.com> | 2018-12-22 19:29:45 +0900 |
commit | 6b5f0c0f9ddfac9c052210c5dbf3224020646127 (patch) | |
tree | 7310447a251e66e5d061f5da00b07b6ce498fc8a /src/lib.rs | |
parent | 79d65c286025c551a775c0964d168e6feb4b3409 (diff) |
Support interactive stdin/stdout streams (#136)
* Support interactive stdin/stdout streams
This adds support for streaming stdin, stderr, and stdout independently
to a running container.
The underlying API is futures-based, meaning the code is implemented
asynchronously. A synchronous API is also exposed, which is implemented
by simply waiting on the asynchronous API futures.
This also modifies the existing Tty logic so that the storage type of
the data is a Vec<u8> rather than a String. This is also how the Rust
standard library persists data from the standard streams. In my
particular application, I'm using stdin/stdout as the communication
method between a container a host application. In it, a byte-based protocol is
used.
Streaming works by performing a TCP upgrade; upgrading a higher-level
HTTP connection to a lower-level TCP byte stream upon agreement with the
server. Docker will automatically upgrade HTTP container log requests to
TCP byte streams of a custom std{in,out,err} multiplexing protocol if
the client requests it with the 'Connection: Upgrade' header.
* Return an error rather than panic when Docker refuses to upgrade to TCP
* Add interpret-as-string accessors to tty::Chunk
Also updates the examples to use them.
Diffstat (limited to 'src/lib.rs')
-rw-r--r-- | src/lib.rs | 31 |
1 files changed, 28 insertions, 3 deletions
@@ -81,7 +81,7 @@ use std::path::Path; use std::time::Duration; use tokio_codec::{FramedRead, LinesCodec}; use transport::{tar, Transport}; -use tty::{TtyDecoder, TtyLine}; +use tty::TtyDecoder; use url::form_urlencoded; /// Represents the result of all docker operations @@ -295,7 +295,7 @@ impl<'a, 'b> Container<'a, 'b> { pub fn logs( &self, opts: &LogsOptions, - ) -> impl Stream<Item = TtyLine, Error = Error> { + ) -> impl Stream<Item = tty::Chunk, Error = Error> { let mut path = vec![format!("/containers/{}/logs", self.id)]; if let Some(query) = opts.serialize() { path.push(query) @@ -307,6 +307,21 @@ impl<'a, 'b> Container<'a, 'b> { FramedRead::new(chunk_stream, decoder) } + /// Attaches to a running container, returning a stream that can + /// be used to interact with the standard IO streams. + pub fn attach(&self) + -> impl Future<Item = tty::Multiplexed, Error = Error> { + self.docker.stream_post_upgrade_multiplexed::<Body>( + &format!("/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", self.id), + None) + } + + /// Attaches to a running container, returning a stream that can + /// be used to interact with the standard IO streams. + pub fn attach_blocking(&self) -> Result<tty::MultiplexedBlocking> { + self.attach().map(|s| s.wait()).wait() + } + /// Returns a set of changes made to the container instance pub fn changes(&self) -> impl Future<Item = Vec<Change>, Error = Error> { self.docker @@ -452,7 +467,7 @@ impl<'a, 'b> Container<'a, 'b> { pub fn exec( &self, opts: &ExecContainerOptions, - ) -> impl Stream<Item = TtyLine, Error = Error> { + ) -> impl Stream<Item = tty::Chunk, Error = Error> { let data = opts.serialize().unwrap(); // TODO fixme let bytes = data.into_bytes(); let docker2 = self.docker.clone(); @@ -903,4 +918,14 @@ impl Docker { self.transport .stream_chunks::<Body>(Method::GET, endpoint, None) } + + fn stream_post_upgrade_multiplexed<B>( + &self, + endpoint: &str, + body: Option<(B, Mime)>, + ) -> impl Future<Item = tty::Multiplexed, Error = Error> + where + B: Into<Body> + 'static { + self.transport.stream_upgrade_multiplexed(Method::POST, endpoint, body) + } } |