diff options
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/lib.rs | 15 | ||||
-rw-r--r-- | src/transport.rs | 64 |
3 files changed, 56 insertions, 25 deletions
@@ -23,7 +23,7 @@ mime = "0.3" openssl = "0.10" rustc-serialize = "0.3" tar = "0.4" -tokio-core = "0.1" +tokio = "0.1" url = "1.7" serde = "1.0" serde_derive = "1.0" @@ -30,6 +30,7 @@ extern crate byteorder; #[macro_use] extern crate serde_derive; extern crate serde; +extern crate tokio; pub mod builder; pub mod rep; @@ -63,6 +64,7 @@ use std::env; use std::io::prelude::*; use std::iter::IntoIterator; use std::path::Path; +use std::cell::RefCell; use std::time::Duration; use transport::{Transport, tar}; use tty::Tty; @@ -641,7 +643,10 @@ impl Docker { where S: Into<String> { Docker { transport: Transport::Unix { - client: Client::builder().build(UnixConnector), + client: Client::builder() + .keep_alive(false) + .build(UnixConnector), + runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), path: socket_path.into(), }, } @@ -660,6 +665,7 @@ impl Docker { Docker { transport: Transport::Unix { client: Client::builder().build(UnixConnector), + runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), path: host.path().to_owned(), }, } @@ -701,12 +707,17 @@ impl Docker { Docker { transport: Transport::EncryptedTcp { client: Client::builder().build(connector), + runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), host: tcp_host_str, }, } } else { Docker { - transport: Transport::Tcp { client: Client::new(), host: tcp_host_str }, + transport: Transport::Tcp { + client: Client::new(), + runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), + host: tcp_host_str + }, } } } diff --git a/src/transport.rs b/src/transport.rs index 8ba1586..6f66f86 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,7 +1,6 @@ //! Transports for communicating with the docker daemon extern crate hyper; -extern crate tokio_core; use self::super::{Error, Result}; use hyper::Body; @@ -15,8 +14,10 @@ use hyperlocal::UnixConnector; use mime::Mime; use rustc_serialize::json; use std::fmt; +use std::cell::{RefMut, RefCell}; use std::io::{BufReader, Cursor}; use std::io::{Read, Write}; +use tokio::runtime::Runtime; trait InteractiveStream : Read + Write { } @@ -28,11 +29,23 @@ pub fn tar() -> Mime { /// with the docker daemon pub enum Transport { /// A network tcp interface - Tcp { client: Client<HttpConnector>, host: String }, - /// TCP/TLS. - EncryptedTcp { client: Client<HttpsConnector<HttpConnector>>, host: String }, + Tcp { + client: Client<HttpConnector>, + runtime: RefCell<Runtime>, + host: String, + }, + /// TCP/TLS.w + EncryptedTcp { + client: Client<HttpsConnector<HttpConnector>>, + runtime: RefCell<Runtime>, + host: String, + }, /// A Unix domain socket - Unix { client: Client<UnixConnector>, path: String }, + Unix { + client: Client<UnixConnector>, + runtime: RefCell<Runtime>, + path: String, + }, } impl fmt::Debug for Transport { @@ -74,23 +87,23 @@ impl Transport { { let mut builder = Request::builder(); let req = match *self { - Transport::Tcp { - ref host, .. - } => builder.method(method).uri(&format!("{}{}", host, endpoint)), - Transport::EncryptedTcp { - ref host, .. - } => builder.method(method).uri(&format!("{}{}", host, endpoint)), - Transport::Unix { - ref path, .. - } => { + Transport::Tcp { ref host, .. } => { + builder.method(method).uri(&format!("{}{}", host, endpoint)) + } + Transport::EncryptedTcp { ref host, .. } => { + builder.method(method).uri(&format!("{}{}", host, endpoint)) + } + Transport::Unix { ref path, .. } => { let uri: hyper::Uri = DomainUri::new(&path, endpoint).into(); builder.method(method).uri(&uri.to_string()) - }, + } }; let req = req.header(header::HOST, ""); match body { - Some((b, c)) => Ok(req.header(header::CONTENT_TYPE, &c.to_string()[..]).body(b.into())?), + Some((b, c)) => Ok(req + .header(header::CONTENT_TYPE, &c.to_string()[..]) + .body(b.into())?), _ => Ok(req.body(Body::empty())?), } } @@ -111,7 +124,7 @@ impl Transport { StatusCode::OK | StatusCode::CREATED | StatusCode::SWITCHING_PROTOCOLS => { - let chunk = res.into_body().concat2().wait()?; + let chunk = self.runtime().block_on(res.into_body().concat2())?; Ok(Box::new(Cursor::new(chunk.into_iter().collect::<Vec<u8>>()))) }, StatusCode::NO_CONTENT => Ok( @@ -176,14 +189,21 @@ impl Transport { } fn send_request(&self, req: Request<hyper::Body>) -> Result<hyper::Response<Body>> { - use self::tokio_core::reactor; - - let mut core = reactor::Core::new().unwrap(); - Ok(core.run(match self { + let req = match self { Transport::Tcp { ref client, .. } => client.request(req), Transport::EncryptedTcp { ref client, .. } => client.request(req), Transport::Unix { ref client, .. } => client.request(req), - })?) + }; + + self.runtime().block_on(req).map_err(Error::Hyper) + } + + fn runtime(&self) -> RefMut<Runtime> { + match self { + Transport::Tcp { ref runtime, .. } => runtime.borrow_mut(), + Transport::EncryptedTcp { ref runtime, .. } => runtime.borrow_mut(), + Transport::Unix { ref runtime, .. } => runtime.borrow_mut(), + } } } |