summaryrefslogtreecommitdiffstats
path: root/src/transport.rs
diff options
context:
space:
mode:
authorDylan McKay <me@dylanmckay.io>2018-08-06 19:17:22 +1200
committerDylan McKay <me@dylanmckay.io>2018-08-07 19:14:37 +1200
commit1b8e837ba25fd3eb4a042bb41083804de7c8c283 (patch)
treeb03cec3b9d27c57780e5a3f9aea24668d8c99931 /src/transport.rs
parent52ece44740f8ea80d11f269be99706d6e2a4bd13 (diff)
Update to Hyper 0.12
Diffstat (limited to 'src/transport.rs')
-rw-r--r--src/transport.rs205
1 files changed, 120 insertions, 85 deletions
diff --git a/src/transport.rs b/src/transport.rs
index 2aa9599..8ba1586 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -1,57 +1,59 @@
//! Transports for communicating with the docker daemon
extern crate hyper;
+extern crate tokio_core;
-use self::hyper::buffer::BufReader;
-use self::hyper::header::ContentType;
-use self::hyper::status::StatusCode;
use self::super::{Error, Result};
-use hyper::Client;
-use hyper::client::Body;
-use hyper::client::response::Response;
+use hyper::Body;
+use hyper::client::{Client, HttpConnector};
+use hyper::{Method, Request, Response, StatusCode};
use hyper::header;
-use hyper::method::Method;
-use hyper::mime;
-use hyperlocal::DomainUrl;
+use hyper_openssl::HttpsConnector;
+use hyper::rt::{Future, Stream};
+use hyperlocal::Uri as DomainUri;
+use hyperlocal::UnixConnector;
+use mime::Mime;
use rustc_serialize::json;
use std::fmt;
-use std::io::Read;
-
-pub fn tar() -> ContentType {
- ContentType(mime::Mime(
- mime::TopLevel::Application,
- mime::SubLevel::Ext(String::from("tar")),
- vec![],
- ))
+use std::io::{BufReader, Cursor};
+use std::io::{Read, Write};
+
+trait InteractiveStream : Read + Write { }
+
+pub fn tar() -> Mime {
+ "application/tar".parse().unwrap()
}
/// Transports are types which define the means of communication
/// with the docker daemon
pub enum Transport {
/// A network tcp interface
- Tcp { client: Client, host: String },
+ Tcp { client: Client<HttpConnector>, host: String },
+ /// TCP/TLS.
+ EncryptedTcp { client: Client<HttpsConnector<HttpConnector>>, host: String },
/// A Unix domain socket
- Unix { client: Client, path: String },
+ Unix { client: Client<UnixConnector>, path: String },
}
impl fmt::Debug for Transport {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Transport::Tcp { ref host, .. } => write!(f, "Tcp({})", host),
+ Transport::EncryptedTcp { ref host, .. } => write!(f, "EncryptedTcp({})", host),
Transport::Unix { ref path, .. } => write!(f, "Unix({})", path),
}
}
}
impl Transport {
- pub fn request<'a, B>(
- &'a self,
+ pub fn request<B>(
+ &self,
method: Method,
endpoint: &str,
- body: Option<(B, ContentType)>,
+ body: Option<(B, Mime)>,
) -> Result<String>
where
- B: Into<Body<'a>>,
+ B: Into<Body>,
{
let mut res = self.stream(method, endpoint, body)?;
let mut body = String::new();
@@ -60,95 +62,110 @@ impl Transport {
Ok(body)
}
- pub fn stream<'c, B>(
- &'c self,
+ /// Builds an HTTP request.
+ fn build_request<B>(
+ &self,
method: Method,
endpoint: &str,
- body: Option<(B, ContentType)>,
- ) -> Result<Box<Read>>
+ body: Option<(B, Mime)>,
+ ) -> Result<Request<Body>>
where
- B: Into<Body<'c>>,
+ B: Into<Body>,
{
- let headers = {
- let mut headers = header::Headers::new();
- headers.set(header::Host {
- hostname: "".to_owned(),
- port: None,
- });
- headers
- };
+ let mut builder = Request::builder();
let req = match *self {
Transport::Tcp {
- ref client,
- ref host,
- } => client.request(method, &format!("{}{}", host, endpoint)[..]),
+ ref host, ..
+ } => builder.method(method).uri(&format!("{}{}", host, endpoint)),
+ Transport::EncryptedTcp {
+ ref host, ..
+ } => builder.method(method).uri(&format!("{}{}", host, endpoint)),
Transport::Unix {
- ref client,
- ref path,
- } => client.request(method, DomainUrl::new(&path, endpoint)),
- }.headers(headers);
-
- let embodied = match body {
- Some((b, c)) => req.header(c).body(b),
- _ => req,
+ ref path, ..
+ } => {
+ let uri: hyper::Uri = DomainUri::new(&path, endpoint).into();
+ builder.method(method).uri(&uri.to_string())
+ },
};
- let mut res = embodied.send()?;
- match res.status {
- StatusCode::Ok |
- StatusCode::Created |
- StatusCode::SwitchingProtocols => Ok(Box::new(res)),
- StatusCode::NoContent => Ok(
+ let req = req.header(header::HOST, "");
+
+ match body {
+ Some((b, c)) => Ok(req.header(header::CONTENT_TYPE, &c.to_string()[..]).body(b.into())?),
+ _ => Ok(req.body(Body::empty())?),
+ }
+ }
+
+ pub fn stream<B>(
+ &self,
+ method: Method,
+ endpoint: &str,
+ body: Option<(B, Mime)>,
+ ) -> Result<Box<Read>>
+ where
+ B: Into<Body>,
+ {
+ let req = self.build_request(method, endpoint, body)?;
+ let res = self.send_request(req)?;
+
+ match res.status() {
+ StatusCode::OK |
+ StatusCode::CREATED |
+ StatusCode::SWITCHING_PROTOCOLS => {
+ let chunk = res.into_body().concat2().wait()?;
+ Ok(Box::new(Cursor::new(chunk.into_iter().collect::<Vec<u8>>())))
+ },
+ StatusCode::NO_CONTENT => Ok(
Box::new(BufReader::new("".as_bytes())),
),
// todo: constantize these
- StatusCode::BadRequest => {
+ StatusCode::BAD_REQUEST => {
Err(Error::Fault {
- code: res.status,
- message: get_error_message(&mut res).unwrap_or(
+ code: res.status(),
+ message: get_error_message(res).unwrap_or(
"bad parameter"
.to_owned(),
),
})
}
- StatusCode::NotFound => {
+ StatusCode::NOT_FOUND => {
Err(Error::Fault {
- code: res.status,
- message: get_error_message(&mut res).unwrap_or(
+ code: res.status(),
+ message: get_error_message(res).unwrap_or(
"not found".to_owned(),
),
})
}
- StatusCode::NotModified => {
+ StatusCode::NOT_MODIFIED => {
Err(Error::Fault {
- code: res.status,
- message: get_error_message(&mut res).unwrap_or(
+ code: res.status(),
+ message: get_error_message(res).unwrap_or(
"not modified"
.to_owned(),
),
})
},
- StatusCode::NotAcceptable => {
+ StatusCode::NOT_ACCEPTABLE => {
Err(Error::Fault {
- code: res.status,
- message: get_error_message(&mut res).unwrap_or(
+ code: res.status(),
+ message: get_error_message(res).unwrap_or(
"not acceptable"
.to_owned(),
),
})
}
- StatusCode::Conflict => {
+ StatusCode::CONFLICT => {
Err(Error::Fault {
- code: res.status,
- message: get_error_message(&mut res).unwrap_or(
+ code: res.status(),
+ message: get_error_message(res).unwrap_or(
"conflict found"
.to_owned(),
),
})
}
- StatusCode::InternalServerError => {
+ StatusCode::INTERNAL_SERVER_ERROR => {
Err(Error::Fault {
- code: res.status,
- message: get_error_message(&mut res).unwrap_or(
+ code: res.status(),
+ message: get_error_message(res).unwrap_or(
"internal server error"
.to_owned(),
),
@@ -157,23 +174,41 @@ impl Transport {
_ => unreachable!(),
}
}
+
+ fn send_request(&self, req: Request<hyper::Body>) -> Result<hyper::Response<Body>> {
+ use self::tokio_core::reactor;
+
+ let mut core = reactor::Core::new().unwrap();
+ Ok(core.run(match self {
+ Transport::Tcp { ref client, .. } => client.request(req),
+ Transport::EncryptedTcp { ref client, .. } => client.request(req),
+ Transport::Unix { ref client, .. } => client.request(req),
+ })?)
+ }
}
/// Extract the error message content from an HTTP response that
/// contains a Docker JSON error structure.
-fn get_error_message(res: &mut Response) -> Option<String> {
- let mut output = String::new();
- if res.read_to_string(&mut output).is_ok() {
- let json_response = json::Json::from_str(output.as_str()).ok();
- let message = json_response
- .as_ref()
- .and_then(|x| x.as_object())
- .and_then(|x| x.get("message"))
- .and_then(|x| x.as_string())
- .map(|x| x.to_owned());
-
- message
- } else {
- None
+fn get_error_message(res: Response<Body>) -> Option<String> {
+ let chunk = match res.into_body().concat2().wait() {
+ Ok(c) => c,
+ Err(..) => return None,
+ };
+
+ match String::from_utf8(chunk.into_iter().collect()) {
+ Ok(output) => {
+ let json_response = json::Json::from_str(output.as_str()).ok();
+ let message = json_response
+ .as_ref()
+ .and_then(|x| x.as_object())
+ .and_then(|x| x.get("message"))
+ .and_then(|x| x.as_string())
+ .map(|x| x.to_owned());
+
+ message
+ }
+ Err(..) => {
+ None
+ },
}
}