summaryrefslogtreecommitdiffstats
path: root/src/transport.rs
diff options
context:
space:
mode:
authorDylan McKay <me@dylanmckay.io>2018-12-22 23:29:45 +1300
committerdoug tangren <d.tangren@gmail.com>2018-12-22 19:29:45 +0900
commit6b5f0c0f9ddfac9c052210c5dbf3224020646127 (patch)
tree7310447a251e66e5d061f5da00b07b6ce498fc8a /src/transport.rs
parent79d65c286025c551a775c0964d168e6feb4b3409 (diff)
Support interactive stdin/stdout streams (#136)
* Support interactive stdin/stdout streams This adds support for streaming stdin, stderr, and stdout independently to a running container. The underlying API is futures-based, meaning the code is implemented asynchronously. A synchronous API is also exposed, which is implemented by simply waiting on the asynchronous API futures. This also modifies the existing Tty logic so that the storage type of the data is a Vec<u8> rather than a String. This is also how the Rust standard library persists data from the standard streams. In my particular application, I'm using stdin/stdout as the communication method between a container a host application. In it, a byte-based protocol is used. Streaming works by performing a TCP upgrade; upgrading a higher-level HTTP connection to a lower-level TCP byte stream upon agreement with the server. Docker will automatically upgrade HTTP container log requests to TCP byte streams of a custom std{in,out,err} multiplexing protocol if the client requests it with the 'Connection: Upgrade' header. * Return an error rather than panic when Docker refuses to upgrade to TCP * Add interpret-as-string accessors to tty::Chunk Also updates the examples to use them.
Diffstat (limited to 'src/transport.rs')
-rw-r--r--src/transport.rs58
1 files changed, 55 insertions, 3 deletions
diff --git a/src/transport.rs b/src/transport.rs
index 2c62f58..55844c3 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -17,6 +17,7 @@ use hyperlocal::Uri as DomainUri;
use mime::Mime;
use serde_json;
use std::fmt;
+use tokio_io::{AsyncRead, AsyncWrite};
pub fn tar() -> Mime {
"application/tar".parse().unwrap()
@@ -91,7 +92,7 @@ impl Transport {
B: Into<Body>,
{
let req = self
- .build_request(method, endpoint, body)
+ .build_request(method, endpoint, body, |_| ())
.expect("Failed to build request!");
self.send_request(req)
@@ -139,11 +140,13 @@ impl Transport {
method: Method,
endpoint: &str,
body: Option<(B, Mime)>,
+ f: impl FnOnce(&mut ::http::request::Builder),
) -> Result<Request<Body>>
where
- B: Into<Body>,
- {
+ B: Into<Body> {
let mut builder = Request::builder();
+ f(&mut builder);
+
let req = match *self {
Transport::Tcp { ref host, .. } => {
builder.method(method).uri(&format!("{}{}", host, endpoint))
@@ -182,6 +185,55 @@ impl Transport {
req.map_err(Error::Hyper)
}
+ /// Makes an HTTP request, upgrading the connection to a TCP
+ /// stream on success.
+ ///
+ /// This method can be used for operations such as viewing
+ /// docker container logs interactively.
+ pub fn stream_upgrade<B>(
+ &self,
+ method: Method,
+ endpoint: &str,
+ body: Option<(B, Mime)>,
+ ) -> impl Future<Item = impl AsyncRead + AsyncWrite, Error = Error>
+ where
+ B: Into<Body>
+ {
+ match self {
+ Transport::Tcp { .. } | Transport::EncryptedTcp { .. } => (),
+ _ => panic!("connection streaming is only supported over TCP"),
+ };
+
+ let req = self.build_request(method, endpoint, body, |builder| {
+ builder.header(header::CONNECTION, "Upgrade")
+ .header(header::UPGRADE, "tcp");
+ }).expect("Failed to build request!");
+
+ self.send_request(req).and_then(|res| {
+ match res.status() {
+ StatusCode::SWITCHING_PROTOCOLS => Ok(res),
+ _ => Err(Error::ConnectionNotUpgraded),
+ }
+ }).and_then(|res| {
+ res.into_body()
+ .on_upgrade()
+ .from_err()
+ })
+ }
+
+ pub fn stream_upgrade_multiplexed<B>(
+ &self,
+ method: Method,
+ endpoint: &str,
+ body: Option<(B, Mime)>,
+ ) -> impl Future<Item = ::tty::Multiplexed, Error = Error>
+ where
+ B: Into<Body> + 'static
+ {
+ self.stream_upgrade(method, endpoint, body)
+ .map(|u| ::tty::Multiplexed::new(u))
+ }
+
/// Extract the error message content from an HTTP response that
/// contains a Docker JSON error structure.
fn get_error_message(body: &str) -> Option<String> {