diff options
Diffstat (limited to 'src/transport.rs')
-rw-r--r-- | src/transport.rs | 58 |
1 files changed, 55 insertions, 3 deletions
diff --git a/src/transport.rs b/src/transport.rs index 2c62f58..55844c3 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -17,6 +17,7 @@ use hyperlocal::Uri as DomainUri; use mime::Mime; use serde_json; use std::fmt; +use tokio_io::{AsyncRead, AsyncWrite}; pub fn tar() -> Mime { "application/tar".parse().unwrap() @@ -91,7 +92,7 @@ impl Transport { B: Into<Body>, { let req = self - .build_request(method, endpoint, body) + .build_request(method, endpoint, body, |_| ()) .expect("Failed to build request!"); self.send_request(req) @@ -139,11 +140,13 @@ impl Transport { method: Method, endpoint: &str, body: Option<(B, Mime)>, + f: impl FnOnce(&mut ::http::request::Builder), ) -> Result<Request<Body>> where - B: Into<Body>, - { + B: Into<Body> { let mut builder = Request::builder(); + f(&mut builder); + let req = match *self { Transport::Tcp { ref host, .. } => { builder.method(method).uri(&format!("{}{}", host, endpoint)) @@ -182,6 +185,55 @@ impl Transport { req.map_err(Error::Hyper) } + /// Makes an HTTP request, upgrading the connection to a TCP + /// stream on success. + /// + /// This method can be used for operations such as viewing + /// docker container logs interactively. + pub fn stream_upgrade<B>( + &self, + method: Method, + endpoint: &str, + body: Option<(B, Mime)>, + ) -> impl Future<Item = impl AsyncRead + AsyncWrite, Error = Error> + where + B: Into<Body> + { + match self { + Transport::Tcp { .. } | Transport::EncryptedTcp { .. } => (), + _ => panic!("connection streaming is only supported over TCP"), + }; + + let req = self.build_request(method, endpoint, body, |builder| { + builder.header(header::CONNECTION, "Upgrade") + .header(header::UPGRADE, "tcp"); + }).expect("Failed to build request!"); + + self.send_request(req).and_then(|res| { + match res.status() { + StatusCode::SWITCHING_PROTOCOLS => Ok(res), + _ => Err(Error::ConnectionNotUpgraded), + } + }).and_then(|res| { + res.into_body() + .on_upgrade() + .from_err() + }) + } + + pub fn stream_upgrade_multiplexed<B>( + &self, + method: Method, + endpoint: &str, + body: Option<(B, Mime)>, + ) -> impl Future<Item = ::tty::Multiplexed, Error = Error> + where + B: Into<Body> + 'static + { + self.stream_upgrade(method, endpoint, body) + .map(|u| ::tty::Multiplexed::new(u)) + } + /// Extract the error message content from an HTTP response that /// contains a Docker JSON error structure. fn get_error_message(body: &str) -> Option<String> { |