diff options
author | Dylan McKay <me@dylanmckay.io> | 2018-08-06 19:17:22 +1200 |
---|---|---|
committer | Dylan McKay <me@dylanmckay.io> | 2018-08-07 19:14:37 +1200 |
commit | 1b8e837ba25fd3eb4a042bb41083804de7c8c283 (patch) | |
tree | b03cec3b9d27c57780e5a3f9aea24668d8c99931 /src/transport.rs | |
parent | 52ece44740f8ea80d11f269be99706d6e2a4bd13 (diff) |
Update to Hyper 0.12
Diffstat (limited to 'src/transport.rs')
-rw-r--r-- | src/transport.rs | 205 |
1 files changed, 120 insertions, 85 deletions
diff --git a/src/transport.rs b/src/transport.rs index 2aa9599..8ba1586 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,57 +1,59 @@ //! Transports for communicating with the docker daemon extern crate hyper; +extern crate tokio_core; -use self::hyper::buffer::BufReader; -use self::hyper::header::ContentType; -use self::hyper::status::StatusCode; use self::super::{Error, Result}; -use hyper::Client; -use hyper::client::Body; -use hyper::client::response::Response; +use hyper::Body; +use hyper::client::{Client, HttpConnector}; +use hyper::{Method, Request, Response, StatusCode}; use hyper::header; -use hyper::method::Method; -use hyper::mime; -use hyperlocal::DomainUrl; +use hyper_openssl::HttpsConnector; +use hyper::rt::{Future, Stream}; +use hyperlocal::Uri as DomainUri; +use hyperlocal::UnixConnector; +use mime::Mime; use rustc_serialize::json; use std::fmt; -use std::io::Read; - -pub fn tar() -> ContentType { - ContentType(mime::Mime( - mime::TopLevel::Application, - mime::SubLevel::Ext(String::from("tar")), - vec![], - )) +use std::io::{BufReader, Cursor}; +use std::io::{Read, Write}; + +trait InteractiveStream : Read + Write { } + +pub fn tar() -> Mime { + "application/tar".parse().unwrap() } /// Transports are types which define the means of communication /// with the docker daemon pub enum Transport { /// A network tcp interface - Tcp { client: Client, host: String }, + Tcp { client: Client<HttpConnector>, host: String }, + /// TCP/TLS. + EncryptedTcp { client: Client<HttpsConnector<HttpConnector>>, host: String }, /// A Unix domain socket - Unix { client: Client, path: String }, + Unix { client: Client<UnixConnector>, path: String }, } impl fmt::Debug for Transport { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { Transport::Tcp { ref host, .. } => write!(f, "Tcp({})", host), + Transport::EncryptedTcp { ref host, .. } => write!(f, "EncryptedTcp({})", host), Transport::Unix { ref path, .. } => write!(f, "Unix({})", path), } } } impl Transport { - pub fn request<'a, B>( - &'a self, + pub fn request<B>( + &self, method: Method, endpoint: &str, - body: Option<(B, ContentType)>, + body: Option<(B, Mime)>, ) -> Result<String> where - B: Into<Body<'a>>, + B: Into<Body>, { let mut res = self.stream(method, endpoint, body)?; let mut body = String::new(); @@ -60,95 +62,110 @@ impl Transport { Ok(body) } - pub fn stream<'c, B>( - &'c self, + /// Builds an HTTP request. + fn build_request<B>( + &self, method: Method, endpoint: &str, - body: Option<(B, ContentType)>, - ) -> Result<Box<Read>> + body: Option<(B, Mime)>, + ) -> Result<Request<Body>> where - B: Into<Body<'c>>, + B: Into<Body>, { - let headers = { - let mut headers = header::Headers::new(); - headers.set(header::Host { - hostname: "".to_owned(), - port: None, - }); - headers - }; + let mut builder = Request::builder(); let req = match *self { Transport::Tcp { - ref client, - ref host, - } => client.request(method, &format!("{}{}", host, endpoint)[..]), + ref host, .. + } => builder.method(method).uri(&format!("{}{}", host, endpoint)), + Transport::EncryptedTcp { + ref host, .. + } => builder.method(method).uri(&format!("{}{}", host, endpoint)), Transport::Unix { - ref client, - ref path, - } => client.request(method, DomainUrl::new(&path, endpoint)), - }.headers(headers); - - let embodied = match body { - Some((b, c)) => req.header(c).body(b), - _ => req, + ref path, .. + } => { + let uri: hyper::Uri = DomainUri::new(&path, endpoint).into(); + builder.method(method).uri(&uri.to_string()) + }, }; - let mut res = embodied.send()?; - match res.status { - StatusCode::Ok | - StatusCode::Created | - StatusCode::SwitchingProtocols => Ok(Box::new(res)), - StatusCode::NoContent => Ok( + let req = req.header(header::HOST, ""); + + match body { + Some((b, c)) => Ok(req.header(header::CONTENT_TYPE, &c.to_string()[..]).body(b.into())?), + _ => Ok(req.body(Body::empty())?), + } + } + + pub fn stream<B>( + &self, + method: Method, + endpoint: &str, + body: Option<(B, Mime)>, + ) -> Result<Box<Read>> + where + B: Into<Body>, + { + let req = self.build_request(method, endpoint, body)?; + let res = self.send_request(req)?; + + match res.status() { + StatusCode::OK | + StatusCode::CREATED | + StatusCode::SWITCHING_PROTOCOLS => { + let chunk = res.into_body().concat2().wait()?; + Ok(Box::new(Cursor::new(chunk.into_iter().collect::<Vec<u8>>()))) + }, + StatusCode::NO_CONTENT => Ok( Box::new(BufReader::new("".as_bytes())), ), // todo: constantize these - StatusCode::BadRequest => { + StatusCode::BAD_REQUEST => { Err(Error::Fault { - code: res.status, - message: get_error_message(&mut res).unwrap_or( + code: res.status(), + message: get_error_message(res).unwrap_or( "bad parameter" .to_owned(), ), }) } - StatusCode::NotFound => { + StatusCode::NOT_FOUND => { Err(Error::Fault { - code: res.status, - message: get_error_message(&mut res).unwrap_or( + code: res.status(), + message: get_error_message(res).unwrap_or( "not found".to_owned(), ), }) } - StatusCode::NotModified => { + StatusCode::NOT_MODIFIED => { Err(Error::Fault { - code: res.status, - message: get_error_message(&mut res).unwrap_or( + code: res.status(), + message: get_error_message(res).unwrap_or( "not modified" .to_owned(), ), }) }, - StatusCode::NotAcceptable => { + StatusCode::NOT_ACCEPTABLE => { Err(Error::Fault { - code: res.status, - message: get_error_message(&mut res).unwrap_or( + code: res.status(), + message: get_error_message(res).unwrap_or( "not acceptable" .to_owned(), ), }) } - StatusCode::Conflict => { + StatusCode::CONFLICT => { Err(Error::Fault { - code: res.status, - message: get_error_message(&mut res).unwrap_or( + code: res.status(), + message: get_error_message(res).unwrap_or( "conflict found" .to_owned(), ), }) } - StatusCode::InternalServerError => { + StatusCode::INTERNAL_SERVER_ERROR => { Err(Error::Fault { - code: res.status, - message: get_error_message(&mut res).unwrap_or( + code: res.status(), + message: get_error_message(res).unwrap_or( "internal server error" .to_owned(), ), @@ -157,23 +174,41 @@ impl Transport { _ => unreachable!(), } } + + fn send_request(&self, req: Request<hyper::Body>) -> Result<hyper::Response<Body>> { + use self::tokio_core::reactor; + + let mut core = reactor::Core::new().unwrap(); + Ok(core.run(match self { + Transport::Tcp { ref client, .. } => client.request(req), + Transport::EncryptedTcp { ref client, .. } => client.request(req), + Transport::Unix { ref client, .. } => client.request(req), + })?) + } } /// Extract the error message content from an HTTP response that /// contains a Docker JSON error structure. -fn get_error_message(res: &mut Response) -> Option<String> { - let mut output = String::new(); - if res.read_to_string(&mut output).is_ok() { - let json_response = json::Json::from_str(output.as_str()).ok(); - let message = json_response - .as_ref() - .and_then(|x| x.as_object()) - .and_then(|x| x.get("message")) - .and_then(|x| x.as_string()) - .map(|x| x.to_owned()); - - message - } else { - None +fn get_error_message(res: Response<Body>) -> Option<String> { + let chunk = match res.into_body().concat2().wait() { + Ok(c) => c, + Err(..) => return None, + }; + + match String::from_utf8(chunk.into_iter().collect()) { + Ok(output) => { + let json_response = json::Json::from_str(output.as_str()).ok(); + let message = json_response + .as_ref() + .and_then(|x| x.as_object()) + .and_then(|x| x.get("message")) + .and_then(|x| x.as_string()) + .map(|x| x.to_owned()); + + message + } + Err(..) => { + None + }, } } |