summaryrefslogtreecommitdiffstats
path: root/src/transport.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport.rs')
-rw-r--r--src/transport.rs58
1 files changed, 55 insertions, 3 deletions
diff --git a/src/transport.rs b/src/transport.rs
index 2c62f58..55844c3 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -17,6 +17,7 @@ use hyperlocal::Uri as DomainUri;
use mime::Mime;
use serde_json;
use std::fmt;
+use tokio_io::{AsyncRead, AsyncWrite};
pub fn tar() -> Mime {
"application/tar".parse().unwrap()
@@ -91,7 +92,7 @@ impl Transport {
B: Into<Body>,
{
let req = self
- .build_request(method, endpoint, body)
+ .build_request(method, endpoint, body, |_| ())
.expect("Failed to build request!");
self.send_request(req)
@@ -139,11 +140,13 @@ impl Transport {
method: Method,
endpoint: &str,
body: Option<(B, Mime)>,
+ f: impl FnOnce(&mut ::http::request::Builder),
) -> Result<Request<Body>>
where
- B: Into<Body>,
- {
+ B: Into<Body> {
let mut builder = Request::builder();
+ f(&mut builder);
+
let req = match *self {
Transport::Tcp { ref host, .. } => {
builder.method(method).uri(&format!("{}{}", host, endpoint))
@@ -182,6 +185,55 @@ impl Transport {
req.map_err(Error::Hyper)
}
+ /// Makes an HTTP request, upgrading the connection to a TCP
+ /// stream on success.
+ ///
+ /// This method can be used for operations such as viewing
+ /// docker container logs interactively.
+ pub fn stream_upgrade<B>(
+ &self,
+ method: Method,
+ endpoint: &str,
+ body: Option<(B, Mime)>,
+ ) -> impl Future<Item = impl AsyncRead + AsyncWrite, Error = Error>
+ where
+ B: Into<Body>
+ {
+ match self {
+ Transport::Tcp { .. } | Transport::EncryptedTcp { .. } => (),
+ _ => panic!("connection streaming is only supported over TCP"),
+ };
+
+ let req = self.build_request(method, endpoint, body, |builder| {
+ builder.header(header::CONNECTION, "Upgrade")
+ .header(header::UPGRADE, "tcp");
+ }).expect("Failed to build request!");
+
+ self.send_request(req).and_then(|res| {
+ match res.status() {
+ StatusCode::SWITCHING_PROTOCOLS => Ok(res),
+ _ => Err(Error::ConnectionNotUpgraded),
+ }
+ }).and_then(|res| {
+ res.into_body()
+ .on_upgrade()
+ .from_err()
+ })
+ }
+
+ pub fn stream_upgrade_multiplexed<B>(
+ &self,
+ method: Method,
+ endpoint: &str,
+ body: Option<(B, Mime)>,
+ ) -> impl Future<Item = ::tty::Multiplexed, Error = Error>
+ where
+ B: Into<Body> + 'static
+ {
+ self.stream_upgrade(method, endpoint, body)
+ .map(|u| ::tty::Multiplexed::new(u))
+ }
+
/// Extract the error message content from an HTTP response that
/// contains a Docker JSON error structure.
fn get_error_message(body: &str) -> Option<String> {