summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEli W. Hunter <elihunter173@gmail.com>2021-02-07 19:17:32 -0500
committerEli W. Hunter <elihunter173@gmail.com>2021-03-08 18:48:30 -0500
commit349d8f2f7343506f986a5773e72f260c78fe9289 (patch)
tree7569c25d73c516b762df3ffcabfbc56feb6a316f
parent007b05eaf5127bbaea27cfc816f307946bd7aa5f (diff)
Split up lib.rs
-rw-r--r--src/container.rs410
-rw-r--r--src/docker.rs430
-rw-r--r--src/errors.rs3
-rw-r--r--src/exec.rs172
-rw-r--r--src/image.rs229
-rw-r--r--src/lib.rs1523
-rw-r--r--src/network.rs134
-rw-r--r--src/rep.rs2
-rw-r--r--src/service.rs122
-rw-r--r--src/volume.rs85
10 files changed, 1605 insertions, 1505 deletions
diff --git a/src/container.rs b/src/container.rs
new file mode 100644
index 0000000..ea4d0a7
--- /dev/null
+++ b/src/container.rs
@@ -0,0 +1,410 @@
+//! Create and manage containers.
+//!
+//! API Reference: <https://docs.docker.com/engine/api/v1.41/#tag/Container>
+
+use std::{io, path::Path, time::Duration};
+
+use futures_util::{
+ io::{AsyncRead, AsyncWrite},
+ stream::Stream,
+ TryStreamExt,
+};
+use hyper::Body;
+use mime::Mime;
+use url::form_urlencoded;
+
+use crate::{
+ builder::{
+ ContainerListOptions, ContainerOptions, ExecContainerOptions, LogsOptions,
+ RmContainerOptions,
+ },
+ docker::Docker,
+ errors::{Error, Result},
+ exec::Exec,
+ rep::{
+ Change, Container as ContainerInfo, ContainerCreateInfo, ContainerDetails, Exit, Stats, Top,
+ },
+ transport::Payload,
+ tty::{self, Multiplexer as TtyMultiPlexer},
+};
+
+/// Interface for accessing and manipulating a docker container
+pub struct Container<'docker> {
+ docker: &'docker Docker,
+ id: String,
+}
+
+impl<'docker> Container<'docker> {
+ /// Exports an interface exposing operations against a container instance
+ pub fn new<S>(
+ docker: &'docker Docker,
+ id: S,
+ ) -> Self
+ where
+ S: Into<String>,
+ {
+ Container {
+ docker,
+ id: id.into(),
+ }
+ }
+
+ /// a getter for the container id
+ pub fn id(&self) -> &str {
+ &self.id
+ }
+
+ /// Inspects the current docker container instance's details
+ pub async fn inspect(&self) -> Result<ContainerDetails> {
+ self.docker
+ .get_json::<ContainerDetails>(&format!("/containers/{}/json", self.id)[..])
+ .await
+ }
+
+ /// Returns a `top` view of information about the container process
+ pub async fn top(
+ &self,
+ psargs: Option<&str>,
+ ) -> Result<Top> {
+ let mut path = vec![format!("/containers/{}/top", self.id)];
+ if let Some(ref args) = psargs {
+ let encoded = form_urlencoded::Serializer::new(String::new())
+ .append_pair("ps_args", args)
+ .finish();
+ path.push(encoded)
+ }
+ self.docker.get_json(&path.join("?")).await
+ }
+
+ /// Returns a stream of logs emitted but the container instance
+ pub fn logs(
+ &self,
+ opts: &LogsOptions,
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
+ let mut path = vec![format!("/containers/{}/logs", self.id)];
+ if let Some(query) = opts.serialize() {
+ path.push(query)
+ }
+
+ let stream = Box::pin(self.docker.stream_get(path.join("?")));
+
+ Box::pin(tty::decode(stream))
+ }
+
+ /// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin.
+ async fn attach_raw(&self) -> Result<impl AsyncRead + AsyncWrite + Send + 'docker> {
+ self.docker
+ .stream_post_upgrade(
+ format!(
+ "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1",
+ self.id
+ ),
+ None,
+ )
+ .await
+ }
+
+ /// Attaches a `[TtyMultiplexer]` to the container.
+ ///
+ /// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin.
+ ///
+ /// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method
+ pub async fn attach(&self) -> Result<TtyMultiPlexer<'docker>> {
+ let tcp_stream = self.attach_raw().await?;
+
+ Ok(TtyMultiPlexer::new(tcp_stream))
+ }
+
+ /// Returns a set of changes made to the container instance
+ pub async fn changes(&self) -> Result<Vec<Change>> {
+ self.docker
+ .get_json::<Vec<Change>>(&format!("/containers/{}/changes", self.id)[..])
+ .await
+ }
+
+ /// Exports the current docker container into a tarball
+ pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + 'docker {
+ self.docker
+ .stream_get(format!("/containers/{}/export", self.id))
+ .map_ok(|c| c.to_vec())
+ }
+
+ /// Returns a stream of stats specific to this container instance
+ pub fn stats(&self) -> impl Stream<Item = Result<Stats>> + Unpin + 'docker {
+ let codec = futures_codec::LinesCodec {};
+
+ let reader = Box::pin(
+ self.docker
+ .stream_get(format!("/containers/{}/stats", self.id))
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
+ )
+ .into_async_read();
+
+ Box::pin(
+ futures_codec::FramedRead::new(reader, codec)
+ .map_err(Error::IO)
+ .and_then(|s: String| async move {
+ serde_json::from_str(&s).map_err(Error::SerdeJsonError)
+ }),
+ )
+ }
+
+ /// Start the container instance
+ pub async fn start(&self) -> Result<()> {
+ self.docker
+ .post(&format!("/containers/{}/start", self.id)[..], None)
+ .await?;
+ Ok(())
+ }
+
+ /// Stop the container instance
+ pub async fn stop(
+ &self,
+ wait: Option<Duration>,
+ ) -> Result<()> {
+ let mut path = vec![format!("/containers/{}/stop", self.id)];
+ if let Some(w) = wait {
+ let encoded = form_urlencoded::Serializer::new(String::new())
+ .append_pair("t", &w.as_secs().to_string())
+ .finish();
+
+ path.push(encoded)
+ }
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
+ }
+
+ /// Restart the container instance
+ pub async fn restart(
+ &self,
+ wait: Option<Duration>,
+ ) -> Result<()> {
+ let mut path = vec![format!("/containers/{}/restart", self.id)];
+ if let Some(w) = wait {
+ let encoded = form_urlencoded::Serializer::new(String::new())
+ .append_pair("t", &w.as_secs().to_string())
+ .finish();
+ path.push(encoded)
+ }
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
+ }
+
+ /// Kill the container instance
+ pub async fn kill(
+ &self,
+ signal: Option<&str>,
+ ) -> Result<()> {
+ let mut path = vec![format!("/containers/{}/kill", self.id)];
+ if let Some(sig) = signal {
+ let encoded = form_urlencoded::Serializer::new(String::new())
+ .append_pair("signal", &sig.to_owned())
+ .finish();
+ path.push(encoded)
+ }
+ self.docker.post(&path.join("?"), None).await?;
+ Ok(())
+ }
+
+ /// Rename the container instance
+ pub async fn rename(
+ &self,
+ name: &str,
+ ) -> Result<()> {
+ let query = form_urlencoded::Serializer::new(String::new())
+ .append_pair("name", name)
+ .finish();
+ self.docker
+ .post(
+ &format!("/containers/{}/rename?{}", self.id, query)[..],
+ None,
+ )
+ .await?;
+ Ok(())
+ }
+
+ /// Pause the container instance
+ pub async fn pause(&self) -> Result<()> {
+ self.docker
+ .post(&format!("/containers/{}/pause", self.id)[..], None)
+ .await?;
+ Ok(())
+ }
+
+ /// Unpause the container instance
+ pub async fn unpause(&self) -> Result<()> {
+ self.docker
+ .post(&format!("/containers/{}/unpause", self.id)[..], None)
+ .await?;
+ Ok(())
+ }
+
+ /// Wait until the container stops
+ pub async fn wait(&self) -> Result<Exit> {
+ self.docker
+ .post_json(format!("/containers/{}/wait", self.id), Payload::None)
+ .await
+ }
+
+ /// Delete the container instance
+ ///
+ /// Use remove instead to use the force/v options.
+ pub async fn delete(&self) -> Result<()> {
+ self.docker
+ .delete(&format!("/containers/{}", self.id)[..])
+ .await?;
+ Ok(())
+ }
+
+ /// Delete the container instance (todo: force/v)
+ pub async fn remove(
+ &self,
+ opts: RmContainerOptions,
+ ) -> Result<()> {
+ let mut path = vec![format!("/containers/{}", self.id)];
+ if let Some(query) = opts.serialize() {
+ path.push(query)
+ }
+ self.docker.delete(&path.join("?")).await?;
+ Ok(())
+ }
+
+ /// Execute a command in this container
+ pub fn exec(
+ &self,
+ opts: &ExecContainerOptions,
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
+ Exec::create_and_start(self.docker, &self.id, opts)
+ }
+
+ /// Copy a file/folder from the container. The resulting stream is a tarball of the extracted
+ /// files.
+ ///
+ /// If `path` is not an absolute path, it is relative to the container’s root directory. The
+ /// resource specified by `path` must exist. To assert that the resource is expected to be a
+ /// directory, `path` should end in `/` or `/`. (assuming a path separator of `/`). If `path`
+ /// ends in `/.` then this indicates that only the contents of the path directory should be
+ /// copied. A symlink is always resolved to its target.
+ pub fn copy_from(
+ &self,
+ path: &Path,
+ ) -> impl Stream<Item = Result<Vec<u8>>> + 'docker {
+ let path_arg = form_urlencoded::Serializer::new(String::new())
+ .append_pair("path", &path.to_string_lossy())
+ .finish();
+
+ let endpoint = format!("/containers/{}/archive?{}", self.id, path_arg);
+ self.docker.stream_get(endpoint).map_ok(|c| c.to_vec())
+ }
+
+ /// Copy a byte slice as file into (see `bytes`) the container.
+ ///
+ /// The file will be copied at the given location (see `path`) and will be owned by root
+ /// with access mask 644.
+ pub async fn copy_file_into<P: AsRef<Path>>(
+ &self,
+ path: P,
+ bytes: &[u8],
+ ) -> Result<()> {
+ let path = path.as_ref();
+
+ let mut ar = tar::Builder::new(Vec::new());
+ let mut header = tar::Header::new_gnu();
+ header.set_size(bytes.len() as u64);
+ header.set_mode(0o0644);
+ ar.append_data(
+ &mut header,
+ path.to_path_buf()
+ .iter()
+ .skip(1)
+ .collect::<std::path::PathBuf>(),
+ bytes,
+ )
+ .unwrap();
+ let data = ar.into_inner().unwrap();
+
+ self.copy_to(Path::new("/"), data.into()).await?;
+ Ok(())
+ }
+
+ /// Copy a tarball (see `body`) to the container.
+ ///
+ /// The tarball will be copied to the container and extracted at the given location (see `path`).
+ pub async fn copy_to(
+ &self,
+ path: &Path,
+ body: Body,
+ ) -> Result<()> {
+ let path_arg = form_urlencoded::Serializer::new(String::new())
+ .append_pair("path", &path.to_string_lossy())
+ .finish();
+
+ let mime = "application/x-tar".parse::<Mime>().unwrap();
+
+ self.docker
+ .put(
+ &format!("/containers/{}/archive?{}", self.id, path_arg),
+ Some((body, mime)),
+ )
+ .await?;
+ Ok(())
+ }
+}
+
+/// Interface for docker containers
+pub struct Containers<'docker> {
+ docker: &'docker Docker,
+}
+
+impl<'docker> Containers<'docker> {
+ /// Exports an interface for interacting with docker containers
+ pub fn new(docker: &'docker Docker) -> Self {
+ Containers { docker }
+ }
+
+ /// Lists the container instances on the docker host
+ pub async fn list(
+ &self,
+ opts: &ContainerListOptions,
+ ) -> Result<Vec<ContainerInfo>> {
+ let mut path = vec!["/containers/json".to_owned()];
+ if let Some(query) = opts.serialize() {
+ path.push(query)
+ }
+ self.docker
+ .get_json::<Vec<ContainerInfo>>(&path.join("?"))
+ .await
+ }
+
+ /// Returns a reference to a set of operations available to a specific container instance
+ pub fn get<S>(
+ &self,
+ name: S,
+ ) -> Container<'docker>
+ where
+ S: Into<String>,
+ {
+ Container::new(self.docker, name)
+ }
+
+ /// Returns a builder interface for creating a new container instance
+ pub async fn create(
+ &self,
+ opts: &ContainerOptions,
+ ) -> Result<ContainerCreateInfo> {
+ let body: Body = opts.serialize()?.into();
+ let mut path = vec!["/containers/create".to_owned()];
+
+ if let Some(ref name) = opts.name {
+ path.push(
+ form_urlencoded::Serializer::new(String::new())
+ .append_pair("name", name)
+ .finish(),
+ );
+ }
+
+ self.docker
+ .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON)))
+ .await
+ }
+}
diff --git a/src/docker.rs b/src/docker.rs
new file mode 100644
index 0000000..d3faa82
--- /dev/null
+++ b/src/docker.rs
@@ -0,0 +1,430 @@
+//! Main entrypoint for interacting with the Docker API.
+//!
+//! API Reference: <https://docs.docker.com/engine/api/v1.41/>
+
+use std::{env, io, path::Path};
+
+use futures_util::{stream::Stream, TryStreamExt};
+use hyper::{client::HttpConnector, Body, Client, Method};
+use mime::Mime;
+use serde_json::Value;
+
+use crate::{
+ builder::EventsOptions,
+ container::Containers,
+ errors::{Error, Result},
+ image::Images,
+ network::Networks,
+ rep::{Event, Info, Version},
+ service::Services,
+ transport::{Headers, Payload, Transport},
+ volume::Volumes,
+ Uri,
+};
+
+#[cfg(feature = "tls")]
+use hyper_openssl::HttpsConnector;
+#[cfg(feature = "tls")]
+use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
+
+#[cfg(feature = "unix-socket")]
+use hyperlocal::UnixConnector;
+
+/// Entrypoint interface for communicating with docker daemon
+#[derive(Clone)]
+pub struct Docker {
+ transport: Transport,
+}
+
+fn get_http_connector() -> HttpConnector {
+ let mut http = HttpConnector::new();
+ http.enforce_http(false);
+
+ http
+}
+
+#[cfg(feature = "tls")]
+fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
+ let http = get_http_connector();
+ if let Ok(ref certs) = env::var("DOCKER_CERT_PATH") {
+ // fixme: don't unwrap before you know what's in the box
+ // https://github.com/hyperium/hyper/blob/master/src/net.rs#L427-L428
+ let mut connector = SslConnector::builder(SslMethod::tls()).unwrap();
+ connector.set_cipher_list("DEFAULT").unwrap();
+ let cert = &format!("{}/cert.pem", certs);
+ let key = &format!("{}/key.pem", certs);
+ connector
+ .set_certificate_file(&Path::new(cert), SslFiletype::PEM)
+ .unwrap();
+ connector
+ .set_private_key_file(&Path::new(key), SslFiletype::PEM)
+ .unwrap();
+ if env::var("DOCKER_TLS_VERIFY").is_ok() {
+ let ca = &format!("{}/ca.pem", certs);
+ connector.set_ca_file(&Path::new(ca)).unwrap();
+ }
+
+ // If we are attempting to connec to the docker daemon via tcp
+ // we need to convert the scheme to `https` to let hyper connect.
+ // Otherwise, hyper will reject the connection since it does not
+ // recongnize `tcp` as a valid `http` scheme.
+ let tcp_host_str = if tcp_host_str.contains("tcp://") {
+ tcp_host_str.replace("tcp://", "https://")
+ } else {
+ tcp_host_str
+ };
+
+ Docker {
+ transport: Transport::EncryptedTcp {
+ client: Client::builder()
+ .build(HttpsConnector::with_connector(http, connector).unwrap()),
+ host: tcp_host_str,
+ },
+ }
+ } else {
+ Docker {
+ transport: Transport::Tcp {
+ client: Client::builder().build(http),
+ host: tcp_host_str,
+ },
+ }
+ }
+}
+
+#[cfg(not(feature = "tls"))]
+fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
+ let http = get_http_connector();
+ Docker {
+ transport: Transport::Tcp {
+ client: Client::builder().build(http),
+ host: tcp_host_str,
+ },
+ }
+}
+
+// https://docs.docker.com/reference/api/docker_remote_api_v1.17/
+impl Docker {
+ /// constructs a new Docker instance for a docker host listening at a url specified by an env var `DOCKER_HOST`,
+ /// falling back on unix:///var/run/docker.sock
+ pub fn new() -> Docker {
+ match env::var("DOCKER_HOST").ok() {
+ Some(host) => {
+ #[cfg(feature = "unix-socket")]
+ if let Some(path) = host.strip_prefix("unix://") {
+ return Docker::unix(path);
+ }
+ let host = host.parse().expect("invalid url");
+ Docker::host(host)
+ }
+ #[cfg(feature = "unix-socket")]
+ None => Docker::unix("/var/run/docker.sock"),
+ #[cfg(not(feature = "unix-socket"))]
+ None => panic!("Unix socket support is disabled"),
+ }
+ }
+
+ /// Creates a new docker instance for a docker host
+ /// listening on a given Unix socket.
+ #[cfg(feature = "unix-socket")]
+ pub fn unix<S>(socket_path: S) -> Docker
+ where
+ S: Into<String>,
+ {
+ Docker {
+ transport: Transport::Unix {
+ client: Client::builder()
+ .pool_max_idle_per_host(0)
+ .build(UnixConnector),
+ path: socket_path.into(),
+ },
+ }
+ }
+
+ /// constructs a new Docker instance for docker host listening at the given host url
+ pub fn host(host: Uri) -> Docker {
+ let tcp_host_str = format!(
+ "{}://{}:{}",
+ host.scheme_str().unwrap(),
+ host.host().unwrap().to_owned(),
+ host.port_u16().unwrap_or(80)
+ );
+
+ match host.scheme_str() {
+ #[cfg(feature = "unix-socket")]
+ Some("unix") => Docker {
+ transport: Transport::Unix {
+ client: Client::builder().build(UnixConnector),
+ path: host.path().to_owned(),
+ },
+ },
+
+ #[cfg(not(feature = "unix-socket"))]
+ Some("unix") => panic!("Unix socket support is disabled"),
+
+ _ => get_docker_for_tcp(tcp_host_str),
+ }
+ }
+
+ /// Exports an interface for interacting with docker images
+ pub fn images(&'_ self) -> Images<'_> {
+ Images::new(self)
+ }
+
+ /// Exports an interface for interacting with docker containers
+ pub fn containers(&'_ self) -> Containers<'_> {
+ Containers::new(self)
+ }
+
+ /// Exports an interface for interacting with docker services
+ pub fn services(&'_ self) -> Services<'_> {
+ Services::new(self)
+ }
+
+ pub fn networks(&'_ self) -> Networks<'_> {
+ Networks::new(self)
+ }
+
+ pub fn volumes(&'_ self) -> Volumes<'_> {
+ Volumes::new(self)
+ }
+
+ /// Returns version information associated with the docker daemon
+ pub async fn version(&self) -> Result<Version> {
+ self.get_json("/version").await
+ }
+
+ /// Returns information associated with the docker daemon
+ pub async fn info(&self) -> Result<Info> {
+ self.get_json("/info").await
+ }
+
+ /// Returns a simple ping response indicating the docker daemon is accessible
+ pub async fn ping(&self) -> Result<String> {
+ self.get("/_ping").await
+ }
+
+ /// Returns a stream of docker events
+ pub fn events<'docker>(
+ &'docker self,
+ opts: &EventsOptions,
+ ) -> impl Stream<Item = Result<Event>> + Unpin + 'docker {
+ let mut path = vec!["/events".to_owned()];
+ if let Some(query) = opts.serialize() {
+ path.push(query);
+ }
+ let reader = Box::pin(
+ self.stream_get(path.join("?"))
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
+ )
+ .into_async_read();
+
+ let codec = futures_codec::LinesCodec {};
+
+ Box::pin(
+ futures_codec::FramedRead::new(reader, codec)
+ .map_err(Error::IO)
+ .and_then(|s: String| async move {
+ serde_json::from_str(&s).map_err(Error::SerdeJsonError)
+ }),
+ )
+ }
+
+ //
+ // Utility functions to make requests
+ //
+
+ pub(crate) async fn get(
+ &self,
+ endpoint: &str,
+ ) -> Result<String> {
+ self.transport
+ .request(Method::GET, endpoint, Payload::None, Headers::None)
+ .await
+ }
+
+ pub(crate) async fn get_json<T: serde::de::DeserializeOwned>(
+ &self,
+ endpoint: &str,
+ ) -> Result<T> {
+ let raw_string = self
+ .transport
+ .request(Method::GET, endpoint, Payload::None, Headers::None)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&raw_string)?)
+ }
+
+ pub(crate) async fn post(
+ &self,
+ endpoint: &str,
+ body: Option<(Body, Mime)>,
+ ) -> Result<String> {
+ self.transport
+ .request(Method::POST, endpoint, body, Headers::None)
+ .await
+ }
+
+ pub(crate) async fn put(
+ &self,
+ endpoint: &str,
+ body: Option<(Body, Mime)>,
+ ) -> Result<String> {
+ self.transport
+ .request(Method::PUT, endpoint, body, Headers::None)
+ .await
+ }
+
+ pub(crate) async fn post_json<T, B>(
+ &self,
+ endpoint: impl AsRef<str>,
+ body: Option<(B, Mime)>,
+ ) -> Result<T>
+ where
+ T: serde::de::DeserializeOwned,
+ B: Into<Body>,
+ {
+ let string = self
+ .transport
+ .request(Method::POST, endpoint, body, Headers::None)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&string)?)
+ }
+
+ pub(crate) async fn post_json_headers<'a, T, B, H>(
+ &self,
+ endpoint: impl AsRef<str>,
+ body: Option<(B, Mime)>,
+ headers: Option<H>,
+ ) -> Result<T>
+ where
+ T: serde::de::DeserializeOwned,
+ B: Into<Body>,
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
+ {
+ let string = self
+ .transport
+ .request(Method::POST, endpoint, body, headers)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&string)?)
+ }
+
+ pub(crate) async fn delete(
+ &self,
+ endpoint: &str,
+ ) -> Result<String> {
+ self.transport
+ .request(Method::DELETE, endpoint, Payload::None, Headers::None)
+ .await
+ }
+
+ pub(crate) async fn delete_json<T: serde::de::DeserializeOwned>(
+ &self,
+ endpoint: &str,
+ ) -> Result<T> {
+ let string = self
+ .transport
+ .request(Method::DELETE, endpoint, Payload::None, Headers::None)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&string)?)
+ }
+
+ /// Send a streaming post request.
+ ///
+ /// Use stream_post_into_values if the endpoint returns JSON values
+ pub(crate) fn stream_post<'a, H>(
+ &'a self,
+ endpoint: impl AsRef<str> + 'a,
+ body: Option<(Body, Mime)>,
+ headers: Option<H>,
+ ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a
+ where
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
+ {
+ self.transport
+ .stream_chunks(Method::POST, endpoint, body, headers)
+ }
+
+ /// Send a streaming post request that returns a stream of JSON values
+ ///
+ /// Assumes that each received chunk contains one or more JSON values
+ pub(crate) fn stream_post_into_values<'a, H>(
+ &'a self,
+ endpoint: impl AsRef<str> + 'a,
+ body: Option<(Body, Mime)>,
+ headers: Option<H>,
+ ) -> impl Stream<Item = Result<Value>> + 'a
+ where
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
+ {
+ self.stream_post(endpoint, body, headers)
+ .and_then(|chunk| async move {
+ let stream = futures_util::stream::iter(
+ serde_json::Deserializer::from_slice(&chunk)
+ .into_iter()
+ .collect::<Vec<_>>(),
+ )
+ .map_err(Error::from);
+
+ Ok(stream)
+ })
+ .try_flatten()
+ }
+
+ pub(crate) fn stream_get<'a>(
+ &'a self,
+ endpoint: impl AsRef<str> + Unpin + 'a,
+ ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a {
+ let headers = Some(Vec::default());
+ self.transport
+ .stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers)
+ }
+
+ pub(crate) async fn stream_post_upgrade<'a>(
+ &'a self,
+ endpoint: impl AsRef<str> + 'a,
+ body: Option<(Body, Mime)>,
+ ) -> Result<impl futures_util::io::AsyncRead + futures_util::io::AsyncWrite + 'a> {
+ self.transport
+ .stream_upgrade(Method::POST, endpoint, body)
+ .await
+ }
+}
+
+impl Default for Docker {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ #[cfg(feature = "unix-socket")]
+ #[test]
+ fn unix_host_env() {
+ use super::Docker;
+ use std::env;
+ env::set_var("DOCKER_HOST", "unix:///docker.sock");
+ let d = Docker::new();
+ match d.transport {
+ crate::transport::Transport::Unix { path, .. } => {
+ assert_eq!(path, "/docker.sock");
+ }
+ _ => {
+ panic!("Expected transport to be unix.");
+ }
+ }
+ env::set_var("DOCKER_HOST", "http://localhost:8000");
+ let d = Docker::new();
+ match d.transport {
+ crate::transport::Transport::Tcp { host, .. } => {
+ assert_eq!(host, "http://localhost:8000");
+ }
+ _ => {
+ panic!("Expected transport to be http.");
+ }
+ }
+ }
+}
diff --git a/src/errors.rs b/src/errors.rs
index 9d01f91..a182ff6 100644
--- a/src/errors.rs
+++ b/src/errors.rs
@@ -6,6 +6,9 @@ use std::{error::Error as StdError, fmt, string::FromUtf8Error};
use futures_util::io::Error as IoError;
+/// Represents the result of all docker operations
+pub type Result<T> = std::result::Result<T, Error>;
+
#[derive(Debug)]
pub enum Error {
SerdeJsonError(SerdeError),
diff --git a/src/exec.rs b/src/exec.rs
new file mode 100644
index 0000000..179e175
--- /dev/null
+++ b/src/exec.rs
@@ -0,0 +1,172 @@
+//! Run new commands inside running containers.
+//!
+//! API Reference: <https://docs.docker.com/engine/api/v1.41/#tag/Exec>
+
+use std::iter;
+
+use futures_util::{stream::Stream, TryFutureExt};
+use hyper::Body;
+
+use crate::{
+ builder::{ExecContainerOptions, ExecResizeOptions},
+ errors::Result,
+ rep::ExecDetails,
+ tty, Docker,
+};
+
+/// Interface for docker exec instance
+pub struct Exec<'docker> {
+ docker: &'docker Docker,
+ id: String,
+}
+
+impl<'docker> Exec<'docker> {
+ fn new<S>(
+ docker: &'docker Docker,
+ id: S,
+ ) -> Self
+ where
+ S: Into<String>,
+ {
+ Exec {
+ docker,
+ id: id.into(),
+ }
+ }
+
+ /// Creates a new exec instance that will be executed in a container with id == container_id
+ pub async fn create(
+ docker: &'docker Docker,
+ container_id: &str,
+ opts: &ExecContainerOptions,
+ ) -> Result<Exec<'docker>> {
+ #[derive(serde::Deserialize)]
+ #[serde(rename_all = "PascalCase")]
+ struct Response {
+ id: String,
+ }
+
+ let body: Body = opts.serialize()?.into();
+
+ let id = docker
+ .post_json(
+ &format!("/containers/{}/exec", container_id),
+ Some((body, mime::APPLICATION_JSON)),
+ )
+ .await
+ .map(|resp: Response| resp.id)?;
+
+ Ok(Exec::new(docker, id))
+ }
+
+ // This exists for Container::exec()
+ //
+ // We need to combine `Exec::create` and `Exec::start` into one method because otherwise you
+ // needlessly tie the Stream to the lifetime of `container_id` and `opts`. This is because
+ // `Exec::create` is async so it must occur inside of the `async move` block. However, this
+ // means that `container_id` and `opts` are both expected to be alive in the returned stream
+ // because we can't do the work of creating an endpoint from `container_id` or serializing
+ // `opts`. By doing this work outside of the stream, we get owned values that we can then move
+ // into the stream and have the lifetimes work out as you would expect.
+ //
+ // Yes, it is sad that we can't do the easy method and thus have some duplicated code.
+ pub(crate) fn create_and_start(
+ docker: &'docker Docker,
+ container_id: &str,
+ opts: &ExecContainerOptions,
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker {
+ #[derive(serde::Deserialize)]
+ #[serde(rename_all = "PascalCase")]
+ struct Response {
+ id: String,
+ }
+
+ // To not tie the lifetime of `opts` to the stream, we do the serializing work outside of
+ // the stream. But for backwards compatability, we have to return the error inside of the
+ // stream.
+ let body_result = opts.serialize();
+
+ // To not tie the lifetime of `container_id` to the stream, we convert it to an (owned)
+ // endpoint outside of the stream.
+ let container_endpoint = format!("/containers/{}/exec", container_id);
+
+ Box::pin(
+ async move {
+ // Bubble up the error inside the stream for backwards compatability
+ let body: Body = body_result?.into();
+
+ let exec_id = docker
+ .post_json(&container_endpoint, Some((body, mime::APPLICATION_JSON)))
+ .await
+ .map(|resp: Response| resp.id)?;
+
+ let stream = Box::pin(docker.stream_post(
+ format!("/exec/{}/start", exec_id),
+ Some(("{}".into(), mime::APPLICATION_JSON)),
+ None::<iter::Empty<_>>,
+ ));
+
+ Ok(tty::decode(stream))
+ }
+ .try_flatten_stream(),
+ )
+ }
+
+ /// Get a reference to a set of operations available to an already created exec instance.
+ ///
+ /// It's in callers responsibility to ensure that exec instance with specified id actually
+ /// exists. Use [Exec::create](Exec::create) to ensure that the exec instance is created
+ /// beforehand.
+ pub async fn get<S>(
+ docker: &'docker Docker,
+ id: S,
+ ) -> Exec<'docker>
+ where
+ S: Into<String>,
+ {
+ Exec::new(docker, id)
+ }
+
+ /// Starts this exec instance returning a multiplexed tty stream
+ pub fn start(&self) -> impl Stream<Item = Result<tty::TtyChunk>> + 'docker {
+ // We must take ownership of the docker reference to not needlessly tie the stream to the
+ // lifetime of `self`.
+ let docker = self.docker;
+ // We convert `self.id` into the (owned) endpoint outside of the stream to not needlessly
+ // tie the stream to the lifetime of `self`.
+ let endpoint = format!("/exec/{}/start", &self.id);
+ Box::pin(
+ async move {
+ let stream = Box::pin(docker.stream_post(
+