diff options
Diffstat (limited to 'src/lib.rs')
-rw-r--r-- | src/lib.rs | 793 |
1 files changed, 410 insertions, 383 deletions
@@ -3,22 +3,22 @@ //! # examples //! //! ```no_run -//! use tokio::prelude::Future; -//! +//! # async { //! let docker = shiplift::Docker::new(); -//! let fut = docker.images().list(&Default::default()).map(|images| { -//! println!("docker images in stock"); -//! for i in images { -//! println!("{:?}", i.repo_tags); -//! } -//! }).map_err(|e| eprintln!("Something bad happened! {}", e)); //! -//! tokio::run(fut); +//! match docker.images().list(&Default::default()).await { +//! Ok(images) => { +//! for image in images { +//! println!("{:?}", image.repo_tags); +//! } +//! }, +//! Err(e) => eprintln!("Something bad happened! {}", e), +//! } +//! # }; //! ``` pub mod builder; pub mod errors; -pub mod read; pub mod rep; pub mod transport; pub mod tty; @@ -35,7 +35,6 @@ pub use crate::{ errors::Error, }; use crate::{ - read::StreamReader, rep::{ Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, Exit, History, Image as ImageRep, ImageDetails, Info, NetworkCreateInfo, @@ -43,9 +42,14 @@ use crate::{ Volume as VolumeRep, VolumeCreateInfo, Volumes as VolumesRep, }, transport::{tar, Transport}, - tty::TtyDecoder, + tty::Multiplexer as TtyMultiPlexer, +}; +use futures_util::{ + io::{AsyncRead, AsyncWrite}, + stream::Stream, + TryFutureExt, TryStreamExt, }; -use futures::{future::Either, Future, IntoFuture, Stream}; +// use futures::{future::Either, Future, IntoFuture, Stream}; pub use hyper::Uri; use hyper::{client::HttpConnector, Body, Client, Method}; #[cfg(feature = "tls")] @@ -56,8 +60,7 @@ use mime::Mime; #[cfg(feature = "tls")] use openssl::ssl::{SslConnector, SslFiletype, SslMethod}; use serde_json::Value; -use std::{borrow::Cow, env, io::Read, iter, path::Path, time::Duration}; -use tokio_codec::{FramedRead, LinesCodec}; +use std::{borrow::Cow, env, io, io::Read, iter, path::Path, time::Duration}; use url::form_urlencoded; /// Represents the result of all docker operations @@ -70,19 +73,19 @@ pub struct Docker { } /// Interface for accessing and manipulating a named docker image -pub struct Image<'a, 'b> { +pub struct Image<'a> { docker: &'a Docker, - name: Cow<'b, str>, + name: Cow<'a, str>, } -impl<'a, 'b> Image<'a, 'b> { +impl<'a> Image<'a> { /// Exports an interface for operations that may be performed against a named image pub fn new<S>( docker: &'a Docker, name: S, - ) -> Image<'a, 'b> + ) -> Self where - S: Into<Cow<'b, str>>, + S: Into<Cow<'a, str>>, { Image { docker, @@ -91,40 +94,46 @@ impl<'a, 'b> Image<'a, 'b> { } /// Inspects a named image's details - pub fn inspect(&self) -> impl Future<Item = ImageDetails, Error = Error> { + pub async fn inspect(&self) -> Result<ImageDetails> { self.docker .get_json(&format!("/images/{}/json", self.name)[..]) + .await } /// Lists the history of the images set of changes - pub fn history(&self) -> impl Future<Item = Vec<History>, Error = Error> { + pub async fn history(&self) -> Result<Vec<History>> { self.docker .get_json(&format!("/images/{}/history", self.name)[..]) + .await } /// Deletes an image - pub fn delete(&self) -> impl Future<Item = Vec<Status>, Error = Error> { + pub async fn delete(&self) -> Result<Vec<Status>> { self.docker .delete_json::<Vec<Status>>(&format!("/images/{}", self.name)[..]) + .await } /// Export this image to a tarball - pub fn export(&self) -> impl Stream<Item = Vec<u8>, Error = Error> { - self.docker - .stream_get(&format!("/images/{}/get", self.name)[..]) - .map(|c| c.to_vec()) + pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + Unpin + 'a { + Box::pin( + self.docker + .stream_get(format!("/images/{}/get", self.name)) + .map_ok(|c| c.to_vec()), + ) } /// Adds a tag to an image - pub fn tag( + pub async fn tag( &self, opts: &TagOptions, - ) -> impl Future<Item = (), Error = Error> { + ) -> Result<()> { let mut path = vec![format!("/images/{}/tag", self.name)]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.post::<Body>(&path.join("?"), None).map(|_| ()) + let _ = self.docker.post(&path.join("?"), None).await?; + Ok(()) } } @@ -141,76 +150,74 @@ impl<'a> Images<'a> { /// Builds a new image build by reading a Dockerfile in a target directory pub fn build( - &self, - opts: &BuildOptions, - ) -> impl Stream<Item = Value, Error = Error> { - let mut path = vec!["/build".to_owned()]; - if let Some(query) = opts.serialize() { - path.push(query) - } + &'a self, + opts: &'a BuildOptions, + ) -> impl Stream<Item = Result<Value>> + Unpin + 'a { + Box::pin( + async move { + let mut path = vec!["/build".to_owned()]; + if let Some(query) = opts.serialize() { + path.push(query) + } + + let mut bytes = Vec::default(); + + tarball::dir(&mut bytes, &opts.path[..])?; + + let chunk_stream = self.docker.stream_post( + path.join("?"), + Some((Body::from(bytes), tar())), + None::<iter::Empty<_>>, + ); - let mut bytes = vec![]; - - match tarball::dir(&mut bytes, &opts.path[..]) { - Ok(_) => Box::new( - self.docker - .stream_post( - &path.join("?"), - Some((Body::from(bytes), tar())), - None::<iter::Empty<_>>, - ) - .map(|r| { - futures::stream::iter_result( - serde_json::Deserializer::from_slice(&r[..]) - .into_iter::<Value>() - .collect::<Vec<_>>(), - ) - .map_err(Error::from) - }) - .flatten(), - ) as Box<dyn Stream<Item = Value, Error = Error> + Send>, - Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream()) - as Box<dyn Stream<Item = Value, Error = Error> + Send>, - } + let value_stream = chunk_stream.and_then(|chunk| async move { + serde_json::from_slice(&chunk).map_err(Error::from) + }); + + Ok(value_stream) + } + .try_flatten_stream(), + ) } /// Lists the docker images on the current docker host - pub fn list( + pub async fn list( &self, opts: &ImageListOptions, - ) -> impl Future<Item = Vec<ImageRep>, Error = Error> { + ) -> Result<Vec<ImageRep>> { let mut path = vec!["/images/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - self.docker.get_json::<Vec<ImageRep>>(&path.join("?")) + self.docker.get_json::<Vec<ImageRep>>(&path.join("?")).await } /// Returns a reference to a set of operations available for a named image - pub fn get<'b>( + pub fn get( &self, - name: &'b str, - ) -> Image<'a, 'b> { + name: &'a str, + ) -> Image<'a> { Image::new(self.docker, name) } /// Search for docker images by term - pub fn search( + pub async fn search( &self, term: &str, - ) -> impl Future<Item = Vec<SearchResult>, Error = Error> { + ) -> Result<Vec<SearchResult>> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("term", term) .finish(); self.docker .get_json::<Vec<SearchResult>>(&format!("/images/search?{}", query)[..]) + .await } /// Pull and create a new docker images from an existing image pub fn pull( &self, opts: &PullOptions, - ) -> impl Stream<Item = Value, Error = Error> { + ) -> impl Stream<Item = Result<Value>> + Unpin + 'a { let mut path = vec!["/images/create".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); @@ -218,18 +225,15 @@ impl<'a> Images<'a> { let headers = opts .auth_header() .map(|a| iter::once(("X-Registry-Auth", a))); - self.docker - .stream_post::<Body, _>(&path.join("?"), None, headers) - // todo: give this a proper enum type - .map(|r| { - futures::stream::iter_result( - serde_json::Deserializer::from_slice(&r[..]) - .into_iter::<Value>() - .collect::<Vec<_>>(), - ) - .map_err(Error::from) - }) - .flatten() + + Box::pin( + self.docker + .stream_post(path.join("?"), None, headers) + .and_then(move |chunk| { + // todo: give this a proper enum type + futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from)) + }), + ) } /// exports a collection of named images, @@ -237,14 +241,14 @@ impl<'a> Images<'a> { pub fn export( &self, names: Vec<&str>, - ) -> impl Stream<Item = Vec<u8>, Error = Error> { + ) -> impl Stream<Item = Result<Vec<u8>>> + 'a { let params = names.iter().map(|n| ("names", *n)); let query = form_urlencoded::Serializer::new(String::new()) .extend_pairs(params) .finish(); self.docker - .stream_get(&format!("/images/get?{}", query)[..]) - .map(|c| c.to_vec()) + .stream_get(format!("/images/get?{}", query)) + .map_ok(|c| c.to_vec()) } /// imports an image or set of images from a given tarball source @@ -252,43 +256,44 @@ impl<'a> Images<'a> { pub fn import( self, mut tarball: Box<dyn Read>, - ) -> impl Stream<Item = Value, Error = Error> { - let mut bytes = Vec::new(); - - match tarball.read_to_end(&mut bytes) { - Ok(_) => Box::new( - self.docker - .stream_post( - "/images/load", - Some((Body::from(bytes), tar())), - None::<iter::Empty<_>>, - ) - .and_then(|bytes| { - serde_json::from_slice::<'_, Value>(&bytes[..]) - .map_err(Error::from) - .into_future() - }), - ) as Box<dyn Stream<Item = Value, Error = Error> + Send>, - Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream()) - as Box<dyn Stream<Item = Value, Error = Error> + Send>, - } + ) -> impl Stream<Item = Result<Value>> + Unpin + 'a { + Box::pin( + async move { + let mut bytes = Vec::default(); + + tarball.read_to_end(&mut bytes)?; + + let chunk_stream = self.docker.stream_post( + "/images/load", + Some((Body::from(bytes), tar())), + None::<iter::Empty<_>>, + ); + + let value_stream = chunk_stream.and_then(|chunk| async move { + serde_json::from_slice(&chunk).map_err(Error::from) + }); + + Ok(value_stream) + } + .try_flatten_stream(), + ) } } /// Interface for accessing and manipulating a docker container -pub struct Container<'a, 'b> { +pub struct Container<'a> { docker: &'a Docker, - id: Cow<'b, str>, + id: Cow<'a, str>, } -impl<'a, 'b> Container<'a, 'b> { +impl<'a> Container<'a> { /// Exports an interface exposing operations against a container instance pub fn new<S>( docker: &'a Docker, id: S, - ) -> Container<'a, 'b> + ) -> Self where - S: Into<Cow<'b, str>>, + S: Into<Cow<'a, str>>, { Container { docker, @@ -302,16 +307,17 @@ impl<'a, 'b> Container<'a, 'b> { } /// Inspects the current docker container instance's details - pub fn inspect(&self) -> impl Future<Item = ContainerDetails, Error = Error> { + pub async fn inspect(&self) -> Result<ContainerDetails> { self.docker .get_json::<ContainerDetails>(&format!("/containers/{}/json", self.id)[..]) + .await } /// Returns a `top` view of information about the container process - pub fn top( + pub async fn top( &self, psargs: Option<&str>, - ) -> impl Future<Item = Top, Error = Error> { + ) -> Result<Top> { let mut path = vec![format!("/containers/{}/top", self.id)]; if let Some(ref args) = psargs { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -319,85 +325,95 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - self.docker.get_json(&path.join("?")) + self.docker.get_json(&path.join("?")).await } /// Returns a stream of logs emitted but the container instance pub fn logs( &self, opts: &LogsOptions, - ) -> impl Stream<Item = tty::Chunk, Error = Error> { + ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'a { let mut path = vec![format!("/containers/{}/logs", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } - let decoder = TtyDecoder::new(); - let chunk_stream = StreamReader::new(self.docker.stream_get(&path.join("?"))); + let stream = Box::pin(self.docker.stream_get(path.join("?"))); - FramedRead::new(chunk_stream, decoder) + Box::pin(tty::decode(stream)) } - /// Attaches to a running container, returning a stream that can - /// be used to interact with the standard IO streams. - pub fn attach(&self) -> impl Future<Item = tty::Multiplexed, Error = Error> { - self.docker.stream_post_upgrade_multiplexed::<Body>( - &format!( - "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", - self.id - ), - None, - ) + /// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin. + async fn attach_raw(&self) -> Result<impl AsyncRead + AsyncWrite + Send + 'a> { + self.docker + .stream_post_upgrade( + format!( + "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", + self.id + ), + None, + ) + .await } - /// Attaches to a running container, returning a stream that can - /// be used to interact with the standard IO streams. - pub fn attach_blocking(&self) -> Result<tty::MultiplexedBlocking> { - self.attach().map(|s| s.wait()).wait() + /// Attaches a `[TtyMultiplexer]` to the container. + /// + /// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin. + /// + /// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method + pub async fn attach(&self) -> Result<TtyMultiPlexer<'a>> { + let tcp_stream = self.attach_raw().await?; + + Ok(TtyMultiPlexer::new(tcp_stream)) } /// Returns a set of changes made to the container instance - pub fn changes(&self) -> impl Future<Item = Vec<Change>, Error = Error> { + pub async fn changes(&self) -> Result<Vec<Change>> { self.docker .get_json::<Vec<Change>>(&format!("/containers/{}/changes", self.id)[..]) + .await } /// Exports the current docker container into a tarball - pub fn export(&self) -> impl Stream<Item = Vec<u8>, Error = Error> { + pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + 'a { self.docker - .stream_get(&format!("/containers/{}/export", self.id)[..]) - .map(|c| c.to_vec()) + .stream_get(format!("/containers/{}/export", self.id)) + .map_ok(|c| c.to_vec()) } /// Returns a stream of stats specific to this container instance - pub fn stats(&self) -> impl Stream<Item = Stats, Error = Error> { - let decoder = LinesCodec::new(); - let stream_of_chunks = StreamReader::new( - self.docker - .stream_get(&format!("/containers/{}/stats", self.id)[..]), - ); + pub fn stats(&'a self) -> impl Stream<Item = Result<Stats>> + Unpin + 'a { + let codec = futures_codec::LinesCodec {}; - FramedRead::new(stream_of_chunks, decoder) - .map_err(Error::IO) - .and_then(|s| { - serde_json::from_str::<Stats>(&s) - .map_err(Error::SerdeJsonError) - .into_future() - }) + let reader = Box::pin( + self.docker + .stream_get(format!("/containers/{}/stats", self.id)) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + ) + .into_async_read(); + + Box::pin( + futures_codec::FramedRead::new(reader, codec) + .map_err(Error::IO) + .and_then(|s: String| async move { + serde_json::from_str(&s).map_err(Error::SerdeJsonError) + }), + ) } /// Start the container instance - pub fn start(&self) -> impl Future<Item = (), Error = Error> { + pub async fn start(&self) -> Result<()> { self.docker - .post::<Body>(&format!("/containers/{}/start", self.id)[..], None) - .map(|_| ()) + .post(&format!("/containers/{}/start", self.id)[..], None) + .await?; + Ok(()) } /// Stop the container instance - pub fn stop( + pub async fn stop( &self, wait: Option<Duration>, - ) -> impl Future<Item = (), Error = Error> { + ) -> Result<()> { let mut path = vec![format!("/containers/{}/stop", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -406,14 +422,15 @@ impl<'a, 'b> Container<'a, 'b> { path.push(encoded) } - self.docker.post::<Body>(&path.join("?"), None).map(|_| ()) + self.docker.post(&path.join("?"), None).await?; + Ok(()) } /// Restart the container instance - pub fn restart( + pub async fn restart( &self, wait: Option<Duration>, - ) -> impl Future<Item = (), Error = Error> { + ) -> Result<()> { let mut path = vec![format!("/containers/{}/restart", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -421,14 +438,15 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - self.docker.post::<Body>(&path.join("?"), None).map(|_| ()) + self.docker.post(&path.join("?"), None).await?; + Ok(()) } /// Kill the container instance - pub fn kill( + pub async fn kill( &self, signal: Option<&str>, - ) -> impl Future<Item = (), Error = Error> { + ) -> Result<()> { let mut path = vec![format!("/containers/{}/kill", self.id)]; if let Some(sig) = signal { let encoded = form_urlencoded::Serializer::new(String::new()) @@ -436,101 +454,125 @@ impl<'a, 'b> Container<'a, 'b> { .finish(); path.push(encoded) } - self.docker.post::<Body>(&path.join("?"), None).map(|_| ()) + self.docker.post(&path.join("?"), None).await?; + Ok(()) } /// Rename the container instance - pub fn rename( + pub async fn rename( &self, name: &str, - ) -> impl Future<Item = (), Error = Error> { + ) -> Result<()> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("name", name) .finish(); self.docker - .post::<Body>( + .post( &format!("/containers/{}/rename?{}", self.id, query)[..], None, ) - .map(|_| ()) + .await?; + Ok(()) } /// Pause the container instance - pub fn pause(&self) -> impl Future<Item = (), Error = Error> { + pub async fn pause(&self) -> Result<()> { self.docker - .post::<Body>(&format!("/containers/{}/pause", self.id)[..], None) - .map(|_| ()) + .post(&format!("/containers/{}/pause", self.id)[..], None) + .await?; + Ok(()) } /// Unpause the container instance - pub fn unpause(&self) -> impl Future<Item = (), Error = Error> { + pub async fn unpause(&self) -> Result<()> { self.docker - .post::<Body>(&format!("/containers/{}/unpause", self.id)[..], None) - .map(|_| ()) + .post(&format!("/containers/{}/unpause", self.id)[..], None) + .await?; + Ok(()) } /// Wait until the container stops - pub fn wait(&self) -> impl Future<Item = Exit, Error = Error> { + pub async fn wait(&self) -> Result<Exit> { self.docker - .post_json::<Body, Exit>(&format!("/containers/{}/wait", self.id)[..], None) + .post_json( + format!("/containers/{}/wait", self.id), + Option::<(Body, Mime)>::None, + ) + .await } /// Delete the container instance /// /// Use remove instead to use the force/v options. - pub fn delete(&self) -> impl Future<Item = (), Error = Error> { + pub async fn delete(&self) -> Result<()> { self.docker .delete(&format!("/containers/{}", self.id)[..]) - .map(|_| ()) + .await?; + Ok(()) } /// Delete the container instance (todo: force/v) - pub fn remove( + pub async fn remove( &self, opts: RmContainerOptions, - ) -> impl Future<Item = (), Error = Error> { + ) -> Result<()> { let mut path = vec![format!("/containers/{}", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.delete(&path.join("?")).map(|_| ()) + self.docker.delete(&path.join("?")).await?; + Ok(()) } - // TODO(abusch) fix this - /// Exec the specified command in the container - pub fn exec( + async fn exec_create( &self, opts: &ExecContainerOptions, - ) -> impl Stream<Item = tty::Chunk, Error = Error> { - let data = opts.serialize().unwrap(); // TODO fixme - let bytes = data.into_bytes(); - let docker2 = self.docker.clone(); - self.docker - .post( + ) -> Result<String> { + #[derive(serde::Deserialize)] + #[serde(rename_all = "PascalCase")] + struct Response { + id: String, + } + + let body: Body = opts.serialize()?.into(); + + let Response { id } = self + .docker + .post_json( &format!("/containers/{}/exec", self.id)[..], - Some((bytes, mime::APPLICATION_JSON)), + Some((body, mime::APPLICATION_JSON)), ) - .map(move |res| { - let data = "{}"; - let bytes = data.as_bytes(); - let id = serde_json::from_str::<Value>(res.as_str()) - .ok() - .and_then(|v| { - v.as_object() - .and_then(|v| v.get("Id")) - .and_then(|v| v.as_str().map(|v| v.to_string())) - }) - .unwrap(); // TODO fixme - - let decoder = TtyDecoder::new(); - let chunk_stream = StreamReader::new(docker2.stream_post( - &format!("/exec/{}/start", id)[..], - Some((bytes, mime::APPLICATION_JSON)), - None::<iter::Empty<_>>, - )); - FramedRead::new(chunk_stream, decoder) - }) - .flatten_stream() + .await?; + + Ok(id) + } + + fn exec_start( + &self, + id: String, + ) -> impl Stream<Item = Result<tty::TtyChunk>> + 'a { + let bytes: &[u8] = b"{}"; + + let stream = Box::pin(self.docker.stream_post( + format!("/exec/{}/start", id), + Some((bytes.into(), mime::APPLICATION_JSON)), + None::<iter::Empty<_>>, + )); + + tty::decode(stream) + } + + pub fn exec( + &'a self, + opts: &'a ExecContainerOptions, + ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'a { + Box::pin( + async move { + let id = self.exec_create(opts).await?; + Ok(self.exec_start(id)) + } + .try_flatten_stream(), + ) } /// Copy a file/folder from the container. The resulting stream is a tarball of the extracted @@ -544,24 +586,24 @@ impl<'a, 'b> Container<'a, 'b> { pub fn copy_from( &self, path: &Path, - ) -> impl Stream<Item = Vec<u8>, Error = Error> { + ) -> impl Stream<Item = Result<Vec<u8>>> + 'a { let path_arg = form_urlencoded::Serializer::new(String::new()) .append_pair("path", &path.to_string_lossy()) .finish(); - self.docker - .stream_get(&format!("/containers/{}/archive?{}", self.id, path_arg)) - .map(|c| c.to_vec()) + + let endpoint = format!("/containers/{}/archive?{}", self.id, path_arg); + self.docker.stream_get(endpoint).map_ok(|c| c.to_vec()) } /// Copy a byte slice as file into (see `bytes`) the container. /// /// The file will be copied at the given location (see `path`) and will be owned by root /// with access mask 644. - pub fn copy_file_into<P: AsRef<Path>>( + pub async fn copy_file_into<P: AsRef<Path>>( &self, path: P, bytes: &[u8], - ) -> impl Future<Item = (), Error = Error> { + ) -> Result<()> { let path = path.as_ref(); let mut ar = tar::Builder::new(Vec::new()); @@ -588,9 +630,10 @@ impl<'a, 'b> Container<'a, 'b> { self.docker .put( &format!("/containers/{}/archive?{}", self.id, path_arg), - body, + body.map(|(body, mime)| (body.into(), mime)), ) - .map(|_| ()) + .await?; + Ok(()) } } @@ -606,36 +649,33 @@ impl<'a> Containers<'a> { } /// Lists the container instances on the docker host - pub fn list( + pub async fn list( &self, opts: &ContainerListOptions, - ) -> impl Future<Item = Vec<ContainerRep>, Error = Error> { + ) -> Result<Vec<ContainerRep>> { let mut path = vec!["/containers/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query) } - self.docker.get_json::<Vec<ContainerRep>>(&path.join("?")) + self.docker + .get_json::<Vec<ContainerRep>>(&path.join("?")) + .await } /// Returns a reference to a set of operations available to a specific container instance - pub fn get<'b>( + pub fn get( &self, - name: &'b str, - ) -> Container<'a, 'b> { + name: &'a str, + ) -> Container<'a> { Container::new(self.docker, name) } /// Returns a builder interface for creating a new container instance - pub fn create( + pub async fn create( &self, opts: &ContainerOptions, - ) -> impl Future<Item = ContainerCreateInfo, Error = Error> { - let data = match opts.serialize() { - Ok(data) => data, - Err(e) => return Either::A(futures::future::err(e)), - }; - - let bytes = data.into_bytes(); + ) -> Result<ContainerCreateInfo> { + let body: Body = opts.serialize()?.into(); let mut path = vec!["/containers/create".to_owned()]; if let Some(ref name) = opts.name { @@ -646,10 +686,9 @@ impl<'a> Containers<'a> { ); } - Either::B( - self.docker - .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))), - ) + self.docker + .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON))) + .await } } @@ -665,15 +704,15 @@ impl<'a> Networks<'a> { } /// List the docker networks on the current docker host - pub fn list( + pub async fn list( &self, opts: &NetworkListOptions, - ) -> impl Future<Item = Vec<NetworkInfo>, Error = Error> { + ) -> Result<Vec<NetworkInfo>> { let mut path = vec!["/networks".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } - self.docker.get_json(&path.join("?")) + self.docker.get_json(&path.join("?")).await } /// Returns a reference to a set of operations available to a specific network instance @@ -685,21 +724,16 @@ impl<'a> Networks<'a> { } /// Create a new Network instance - pub fn create( + pub async fn create( &self, opts: &NetworkCreateOptions, - ) -> impl Future<Item = NetworkCreateInfo, Error = Error> { - let data = match opts.serialize() { - Ok(data) => data, - Err(e) => return Either::A(futures::future::err(e)), - }; - let bytes = data.into_bytes(); + ) -> Result<NetworkCreateInfo> { + let body: Body = opts.serialize()?.into(); let path = vec!["/networks/create".to_owned()]; - Either::B( - self.docker - .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))), - ) + self.docker + .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON))) + .await } } @@ -730,52 +764,50 @@ impl<'a, 'b> Network<'a, 'b> { } /// Inspects the current docker network instance's details - pub fn inspect(&self) -> impl Future<Item = NetworkInfo, Error = Error> { - self.docker.get_json(&format!("/networks/{}", self.id)[..]) + pub async fn inspect(&self) -> Result<NetworkInfo> { + self.docker + .get_json(&format!("/networks/{}", self.id)[..]) + .await } /// Delete the network instance - pub fn delete(&self) -> impl Future<Item = (), Error = Error> { + pub async fn delete(&self) -> Result<()> { self.docker .delete(&format!("/networks/{}", self.id)[..]) - .map(|_| ()) + .await?; + Ok(()) } /// Connect container to network - pub fn connect( + pub async fn connect( &self, opts: &ContainerConnectionOptions, - ) -> impl Future<Item = (), Error = Error> { - self.do_connection("connect", opts) + ) -> Result<()> { + self.do_connection("connect", opts).await } /// Disconnect container to network - pub fn disconnect( + pub async fn disconnect( &self, opts: &ContainerConnectionOptions, - ) -> impl Future<Item = (), Error = Error> { - self.do_connection("disconnect", opts) + ) -> Result<()> { + self.do_connection("disconnect", opts).await } - fn do_connection( + async fn do_connection( &self, segment: &str, opts: &ContainerConnectionOptions, - ) -> impl Future<Item = (), Error = Error> { - let data = match opts.serialize() { - Ok(data) => data, - Err(e) => return Either::A(futures::future::err(e)), - }; - let bytes = data.into_bytes(); + ) -> Result<()> { + let body: Body = opts.serialize()?.into(); - Either::B( - self.docker - .post( - &format!("/networks/{}/{}", self.id, segment)[..], - Some((bytes, mime::APPLICATION_JSON)), - ) - .map(|_| ()), - ) + self.docker + .post( + &format!("/networks/{}/{}", self.id, segment)[..], + Some((body, mime::APPLICATION_JSON)), + ) + .await?; + Ok(()) } } @@ -790,34 +822,27 @@ impl<'a> Volumes<'a> { Volumes { docker } } - pub fn create( + pub async fn create( &self, opts: &VolumeCreateOptions, - ) -> impl Future<Item = VolumeCreateInfo, Error = Error> { - let data = match opts.serialize() { - Ok(data) => data, - Err(e) => return Either::A(futures::future::err(e)), - }; - - let bytes = data.into_bytes(); + ) -> Result<VolumeCreateInfo> { + let body: Body = opts.serialize()?.into(); let path = vec!["/volumes/create".to_owned()]; - Either::B( - self.docker - .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))), - ) + self.docker + .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON))) + .await } /// Lists the docker volumes on the current docker host - pub fn list(&self) -> impl Future<Item = Vec<VolumeRep>, Error = Error> { + pub async fn list(&self) -> Result<Vec<VolumeRep>> { let path = vec!["/volumes".to_owned()]; - self.docker - .get_json::<VolumesRep>(&path.join("?")) - .map(|volumes: VolumesRep| match volumes.volumes { - Some(volumes) => volumes, - None => vec![], - }) + let volumes_rep = self.docker.get_json::<VolumesRep>(&path.join("?")).await?; + Ok(match volumes_rep.volumes { + Some(volumes) => volumes, + None => vec![], + }) } /// Returns a reference to a set of |