diff options
author | Antoine Büsch <antoine.busch@gmail.com> | 2018-11-14 20:36:14 +1100 |
---|---|---|
committer | doug tangren <d.tangren@gmail.com> | 2018-11-14 18:36:14 +0900 |
commit | 79d65c286025c551a775c0964d168e6feb4b3409 (patch) | |
tree | 34b49a0f97f6f851f47711be1cad0c002b8b78f7 /src | |
parent | 29bd95b42cd2b3c364f0be1f3e07e4b654e0ccf3 (diff) |
Async api (#128)
* Refactored Transport for better async use
Still a bit rough, but it now builds a big future using combinators. It
still does one `Runtime::block_on()` to keep the existing API, but this
is a first up before making the whole API async.
* Migrate most APIs to be Future-based
I still need to finish a few of the more tricky ones that I've commented
out for now, but most of it compiles and some examples work. In
particular, `Docker::stats()` now properly returns an async stream of
stats.
* Fix events and containerinspect examples
* Fix imageinspect, images, info and top examples
* Fix containercreate, imagedelete and imagepull examples
* Fix more examples
* Add back debug statement in Transport::request
* De-glob imports in examples
* Remove unused imports in examples
* Fix NetworkCreateOptions serialization
* Add back error message extraction in Transport
* Fix Container::create serialization of options
* Add containerdelete example
* Simplify result
* Fix some error handling to remove unwrap()
* Fix Image::export()
* Fix imagebuild example
* Add adapter from Stream of Chunks to AsyncRead
Having an `AsyncRead` is required to be able to use the `FramedRead` and
`Decoder` stuff from tokio_codec. This code is "borrowed" from
https:/github.com/ferristseng/rust-ipfs-api though should probably be
moved to its own crate or to tokio_codec.
* Fix Container::logs()
It now properly demuxes stdout/stderr, and returns a `Stream<Item =
TtyLine>`.
* Fix Container::export()
* Use LineCodec for streaming JSON
Although in my limited testing it seemed to work fine, there is no
guarantee that 1 chunk == 1 piece of valid JSON. However, each JSON
structure seems to be serialized on one line, so use LineCodec to turn
the body into a stream of lines, then deserialize over this.
* Fix serialization of ExecContainerOptions
* Fix Container::exec() (kind of...)
* Simplify deserialisation in Image::delete()
* Small clean-ups
* More clean ups
* Fix rustdoc + remove extraneous "extern crate"
* Fix doc example
* Fix formatting
Diffstat (limited to 'src')
-rw-r--r-- | src/builder.rs | 33 | ||||
-rw-r--r-- | src/errors.rs | 7 | ||||
-rw-r--r-- | src/lib.rs | 434 | ||||
-rw-r--r-- | src/read.rs | 103 | ||||
-rw-r--r-- | src/rep.rs | 9 | ||||
-rw-r--r-- | src/transport.rs | 202 | ||||
-rw-r--r-- | src/tty.rs | 151 |
7 files changed, 556 insertions, 383 deletions
diff --git a/src/builder.rs b/src/builder.rs index 05576f4..27a92e4 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -373,7 +373,7 @@ impl ContainerOptions { /// serialize options as a string. returns None if no options are defined pub fn serialize(&self) -> Result<String> { - Ok(serde_json::to_string(&self.to_json())?) + serde_json::to_string(&self.to_json()).map_err(Error::from) } fn to_json(&self) -> Value { @@ -600,7 +600,23 @@ impl ExecContainerOptions { /// serialize options as a string. returns None if no options are defined pub fn serialize(&self) -> Result<String> { - Ok(serde_json::to_string(self)?) + let mut body = serde_json::Map::new(); + + for (k, v) in &self.params { + body.insert( + k.to_string(), + serde_json::to_value(v).map_err(Error::SerdeJsonError)?, + ); + } + + for (k, v) in &self.params_bool { + body.insert( + k.to_string(), + serde_json::to_value(v).map_err(Error::SerdeJsonError)?, + ); + } + + serde_json::to_string(&body).map_err(Error::from) } } @@ -873,7 +889,7 @@ impl LogsOptionsBuilder { self } - /// how_many can either by "all" or a to_string() of the number + /// how_many can either be "all" or a to_string() of the number pub fn tail( &mut self, how_many: &str, @@ -1067,15 +1083,22 @@ impl NetworkCreateOptions { NetworkCreateOptionsBuilder::new(name) } + fn to_json(&self) -> Value { + let mut body = serde_json::Map::new(); + self.parse_from(&self.params, &mut body); + self.parse_from(&self.params_hash, &mut body); + Value::Object(body) + } + /// serialize options as a string. returns None if no options are defined pub fn serialize(&self) -> Result<String> { - serde_json::to_string(self).map_err(Error::from) + serde_json::to_string(&self.to_json()).map_err(Error::from) } pub fn parse_from<'a, K, V>( &self, params: &'a HashMap<K, V>, - body: &mut BTreeMap<String, Value>, + body: &mut serde_json::Map<String, Value>, ) where &'a HashMap<K, V>: IntoIterator, K: ToString + Eq + Hash, diff --git a/src/errors.rs b/src/errors.rs index 5af8c44..36be662 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -6,6 +6,7 @@ use serde_json::Error as SerdeError; use std::error::Error as StdError; use std::fmt; use std::io::Error as IoError; +use std::string::FromUtf8Error; #[derive(Debug)] pub enum Error { @@ -13,6 +14,8 @@ pub enum Error { Hyper(hyper::Error), Http(http::Error), IO(IoError), + Encoding(FromUtf8Error), + InvalidResponse(String), Fault { code: StatusCode, message: String }, } @@ -51,6 +54,10 @@ impl fmt::Display for Error { Error::Http(ref err) => err.fmt(f), Error::Hyper(ref err) => err.fmt(f), Error::IO(ref err) => err.fmt(f), + Error::Encoding(ref err) => err.fmt(f), + Error::InvalidResponse(ref cause) => { + write!(f, "Response doesn't have the expected format: {}", cause) + } Error::Fault { code, .. } => write!(f, "{}", code), } } @@ -4,19 +4,27 @@ //! //! ```no_run //! extern crate shiplift; +//! extern crate tokio; +//! +//! use tokio::prelude::Future; //! //! let docker = shiplift::Docker::new(); -//! let images = docker.images().list(&Default::default()).unwrap(); -//! println!("docker images in stock"); -//! for i in images { -//! println!("{:?}", i.repo_tags); -//! } +//! let fut = docker.images().list(&Default::default()).map(|images| { +//! println!("docker images in stock"); +//! for i in images { +//! println!("{:?}", i.repo_tags); +//! } +//! }).map_err(|e| eprintln!("Something bad happened! {}", e)); +//! +//! tokio::run(fut); //! ``` #[macro_use] extern crate log; extern crate byteorder; +extern crate bytes; extern crate flate2; +extern crate futures; extern crate http; extern crate hyper; extern crate hyper_openssl; @@ -32,9 +40,12 @@ extern crate serde; #[macro_use] extern crate serde_json; extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; pub mod builder; pub mod errors; +pub mod read; pub mod rep; pub mod transport; pub mod tty; @@ -47,6 +58,7 @@ pub use builder::{ LogsOptions, NetworkCreateOptions, NetworkListOptions, PullOptions, RmContainerOptions, }; pub use errors::Error; +use futures::{future::Either, Future, IntoFuture, Stream}; use hyper::client::HttpConnector; use hyper::Body; use hyper::{Client, Method, Uri}; @@ -55,6 +67,7 @@ use hyper_openssl::HttpsConnector; use hyperlocal::UnixConnector; use mime::Mime; use openssl::ssl::{SslConnector, SslFiletype, SslMethod}; +use read::StreamReader; use rep::Image as ImageRep; use rep::{ Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, Exit, History, @@ -63,19 +76,19 @@ use rep::{ use rep::{NetworkCreateInfo, NetworkDetails as NetworkInfo}; use serde_json::Value; use std::borrow::Cow; -use std::cell::RefCell; use std::env; -use std::io::prelude::*; use std::path::Path; use std::time::Duration; +use tokio_codec::{FramedRead, LinesCodec}; use transport::{tar, Transport}; -use tty::Tty; +use tty::{TtyDecoder, TtyLine}; use url::form_urlencoded; /// Represents the result of all docker operations pub type Result<T> = std::result::Result<T, Error>; /// Entrypoint interface for communicating with docker daemon +#[derive(Clone)] pub struct Docker { transport: Transport, } @@ -102,55 +115,28 @@ impl<'a, 'b> Image<'a, 'b> { } /// Inspects a named image's details - pub fn inspect(&self) -> Result<ImageDetails> { - let raw = self - .docker - .get(&format!("/images/{}/json", self.name)[..])?; - Ok(serde_json::from_str::<ImageDetails>(&raw)?) + pub fn inspect(&self) -> impl Future<Item = ImageDetails, Error = Error> { + self.docker + .get_json(&format!("/images/{}/json", self.name)[..]) } /// Lists the history of the images set of changes - pub fn history(&self) -> Result<Vec<History>> { - let raw = self - .docker - .get(&format!("/images/{}/history", self.name)[..])?; - Ok(serde_json::from_str::<Vec<History>>(&raw)?) - } - - /// Delete's an image - pub fn delete(&self) -> Result<Vec<Status>> { - let raw = self.docker.delete(&format!("/images/{}", self.name)[..])?; - Ok(match serde_json::from_str(&raw)? { - Value::Array(ref xs) => xs.iter().map(|j| { - let obj = j.as_object().expect("expected json object"); - obj.get("Untagged") - .map(|sha| { - Status::Untagged( - sha.as_str() - .expect("expected Untagged to be a string") - .to_owned(), - ) - }) - .or_else(|| { - obj.get("Deleted").map(|sha| { - Status::Deleted( - sha.as_str() - .expect("expected Deleted to be a string") - .to_owned(), - ) - }) - }) - .expect("expected Untagged or Deleted") - }), - _ => unreachable!(), - } - .collect()) + pub fn history(&self) -> impl Future<Item = Vec<History>, Error = Error> { + self.docker + .get_json(&format!("/images/{}/history", self.name)[..]) + } + + /// Deletes an image + pub fn delete(&self) -> impl Future<Item = Vec<Status>, Error = Error> { + self.docker + .delete_json::<Vec<Status>>(&format!("/images/{}", self.name)[..]) } /// Export this image to a tarball - pub fn export(&self) -> Result<Box<Read>> { + pub fn export(&self) -> impl Stream<Item = Vec<u8>, Error = Error> { self.docker .stream_get(&format!("/images/{}/get", self.name)[..]) + .map(|c| c.to_vec()) } } @@ -169,7 +155,7 @@ impl<'a> Images<'a> { pub fn build( &self, opts: &BuildOptions, - ) -> Result<Vec<Value>> { + ) -> impl Stream<Item = Value, Error = Error> { let mut path = vec!["/build".to_owned()]; if let Some(query) = opts.serialize() { path.push(query) @@ -177,29 +163,31 @@ impl<'a> Images<'a> { let mut bytes = vec![]; - tarball::dir(&mut bytes, &opts.path[..])?; - - self.docker - .stream_post(&path.join("?"), Some((Body::from(bytes), tar()))) - .and_then(|r| { - serde_json::Deserializer::from_reader(r) - .into_iter::<Value>() - .map(|res| res.map_err(Error::from)) - .collect() - }) + match tarball::dir(&mut bytes, &opts.path[..]) { + Ok(_) => Box::new( + self.docker + .stream_post(&path.join("?"), Some((Body::from(bytes), tar()))) + .and_then(|bytes| { + serde_json::from_slice::<'_, Value>(&bytes[..]) + .map_err(Error::from) + .into_future() + }), + ) as Box<Stream<Item = Value, Error = Error> + Send>, + Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream()) + as Box<Stream<Item = Value, Error = Error> + Send>, + } } /// Lists the docker images on the current docker host pub fn list( &self, opts: &ImageListOptions, - ) -> Result<Vec<ImageRep>> { + ) -> impl Future<Item = Vec<ImageRep>, Error = Error> { let mut path = vec!["/images/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - let raw = self.docker.get(&path.join("?"))?; - Ok(serde_json::from_str::<Vec<ImageRep>>(&raw)?) + self.docker.get_json::<Vec<ImageRep>>(&path.join("?")) } /// Returns a reference to a set of operations available for a named image @@ -214,31 +202,26 @@ impl<'a> Images<'a> { pub fn search( &self, term: &str, - ) -> Result<Vec<SearchResult>> { + ) -> impl Future<Item = Vec<SearchResult>, Error = Error> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("term", term) .finish(); - let raw = self.docker.get(&format!("/images/search?{}", query)[..])?; - Ok(serde_json::from_str::<Vec<SearchResult>>(&raw)?) + self.docker + .get_json::<Vec<SearchResult>>(&format!("/images/search?{}", query)[..]) } /// Pull and create a new docker images from an existing image pub fn pull( &self, opts: &PullOptions, - ) -> Result<Vec<Value>> { + ) -> impl Stream<Item = Value, Error = Error> { let mut path = vec!["/images/create".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } self.docker .stream_post::<Body>(&path.join("?"), None) - .and_then(|r| { - serde_json::Deserializer::from_reader(r) - .into_iter::<Value>() - .map(|res| res.map_err(Error::from)) - .collect() - }) + .and_then(|r| serde_json::from_slice::<Value>(&r[..]).map_err(Error::from)) } /// exports a collection of named images, @@ -246,13 +229,14 @@ impl<'a> Images<'a> { pub fn export( &self, names: Vec<&str>, - ) -> Result<Box<Read>> { + ) -> impl Stream<Item = Vec<u8>, Error = Error> { let params = names.iter().map(|n| ("names", *n)); let query = form_urlencoded::Serializer::new(String::new()) .extend_pairs(params) .finish(); self.docker .stream_get(&format!("/images/get?{}", query)[..]) + .map(|c| c.to_vec()) } // pub fn import(self, tarball: Box<Read>) -> Result<()> { @@ -287,18 +271,16 @@ impl<'a, 'b> Container<'a, 'b> { } /// Inspects the current docker container instance's details - pub fn inspect(&self) -> Result<ContainerDetails> { - let raw = self - .docker - .get(&format!("/containers/{}/json", self.id)[..])?; - Ok(serde_json::from_str::<ContainerDetails>(&raw)?) + pub fn inspect(&self) -> impl Future<Item = ContainerDetails, Error = Error> { + self.docker + .get_json::<ContainerDetails>(&format!("/containers/{}/json", self.id)[..]) } /// Returns a `top` view of information about the container process pub fn top( &self, psargs: Option<&str>, - ) -> Result<Top> { + ) -> impl Future<Item = Top, Error = Error> { let mut path = vec![format!("/containers/{}/top", self.id)]; if let Some(ref args) = psargs { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -306,46 +288,57 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - let raw = self.docker.get(&path.join("?"))?; - - Ok(serde_json::from_str::<Top>(&raw)?) + self.docker.get_json(&path.join("?")) } /// Returns a stream of logs emitted but the container instance pub fn logs( &self, opts: &LogsOptions, - ) -> Result<Box<Read>> { + ) -> impl Stream<Item = TtyLine, Error = Error> { let mut path = vec![format!("/containers/{}/logs", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.stream_get(&path.join("?")) + + let decoder = TtyDecoder::new(); + let chunk_stream = StreamReader::new(self.docker.stream_get(&path.join("?"))); + + FramedRead::new(chunk_stream, decoder) } /// Returns a set of changes made to the container instance - pub fn changes(&self) -> Result<Vec<Change>> { - let raw = self - .docker - .get(&format!("/containers/{}/changes", self.id)[..])?; - Ok(serde_json::from_str::<Vec<Change>>(&raw)?) + pub fn changes(&self) -> impl Future<Item = Vec<Change>, Error = Error> { + self.docker + .get_json::<Vec<Change>>(&format!("/containers/{}/changes", self.id)[..]) } /// Exports the current docker container into a tarball - pub fn export(&self) -> Result<Box<Read>> { + pub fn export(&self) -> impl Stream<Item = Vec<u8>, Error = Error> { self.docker .stream_get(&format!("/containers/{}/export", self.id)[..]) + .map(|c| c.to_vec()) } /// Returns a stream of stats specific to this container instance - pub fn stats(&self) -> Result<Vec<Stats>> { - self.docker - .stream_get(&format!("/containers/{}/stats", self.id)[..]) - .and_then(|r| serde_json::from_reader::<_, Vec<Stats>>(r).map_err(Error::from)) + pub fn stats(&self) -> impl Stream<Item = Stats, Error = Error> { + let decoder = LinesCodec::new(); + let stream_of_chunks = StreamReader::new( + self.docker + .stream_get(&format!("/containers/{}/stats", self.id)[..]), + ); + + FramedRead::new(stream_of_chunks, decoder) + .map_err(Error::IO) + .and_then(|s| { + serde_json::from_str::<Stats>(&s) + .map_err(Error::SerdeJsonError) + .into_future() + }) } /// Start the container instance - pub fn start(&self) -> Result<()> { + pub fn start(&self) -> impl Future<Item = (), Error = Error> { self.docker .post::<Body>(&format!("/containers/{}/start", self.id)[..], None) .map(|_| ()) @@ -355,7 +348,7 @@ impl<'a, 'b> Container<'a, 'b> { pub fn stop( &self, wait: Option<Duration>, - ) -> Result<()> { + ) -> impl Future<Item = (), Error = Error> { let mut path = vec![format!("/containers/{}/stop", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -371,7 +364,7 @@ impl<'a, 'b> Container<'a, 'b> { pub fn restart( &self, wait: Option<Duration>, - ) -> Result<()> { + ) -> impl Future<Item = (), Error = Error> { let mut path = vec![format!("/containers/{}/restart", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -386,7 +379,7 @@ impl<'a, 'b> Container<'a, 'b> { pub fn kill( &self, signal: Option<&str>, - ) -> Result<()> { + ) -> impl Future<Item = (), Error = Error> { let mut path = vec![format!("/containers/{}/kill", self.id)]; if let Some(sig) = signal { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -401,7 +394,7 @@ impl<'a, 'b> Container<'a, 'b> { pub fn rename( &self, name: &str, - ) -> Result<()> { + ) -> impl Future<Item = (), Error = Error> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("name", name) .finish(); @@ -414,31 +407,29 @@ impl<'a, 'b> Container<'a, 'b> { } /// Pause the container instance - pub fn pause(&self) -> Result<()> { + pub fn pause(&self) -> impl Future<Item = (), Error = Error> { self.docker .post::<Body>(&format!("/containers/{}/pause", self.id)[..], None) .map(|_| ()) } /// Unpause the container instance - pub fn unpause(&self) -> Result<()> { + pub fn unpause(&self) -> impl Future<Item = (), Error = Error> { self.docker .post::<Body>(&format!("/containers/{}/unpause", self.id)[..], None) .map(|_| ()) } /// Wait until the container stops - pub fn wait(&self) -> Result<Exit> { - let raw = self - .docker - .post::<Body>(&format!("/containers/{}/wait", self.id)[..], None)?; - Ok(serde_json::from_str::<Exit>(&raw)?) + pub fn wait(&self) -> impl Future<Item = Exit, Error = Error> { + self.docker + .post_json::<Body, Exit>(&format!("/containers/{}/wait", self.id)[..], None) } /// Delete the container instance /// /// Use remove instead to use the force/v options. - pub fn delete(&self) -> Result<()> { + pub fn delete(&self) -> impl Future<Item = (), Error = Error> { self.docker .delete(&format!("/containers/{}", self.id)[..]) .map(|_| ()) @@ -448,51 +439,48 @@ impl<'a, 'b> Container<'a, 'b> { pub fn remove( &self, opts: RmContainerOptions, - ) -> Result<()> { + ) -> impl Future<Item = (), Error = Error> { let mut path = vec![format!("/containers/{}", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.delete(&path.join("?"))?; - Ok(()) + self.docker.delete(&path.join("?")).map(|_| ()) } + // TODO(abusch) fix this /// Exec the specified command in the container pub fn exec( &self, opts: &ExecContainerOptions, - ) -> Result<Tty> { - let data = opts.serialize()?; + ) -> impl Stream<Item = TtyLine, Error = Error> { + let data = opts.serialize().unwrap(); // TODO fixme let bytes = data.into_bytes(); - match self.docker.post( - &format!("/containers/{}/exec", self.id)[..], - Some((bytes, mime::APPLICATION_JSON)), - ) { - Err(e) => Err(e), - Ok(res) => { + let docker2 = self.docker.clone(); + self.docker + .post( + &format!("/containers/{}/exec", self.id)[..], + Some((bytes, mime::APPLICATION_JSON)), + ) + .map(move |res| { let data = "{}"; - let mut bytes = data.as_bytes(); - let json: Value = serde_json::from_str(res.as_str())?; - - if let Value::Object(ref obj) = json { - self.docker - .stream_post( - &format!( - "/exec/{}/start", - obj - .get("Id") - .unwrap() - .as_str() - .unwrap() - )[..], - Some((bytes, mime::APPLICATION_JSON)), - ).map(Tty::new) - } else { - // TODO - panic!() - } - } - } + let bytes = data.as_bytes(); + let id = serde_json::from_str::<Value>(res.as_str()) + .ok() + .and_then(|v| { + v.as_object() + .and_then(|v| v.get("Id")) + .and_then(|v| v.as_str().map(|v| v.to_string())) + }) + .unwrap(); // TODO fixme + + let decoder = TtyDecoder::new(); + let chunk_stream = StreamReader::new(docker2.stream_post( + &format!("/exec/{}/start", id)[..], + Some((bytes, mime::APPLICATION_JSON)), + )); + FramedRead::new(chunk_stream, decoder) + }) + .flatten_stream() } // todo attach, attach/ws, copy, archive @@ -513,13 +501,12 @@ impl<'a> Containers<'a> { pub fn list( &self, opts: &ContainerListOptions, - ) -> Result<Vec<ContainerRep>> { + ) -> impl Future<Item = Vec<ContainerRep>, Error = Error> { let mut path = vec!["/containers/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query) } - let raw = self.docker.get(&path.join("?"))?; - Ok(serde_json::from_str::<Vec<ContainerRep>>(&raw)?) + self.docker.get_json::<Vec<ContainerRep>>(&path.join("?")) } /// Returns a reference to a set of operations available to a specific container instance @@ -534,8 +521,12 @@ impl<'a> Containers<'a> { pub fn create( &self, opts: &ContainerOptions, - ) -> Result<ContainerCreateInfo> { - let data = opts.serialize()?; + ) -> impl Future<Item = ContainerCreateInfo, Error = Error> { + let data = match opts.serialize() { + Ok(data) => data, + Err(e) => return Either::A(futures::future::err(e)), + }; + let bytes = data.into_bytes(); let mut path = vec!["/containers/create".to_owned()]; @@ -547,10 +538,10 @@ impl<'a> Containers<'a> { ); } - let raw = self - .docker - .post(&path.join("?"), Some((bytes, mime::APPLICATION_JSON)))?; - Ok(serde_json::from_str::<ContainerCreateInfo>(&raw)?) + Either::B( + self.docker + .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))), + ) } } @@ -569,13 +560,12 @@ impl<'a> Networks<'a> { pub fn list( &self, opts: &NetworkListOptions, - ) -> Result<Vec<NetworkInfo>> { + ) -> impl Future<Item = Vec<NetworkInfo>, Error = Error> { let mut path = vec!["/networks".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - let raw = self.docker.get(&path.join("?"))?; - Ok(serde_json::from_str::<Vec<NetworkInfo>>(&raw)?) + self.docker.get_json(&path.join("?")) } /// Returns a reference to a set of operations available to a specific network instance @@ -586,18 +576,22 @@ impl<'a> Networks<'a> { Network::new(self.docker, id) } + /// Create a new Network instance pub fn create( &self, opts: &NetworkCreateOptions, - ) -> Result<NetworkCreateInfo> { - let data = opts.serialize()?; + ) -> impl Future<Item = NetworkCreateInfo, Error = Error> { + let data = match opts.serialize() { + Ok(data) => data, + Err(e) => return Either::A(futures::future::err(e)), + }; let bytes = data.into_bytes(); let path = vec!["/networks/create".to_owned()]; - let raw = self - .docker - .post(&path.join("?"), Some((bytes, mime::APPLICATION_JSON)))?; - Ok(serde_json::from_str::<NetworkCreateInfo>(&raw)?) + Either::B( + self.docker + .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))), + ) } } @@ -628,13 +622,12 @@ impl<'a, 'b> Network<'a, 'b> { } /// Inspects the current docker network instance's details - pub fn inspect(&self) -> Result<NetworkInfo> { - let raw = self.docker.get(&format!("/networks/{}", self.id)[..])?; - Ok(serde_json::from_str::<NetworkInfo>(&raw)?) + pub fn inspect(&self) -> impl Future<Item = NetworkInfo, Error = Error> { + self.docker.get_json(&format!("/networks/{}", self.id)[..]) } /// Delete the network instance - pub fn delete(&self) -> Result<()> { + pub fn delete(&self) -> impl Future<Item = (), Error = Error> { self.docker .delete(&format!("/networks/{}", self.id)[..]) .map(|_| ()) @@ -644,7 +637,7 @@ impl<'a, 'b> Network<'a, 'b> { pub fn connect( &self, opts: &ContainerConnectionOptions, - ) -> Result<()> { + ) -> impl Future<Item = (), Error = Error> { self.do_connection("connect", opts) } @@ -652,7 +645,7 @@ impl<'a, 'b> Network<'a, 'b> { pub fn disconnect( &self, opts: &ContainerConnectionOptions, - ) -> Result<()> { + ) -> impl Future<Item = (), Error = Error> { self.do_connection("disconnect", opts) } @@ -660,16 +653,21 @@ impl<'a, 'b> Network<'a, 'b> { &self, segment: &str, opts: &ContainerConnectionOptions, - ) -> Result<()> { - let data = opts.serialize()?; + ) -> impl Future<Item = (), Error = Error> { + let data = match opts.serialize() { + Ok(data) => data, + Err(e) => return Either::A(futures::future::err(e)), + }; let bytes = data.into_bytes(); - self.docker - .post( - &format!("/networks/{}/{}", self.id, segment)[..], - Some((bytes, mime::APPLICATION_JSON)), - ) - .map(|_| ()) + Either::B( + self.docker + .post( + &format!("/networks/{}/{}", self.id, segment)[..], + Some((bytes, mime::APPLICATION_JSON)), + ) + .map(|_| ()), + ) } } @@ -700,7 +698,6 @@ impl Docker { Docker { transport: Transport::Unix { client: Client::builder().keep_alive(false).build(UnixConnector), - runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), path: socket_path.into(), }, } @@ -720,7 +717,6 @@ impl Docker { Some("unix") => Docker { transport: Transport::Unix { client: Client::builder().build(UnixConnector), - runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), path: host.path().to_owned(), }, }, @@ -755,7 +751,6 @@ impl Docker { transport: Transport::EncryptedTcp { client: Client::builder() .build(HttpsConnector::with_connector(http, connector).unwrap()), - runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), host: tcp_host_str, }, } @@ -763,7 +758,6 @@ impl Docker { Docker { transport: Transport::Tcp { client: Client::builder().build(http), - runtime: RefCell::new(tokio::runtime::Runtime::new().unwrap()), host: tcp_host_str, }, } @@ -787,76 +781,126 @@ impl Docker { } /// Returns version information associated with the docker daemon - pub fn version(&self) -> Result<Version> { - let raw = self.get("/version")?; - Ok(serde_json::from_str::<Version>(&raw)?) + pub fn version(&self) -> impl Future<Item = Version, Error = Error> { + self.get_json("/version") } /// Returns information associated with the docker daemon - pub fn info(&self) -> Result<Info> { - let raw = self.get("/info")?; - Ok(serde_json::from_str::<Info>(&raw)?) + pub fn info(&self) -> impl Future<Item = Info, Error = Error> { + self.get_json("/info") } /// Returns a simple ping response indicating the docker daemon is accessible - pub fn ping(&self) -> Result<String> { + pub fn ping(&self) -> impl Future<Item = String, Error = Error> { self.get("/_ping") } - /// Returns an interator over streamed docker events + /// Returns a stream of docker events pub fn events( &self, opts: &EventsOptions, - ) -> Result<Vec<Event>> { + ) -> impl Stream<Item = Event, Error = Error> { let mut path = vec!["/events".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - self.stream_get(&path.join("?")[..]) - .and_then(|r| serde_json::from_reader::<_, Vec<Event>>(r).map_err(Error::from)) + let stream_of_chunks = self.stream_get(&path.join("?")[..]); + let reader = StreamReader::new(stream_of_chunks); + FramedRead::new(reader, LinesCodec::new()) + .map_err(Error::IO) + .and_then(|line| serde_json::from_str::<Event>(&line).map_err(Error::from)) } + // + // Utility functions to make requests + // + fn get( &self, endpoint: &str, - ) -> Result<String> { + ) -> impl Future<Item = String, Error = Error> { self.transport.request::<Body>(Method::GET, endpoint, None) } + fn get_json<T: serde::de::DeserializeOwned>( + &self, + endpoint: &str, + ) -> impl Future<Item = T, Error = Error> { + self.transport + .request::<Body>(Method::GET, endpoint, None) + .and_then(|v| { + serde_json::from_str::<T>(&v) + .map_err(Error::SerdeJsonError) + .into_future() + }) + } + fn post<B>( &self, endpoint: &str, body: Option<(B, Mime)>, - ) -> Result<String> + ) -> impl Future<Item = String, Error = Error> where B: Into<Body>, { self.transport.request(Method::POST, endpoint, body) } + fn post_json<B, T>( + &self, + endpoint: &str, + body: Option<(B, Mime)>, + ) -> impl Future<Item = T, Error = Error> + where + B: Into<Body>, + T: serde::de::DeserializeOwned, + { + self.transport + .request(Method::POST, endpoint, body) + .and_then(|v| { |