summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAntoine Büsch <antoine.busch@gmail.com>2018-11-14 20:36:14 +1100
committerdoug tangren <d.tangren@gmail.com>2018-11-14 18:36:14 +0900
commit79d65c286025c551a775c0964d168e6feb4b3409 (patch)
tree34b49a0f97f6f851f47711be1cad0c002b8b78f7 /src
parent29bd95b42cd2b3c364f0be1f3e07e4b654e0ccf3 (diff)
Async api (#128)
* Refactored Transport for better async use Still a bit rough, but it now builds a big future using combinators. It still does one `Runtime::block_on()` to keep the existing API, but this is a first up before making the whole API async. * Migrate most APIs to be Future-based I still need to finish a few of the more tricky ones that I've commented out for now, but most of it compiles and some examples work. In particular, `Docker::stats()` now properly returns an async stream of stats. * Fix events and containerinspect examples * Fix imageinspect, images, info and top examples * Fix containercreate, imagedelete and imagepull examples * Fix more examples * Add back debug statement in Transport::request * De-glob imports in examples * Remove unused imports in examples * Fix NetworkCreateOptions serialization * Add back error message extraction in Transport * Fix Container::create serialization of options * Add containerdelete example * Simplify result * Fix some error handling to remove unwrap() * Fix Image::export() * Fix imagebuild example * Add adapter from Stream of Chunks to AsyncRead Having an `AsyncRead` is required to be able to use the `FramedRead` and `Decoder` stuff from tokio_codec. This code is "borrowed" from https:/github.com/ferristseng/rust-ipfs-api though should probably be moved to its own crate or to tokio_codec. * Fix Container::logs() It now properly demuxes stdout/stderr, and returns a `Stream<Item = TtyLine>`. * Fix Container::export() * Use LineCodec for streaming JSON Although in my limited testing it seemed to work fine, there is no guarantee that 1 chunk == 1 piece of valid JSON. However, each JSON structure seems to be serialized on one line, so use LineCodec to turn the body into a stream of lines, then deserialize over this. * Fix serialization of ExecContainerOptions * Fix Container::exec() (kind of...) * Simplify deserialisation in Image::delete() * Small clean-ups * More clean ups * Fix rustdoc + remove extraneous "extern crate" * Fix doc example * Fix formatting
Diffstat (limited to 'src')
-rw-r--r--src/builder.rs33
-rw-r--r--src/errors.rs7
-rw-r--r--src/lib.rs434
-rw-r--r--src/read.rs103
-rw-r--r--src/rep.rs9
-rw-r--r--src/transport.rs202
-rw-r--r--src/tty.rs151
7 files changed, 556 insertions, 383 deletions
diff --git a/src/builder.rs b/src/builder.rs
index 05576f4..27a92e4 100644
--- a/src/builder.rs
+++ b/src/builder.rs
@@ -373,7 +373,7 @@ impl ContainerOptions {
/// serialize options as a string. returns None if no options are defined
pub fn serialize(&self) -> Result<String> {
- Ok(serde_json::to_string(&self.to_json())?)
+ serde_json::to_string(&self.to_json()).map_err(Error::from)
}
fn to_json(&self) -> Value {
@@ -600,7 +600,23 @@ impl ExecContainerOptions {
/// serialize options as a string. returns None if no options are defined
pub fn serialize(&self) -> Result<String> {
- Ok(serde_json::to_string(self)?)
+ let mut body = serde_json::Map::new();
+
+ for (k, v) in &self.params {
+ body.insert(
+ k.to_string(),
+ serde_json::to_value(v).map_err(Error::SerdeJsonError)?,
+ );
+ }
+
+ for (k, v) in &self.params_bool {
+ body.insert(
+ k.to_string(),
+ serde_json::to_value(v).map_err(Error::SerdeJsonError)?,
+ );
+ }
+
+ serde_json::to_string(&body).map_err(Error::from)
}
}
@@ -873,7 +889,7 @@ impl LogsOptionsBuilder {
self
}
- /// how_many can either by "all" or a to_string() of the number
+ /// how_many can either be "all" or a to_string() of the number
pub fn tail(
&mut self,
how_many: &str,
@@ -1067,15 +1083,22 @@ impl NetworkCreateOptions {
NetworkCreateOptionsBuilder::new(name)
}
+ fn to_json(&self) -> Value {
+ let mut body = serde_json::Map::new();
+ self.parse_from(&self.params, &mut body);
+ self.parse_from(&self.params_hash, &mut body);
+ Value::Object(body)
+ }
+
/// serialize options as a string. returns None if no options are defined
pub fn serialize(&self) -> Result<String> {
- serde_json::to_string(self).map_err(Error::from)
+ serde_json::to_string(&self.to_json()).map_err(Error::from)
}
pub fn parse_from<'a, K, V>(
&self,
params: &'a HashMap<K, V>,
- body: &mut BTreeMap<String, Value>,
+ body: &mut serde_json::Map<String, Value>,
) where
&'a HashMap<K, V>: IntoIterator,
K: ToString + Eq + Hash,
diff --git a/src/errors.rs b/src/errors.rs
index 5af8c44..36be662 100644
--- a/src/errors.rs
+++ b/src/errors.rs
@@ -6,6 +6,7 @@ use serde_json::Error as SerdeError;
use std::error::Error as StdError;
use std::fmt;
use std::io::Error as IoError;
+use std::string::FromUtf8Error;
#[derive(Debug)]
pub enum Error {
@@ -13,6 +14,8 @@ pub enum Error {
Hyper(hyper::Error),
Http(http::Error),
IO(IoError),
+ Encoding(FromUtf8Error),
+ InvalidResponse(String),
Fault { code: StatusCode, message: String },
}
@@ -51,6 +54,10 @@ impl fmt::Display for Error {
Error::Http(ref err) => err.fmt(f),
Error::Hyper(ref err) => err.fmt(f),
Error::IO(ref err) => err.fmt(f),
+ Error::Encoding(ref err) => err.fmt(f),
+ Error::InvalidResponse(ref cause) => {
+ write!(f, "Response doesn't have the expected format: {}", cause)
+ }
Error::Fault { code, .. } => write!(f, "{}", code),
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 6823ea3..19f0cff 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -4,19 +4,27 @@
//!
//! ```no_run
//! extern crate shiplift;
+//! extern crate tokio;
+//!
+//! use tokio::prelude::Future;
//!
//! let docker = shiplift::Docker::new();
-//! let images = docker.images().list(&Default::default()).unwrap();
-//! println!("docker images in stock");
-//! for i in images {
-//! println!("{:?}", i.repo_tags);
-//! }
+//! let fut = docker.images().list(&Default::default()).map(|images| {
+//! println!("docker images in stock");
+//! for i in images {
+//! println!("{:?}", i.repo_tags);
+//! }
+//! }).map_err(|e| eprintln!("Something bad happened! {}", e));
+//!
+//! tokio::run(fut);
//! ```
#[macro_use]
extern crate log;
extern crate byteorder;
+extern crate bytes;
extern crate flate2;
+extern crate futures;
extern crate http;
extern crate hyper;
extern crate hyper_openssl;
@@ -32,9 +40,12 @@ extern crate serde;
#[macro_use]
extern crate serde_json;
extern crate tokio;
+extern crate tokio_codec;
+extern crate tokio_io;
pub mod builder;
pub mod errors;
+pub mod read;
pub mod rep;
pub mod transport;
pub mod tty;
@@ -47,6 +58,7 @@ pub use builder::{
LogsOptions, NetworkCreateOptions, NetworkListOptions, PullOptions, RmContainerOptions,
};
pub use errors::Error;
+use futures::{future::Either, Future, IntoFuture, Stream};
use hyper::client::HttpConnector;
use hyper::Body;
use hyper::{Client, Method, Uri};
@@ -55,6 +67,7 @@ use hyper_openssl::HttpsConnector;
use hyperlocal::UnixConnector;
use mime::Mime;
use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
+use read::StreamReader;
use rep::Image as ImageRep;
use rep::{
Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, Exit, History,
@@ -63,19 +76,19 @@ use rep::{
use rep::{NetworkCreateInfo, NetworkDetails as NetworkInfo};
use serde_json::Value;
use std::borrow::Cow;
-use std::cell::RefCell;
use std::env;
-use std::io::prelude::*;
use std::path::Path;
use std::time::Duration;
+use tokio_codec::{FramedRead, LinesCodec};
use transport::{tar, Transport};
-use tty::Tty;
+use tty::{TtyDecoder, TtyLine};
use url::form_urlencoded;
/// Represents the result of all docker operations
pub type Result<T> = std::result::Result<T, Error>;
/// Entrypoint interface for communicating with docker daemon
+#[derive(Clone)]
pub struct Docker {
transport: Transport,
}
@@ -102,55 +115,28 @@ impl<'a, 'b> Image<'a, 'b> {
}
/// Inspects a named image's details
- pub fn inspect(&self) -> Result<ImageDetails> {
- let raw = self
- .docker
- .get(&format!("/images/{}/json", self.name)[..])?;
- Ok(serde_json::from_str::<ImageDetails>(&raw)?)
+ pub fn inspect(&self) -> impl Future<Item = ImageDetails, Error = Error> {
+ self.docker
+ .get_json(&format!("/images/{}/json", self.name)[..])
}
/// Lists the history of the images set of changes
- pub fn history(&self) -> Result<Vec<History>> {
- let raw = self
- .docker
- .get(&format!("/images/{}/history", self.name)[..])?;
- Ok(serde_json::from_str::<Vec<History>>(&raw)?)
- }
-
- /// Delete's an image
- pub fn delete(&self) -> Result<Vec<Status>> {
- let raw = self.docker.delete(&format!("/images/{}", self.name)[..])?;
- Ok(match serde_json::from_str(&raw)? {
- Value::Array(ref xs) => xs.iter().map(|j| {
- let obj = j.as_object().expect("expected json object");
- obj.get("Untagged")
- .map(|sha| {
- Status::Untagged(
- sha.as_str()
- .expect("expected Untagged to be a string")
- .to_owned(),
- )
- })
- .or_else(|| {
- obj.get("Deleted").map(|sha| {
- Status::Deleted(
- sha.as_str()
- .expect("expected Deleted to be a string")
- .to_owned(),
- )
- })
- })
- .expect("expected Untagged or Deleted")
- }),
- _ => unreachable!(),
- }
- .collect())
+ pub fn history(&self) -> impl Future<Item = Vec<History>, Error = Error> {
+ self.docker
+ .get_json(&format!("/images/{}/history", self.name)[..])
+ }
+
+ /// Deletes an image
+ pub fn delete(&self) -> impl Future<Item = Vec<Status>, Error = Error> {
+ self.docker
+ .delete_json::<Vec<Status>>(&format!("/images/{}", self.name)[..])
}
/// Export this image to a tarball
- pub fn export(&self) -> Result<Box<Read>> {
+ pub fn export(&self) -> impl Stream<Item = Vec<u8>, Error = Error> {
self.docker
.stream_get(&format!("/images/{}/get", self.name)[..])
+ .map(|c| c.to_vec())
}
}
@@ -169,7 +155,7 @@ impl<'a> Images<'a> {
pub fn build(
&self,
opts: &BuildOptions,
- ) -> Result<Vec<Value>> {
+ ) -> impl Stream<Item = Value, Error = Error> {
let mut path = vec!["/build".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query)
@@ -177,29 +163,31 @@ impl<'a> Images<'a> {
let mut bytes = vec![];
- tarball::dir(&mut bytes, &opts.path[..])?;
-
- self.docker
- .stream_post(&path.join("?"), Some((Body::from(bytes), tar())))
- .and_then(|r| {
- serde_json::Deserializer::from_reader(r)
- .into_iter::<Value>()
- .map(|res| res.map_err(Error::from))
- .collect()
- })
+ match tarball::dir(&mut bytes, &opts.path[..]) {
+ Ok(_) => Box::new(
+ self.docker
+ .stream_post(&path.join("?"), Some((Body::from(bytes), tar())))
+ .and_then(|bytes| {
+ serde_json::from_slice::<'_, Value>(&bytes[..])
+ .map_err(Error::from)
+ .into_future()
+ }),
+ ) as Box<Stream<Item = Value, Error = Error> + Send>,
+ Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream())
+ as Box<Stream<Item = Value, Error = Error> + Send>,
+ }
}
/// Lists the docker images on the current docker host
pub fn list(
&self,
opts: &ImageListOptions,
- ) -> Result<Vec<ImageRep>> {
+ ) -> impl Future<Item = Vec<ImageRep>, Error = Error> {
let mut path = vec!["/images/json".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
- let raw = self.docker.get(&path.join("?"))?;
- Ok(serde_json::from_str::<Vec<ImageRep>>(&raw)?)
+ self.docker.get_json::<Vec<ImageRep>>(&path.join("?"))
}
/// Returns a reference to a set of operations available for a named image
@@ -214,31 +202,26 @@ impl<'a> Images<'a> {
pub fn search(
&self,
term: &str,
- ) -> Result<Vec<SearchResult>> {
+ ) -> impl Future<Item = Vec<SearchResult>, Error = Error> {
let query = form_urlencoded::Serializer::new(String::new())
.append_pair("term", term)
.finish();
- let raw = self.docker.get(&format!("/images/search?{}", query)[..])?;
- Ok(serde_json::from_str::<Vec<SearchResult>>(&raw)?)
+ self.docker
+ .get_json::<Vec<SearchResult>>(&format!("/images/search?{}", query)[..])
}
/// Pull and create a new docker images from an existing image
pub fn pull(
&self,
opts: &PullOptions,
- ) -> Result<Vec<Value>> {
+ ) -> impl Stream<Item = Value, Error = Error> {
let mut path = vec!["/images/create".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
self.docker
.stream_post::<Body>(&path.join("?"), None)
- .and_then(|r| {
- serde_json::Deserializer::from_reader(r)
- .into_iter::<Value>()
- .map(|res| res.map_err(Error::from))
- .collect()
- })
+ .and_then(|r| serde_json::from_slice::<Value>(&r[..]).map_err(Error::from))
}
/// exports a collection of named images,
@@ -246,13 +229,14 @@ impl<'a> Images<'a> {
pub fn export(
&self,
names: Vec<&str>,
- ) -> Result<Box<Read>> {
+ ) -> impl Stream<Item = Vec<u8>, Error = Error> {
let params = names.iter().map(|n| ("names", *n));
let query = form_urlencoded::Serializer::new(String::new())
.extend_pairs(params)
.finish();
self.docker
.stream_get(&format!("/images/get?{}", query)[..])
+ .map(|c| c.to_vec())
}
// pub fn import(self, tarball: Box<Read>) -> Result<()> {
@@ -287,18 +271,16 @@ impl<'a, 'b> Container<'a, 'b> {
}
/// Inspects the current docker container instance's details
- pub fn inspect(&self) -> Result<ContainerDetails> {
- let raw = self
- .docker
- .get(&format!("/containers/{}/json", self.id)[..])?;
- Ok(serde_json::from_str::<ContainerDetails>(&raw)?)
+ pub fn inspect(&self) -> impl Future<Item = ContainerDetails, Error = Error> {
+ self.docker
+ .get_json::<ContainerDetails>(&format!("/containers/{}/json", self.id)[..])
}
/// Returns a `top` view of information about the container process
pub fn top(
&self,
psargs: Option<&str>,
- ) -> Result<Top> {
+ ) -> impl Future<Item = Top, Error = Error> {
let mut path = vec![format!("/containers/{}/top", self.id)];
if let Some(ref args) = psargs {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -306,46 +288,57 @@ impl<'a, 'b> Container<'a, 'b> {
.finish();
path.push(encoded)
}
- let raw = self.docker.get(&path.join("?"))?;
-
- Ok(serde_json::from_str::<Top>(&raw)?)
+ self.docker.get_json(&path.join("?"))
}
/// Returns a stream of logs emitted but the container instance
pub fn logs(
&self,
opts: &LogsOptions,
- ) -> Result<Box<Read>> {
+ ) -> impl Stream<Item = TtyLine, Error = Error> {
let mut path = vec![format!("/containers/{}/logs", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- self.docker.stream_get(&path.join("?"))
+
+ let decoder = TtyDecoder::new();
+ let chunk_stream = StreamReader::new(self.docker.stream_get(&path.join("?")));
+
+ FramedRead::new(chunk_stream, decoder)
}
/// Returns a set of changes made to the container instance
- pub fn changes(&self) -> Result<Vec<Change>> {
- let raw = self
- .docker
- .get(&format!("/containers/{}/changes", self.id)[..])?;
- Ok(serde_json::from_str::<Vec<Change>>(&raw)?)
+ pub fn changes(&self) -> impl Future<Item = Vec<Change>, Error = Error> {
+ self.docker
+ .get_json::<Vec<Change>>(&format!("/containers/{}/changes", self.id)[..])
}
/// Exports the current docker container into a tarball
- pub fn export(&self) -> Result<Box<Read>> {
+ pub fn export(&self) -> impl Stream<Item = Vec<u8>, Error = Error> {
self.docker
.stream_get(&format!("/containers/{}/export", self.id)[..])
+ .map(|c| c.to_vec())
}
/// Returns a stream of stats specific to this container instance
- pub fn stats(&self) -> Result<Vec<Stats>> {
- self.docker
- .stream_get(&format!("/containers/{}/stats", self.id)[..])
- .and_then(|r| serde_json::from_reader::<_, Vec<Stats>>(r).map_err(Error::from))
+ pub fn stats(&self) -> impl Stream<Item = Stats, Error = Error> {
+ let decoder = LinesCodec::new();
+ let stream_of_chunks = StreamReader::new(
+ self.docker
+ .stream_get(&format!("/containers/{}/stats", self.id)[..]),
+ );
+
+ FramedRead::new(stream_of_chunks, decoder)
+ .map_err(Error::IO)
+ .and_then(|s| {
+ serde_json::from_str::<Stats>(&s)
+ .map_err(Error::SerdeJsonError)
+ .into_future()
+ })
}
/// Start the container instance
- pub fn start(&self) -> Result<()> {
+ pub fn start(&self) -> impl Future<Item = (), Error = Error> {
self.docker
.post::<Body>(&format!("/containers/{}/start", self.id)[..], None)
.map(|_| ())
@@ -355,7 +348,7 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn stop(
&self,
wait: Option<Duration>,
- ) -> Result<()> {
+ ) -> impl Future<Item = (), Error = Error> {
let mut path = vec![format!("/containers/{}/stop", self.id)];
if let Some(w) = wait {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -371,7 +364,7 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn restart(
&self,
wait: Option<Duration>,
- ) -> Result<()> {
+ ) -> impl Future<Item = (), Error = Error> {
let mut path = vec![format!("/containers/{}/restart", self.id)];
if let Some(w) = wait {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -386,7 +379,7 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn kill(
&self,
signal: Option<&str>,
- ) -> Result<()> {
+ ) -> impl Future<Item = (), Error = Error> {
let mut path = vec![format!("/containers/{}/kill", self.id)];
if let Some(sig) = signal {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -401,7 +394,7 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn rename(
&self,
name: &str,
- ) -> Result<()> {
+ ) -> impl Future<Item = (), Error = Error> {
let query = form_urlencoded::Serializer::new(String::new())
.append_pair("name", name)
.finish();
@@ -414,31 +407,29 @@ impl<'a, 'b> Container<'a, 'b> {
}
/// Pause the container instance
- pub fn pause(&self) -> Result<()> {
+ pub fn pause(&self) -> impl Future<Item = (), Error = Error> {
self.docker
.post::<Body>(&format!("/containers/{}/pause", self.id)[..], None)
.map(|_| ())
}
/// Unpause the container instance
- pub fn unpause(&self) -> Result<()> {
+ pub fn unpause(&self) -> impl Future<Item = (), Error = Error> {
self.docker
.post::<Body>(&format!("/containers/{}/unpause", self.id)[..], None)
.map(|_| ())
}
/// Wait until the container stops
- pub fn wait(&self) -> Result<Exit> {
- let raw = self
- .docker
- .post::<Body>(&format!("/containers/{}/wait", self.id)[..], None)?;
- Ok(serde_json::from_str::<Exit>(&raw)?)
+ pub fn wait(&self) -> impl Future<Item = Exit, Error = Error> {
+ self.docker
+ .post_json::<Body, Exit>(&format!("/containers/{}/wait", self.id)[..], None)
}
/// Delete the container instance
///
/// Use remove instead to use the force/v options.
- pub fn delete(&self) -> Result<()> {
+ pub fn delete(&self) -> impl Future<Item = (), Error = Error> {
self.docker
.delete(&format!("/containers/{}", self.id)[..])
.map(|_| ())
@@ -448,51 +439,48 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn remove(
&self,
opts: RmContainerOptions,
- ) -> Result<()> {
+ ) -> impl Future<Item = (), Error = Error> {
let mut path = vec![format!("/containers/{}", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- self.docker.delete(&path.join("?"))?;
- Ok(())
+ self.docker.delete(&path.join("?")).map(|_| ())
}
+ // TODO(abusch) fix this
/// Exec the specified command in the container
pub fn exec(
&self,
opts: &ExecContainerOptions,
- ) -> Result<Tty> {
- let data = opts.serialize()?;
+ ) -> impl Stream<Item = TtyLine, Error = Error> {
+ let data = opts.serialize().unwrap(); // TODO fixme
let bytes = data.into_bytes();
- match self.docker.post(
- &format!("/containers/{}/exec", self.id)[..],
- Some((bytes, mime::APPLICATION_JSON)),
- ) {
- Err(e) => Err(e),
- Ok(res) => {
+ let docker2 = self.docker.clone();
+ self.docker
+ .post(
+ &format!("/containers/{}/exec", self.id)[..],
+ Some((bytes, mime::APPLICATION_JSON)),
+ )
+ .map(move |res| {
let data = "{}";
- let mut bytes = data.as_bytes();
- let json: Value = serde_json::from_str(res.as_str())?;
-
- if let Value::Object(ref obj) = json {
- self.docker
- .stream_post(
- &format!(
- "/exec/{}/start",
- obj
- .get("Id")
- .unwrap()
- .as_str()
- .unwrap()
- )[..],
- Some((bytes, mime::APPLICATION_JSON)),
- ).map(Tty::new)
- } else {
- // TODO
- panic!()
- }
- }
- }
+ let bytes = data.as_bytes();
+ let id = serde_json::from_str::<Value>(res.as_str())
+ .ok()
+ .and_then(|v| {
+ v.as_object()
+ .and_then(|v| v.get("Id"))
+ .and_then(|v| v.as_str().map(|v| v.to_string()))
+ })
+ .unwrap(); // TODO fixme
+
+ let decoder = TtyDecoder::new();
+ let chunk_stream = StreamReader::new(docker2.stream_post(
+ &format!("/exec/{}/start", id)[..],
+ Some((bytes, mime::APPLICATION_JSON)),
+ ));
+ FramedRead::new(chunk_stream, decoder)
+ })
+ .flatten_stream()
}
// todo attach, attach/ws, copy, archive
@@ -513,13 +501,12 @@ impl<'a> Containers<'a> {
pub fn list(
&self,
opts: &ContainerListOptions,
- ) -> Result<Vec<ContainerRep>> {
+ ) -> impl Future<Item = Vec<ContainerRep>, Error = Error> {
let mut path = vec!["/containers/json".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query)
}
- let raw = self.docker.get(&path.join("?"))?;
- Ok(serde_json::from_str::<Vec<ContainerRep>>(&raw)?)
+ self.docker.get_json::<Vec<ContainerRep>>(&path.join("?"))
}
/// Returns a reference to a set of operations available to a specific container instance
@@ -534,8 +521,12 @@ impl<'a> Containers<'a> {
pub fn create(
&self,
opts: &ContainerOptions,
- ) -> Result<ContainerCreateInfo> {
- let data = opts.serialize()?;
+ ) -> impl Future<Item = ContainerCreateInfo, Error = Error> {
+ let data = match opts.serialize() {
+ Ok(data) => data,
+ Err(e) => return Either::A(futures::future::err(e)),
+ };
+
let bytes = data.into_bytes();
let mut path = vec!["/containers/create".to_owned()];
@@ -547,10 +538,10 @@ impl<'a> Containers<'a> {
);
}
- let raw = self
- .docker
- .post(&path.join("?"), Some((bytes, mime::APPLICATION_JSON)))?;
- Ok(serde_json::from_str::<ContainerCreateInfo>(&raw)?)
+ Either::B(
+ self.docker
+ .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))),
+ )
}
}
@@ -569,13 +560,12 @@ impl<'a> Networks<'a> {
pub fn list(
&self,
opts: &NetworkListOptions,
- ) -> Result<Vec<NetworkInfo>> {
+ ) -> impl Future<Item = Vec<NetworkInfo>, Error = Error> {
let mut path = vec!["/networks".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
- let raw = self.docker.get(&path.join("?"))?;
- Ok(serde_json::from_str::<Vec<NetworkInfo>>(&raw)?)
+ self.docker.get_json(&path.join("?"))
}
/// Returns a reference to a set of operations available to a specific network instance
@@ -586,18 +576,22 @@ impl<'a> Networks<'a> {
Network::new(self.docker, id)
}
+ /// Create a new Network instance
pub fn create(
&self,
opts: &NetworkCreateOptions,
- ) -> Result<NetworkCreateInfo> {
- let data = opts.serialize()?;
+ ) -> impl Future<Item = NetworkCreateInfo, Error = Error> {
+ let data = match opts.serialize() {
+ Ok(data) => data,
+ Err(e) => return Either::A(futures::future::err(e)),
+ };
let bytes = data.into_bytes();
let path = vec!["/networks/create".to_owned()];
- let raw = self
- .docker
- .post(&path.join("?"), Some((bytes, mime::APPLICATION_JSON)))?;
- Ok(serde_json::from_str::<NetworkCreateInfo>(&raw)?)
+ Either::B(
+ self.docker
+ .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))),
+ )
}
}
@@ -628,13 +622,12 @@ impl<'a, 'b> Network<'a, 'b> {
}
/// Inspects the current docker network instance's details
- pub fn inspect(&self) -> Result<NetworkInfo> {
- let raw = self.docker.get(&format!("/networks/{}", self.id)[..])?;
- Ok(serde_json::from_str::<NetworkInfo>(&raw)?)
+ pub fn inspect(&self) -> impl Future<Item = NetworkInfo, Error = Error> {
+ self.docker.get_json(&format!("/networks/{}", self.id)[..])
}
/// Delete the network instance
- pub fn delete(&self) -> Result<()> {
+ pub fn delete(&self) -> impl Future<Item = (), Error = Error> {
self.docker
.delete(&format!("/networks/{}", self.id)[..])
.map(|_| ())
@@ -644,7 +637,7 @@ impl<'a, 'b> Network<'a, 'b> {
pub fn connect(
&self,
opts: &ContainerConnectionOptions,
- ) -> Result<()> {
+ ) -> impl Future<Item = (), Error = Error> {
self.do_connection("connect", opts)
}
@@ -652,7 +645,7 @@ impl<'a, 'b> Network<'a, 'b> {
pub fn disconnect(
&self,
opts: &ContainerConnectionOptions,
- ) -> Result<()> {
+ ) -> impl Future<Item = (), Error = Error> {
self.do_connection("disconnect", opts)
}
@@ -660,16 +653,21 @@ impl<'a, 'b> Network<'a, 'b> {
&self,
segment: &str,
opts: &ContainerConnectionOptions,
- ) -> Result<()> {
- let data = opts.serialize()?;
+ ) -> impl Future<Item = (), Error = Error> {
+ let data = match opts.serialize() {
+ Ok(data) => data,
+ Err(e) => return Either::A(futures::future::err(e)),
+ };
let bytes = data.into_bytes();
- self.docker
- .post(
- &format!("/networks/{}/{}", self.id, segment)[..],
- Some((bytes, mime::APPLICATION_JSON)),
- )
- .map(|_| ())
+ Either::B(
+ self.docker
+ .post(
+ &format!("/networks/{}/{}", self.id, segment)[..],
+ Some((bytes, mime::APPLICATION_JSON)),
+ )
+ .map(|_| ()),
+ )
}
}
@@ -700,7 +698,6 @@ impl Docker {
Docker {
transport: Transport::Unix {
client: Client::builder().keep_alive(false).build(UnixConnector),
- runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()),
path: socket_path.into(),
},
}
@@ -720,7 +717,6 @@ impl Docker {
Some("unix") => Docker {
transport: Transport::Unix {
client: Client::builder().build(UnixConnector),
- runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()),
path: host.path().to_owned(),
},
},
@@ -755,7 +751,6 @@ impl Docker {
transport: Transport::EncryptedTcp {
client: Client::builder()
.build(HttpsConnector::with_connector(http, connector).unwrap()),
- runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()),
host: tcp_host_str,
},
}
@@ -763,7 +758,6 @@ impl Docker {
Docker {
transport: Transport::Tcp {
client: Client::builder().build(http),
- runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()),
host: tcp_host_str,
},
}
@@ -787,76 +781,126 @@ impl Docker {
}
/// Returns version information associated with the docker daemon
- pub fn version(&self) -> Result<Version> {
- let raw = self.get("/version")?;
- Ok(serde_json::from_str::<Version>(&raw)?)
+ pub fn version(&self) -> impl Future<Item = Version, Error = Error> {
+ self.get_json("/version")
}
/// Returns information associated with the docker daemon
- pub fn info(&self) -> Result<Info> {
- let raw = self.get("/info")?;
- Ok(serde_json::from_str::<Info>(&raw)?)
+ pub fn info(&self) -> impl Future<Item = Info, Error = Error> {
+ self.get_json("/info")
}
/// Returns a simple ping response indicating the docker daemon is accessible
- pub fn ping(&self) -> Result<String> {
+ pub fn ping(&self) -> impl Future<Item = String, Error = Error> {
self.get("/_ping")
}
- /// Returns an interator over streamed docker events
+ /// Returns a stream of docker events
pub fn events(
&self,
opts: &EventsOptions,
- ) -> Result<Vec<Event>> {
+ ) -> impl Stream<Item = Event, Error = Error> {
let mut path = vec!["/events".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
- self.stream_get(&path.join("?")[..])
- .and_then(|r| serde_json::from_reader::<_, Vec<Event>>(r).map_err(Error::from))
+ let stream_of_chunks = self.stream_get(&path.join("?")[..]);
+ let reader = StreamReader::new(stream_of_chunks);
+ FramedRead::new(reader, LinesCodec::new())
+ .map_err(Error::IO)
+ .and_then(|line| serde_json::from_str::<Event>(&line).map_err(Error::from))
}
+ //
+ // Utility functions to make requests
+ //
+
fn get(
&self,
endpoint: &str,
- ) -> Result<String> {
+ ) -> impl Future<Item = String, Error = Error> {
self.transport.request::<Body>(Method::GET, endpoint, None)
}
+ fn get_json<T: serde::de::DeserializeOwned>(
+ &self,
+ endpoint: &str,
+ ) -> impl Future<Item = T, Error = Error> {
+ self.transport
+ .request::<Body>(Method::GET, endpoint, None)
+ .and_then(|v| {
+ serde_json::from_str::<T>(&v)
+ .map_err(Error::SerdeJsonError)
+ .into_future()
+ })
+ }
+
fn post<B>(
&self,
endpoint: &str,
body: Option<(B, Mime)>,
- ) -> Result<String>
+ ) -> impl Future<Item = String, Error = Error>
where
B: Into<Body>,
{
self.transport.request(Method::POST, endpoint, body)
}
+ fn post_json<B, T>(
+ &self,
+ endpoint: &str,
+ body: Option<(B, Mime)>,
+ ) -> impl Future<Item = T, Error = Error>
+ where
+ B: Into<Body>,
+ T: serde::de::DeserializeOwned,
+ {
+ self.transport
+ .request(Method::POST, endpoint, body)
+ .and_then(|v| {