summaryrefslogtreecommitdiffstats
path: root/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs793
1 files changed, 410 insertions, 383 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 16e5625..06b31a1 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,22 +3,22 @@
//! # examples
//!
//! ```no_run
-//! use tokio::prelude::Future;
-//!
+//! # async {
//! let docker = shiplift::Docker::new();
-//! let fut = docker.images().list(&Default::default()).map(|images| {
-//! println!("docker images in stock");
-//! for i in images {
-//! println!("{:?}", i.repo_tags);
-//! }
-//! }).map_err(|e| eprintln!("Something bad happened! {}", e));
//!
-//! tokio::run(fut);
+//! match docker.images().list(&Default::default()).await {
+//! Ok(images) => {
+//! for image in images {
+//! println!("{:?}", image.repo_tags);
+//! }
+//! },
+//! Err(e) => eprintln!("Something bad happened! {}", e),
+//! }
+//! # };
//! ```
pub mod builder;
pub mod errors;
-pub mod read;
pub mod rep;
pub mod transport;
pub mod tty;
@@ -35,7 +35,6 @@ pub use crate::{
errors::Error,
};
use crate::{
- read::StreamReader,
rep::{
Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, Exit,
History, Image as ImageRep, ImageDetails, Info, NetworkCreateInfo,
@@ -43,9 +42,14 @@ use crate::{
Volume as VolumeRep, VolumeCreateInfo, Volumes as VolumesRep,
},
transport::{tar, Transport},
- tty::TtyDecoder,
+ tty::Multiplexer as TtyMultiPlexer,
+};
+use futures_util::{
+ io::{AsyncRead, AsyncWrite},
+ stream::Stream,
+ TryFutureExt, TryStreamExt,
};
-use futures::{future::Either, Future, IntoFuture, Stream};
+// use futures::{future::Either, Future, IntoFuture, Stream};
pub use hyper::Uri;
use hyper::{client::HttpConnector, Body, Client, Method};
#[cfg(feature = "tls")]
@@ -56,8 +60,7 @@ use mime::Mime;
#[cfg(feature = "tls")]
use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
use serde_json::Value;
-use std::{borrow::Cow, env, io::Read, iter, path::Path, time::Duration};
-use tokio_codec::{FramedRead, LinesCodec};
+use std::{borrow::Cow, env, io, io::Read, iter, path::Path, time::Duration};
use url::form_urlencoded;
/// Represents the result of all docker operations
@@ -70,19 +73,19 @@ pub struct Docker {
}
/// Interface for accessing and manipulating a named docker image
-pub struct Image<'a, 'b> {
+pub struct Image<'a> {
docker: &'a Docker,
- name: Cow<'b, str>,
+ name: Cow<'a, str>,
}
-impl<'a, 'b> Image<'a, 'b> {
+impl<'a> Image<'a> {
/// Exports an interface for operations that may be performed against a named image
pub fn new<S>(
docker: &'a Docker,
name: S,
- ) -> Image<'a, 'b>
+ ) -> Self
where
- S: Into<Cow<'b, str>>,
+ S: Into<Cow<'a, str>>,
{
Image {
docker,
@@ -91,40 +94,46 @@ impl<'a, 'b> Image<'a, 'b> {
}
/// Inspects a named image's details
- pub fn inspect(&self) -> impl Future<Item = ImageDetails, Error = Error> {
+ pub async fn inspect(&self) -> Result<ImageDetails> {
self.docker
.get_json(&format!("/images/{}/json", self.name)[..])
+ .await
}
/// Lists the history of the images set of changes
- pub fn history(&self) -> impl Future<Item = Vec<History>, Error = Error> {
+ pub async fn history(&self) -> Result<Vec<History>> {
self.docker
.get_json(&format!("/images/{}/history", self.name)[..])
+ .await
}
/// Deletes an image
- pub fn delete(&self) -> impl Future<Item = Vec<Status>, Error = Error> {
+ pub async fn delete(&self) -> Result<Vec<Status>> {
self.docker
.delete_json::<Vec<Status>>(&format!("/images/{}", self.name)[..])
+ .await
}
/// Export this image to a tarball
- pub fn export(&self) -> impl Stream<Item = Vec<u8>, Error = Error> {
- self.docker
- .stream_get(&format!("/images/{}/get", self.name)[..])
- .map(|c| c.to_vec())
+ pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + Unpin + 'a {
+ Box::pin(
+ self.docker
+ .stream_get(format!("/images/{}/get", self.name))
+ .map_ok(|c| c.to_vec()),
+ )
}
/// Adds a tag to an image
- pub fn tag(
+ pub async fn tag(
&self,
opts: &TagOptions,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let mut path = vec![format!("/images/{}/tag", self.name)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- self.docker.post::<Body>(&path.join("?"), None).map(|_| ())
+ let _ = self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
}
@@ -141,76 +150,74 @@ impl<'a> Images<'a> {
/// Builds a new image build by reading a Dockerfile in a target directory
pub fn build(
- &self,
- opts: &BuildOptions,
- ) -> impl Stream<Item = Value, Error = Error> {
- let mut path = vec!["/build".to_owned()];
- if let Some(query) = opts.serialize() {
- path.push(query)
- }
+ &'a self,
+ opts: &'a BuildOptions,
+ ) -> impl Stream<Item = Result<Value>> + Unpin + 'a {
+ Box::pin(
+ async move {
+ let mut path = vec!["/build".to_owned()];
+ if let Some(query) = opts.serialize() {
+ path.push(query)
+ }
+
+ let mut bytes = Vec::default();
+
+ tarball::dir(&mut bytes, &opts.path[..])?;
+
+ let chunk_stream = self.docker.stream_post(
+ path.join("?"),
+ Some((Body::from(bytes), tar())),
+ None::<iter::Empty<_>>,
+ );
- let mut bytes = vec![];
-
- match tarball::dir(&mut bytes, &opts.path[..]) {
- Ok(_) => Box::new(
- self.docker
- .stream_post(
- &path.join("?"),
- Some((Body::from(bytes), tar())),
- None::<iter::Empty<_>>,
- )
- .map(|r| {
- futures::stream::iter_result(
- serde_json::Deserializer::from_slice(&r[..])
- .into_iter::<Value>()
- .collect::<Vec<_>>(),
- )
- .map_err(Error::from)
- })
- .flatten(),
- ) as Box<dyn Stream<Item = Value, Error = Error> + Send>,
- Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream())
- as Box<dyn Stream<Item = Value, Error = Error> + Send>,
- }
+ let value_stream = chunk_stream.and_then(|chunk| async move {
+ serde_json::from_slice(&chunk).map_err(Error::from)
+ });
+
+ Ok(value_stream)
+ }
+ .try_flatten_stream(),
+ )
}
/// Lists the docker images on the current docker host
- pub fn list(
+ pub async fn list(
&self,
opts: &ImageListOptions,
- ) -> impl Future<Item = Vec<ImageRep>, Error = Error> {
+ ) -> Result<Vec<ImageRep>> {
let mut path = vec!["/images/json".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
- self.docker.get_json::<Vec<ImageRep>>(&path.join("?"))
+ self.docker.get_json::<Vec<ImageRep>>(&path.join("?")).await
}
/// Returns a reference to a set of operations available for a named image
- pub fn get<'b>(
+ pub fn get(
&self,
- name: &'b str,
- ) -> Image<'a, 'b> {
+ name: &'a str,
+ ) -> Image<'a> {
Image::new(self.docker, name)
}
/// Search for docker images by term
- pub fn search(
+ pub async fn search(
&self,
term: &str,
- ) -> impl Future<Item = Vec<SearchResult>, Error = Error> {
+ ) -> Result<Vec<SearchResult>> {
let query = form_urlencoded::Serializer::new(String::new())
.append_pair("term", term)
.finish();
self.docker
.get_json::<Vec<SearchResult>>(&format!("/images/search?{}", query)[..])
+ .await
}
/// Pull and create a new docker images from an existing image
pub fn pull(
&self,
opts: &PullOptions,
- ) -> impl Stream<Item = Value, Error = Error> {
+ ) -> impl Stream<Item = Result<Value>> + Unpin + 'a {
let mut path = vec!["/images/create".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
@@ -218,18 +225,15 @@ impl<'a> Images<'a> {
let headers = opts
.auth_header()
.map(|a| iter::once(("X-Registry-Auth", a)));
- self.docker
- .stream_post::<Body, _>(&path.join("?"), None, headers)
- // todo: give this a proper enum type
- .map(|r| {
- futures::stream::iter_result(
- serde_json::Deserializer::from_slice(&r[..])
- .into_iter::<Value>()
- .collect::<Vec<_>>(),
- )
- .map_err(Error::from)
- })
- .flatten()
+
+ Box::pin(
+ self.docker
+ .stream_post(path.join("?"), None, headers)
+ .and_then(move |chunk| {
+ // todo: give this a proper enum type
+ futures_util::future::ready(serde_json::from_slice(&chunk).map_err(Error::from))
+ }),
+ )
}
/// exports a collection of named images,
@@ -237,14 +241,14 @@ impl<'a> Images<'a> {
pub fn export(
&self,
names: Vec<&str>,
- ) -> impl Stream<Item = Vec<u8>, Error = Error> {
+ ) -> impl Stream<Item = Result<Vec<u8>>> + 'a {
let params = names.iter().map(|n| ("names", *n));
let query = form_urlencoded::Serializer::new(String::new())
.extend_pairs(params)
.finish();
self.docker
- .stream_get(&format!("/images/get?{}", query)[..])
- .map(|c| c.to_vec())
+ .stream_get(format!("/images/get?{}", query))
+ .map_ok(|c| c.to_vec())
}
/// imports an image or set of images from a given tarball source
@@ -252,43 +256,44 @@ impl<'a> Images<'a> {
pub fn import(
self,
mut tarball: Box<dyn Read>,
- ) -> impl Stream<Item = Value, Error = Error> {
- let mut bytes = Vec::new();
-
- match tarball.read_to_end(&mut bytes) {
- Ok(_) => Box::new(
- self.docker
- .stream_post(
- "/images/load",
- Some((Body::from(bytes), tar())),
- None::<iter::Empty<_>>,
- )
- .and_then(|bytes| {
- serde_json::from_slice::<'_, Value>(&bytes[..])
- .map_err(Error::from)
- .into_future()
- }),
- ) as Box<dyn Stream<Item = Value, Error = Error> + Send>,
- Err(e) => Box::new(futures::future::err(Error::IO(e)).into_stream())
- as Box<dyn Stream<Item = Value, Error = Error> + Send>,
- }
+ ) -> impl Stream<Item = Result<Value>> + Unpin + 'a {
+ Box::pin(
+ async move {
+ let mut bytes = Vec::default();
+
+ tarball.read_to_end(&mut bytes)?;
+
+ let chunk_stream = self.docker.stream_post(
+ "/images/load",
+ Some((Body::from(bytes), tar())),
+ None::<iter::Empty<_>>,
+ );
+
+ let value_stream = chunk_stream.and_then(|chunk| async move {
+ serde_json::from_slice(&chunk).map_err(Error::from)
+ });
+
+ Ok(value_stream)
+ }
+ .try_flatten_stream(),
+ )
}
}
/// Interface for accessing and manipulating a docker container
-pub struct Container<'a, 'b> {
+pub struct Container<'a> {
docker: &'a Docker,
- id: Cow<'b, str>,
+ id: Cow<'a, str>,
}
-impl<'a, 'b> Container<'a, 'b> {
+impl<'a> Container<'a> {
/// Exports an interface exposing operations against a container instance
pub fn new<S>(
docker: &'a Docker,
id: S,
- ) -> Container<'a, 'b>
+ ) -> Self
where
- S: Into<Cow<'b, str>>,
+ S: Into<Cow<'a, str>>,
{
Container {
docker,
@@ -302,16 +307,17 @@ impl<'a, 'b> Container<'a, 'b> {
}
/// Inspects the current docker container instance's details
- pub fn inspect(&self) -> impl Future<Item = ContainerDetails, Error = Error> {
+ pub async fn inspect(&self) -> Result<ContainerDetails> {
self.docker
.get_json::<ContainerDetails>(&format!("/containers/{}/json", self.id)[..])
+ .await
}
/// Returns a `top` view of information about the container process
- pub fn top(
+ pub async fn top(
&self,
psargs: Option<&str>,
- ) -> impl Future<Item = Top, Error = Error> {
+ ) -> Result<Top> {
let mut path = vec![format!("/containers/{}/top", self.id)];
if let Some(ref args) = psargs {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -319,85 +325,95 @@ impl<'a, 'b> Container<'a, 'b> {
.finish();
path.push(encoded)
}
- self.docker.get_json(&path.join("?"))
+ self.docker.get_json(&path.join("?")).await
}
/// Returns a stream of logs emitted but the container instance
pub fn logs(
&self,
opts: &LogsOptions,
- ) -> impl Stream<Item = tty::Chunk, Error = Error> {
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'a {
let mut path = vec![format!("/containers/{}/logs", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- let decoder = TtyDecoder::new();
- let chunk_stream = StreamReader::new(self.docker.stream_get(&path.join("?")));
+ let stream = Box::pin(self.docker.stream_get(path.join("?")));
- FramedRead::new(chunk_stream, decoder)
+ Box::pin(tty::decode(stream))
}
- /// Attaches to a running container, returning a stream that can
- /// be used to interact with the standard IO streams.
- pub fn attach(&self) -> impl Future<Item = tty::Multiplexed, Error = Error> {
- self.docker.stream_post_upgrade_multiplexed::<Body>(
- &format!(
- "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
- self.id
- ),
- None,
- )
+ /// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin.
+ async fn attach_raw(&self) -> Result<impl AsyncRead + AsyncWrite + Send + 'a> {
+ self.docker
+ .stream_post_upgrade(
+ format!(
+ "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
+ self.id
+ ),
+ None,
+ )
+ .await
}
- /// Attaches to a running container, returning a stream that can
- /// be used to interact with the standard IO streams.
- pub fn attach_blocking(&self) -> Result<tty::MultiplexedBlocking> {
- self.attach().map(|s| s.wait()).wait()
+ /// Attaches a `[TtyMultiplexer]` to the container.
+ ///
+ /// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin.
+ ///
+ /// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method
+ pub async fn attach(&self) -> Result<TtyMultiPlexer<'a>> {
+ let tcp_stream = self.attach_raw().await?;
+
+ Ok(TtyMultiPlexer::new(tcp_stream))
}
/// Returns a set of changes made to the container instance
- pub fn changes(&self) -> impl Future<Item = Vec<Change>, Error = Error> {
+ pub async fn changes(&self) -> Result<Vec<Change>> {
self.docker
.get_json::<Vec<Change>>(&format!("/containers/{}/changes", self.id)[..])
+ .await
}
/// Exports the current docker container into a tarball
- pub fn export(&self) -> impl Stream<Item = Vec<u8>, Error = Error> {
+ pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + 'a {
self.docker
- .stream_get(&format!("/containers/{}/export", self.id)[..])
- .map(|c| c.to_vec())
+ .stream_get(format!("/containers/{}/export", self.id))
+ .map_ok(|c| c.to_vec())
}
/// Returns a stream of stats specific to this container instance
- pub fn stats(&self) -> impl Stream<Item = Stats, Error = Error> {
- let decoder = LinesCodec::new();
- let stream_of_chunks = StreamReader::new(
- self.docker
- .stream_get(&format!("/containers/{}/stats", self.id)[..]),
- );
+ pub fn stats(&'a self) -> impl Stream<Item = Result<Stats>> + Unpin + 'a {
+ let codec = futures_codec::LinesCodec {};
- FramedRead::new(stream_of_chunks, decoder)
- .map_err(Error::IO)
- .and_then(|s| {
- serde_json::from_str::<Stats>(&s)
- .map_err(Error::SerdeJsonError)
- .into_future()
- })
+ let reader = Box::pin(
+ self.docker
+ .stream_get(format!("/containers/{}/stats", self.id))
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
+ )
+ .into_async_read();
+
+ Box::pin(
+ futures_codec::FramedRead::new(reader, codec)
+ .map_err(Error::IO)
+ .and_then(|s: String| async move {
+ serde_json::from_str(&s).map_err(Error::SerdeJsonError)
+ }),
+ )
}
/// Start the container instance
- pub fn start(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn start(&self) -> Result<()> {
self.docker
- .post::<Body>(&format!("/containers/{}/start", self.id)[..], None)
- .map(|_| ())
+ .post(&format!("/containers/{}/start", self.id)[..], None)
+ .await?;
+ Ok(())
}
/// Stop the container instance
- pub fn stop(
+ pub async fn stop(
&self,
wait: Option<Duration>,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}/stop", self.id)];
if let Some(w) = wait {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -406,14 +422,15 @@ impl<'a, 'b> Container<'a, 'b> {
path.push(encoded)
}
- self.docker.post::<Body>(&path.join("?"), None).map(|_| ())
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
/// Restart the container instance
- pub fn restart(
+ pub async fn restart(
&self,
wait: Option<Duration>,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}/restart", self.id)];
if let Some(w) = wait {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -421,14 +438,15 @@ impl<'a, 'b> Container<'a, 'b> {
.finish();
path.push(encoded)
}
- self.docker.post::<Body>(&path.join("?"), None).map(|_| ())
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
/// Kill the container instance
- pub fn kill(
+ pub async fn kill(
&self,
signal: Option<&str>,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}/kill", self.id)];
if let Some(sig) = signal {
let encoded = form_urlencoded::Serializer::new(String::new())
@@ -436,101 +454,125 @@ impl<'a, 'b> Container<'a, 'b> {
.finish();
path.push(encoded)
}
- self.docker.post::<Body>(&path.join("?"), None).map(|_| ())
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
}
/// Rename the container instance
- pub fn rename(
+ pub async fn rename(
&self,
name: &str,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let query = form_urlencoded::Serializer::new(String::new())
.append_pair("name", name)
.finish();
self.docker
- .post::<Body>(
+ .post(
&format!("/containers/{}/rename?{}", self.id, query)[..],
None,
)
- .map(|_| ())
+ .await?;
+ Ok(())
}
/// Pause the container instance
- pub fn pause(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn pause(&self) -> Result<()> {
self.docker
- .post::<Body>(&format!("/containers/{}/pause", self.id)[..], None)
- .map(|_| ())
+ .post(&format!("/containers/{}/pause", self.id)[..], None)
+ .await?;
+ Ok(())
}
/// Unpause the container instance
- pub fn unpause(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn unpause(&self) -> Result<()> {
self.docker
- .post::<Body>(&format!("/containers/{}/unpause", self.id)[..], None)
- .map(|_| ())
+ .post(&format!("/containers/{}/unpause", self.id)[..], None)
+ .await?;
+ Ok(())
}
/// Wait until the container stops
- pub fn wait(&self) -> impl Future<Item = Exit, Error = Error> {
+ pub async fn wait(&self) -> Result<Exit> {
self.docker
- .post_json::<Body, Exit>(&format!("/containers/{}/wait", self.id)[..], None)
+ .post_json(
+ format!("/containers/{}/wait", self.id),
+ Option::<(Body, Mime)>::None,
+ )
+ .await
}
/// Delete the container instance
///
/// Use remove instead to use the force/v options.
- pub fn delete(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn delete(&self) -> Result<()> {
self.docker
.delete(&format!("/containers/{}", self.id)[..])
- .map(|_| ())
+ .await?;
+ Ok(())
}
/// Delete the container instance (todo: force/v)
- pub fn remove(
+ pub async fn remove(
&self,
opts: RmContainerOptions,
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let mut path = vec![format!("/containers/{}", self.id)];
if let Some(query) = opts.serialize() {
path.push(query)
}
- self.docker.delete(&path.join("?")).map(|_| ())
+ self.docker.delete(&path.join("?")).await?;
+ Ok(())
}
- // TODO(abusch) fix this
- /// Exec the specified command in the container
- pub fn exec(
+ async fn exec_create(
&self,
opts: &ExecContainerOptions,
- ) -> impl Stream<Item = tty::Chunk, Error = Error> {
- let data = opts.serialize().unwrap(); // TODO fixme
- let bytes = data.into_bytes();
- let docker2 = self.docker.clone();
- self.docker
- .post(
+ ) -> Result<String> {
+ #[derive(serde::Deserialize)]
+ #[serde(rename_all = "PascalCase")]
+ struct Response {
+ id: String,
+ }
+
+ let body: Body = opts.serialize()?.into();
+
+ let Response { id } = self
+ .docker
+ .post_json(
&format!("/containers/{}/exec", self.id)[..],
- Some((bytes, mime::APPLICATION_JSON)),
+ Some((body, mime::APPLICATION_JSON)),
)
- .map(move |res| {
- let data = "{}";
- let bytes = data.as_bytes();
- let id = serde_json::from_str::<Value>(res.as_str())
- .ok()
- .and_then(|v| {
- v.as_object()
- .and_then(|v| v.get("Id"))
- .and_then(|v| v.as_str().map(|v| v.to_string()))
- })
- .unwrap(); // TODO fixme
-
- let decoder = TtyDecoder::new();
- let chunk_stream = StreamReader::new(docker2.stream_post(
- &format!("/exec/{}/start", id)[..],
- Some((bytes, mime::APPLICATION_JSON)),
- None::<iter::Empty<_>>,
- ));
- FramedRead::new(chunk_stream, decoder)
- })
- .flatten_stream()
+ .await?;
+
+ Ok(id)
+ }
+
+ fn exec_start(
+ &self,
+ id: String,
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + 'a {
+ let bytes: &[u8] = b"{}";
+
+ let stream = Box::pin(self.docker.stream_post(
+ format!("/exec/{}/start", id),
+ Some((bytes.into(), mime::APPLICATION_JSON)),
+ None::<iter::Empty<_>>,
+ ));
+
+ tty::decode(stream)
+ }
+
+ pub fn exec(
+ &'a self,
+ opts: &'a ExecContainerOptions,
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'a {
+ Box::pin(
+ async move {
+ let id = self.exec_create(opts).await?;
+ Ok(self.exec_start(id))
+ }
+ .try_flatten_stream(),
+ )
}
/// Copy a file/folder from the container. The resulting stream is a tarball of the extracted
@@ -544,24 +586,24 @@ impl<'a, 'b> Container<'a, 'b> {
pub fn copy_from(
&self,
path: &Path,
- ) -> impl Stream<Item = Vec<u8>, Error = Error> {
+ ) -> impl Stream<Item = Result<Vec<u8>>> + 'a {
let path_arg = form_urlencoded::Serializer::new(String::new())
.append_pair("path", &path.to_string_lossy())
.finish();
- self.docker
- .stream_get(&format!("/containers/{}/archive?{}", self.id, path_arg))
- .map(|c| c.to_vec())
+
+ let endpoint = format!("/containers/{}/archive?{}", self.id, path_arg);
+ self.docker.stream_get(endpoint).map_ok(|c| c.to_vec())
}
/// Copy a byte slice as file into (see `bytes`) the container.
///
/// The file will be copied at the given location (see `path`) and will be owned by root
/// with access mask 644.
- pub fn copy_file_into<P: AsRef<Path>>(
+ pub async fn copy_file_into<P: AsRef<Path>>(
&self,
path: P,
bytes: &[u8],
- ) -> impl Future<Item = (), Error = Error> {
+ ) -> Result<()> {
let path = path.as_ref();
let mut ar = tar::Builder::new(Vec::new());
@@ -588,9 +630,10 @@ impl<'a, 'b> Container<'a, 'b> {
self.docker
.put(
&format!("/containers/{}/archive?{}", self.id, path_arg),
- body,
+ body.map(|(body, mime)| (body.into(), mime)),
)
- .map(|_| ())
+ .await?;
+ Ok(())
}
}
@@ -606,36 +649,33 @@ impl<'a> Containers<'a> {
}
/// Lists the container instances on the docker host
- pub fn list(
+ pub async fn list(
&self,
opts: &ContainerListOptions,
- ) -> impl Future<Item = Vec<ContainerRep>, Error = Error> {
+ ) -> Result<Vec<ContainerRep>> {
let mut path = vec!["/containers/json".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query)
}
- self.docker.get_json::<Vec<ContainerRep>>(&path.join("?"))
+ self.docker
+ .get_json::<Vec<ContainerRep>>(&path.join("?"))
+ .await
}
/// Returns a reference to a set of operations available to a specific container instance
- pub fn get<'b>(
+ pub fn get(
&self,
- name: &'b str,
- ) -> Container<'a, 'b> {
+ name: &'a str,
+ ) -> Container<'a> {
Container::new(self.docker, name)
}
/// Returns a builder interface for creating a new container instance
- pub fn create(
+ pub async fn create(
&self,
opts: &ContainerOptions,
- ) -> impl Future<Item = ContainerCreateInfo, Error = Error> {
- let data = match opts.serialize() {
- Ok(data) => data,
- Err(e) => return Either::A(futures::future::err(e)),
- };
-
- let bytes = data.into_bytes();
+ ) -> Result<ContainerCreateInfo> {
+ let body: Body = opts.serialize()?.into();
let mut path = vec!["/containers/create".to_owned()];
if let Some(ref name) = opts.name {
@@ -646,10 +686,9 @@ impl<'a> Containers<'a> {
);
}
- Either::B(
- self.docker
- .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))),
- )
+ self.docker
+ .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON)))
+ .await
}
}
@@ -665,15 +704,15 @@ impl<'a> Networks<'a> {
}
/// List the docker networks on the current docker host
- pub fn list(
+ pub async fn list(
&self,
opts: &NetworkListOptions,
- ) -> impl Future<Item = Vec<NetworkInfo>, Error = Error> {
+ ) -> Result<Vec<NetworkInfo>> {
let mut path = vec!["/networks".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
- self.docker.get_json(&path.join("?"))
+ self.docker.get_json(&path.join("?")).await
}
/// Returns a reference to a set of operations available to a specific network instance
@@ -685,21 +724,16 @@ impl<'a> Networks<'a> {
}
/// Create a new Network instance
- pub fn create(
+ pub async fn create(
&self,
opts: &NetworkCreateOptions,
- ) -> impl Future<Item = NetworkCreateInfo, Error = Error> {
- let data = match opts.serialize() {
- Ok(data) => data,
- Err(e) => return Either::A(futures::future::err(e)),
- };
- let bytes = data.into_bytes();
+ ) -> Result<NetworkCreateInfo> {
+ let body: Body = opts.serialize()?.into();
let path = vec!["/networks/create".to_owned()];
- Either::B(
- self.docker
- .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))),
- )
+ self.docker
+ .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON)))
+ .await
}
}
@@ -730,52 +764,50 @@ impl<'a, 'b> Network<'a, 'b> {
}
/// Inspects the current docker network instance's details
- pub fn inspect(&self) -> impl Future<Item = NetworkInfo, Error = Error> {
- self.docker.get_json(&format!("/networks/{}", self.id)[..])
+ pub async fn inspect(&self) -> Result<NetworkInfo> {
+ self.docker
+ .get_json(&format!("/networks/{}", self.id)[..])
+ .await
}
/// Delete the network instance
- pub fn delete(&self) -> impl Future<Item = (), Error = Error> {
+ pub async fn delete(&self) -> Result<()> {
self.docker
.delete(&format!("/networks/{}", self.id)[..])
- .map(|_| ())
+ .await?;
+ Ok(())
}
/// Connect container to network
- pub fn connect(
+ pub async fn connect(
&self,
opts: &ContainerConnectionOptions,
- ) -> impl Future<Item = (), Error = Error> {
- self.do_connection("connect", opts)
+ ) -> Result<()> {
+ self.do_connection("connect", opts).await
}
/// Disconnect container to network
- pub fn disconnect(
+ pub async fn disconnect(
&self,
opts: &ContainerConnectionOptions,
- ) -> impl Future<Item = (), Error = Error> {
- self.do_connection("disconnect", opts)
+ ) -> Result<()> {
+ self.do_connection("disconnect", opts).await
}
- fn do_connection(
+ async fn do_connection(
&self,
segment: &str,
opts: &ContainerConnectionOptions,
- ) -> impl Future<Item = (), Error = Error> {
- let data = match opts.serialize() {
- Ok(data) => data,
- Err(e) => return Either::A(futures::future::err(e)),
- };
- let bytes = data.into_bytes();
+ ) -> Result<()> {
+ let body: Body = opts.serialize()?.into();
- Either::B(
- self.docker
- .post(
- &format!("/networks/{}/{}", self.id, segment)[..],
- Some((bytes, mime::APPLICATION_JSON)),
- )
- .map(|_| ()),
- )
+ self.docker
+ .post(
+ &format!("/networks/{}/{}", self.id, segment)[..],
+ Some((body, mime::APPLICATION_JSON)),
+ )
+ .await?;
+ Ok(())
}
}
@@ -790,34 +822,27 @@ impl<'a> Volumes<'a> {
Volumes { docker }
}
- pub fn create(
+ pub async fn create(
&self,
opts: &VolumeCreateOptions,
- ) -> impl Future<Item = VolumeCreateInfo, Error = Error> {
- let data = match opts.serialize() {
- Ok(data) => data,
- Err(e) => return Either::A(futures::future::err(e)),
- };
-
- let bytes = data.into_bytes();
+ ) -> Result<VolumeCreateInfo> {
+ let body: Body = opts.serialize()?.into();
let path = vec!["/volumes/create".to_owned()];
- Either::B(
- self.docker
- .post_json(&path.join("?"), Some((bytes, mime::APPLICATION_JSON))),
- )
+ self.docker
+ .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON)))
+ .await
}
/// Lists the docker volumes on the current docker host
- pub fn list(&self) -> impl Future<Item = Vec<VolumeRep>, Error = Error> {
+ pub async fn list(&self) -> Result<Vec<VolumeRep>> {
let path = vec!["/volumes".to_owned()];
- self.docker
- .get_json::<VolumesRep>(&path.join("?"))
- .map(|volumes: VolumesRep| match volumes.volumes {
- Some(volumes) => volumes,
- None => vec![],
- })
+ let volumes_rep = self.docker.get_json::<VolumesRep>(&path.join("?")).await?;
+ Ok(match volumes_rep.volumes {
+ Some(volumes) => volumes,
+ None => vec![],
+ })
}
/// Returns a reference to a set of