diff options
Diffstat (limited to 'src/transport.rs')
-rw-r--r-- | src/transport.rs | 202 |
1 files changed, 86 insertions, 116 deletions
diff --git a/src/transport.rs b/src/transport.rs index 9df6d4a..2c62f58 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,27 +1,22 @@ //! Transports for communicating with the docker daemon -extern crate hyper; -#[cfg(feature = "unix-socket")] -extern crate hyperlocal; - use self::super::{Error, Result}; -use hyper::client::{Client, HttpConnector}; -use hyper::header; -use hyper::rt::Stream; -use hyper::Body; -use hyper::{Method, Request, Response, StatusCode}; +use futures::{ + future::{self, Either}, + Future, IntoFuture, Stream, +}; +use hyper::{ + client::{Client, HttpConnector}, + header, Body, Chunk, Method, Request, StatusCode, +}; use hyper_openssl::HttpsConnector; #[cfg(feature = "unix-socket")] use hyperlocal::UnixConnector; #[cfg(feature = "unix-socket")] use hyperlocal::Uri as DomainUri; use mime::Mime; -use serde_json::{self, Value}; -use std::cell::{RefCell, RefMut}; +use serde_json; use std::fmt; -use std::io::Read; -use std::io::{BufReader, Cursor}; -use tokio::runtime::Runtime; pub fn tar() -> Mime { "application/tar".parse().unwrap() @@ -29,24 +24,22 @@ pub fn tar() -> Mime { /// Transports are types which define the means of communication /// with the docker daemon +#[derive(Clone)] pub enum Transport { /// A network tcp interface Tcp { client: Client<HttpConnector>, - runtime: RefCell<Runtime>, host: String, }, /// TCP/TLS EncryptedTcp { client: Client<HttpsConnector<HttpConnector>>, - runtime: RefCell<Runtime>, host: String, }, /// A Unix domain socket #[cfg(feature = "unix-socket")] Unix { client: Client<UnixConnector>, - runtime: RefCell<Runtime>, path: String, }, } @@ -66,20 +59,78 @@ impl fmt::Debug for Transport { } impl Transport { + /// Make a request and return the whole response in a `String` pub fn request<B>( &self, method: Method, endpoint: &str, body: Option<(B, Mime)>, - ) -> Result<String> + ) -> impl Future<Item = String, Error = Error> + where + B: Into<Body>, + { + let endpoint = endpoint.to_string(); + self.stream_chunks(method, &endpoint, body) + .concat2() + .and_then(|v| { + String::from_utf8(v.to_vec()) + .map_err(Error::Encoding) + .into_future() + }) + .inspect(move |body| debug!("{} raw response: {}", endpoint, body)) + } + + /// Make a request and return a `Stream` of `Chunks` as they are returned. + pub fn stream_chunks<B>( + &self, + method: Method, + endpoint: &str, + body: Option<(B, Mime)>, + ) -> impl Stream<Item = Chunk, Error = Error> where B: Into<Body>, { - let mut res = self.stream(method, endpoint, body)?; - let mut body = String::new(); - res.read_to_string(&mut body)?; - debug!("{} raw response: {}", endpoint, body); - Ok(body) + let req = self + .build_request(method, endpoint, body) + .expect("Failed to build request!"); + + self.send_request(req) + .and_then(|res| { + let status = res.status(); + match status { + // Success case: pass on the response + StatusCode::OK + | StatusCode::CREATED + | StatusCode::SWITCHING_PROTOCOLS + | StatusCode::NO_CONTENT => Either::A(future::ok(res)), + // Error case: parse the body to try to extract the error message + _ => Either::B( + res.into_body() + .concat2() + .map_err(Error::Hyper) + .and_then(|v| { + String::from_utf8(v.into_iter().collect::<Vec<u8>>()) + .map_err(Error::Encoding) + }) + .and_then(move |body| { + future::err(Error::Fault { + code: status, + message: Self::get_error_message(&body).unwrap_or_else(|| { + status + .canonical_reason() + .unwrap_or_else(|| "unknown error code") + .to_owned() + }), + }) + }), + ), + } + }) + .map(|r| { + // Convert the response body into a stream of chunks + r.into_body().map_err(Error::Hyper) + }) + .flatten_stream() } /// Builds an HTTP request. @@ -116,71 +167,11 @@ impl Transport { } } - pub fn stream<B>( - &self, - method: Method, - endpoint: &str, - body: Option<(B, Mime)>, - ) -> Result<Box<Read>> - where - B: Into<Body>, - { - let req = self.build_request(method, endpoint, body)?; - let res = self.send_request(req)?; - - match res.status() { - StatusCode::OK | StatusCode::CREATED | StatusCode::SWITCHING_PROTOCOLS => { - let chunk = self.runtime().block_on(res.into_body().concat2())?; - Ok(Box::new(Cursor::new( - chunk.into_iter().collect::<Vec<u8>>(), - ))) - } - StatusCode::NO_CONTENT => Ok(Box::new(BufReader::new("".as_bytes()))), - // todo: constantize these - StatusCode::BAD_REQUEST => Err(Error::Fault { - code: res.status(), - message: self - .get_error_message(res) - .unwrap_or_else(|| "bad parameter".to_owned()), - }), - StatusCode::NOT_FOUND => Err(Error::Fault { - code: res.status(), - message: self - .get_error_message(res) - .unwrap_or_else(|| "not found".to_owned()), - }), - StatusCode::NOT_MODIFIED => Err(Error::Fault { - code: res.status(), - message: self - .get_error_message(res) - .unwrap_or_else(|| "not modified".to_owned()), - }), - StatusCode::NOT_ACCEPTABLE => Err(Error::Fault { - code: res.status(), - message: self - .get_error_message(res) - .unwrap_or_else(|| "not acceptable".to_owned()), - }), - StatusCode::CONFLICT => Err(Error::Fault { - code: res.status(), - message: self - .get_error_message(res) - .unwrap_or_else(|| "conflict found".to_owned()), - }), - StatusCode::INTERNAL_SERVER_ERROR => Err(Error::Fault { - code: res.status(), - message: self - .get_error_message(res) - .unwrap_or_else(|| "internal server error".to_owned()), - }), - _ => unreachable!(), - } - } - + /// Send the given request to the docker daemon and return a Future of the response. fn send_request( &self, req: Request<hyper::Body>, - ) -> Result<hyper::Response<Body>> { + ) -> impl Future<Item = hyper::Response<Body>, Error = Error> { let req = match self { Transport::Tcp { ref client, .. } => client.request(req), Transport::EncryptedTcp { ref client, .. } => client.request(req), @@ -188,40 +179,19 @@ impl Transport { Transport::Unix { ref client, .. } => client.request(req), }; - self.runtime().block_on(req).map_err(Error::Hyper) - } - - fn runtime(&self) -> RefMut<Runtime> { - match self { - Transport::Tcp { ref runtime, .. } => runtime.borrow_mut(), - Transport::EncryptedTcp { ref runtime, .. } => runtime.borrow_mut(), - #[cfg(feature = "unix-socket")] - Transport::Unix { ref runtime, .. } => runtime.borrow_mut(), - } + req.map_err(Error::Hyper) } /// Extract the error message content from an HTTP response that /// contains a Docker JSON error structure. - fn get_error_message( - &self, - res: Response<Body>, - ) -> Option<String> { - let chunk = match self.runtime().block_on(res.into_body().concat2()) { - Ok(c) => c, - Err(..) => return None, - }; - - match String::from_utf8(chunk.into_iter().collect()) { - Ok(output) => { - let json_response = serde_json::from_str::<Value>(output.as_str()).ok(); - json_response - .as_ref() - .and_then(|x| x.as_object()) - .and_then(|x| x.get("message")) - .and_then(|x| x.as_str()) - .map(|x| x.to_owned()) - } - Err(..) => None, - } + fn get_error_message(body: &str) -> Option<String> { + serde_json::from_str::<ErrorResponse>(body) + .map(|e| e.message) + .ok() } } + +#[derive(Serialize, Deserialize)] +struct ErrorResponse { + message: String, +} |