summaryrefslogtreecommitdiffstats
path: root/src/transport.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport.rs')
-rw-r--r--src/transport.rs202
1 files changed, 86 insertions, 116 deletions
diff --git a/src/transport.rs b/src/transport.rs
index 9df6d4a..2c62f58 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -1,27 +1,22 @@
//! Transports for communicating with the docker daemon
-extern crate hyper;
-#[cfg(feature = "unix-socket")]
-extern crate hyperlocal;
-
use self::super::{Error, Result};
-use hyper::client::{Client, HttpConnector};
-use hyper::header;
-use hyper::rt::Stream;
-use hyper::Body;
-use hyper::{Method, Request, Response, StatusCode};
+use futures::{
+ future::{self, Either},
+ Future, IntoFuture, Stream,
+};
+use hyper::{
+ client::{Client, HttpConnector},
+ header, Body, Chunk, Method, Request, StatusCode,
+};
use hyper_openssl::HttpsConnector;
#[cfg(feature = "unix-socket")]
use hyperlocal::UnixConnector;
#[cfg(feature = "unix-socket")]
use hyperlocal::Uri as DomainUri;
use mime::Mime;
-use serde_json::{self, Value};
-use std::cell::{RefCell, RefMut};
+use serde_json;
use std::fmt;
-use std::io::Read;
-use std::io::{BufReader, Cursor};
-use tokio::runtime::Runtime;
pub fn tar() -> Mime {
"application/tar".parse().unwrap()
@@ -29,24 +24,22 @@ pub fn tar() -> Mime {
/// Transports are types which define the means of communication
/// with the docker daemon
+#[derive(Clone)]
pub enum Transport {
/// A network tcp interface
Tcp {
client: Client<HttpConnector>,
- runtime: RefCell<Runtime>,
host: String,
},
/// TCP/TLS
EncryptedTcp {
client: Client<HttpsConnector<HttpConnector>>,
- runtime: RefCell<Runtime>,
host: String,
},
/// A Unix domain socket
#[cfg(feature = "unix-socket")]
Unix {
client: Client<UnixConnector>,
- runtime: RefCell<Runtime>,
path: String,
},
}
@@ -66,20 +59,78 @@ impl fmt::Debug for Transport {
}
impl Transport {
+ /// Make a request and return the whole response in a `String`
pub fn request<B>(
&self,
method: Method,
endpoint: &str,
body: Option<(B, Mime)>,
- ) -> Result<String>
+ ) -> impl Future<Item = String, Error = Error>
+ where
+ B: Into<Body>,
+ {
+ let endpoint = endpoint.to_string();
+ self.stream_chunks(method, &endpoint, body)
+ .concat2()
+ .and_then(|v| {
+ String::from_utf8(v.to_vec())
+ .map_err(Error::Encoding)
+ .into_future()
+ })
+ .inspect(move |body| debug!("{} raw response: {}", endpoint, body))
+ }
+
+ /// Make a request and return a `Stream` of `Chunks` as they are returned.
+ pub fn stream_chunks<B>(
+ &self,
+ method: Method,
+ endpoint: &str,
+ body: Option<(B, Mime)>,
+ ) -> impl Stream<Item = Chunk, Error = Error>
where
B: Into<Body>,
{
- let mut res = self.stream(method, endpoint, body)?;
- let mut body = String::new();
- res.read_to_string(&mut body)?;
- debug!("{} raw response: {}", endpoint, body);
- Ok(body)
+ let req = self
+ .build_request(method, endpoint, body)
+ .expect("Failed to build request!");
+
+ self.send_request(req)
+ .and_then(|res| {
+ let status = res.status();
+ match status {
+ // Success case: pass on the response
+ StatusCode::OK
+ | StatusCode::CREATED
+ | StatusCode::SWITCHING_PROTOCOLS
+ | StatusCode::NO_CONTENT => Either::A(future::ok(res)),
+ // Error case: parse the body to try to extract the error message
+ _ => Either::B(
+ res.into_body()
+ .concat2()
+ .map_err(Error::Hyper)
+ .and_then(|v| {
+ String::from_utf8(v.into_iter().collect::<Vec<u8>>())
+ .map_err(Error::Encoding)
+ })
+ .and_then(move |body| {
+ future::err(Error::Fault {
+ code: status,
+ message: Self::get_error_message(&body).unwrap_or_else(|| {
+ status
+ .canonical_reason()
+ .unwrap_or_else(|| "unknown error code")
+ .to_owned()
+ }),
+ })
+ }),
+ ),
+ }
+ })
+ .map(|r| {
+ // Convert the response body into a stream of chunks
+ r.into_body().map_err(Error::Hyper)
+ })
+ .flatten_stream()
}
/// Builds an HTTP request.
@@ -116,71 +167,11 @@ impl Transport {
}
}
- pub fn stream<B>(
- &self,
- method: Method,
- endpoint: &str,
- body: Option<(B, Mime)>,
- ) -> Result<Box<Read>>
- where
- B: Into<Body>,
- {
- let req = self.build_request(method, endpoint, body)?;
- let res = self.send_request(req)?;
-
- match res.status() {
- StatusCode::OK | StatusCode::CREATED | StatusCode::SWITCHING_PROTOCOLS => {
- let chunk = self.runtime().block_on(res.into_body().concat2())?;
- Ok(Box::new(Cursor::new(
- chunk.into_iter().collect::<Vec<u8>>(),
- )))
- }
- StatusCode::NO_CONTENT => Ok(Box::new(BufReader::new("".as_bytes()))),
- // todo: constantize these
- StatusCode::BAD_REQUEST => Err(Error::Fault {
- code: res.status(),
- message: self
- .get_error_message(res)
- .unwrap_or_else(|| "bad parameter".to_owned()),
- }),
- StatusCode::NOT_FOUND => Err(Error::Fault {
- code: res.status(),
- message: self
- .get_error_message(res)
- .unwrap_or_else(|| "not found".to_owned()),
- }),
- StatusCode::NOT_MODIFIED => Err(Error::Fault {
- code: res.status(),
- message: self
- .get_error_message(res)
- .unwrap_or_else(|| "not modified".to_owned()),
- }),
- StatusCode::NOT_ACCEPTABLE => Err(Error::Fault {
- code: res.status(),
- message: self
- .get_error_message(res)
- .unwrap_or_else(|| "not acceptable".to_owned()),
- }),
- StatusCode::CONFLICT => Err(Error::Fault {
- code: res.status(),
- message: self
- .get_error_message(res)
- .unwrap_or_else(|| "conflict found".to_owned()),
- }),
- StatusCode::INTERNAL_SERVER_ERROR => Err(Error::Fault {
- code: res.status(),
- message: self
- .get_error_message(res)
- .unwrap_or_else(|| "internal server error".to_owned()),
- }),
- _ => unreachable!(),
- }
- }
-
+ /// Send the given request to the docker daemon and return a Future of the response.
fn send_request(
&self,
req: Request<hyper::Body>,
- ) -> Result<hyper::Response<Body>> {
+ ) -> impl Future<Item = hyper::Response<Body>, Error = Error> {
let req = match self {
Transport::Tcp { ref client, .. } => client.request(req),
Transport::EncryptedTcp { ref client, .. } => client.request(req),
@@ -188,40 +179,19 @@ impl Transport {
Transport::Unix { ref client, .. } => client.request(req),
};
- self.runtime().block_on(req).map_err(Error::Hyper)
- }
-
- fn runtime(&self) -> RefMut<Runtime> {
- match self {
- Transport::Tcp { ref runtime, .. } => runtime.borrow_mut(),
- Transport::EncryptedTcp { ref runtime, .. } => runtime.borrow_mut(),
- #[cfg(feature = "unix-socket")]
- Transport::Unix { ref runtime, .. } => runtime.borrow_mut(),
- }
+ req.map_err(Error::Hyper)
}
/// Extract the error message content from an HTTP response that
/// contains a Docker JSON error structure.
- fn get_error_message(
- &self,
- res: Response<Body>,
- ) -> Option<String> {
- let chunk = match self.runtime().block_on(res.into_body().concat2()) {
- Ok(c) => c,
- Err(..) => return None,
- };
-
- match String::from_utf8(chunk.into_iter().collect()) {
- Ok(output) => {
- let json_response = serde_json::from_str::<Value>(output.as_str()).ok();
- json_response
- .as_ref()
- .and_then(|x| x.as_object())
- .and_then(|x| x.get("message"))
- .and_then(|x| x.as_str())
- .map(|x| x.to_owned())
- }
- Err(..) => None,
- }
+ fn get_error_message(body: &str) -> Option<String> {
+ serde_json::from_str::<ErrorResponse>(body)
+ .map(|e| e.message)
+ .ok()
}
}
+
+#[derive(Serialize, Deserialize)]
+struct ErrorResponse {
+ message: String,
+}