summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml2
-rw-r--r--src/lib.rs15
-rw-r--r--src/transport.rs64
3 files changed, 56 insertions, 25 deletions
diff --git a/Cargo.toml b/Cargo.toml
index abfc899..9f0b352 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,7 +23,7 @@ mime = "0.3"
openssl = "0.10"
rustc-serialize = "0.3"
tar = "0.4"
-tokio-core = "0.1"
+tokio = "0.1"
url = "1.7"
serde = "1.0"
serde_derive = "1.0"
diff --git a/src/lib.rs b/src/lib.rs
index 987a1a4..c42dc60 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -30,6 +30,7 @@ extern crate byteorder;
#[macro_use]
extern crate serde_derive;
extern crate serde;
+extern crate tokio;
pub mod builder;
pub mod rep;
@@ -63,6 +64,7 @@ use std::env;
use std::io::prelude::*;
use std::iter::IntoIterator;
use std::path::Path;
+use std::cell::RefCell;
use std::time::Duration;
use transport::{Transport, tar};
use tty::Tty;
@@ -641,7 +643,10 @@ impl Docker {
where S: Into<String> {
Docker {
transport: Transport::Unix {
- client: Client::builder().build(UnixConnector),
+ client: Client::builder()
+ .keep_alive(false)
+ .build(UnixConnector),
+ runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()),
path: socket_path.into(),
},
}
@@ -660,6 +665,7 @@ impl Docker {
Docker {
transport: Transport::Unix {
client: Client::builder().build(UnixConnector),
+ runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()),
path: host.path().to_owned(),
},
}
@@ -701,12 +707,17 @@ impl Docker {
Docker {
transport: Transport::EncryptedTcp {
client: Client::builder().build(connector),
+ runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()),
host: tcp_host_str,
},
}
} else {
Docker {
- transport: Transport::Tcp { client: Client::new(), host: tcp_host_str },
+ transport: Transport::Tcp {
+ client: Client::new(),
+ runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()),
+ host: tcp_host_str
+ },
}
}
}
diff --git a/src/transport.rs b/src/transport.rs
index 8ba1586..6f66f86 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -1,7 +1,6 @@
//! Transports for communicating with the docker daemon
extern crate hyper;
-extern crate tokio_core;
use self::super::{Error, Result};
use hyper::Body;
@@ -15,8 +14,10 @@ use hyperlocal::UnixConnector;
use mime::Mime;
use rustc_serialize::json;
use std::fmt;
+use std::cell::{RefMut, RefCell};
use std::io::{BufReader, Cursor};
use std::io::{Read, Write};
+use tokio::runtime::Runtime;
trait InteractiveStream : Read + Write { }
@@ -28,11 +29,23 @@ pub fn tar() -> Mime {
/// with the docker daemon
pub enum Transport {
/// A network tcp interface
- Tcp { client: Client<HttpConnector>, host: String },
- /// TCP/TLS.
- EncryptedTcp { client: Client<HttpsConnector<HttpConnector>>, host: String },
+ Tcp {
+ client: Client<HttpConnector>,
+ runtime: RefCell<Runtime>,
+ host: String,
+ },
+ /// TCP/TLS.w
+ EncryptedTcp {
+ client: Client<HttpsConnector<HttpConnector>>,
+ runtime: RefCell<Runtime>,
+ host: String,
+ },
/// A Unix domain socket
- Unix { client: Client<UnixConnector>, path: String },
+ Unix {
+ client: Client<UnixConnector>,
+ runtime: RefCell<Runtime>,
+ path: String,
+ },
}
impl fmt::Debug for Transport {
@@ -74,23 +87,23 @@ impl Transport {
{
let mut builder = Request::builder();
let req = match *self {
- Transport::Tcp {
- ref host, ..
- } => builder.method(method).uri(&format!("{}{}", host, endpoint)),
- Transport::EncryptedTcp {
- ref host, ..
- } => builder.method(method).uri(&format!("{}{}", host, endpoint)),
- Transport::Unix {
- ref path, ..
- } => {
+ Transport::Tcp { ref host, .. } => {
+ builder.method(method).uri(&format!("{}{}", host, endpoint))
+ }
+ Transport::EncryptedTcp { ref host, .. } => {
+ builder.method(method).uri(&format!("{}{}", host, endpoint))
+ }
+ Transport::Unix { ref path, .. } => {
let uri: hyper::Uri = DomainUri::new(&path, endpoint).into();
builder.method(method).uri(&uri.to_string())
- },
+ }
};
let req = req.header(header::HOST, "");
match body {
- Some((b, c)) => Ok(req.header(header::CONTENT_TYPE, &c.to_string()[..]).body(b.into())?),
+ Some((b, c)) => Ok(req
+ .header(header::CONTENT_TYPE, &c.to_string()[..])
+ .body(b.into())?),
_ => Ok(req.body(Body::empty())?),
}
}
@@ -111,7 +124,7 @@ impl Transport {
StatusCode::OK |
StatusCode::CREATED |
StatusCode::SWITCHING_PROTOCOLS => {
- let chunk = res.into_body().concat2().wait()?;
+ let chunk = self.runtime().block_on(res.into_body().concat2())?;
Ok(Box::new(Cursor::new(chunk.into_iter().collect::<Vec<u8>>())))
},
StatusCode::NO_CONTENT => Ok(
@@ -176,14 +189,21 @@ impl Transport {
}
fn send_request(&self, req: Request<hyper::Body>) -> Result<hyper::Response<Body>> {
- use self::tokio_core::reactor;
-
- let mut core = reactor::Core::new().unwrap();
- Ok(core.run(match self {
+ let req = match self {
Transport::Tcp { ref client, .. } => client.request(req),
Transport::EncryptedTcp { ref client, .. } => client.request(req),
Transport::Unix { ref client, .. } => client.request(req),
- })?)
+ };
+
+ self.runtime().block_on(req).map_err(Error::Hyper)
+ }
+
+ fn runtime(&self) -> RefMut<Runtime> {
+ match self {
+ Transport::Tcp { ref runtime, .. } => runtime.borrow_mut(),
+ Transport::EncryptedTcp { ref runtime, .. } => runtime.borrow_mut(),
+ Transport::Unix { ref runtime, .. } => runtime.borrow_mut(),
+ }
}
}