diff options
author | Eli W. Hunter <elihunter173@gmail.com> | 2021-02-07 19:17:32 -0500 |
---|---|---|
committer | Eli W. Hunter <elihunter173@gmail.com> | 2021-03-08 18:48:30 -0500 |
commit | 349d8f2f7343506f986a5773e72f260c78fe9289 (patch) | |
tree | 7569c25d73c516b762df3ffcabfbc56feb6a316f | |
parent | 007b05eaf5127bbaea27cfc816f307946bd7aa5f (diff) |
Split up lib.rs
-rw-r--r-- | src/container.rs | 410 | ||||
-rw-r--r-- | src/docker.rs | 430 | ||||
-rw-r--r-- | src/errors.rs | 3 | ||||
-rw-r--r-- | src/exec.rs | 172 | ||||
-rw-r--r-- | src/image.rs | 229 | ||||
-rw-r--r-- | src/lib.rs | 1523 | ||||
-rw-r--r-- | src/network.rs | 134 | ||||
-rw-r--r-- | src/rep.rs | 2 | ||||
-rw-r--r-- | src/service.rs | 122 | ||||
-rw-r--r-- | src/volume.rs | 85 |
10 files changed, 1605 insertions, 1505 deletions
diff --git a/src/container.rs b/src/container.rs new file mode 100644 index 0000000..ea4d0a7 --- /dev/null +++ b/src/container.rs @@ -0,0 +1,410 @@ +//! Create and manage containers. +//! +//! API Reference: <https://docs.docker.com/engine/api/v1.41/#tag/Container> + +use std::{io, path::Path, time::Duration}; + +use futures_util::{ + io::{AsyncRead, AsyncWrite}, + stream::Stream, + TryStreamExt, +}; +use hyper::Body; +use mime::Mime; +use url::form_urlencoded; + +use crate::{ + builder::{ + ContainerListOptions, ContainerOptions, ExecContainerOptions, LogsOptions, + RmContainerOptions, + }, + docker::Docker, + errors::{Error, Result}, + exec::Exec, + rep::{ + Change, Container as ContainerInfo, ContainerCreateInfo, ContainerDetails, Exit, Stats, Top, + }, + transport::Payload, + tty::{self, Multiplexer as TtyMultiPlexer}, +}; + +/// Interface for accessing and manipulating a docker container +pub struct Container<'docker> { + docker: &'docker Docker, + id: String, +} + +impl<'docker> Container<'docker> { + /// Exports an interface exposing operations against a container instance + pub fn new<S>( + docker: &'docker Docker, + id: S, + ) -> Self + where + S: Into<String>, + { + Container { + docker, + id: id.into(), + } + } + + /// a getter for the container id + pub fn id(&self) -> &str { + &self.id + } + + /// Inspects the current docker container instance's details + pub async fn inspect(&self) -> Result<ContainerDetails> { + self.docker + .get_json::<ContainerDetails>(&format!("/containers/{}/json", self.id)[..]) + .await + } + + /// Returns a `top` view of information about the container process + pub async fn top( + &self, + psargs: Option<&str>, + ) -> Result<Top> { + let mut path = vec![format!("/containers/{}/top", self.id)]; + if let Some(ref args) = psargs { + let encoded = form_urlencoded::Serializer::new(String::new()) + .append_pair("ps_args", args) + .finish(); + path.push(encoded) + } + self.docker.get_json(&path.join("?")).await + } + + /// Returns a stream of logs emitted but the container instance + pub fn logs( + &self, + opts: &LogsOptions, + ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker { + let mut path = vec![format!("/containers/{}/logs", self.id)]; + if let Some(query) = opts.serialize() { + path.push(query) + } + + let stream = Box::pin(self.docker.stream_get(path.join("?"))); + + Box::pin(tty::decode(stream)) + } + + /// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin. + async fn attach_raw(&self) -> Result<impl AsyncRead + AsyncWrite + Send + 'docker> { + self.docker + .stream_post_upgrade( + format!( + "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", + self.id + ), + None, + ) + .await + } + + /// Attaches a `[TtyMultiplexer]` to the container. + /// + /// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin. + /// + /// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method + pub async fn attach(&self) -> Result<TtyMultiPlexer<'docker>> { + let tcp_stream = self.attach_raw().await?; + + Ok(TtyMultiPlexer::new(tcp_stream)) + } + + /// Returns a set of changes made to the container instance + pub async fn changes(&self) -> Result<Vec<Change>> { + self.docker + .get_json::<Vec<Change>>(&format!("/containers/{}/changes", self.id)[..]) + .await + } + + /// Exports the current docker container into a tarball + pub fn export(&self) -> impl Stream<Item = Result<Vec<u8>>> + 'docker { + self.docker + .stream_get(format!("/containers/{}/export", self.id)) + .map_ok(|c| c.to_vec()) + } + + /// Returns a stream of stats specific to this container instance + pub fn stats(&self) -> impl Stream<Item = Result<Stats>> + Unpin + 'docker { + let codec = futures_codec::LinesCodec {}; + + let reader = Box::pin( + self.docker + .stream_get(format!("/containers/{}/stats", self.id)) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + ) + .into_async_read(); + + Box::pin( + futures_codec::FramedRead::new(reader, codec) + .map_err(Error::IO) + .and_then(|s: String| async move { + serde_json::from_str(&s).map_err(Error::SerdeJsonError) + }), + ) + } + + /// Start the container instance + pub async fn start(&self) -> Result<()> { + self.docker + .post(&format!("/containers/{}/start", self.id)[..], None) + .await?; + Ok(()) + } + + /// Stop the container instance + pub async fn stop( + &self, + wait: Option<Duration>, + ) -> Result<()> { + let mut path = vec![format!("/containers/{}/stop", self.id)]; + if let Some(w) = wait { + let encoded = form_urlencoded::Serializer::new(String::new()) + .append_pair("t", &w.as_secs().to_string()) + .finish(); + + path.push(encoded) + } + self.docker.post(&path.join("?"), None).await?; + Ok(()) + } + + /// Restart the container instance + pub async fn restart( + &self, + wait: Option<Duration>, + ) -> Result<()> { + let mut path = vec![format!("/containers/{}/restart", self.id)]; + if let Some(w) = wait { + let encoded = form_urlencoded::Serializer::new(String::new()) + .append_pair("t", &w.as_secs().to_string()) + .finish(); + path.push(encoded) + } + self.docker.post(&path.join("?"), None).await?; + Ok(()) + } + + /// Kill the container instance + pub async fn kill( + &self, + signal: Option<&str>, + ) -> Result<()> { + let mut path = vec![format!("/containers/{}/kill", self.id)]; + if let Some(sig) = signal { + let encoded = form_urlencoded::Serializer::new(String::new()) + .append_pair("signal", &sig.to_owned()) + .finish(); + path.push(encoded) + } + self.docker.post(&path.join("?"), None).await?; + Ok(()) + } + + /// Rename the container instance + pub async fn rename( + &self, + name: &str, + ) -> Result<()> { + let query = form_urlencoded::Serializer::new(String::new()) + .append_pair("name", name) + .finish(); + self.docker + .post( + &format!("/containers/{}/rename?{}", self.id, query)[..], + None, + ) + .await?; + Ok(()) + } + + /// Pause the container instance + pub async fn pause(&self) -> Result<()> { + self.docker + .post(&format!("/containers/{}/pause", self.id)[..], None) + .await?; + Ok(()) + } + + /// Unpause the container instance + pub async fn unpause(&self) -> Result<()> { + self.docker + .post(&format!("/containers/{}/unpause", self.id)[..], None) + .await?; + Ok(()) + } + + /// Wait until the container stops + pub async fn wait(&self) -> Result<Exit> { + self.docker + .post_json(format!("/containers/{}/wait", self.id), Payload::None) + .await + } + + /// Delete the container instance + /// + /// Use remove instead to use the force/v options. + pub async fn delete(&self) -> Result<()> { + self.docker + .delete(&format!("/containers/{}", self.id)[..]) + .await?; + Ok(()) + } + + /// Delete the container instance (todo: force/v) + pub async fn remove( + &self, + opts: RmContainerOptions, + ) -> Result<()> { + let mut path = vec![format!("/containers/{}", self.id)]; + if let Some(query) = opts.serialize() { + path.push(query) + } + self.docker.delete(&path.join("?")).await?; + Ok(()) + } + + /// Execute a command in this container + pub fn exec( + &self, + opts: &ExecContainerOptions, + ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker { + Exec::create_and_start(self.docker, &self.id, opts) + } + + /// Copy a file/folder from the container. The resulting stream is a tarball of the extracted + /// files. + /// + /// If `path` is not an absolute path, it is relative to the container’s root directory. The + /// resource specified by `path` must exist. To assert that the resource is expected to be a + /// directory, `path` should end in `/` or `/`. (assuming a path separator of `/`). If `path` + /// ends in `/.` then this indicates that only the contents of the path directory should be + /// copied. A symlink is always resolved to its target. + pub fn copy_from( + &self, + path: &Path, + ) -> impl Stream<Item = Result<Vec<u8>>> + 'docker { + let path_arg = form_urlencoded::Serializer::new(String::new()) + .append_pair("path", &path.to_string_lossy()) + .finish(); + + let endpoint = format!("/containers/{}/archive?{}", self.id, path_arg); + self.docker.stream_get(endpoint).map_ok(|c| c.to_vec()) + } + + /// Copy a byte slice as file into (see `bytes`) the container. + /// + /// The file will be copied at the given location (see `path`) and will be owned by root + /// with access mask 644. + pub async fn copy_file_into<P: AsRef<Path>>( + &self, + path: P, + bytes: &[u8], + ) -> Result<()> { + let path = path.as_ref(); + + let mut ar = tar::Builder::new(Vec::new()); + let mut header = tar::Header::new_gnu(); + header.set_size(bytes.len() as u64); + header.set_mode(0o0644); + ar.append_data( + &mut header, + path.to_path_buf() + .iter() + .skip(1) + .collect::<std::path::PathBuf>(), + bytes, + ) + .unwrap(); + let data = ar.into_inner().unwrap(); + + self.copy_to(Path::new("/"), data.into()).await?; + Ok(()) + } + + /// Copy a tarball (see `body`) to the container. + /// + /// The tarball will be copied to the container and extracted at the given location (see `path`). + pub async fn copy_to( + &self, + path: &Path, + body: Body, + ) -> Result<()> { + let path_arg = form_urlencoded::Serializer::new(String::new()) + .append_pair("path", &path.to_string_lossy()) + .finish(); + + let mime = "application/x-tar".parse::<Mime>().unwrap(); + + self.docker + .put( + &format!("/containers/{}/archive?{}", self.id, path_arg), + Some((body, mime)), + ) + .await?; + Ok(()) + } +} + +/// Interface for docker containers +pub struct Containers<'docker> { + docker: &'docker Docker, +} + +impl<'docker> Containers<'docker> { + /// Exports an interface for interacting with docker containers + pub fn new(docker: &'docker Docker) -> Self { + Containers { docker } + } + + /// Lists the container instances on the docker host + pub async fn list( + &self, + opts: &ContainerListOptions, + ) -> Result<Vec<ContainerInfo>> { + let mut path = vec!["/containers/json".to_owned()]; + if let Some(query) = opts.serialize() { + path.push(query) + } + self.docker + .get_json::<Vec<ContainerInfo>>(&path.join("?")) + .await + } + + /// Returns a reference to a set of operations available to a specific container instance + pub fn get<S>( + &self, + name: S, + ) -> Container<'docker> + where + S: Into<String>, + { + Container::new(self.docker, name) + } + + /// Returns a builder interface for creating a new container instance + pub async fn create( + &self, + opts: &ContainerOptions, + ) -> Result<ContainerCreateInfo> { + let body: Body = opts.serialize()?.into(); + let mut path = vec!["/containers/create".to_owned()]; + + if let Some(ref name) = opts.name { + path.push( + form_urlencoded::Serializer::new(String::new()) + .append_pair("name", name) + .finish(), + ); + } + + self.docker + .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON))) + .await + } +} diff --git a/src/docker.rs b/src/docker.rs new file mode 100644 index 0000000..d3faa82 --- /dev/null +++ b/src/docker.rs @@ -0,0 +1,430 @@ +//! Main entrypoint for interacting with the Docker API. +//! +//! API Reference: <https://docs.docker.com/engine/api/v1.41/> + +use std::{env, io, path::Path}; + +use futures_util::{stream::Stream, TryStreamExt}; +use hyper::{client::HttpConnector, Body, Client, Method}; +use mime::Mime; +use serde_json::Value; + +use crate::{ + builder::EventsOptions, + container::Containers, + errors::{Error, Result}, + image::Images, + network::Networks, + rep::{Event, Info, Version}, + service::Services, + transport::{Headers, Payload, Transport}, + volume::Volumes, + Uri, +}; + +#[cfg(feature = "tls")] +use hyper_openssl::HttpsConnector; +#[cfg(feature = "tls")] +use openssl::ssl::{SslConnector, SslFiletype, SslMethod}; + +#[cfg(feature = "unix-socket")] +use hyperlocal::UnixConnector; + +/// Entrypoint interface for communicating with docker daemon +#[derive(Clone)] +pub struct Docker { + transport: Transport, +} + +fn get_http_connector() -> HttpConnector { + let mut http = HttpConnector::new(); + http.enforce_http(false); + + http +} + +#[cfg(feature = "tls")] +fn get_docker_for_tcp(tcp_host_str: String) -> Docker { + let http = get_http_connector(); + if let Ok(ref certs) = env::var("DOCKER_CERT_PATH") { + // fixme: don't unwrap before you know what's in the box + // https://github.com/hyperium/hyper/blob/master/src/net.rs#L427-L428 + let mut connector = SslConnector::builder(SslMethod::tls()).unwrap(); + connector.set_cipher_list("DEFAULT").unwrap(); + let cert = &format!("{}/cert.pem", certs); + let key = &format!("{}/key.pem", certs); + connector + .set_certificate_file(&Path::new(cert), SslFiletype::PEM) + .unwrap(); + connector + .set_private_key_file(&Path::new(key), SslFiletype::PEM) + .unwrap(); + if env::var("DOCKER_TLS_VERIFY").is_ok() { + let ca = &format!("{}/ca.pem", certs); + connector.set_ca_file(&Path::new(ca)).unwrap(); + } + + // If we are attempting to connec to the docker daemon via tcp + // we need to convert the scheme to `https` to let hyper connect. + // Otherwise, hyper will reject the connection since it does not + // recongnize `tcp` as a valid `http` scheme. + let tcp_host_str = if tcp_host_str.contains("tcp://") { + tcp_host_str.replace("tcp://", "https://") + } else { + tcp_host_str + }; + + Docker { + transport: Transport::EncryptedTcp { + client: Client::builder() + .build(HttpsConnector::with_connector(http, connector).unwrap()), + host: tcp_host_str, + }, + } + } else { + Docker { + transport: Transport::Tcp { + client: Client::builder().build(http), + host: tcp_host_str, + }, + } + } +} + +#[cfg(not(feature = "tls"))] +fn get_docker_for_tcp(tcp_host_str: String) -> Docker { + let http = get_http_connector(); + Docker { + transport: Transport::Tcp { + client: Client::builder().build(http), + host: tcp_host_str, + }, + } +} + +// https://docs.docker.com/reference/api/docker_remote_api_v1.17/ +impl Docker { + /// constructs a new Docker instance for a docker host listening at a url specified by an env var `DOCKER_HOST`, + /// falling back on unix:///var/run/docker.sock + pub fn new() -> Docker { + match env::var("DOCKER_HOST").ok() { + Some(host) => { + #[cfg(feature = "unix-socket")] + if let Some(path) = host.strip_prefix("unix://") { + return Docker::unix(path); + } + let host = host.parse().expect("invalid url"); + Docker::host(host) + } + #[cfg(feature = "unix-socket")] + None => Docker::unix("/var/run/docker.sock"), + #[cfg(not(feature = "unix-socket"))] + None => panic!("Unix socket support is disabled"), + } + } + + /// Creates a new docker instance for a docker host + /// listening on a given Unix socket. + #[cfg(feature = "unix-socket")] + pub fn unix<S>(socket_path: S) -> Docker + where + S: Into<String>, + { + Docker { + transport: Transport::Unix { + client: Client::builder() + .pool_max_idle_per_host(0) + .build(UnixConnector), + path: socket_path.into(), + }, + } + } + + /// constructs a new Docker instance for docker host listening at the given host url + pub fn host(host: Uri) -> Docker { + let tcp_host_str = format!( + "{}://{}:{}", + host.scheme_str().unwrap(), + host.host().unwrap().to_owned(), + host.port_u16().unwrap_or(80) + ); + + match host.scheme_str() { + #[cfg(feature = "unix-socket")] + Some("unix") => Docker { + transport: Transport::Unix { + client: Client::builder().build(UnixConnector), + path: host.path().to_owned(), + }, + }, + + #[cfg(not(feature = "unix-socket"))] + Some("unix") => panic!("Unix socket support is disabled"), + + _ => get_docker_for_tcp(tcp_host_str), + } + } + + /// Exports an interface for interacting with docker images + pub fn images(&'_ self) -> Images<'_> { + Images::new(self) + } + + /// Exports an interface for interacting with docker containers + pub fn containers(&'_ self) -> Containers<'_> { + Containers::new(self) + } + + /// Exports an interface for interacting with docker services + pub fn services(&'_ self) -> Services<'_> { + Services::new(self) + } + + pub fn networks(&'_ self) -> Networks<'_> { + Networks::new(self) + } + + pub fn volumes(&'_ self) -> Volumes<'_> { + Volumes::new(self) + } + + /// Returns version information associated with the docker daemon + pub async fn version(&self) -> Result<Version> { + self.get_json("/version").await + } + + /// Returns information associated with the docker daemon + pub async fn info(&self) -> Result<Info> { + self.get_json("/info").await + } + + /// Returns a simple ping response indicating the docker daemon is accessible + pub async fn ping(&self) -> Result<String> { + self.get("/_ping").await + } + + /// Returns a stream of docker events + pub fn events<'docker>( + &'docker self, + opts: &EventsOptions, + ) -> impl Stream<Item = Result<Event>> + Unpin + 'docker { + let mut path = vec!["/events".to_owned()]; + if let Some(query) = opts.serialize() { + path.push(query); + } + let reader = Box::pin( + self.stream_get(path.join("?")) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + ) + .into_async_read(); + + let codec = futures_codec::LinesCodec {}; + + Box::pin( + futures_codec::FramedRead::new(reader, codec) + .map_err(Error::IO) + .and_then(|s: String| async move { + serde_json::from_str(&s).map_err(Error::SerdeJsonError) + }), + ) + } + + // + // Utility functions to make requests + // + + pub(crate) async fn get( + &self, + endpoint: &str, + ) -> Result<String> { + self.transport + .request(Method::GET, endpoint, Payload::None, Headers::None) + .await + } + + pub(crate) async fn get_json<T: serde::de::DeserializeOwned>( + &self, + endpoint: &str, + ) -> Result<T> { + let raw_string = self + .transport + .request(Method::GET, endpoint, Payload::None, Headers::None) + .await?; + + Ok(serde_json::from_str::<T>(&raw_string)?) + } + + pub(crate) async fn post( + &self, + endpoint: &str, + body: Option<(Body, Mime)>, + ) -> Result<String> { + self.transport + .request(Method::POST, endpoint, body, Headers::None) + .await + } + + pub(crate) async fn put( + &self, + endpoint: &str, + body: Option<(Body, Mime)>, + ) -> Result<String> { + self.transport + .request(Method::PUT, endpoint, body, Headers::None) + .await + } + + pub(crate) async fn post_json<T, B>( + &self, + endpoint: impl AsRef<str>, + body: Option<(B, Mime)>, + ) -> Result<T> + where + T: serde::de::DeserializeOwned, + B: Into<Body>, + { + let string = self + .transport + .request(Method::POST, endpoint, body, Headers::None) + .await?; + + Ok(serde_json::from_str::<T>(&string)?) + } + + pub(crate) async fn post_json_headers<'a, T, B, H>( + &self, + endpoint: impl AsRef<str>, + body: Option<(B, Mime)>, + headers: Option<H>, + ) -> Result<T> + where + T: serde::de::DeserializeOwned, + B: Into<Body>, + H: IntoIterator<Item = (&'static str, String)> + 'a, + { + let string = self + .transport + .request(Method::POST, endpoint, body, headers) + .await?; + + Ok(serde_json::from_str::<T>(&string)?) + } + + pub(crate) async fn delete( + &self, + endpoint: &str, + ) -> Result<String> { + self.transport + .request(Method::DELETE, endpoint, Payload::None, Headers::None) + .await + } + + pub(crate) async fn delete_json<T: serde::de::DeserializeOwned>( + &self, + endpoint: &str, + ) -> Result<T> { + let string = self + .transport + .request(Method::DELETE, endpoint, Payload::None, Headers::None) + .await?; + + Ok(serde_json::from_str::<T>(&string)?) + } + + /// Send a streaming post request. + /// + /// Use stream_post_into_values if the endpoint returns JSON values + pub(crate) fn stream_post<'a, H>( + &'a self, + endpoint: impl AsRef<str> + 'a, + body: Option<(Body, Mime)>, + headers: Option<H>, + ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a + where + H: IntoIterator<Item = (&'static str, String)> + 'a, + { + self.transport + .stream_chunks(Method::POST, endpoint, body, headers) + } + + /// Send a streaming post request that returns a stream of JSON values + /// + /// Assumes that each received chunk contains one or more JSON values + pub(crate) fn stream_post_into_values<'a, H>( + &'a self, + endpoint: impl AsRef<str> + 'a, + body: Option<(Body, Mime)>, + headers: Option<H>, + ) -> impl Stream<Item = Result<Value>> + 'a + where + H: IntoIterator<Item = (&'static str, String)> + 'a, + { + self.stream_post(endpoint, body, headers) + .and_then(|chunk| async move { + let stream = futures_util::stream::iter( + serde_json::Deserializer::from_slice(&chunk) + .into_iter() + .collect::<Vec<_>>(), + ) + .map_err(Error::from); + + Ok(stream) + }) + .try_flatten() + } + + pub(crate) fn stream_get<'a>( + &'a self, + endpoint: impl AsRef<str> + Unpin + 'a, + ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a { + let headers = Some(Vec::default()); + self.transport + .stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers) + } + + pub(crate) async fn stream_post_upgrade<'a>( + &'a self, + endpoint: impl AsRef<str> + 'a, + body: Option<(Body, Mime)>, + ) -> Result<impl futures_util::io::AsyncRead + futures_util::io::AsyncWrite + 'a> { + self.transport + .stream_upgrade(Method::POST, endpoint, body) + .await + } +} + +impl Default for Docker { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + #[cfg(feature = "unix-socket")] + #[test] + fn unix_host_env() { + use super::Docker; + use std::env; + env::set_var("DOCKER_HOST", "unix:///docker.sock"); + let d = Docker::new(); + match d.transport { + crate::transport::Transport::Unix { path, .. } => { + assert_eq!(path, "/docker.sock"); + } + _ => { + panic!("Expected transport to be unix."); + } + } + env::set_var("DOCKER_HOST", "http://localhost:8000"); + let d = Docker::new(); + match d.transport { + crate::transport::Transport::Tcp { host, .. } => { + assert_eq!(host, "http://localhost:8000"); + } + _ => { + panic!("Expected transport to be http."); + } + } + } +} diff --git a/src/errors.rs b/src/errors.rs index 9d01f91..a182ff6 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -6,6 +6,9 @@ use std::{error::Error as StdError, fmt, string::FromUtf8Error}; use futures_util::io::Error as IoError; +/// Represents the result of all docker operations +pub type Result<T> = std::result::Result<T, Error>; + #[derive(Debug)] pub enum Error { SerdeJsonError(SerdeError), diff --git a/src/exec.rs b/src/exec.rs new file mode 100644 index 0000000..179e175 --- /dev/null +++ b/src/exec.rs @@ -0,0 +1,172 @@ +//! Run new commands inside running containers. +//! +//! API Reference: <https://docs.docker.com/engine/api/v1.41/#tag/Exec> + +use std::iter; + +use futures_util::{stream::Stream, TryFutureExt}; +use hyper::Body; + +use crate::{ + builder::{ExecContainerOptions, ExecResizeOptions}, + errors::Result, + rep::ExecDetails, + tty, Docker, +}; + +/// Interface for docker exec instance +pub struct Exec<'docker> { + docker: &'docker Docker, + id: String, +} + +impl<'docker> Exec<'docker> { + fn new<S>( + docker: &'docker Docker, + id: S, + ) -> Self + where + S: Into<String>, + { + Exec { + docker, + id: id.into(), + } + } + + /// Creates a new exec instance that will be executed in a container with id == container_id + pub async fn create( + docker: &'docker Docker, + container_id: &str, + opts: &ExecContainerOptions, + ) -> Result<Exec<'docker>> { + #[derive(serde::Deserialize)] + #[serde(rename_all = "PascalCase")] + struct Response { + id: String, + } + + let body: Body = opts.serialize()?.into(); + + let id = docker + .post_json( + &format!("/containers/{}/exec", container_id), + Some((body, mime::APPLICATION_JSON)), + ) + .await + .map(|resp: Response| resp.id)?; + + Ok(Exec::new(docker, id)) + } + + // This exists for Container::exec() + // + // We need to combine `Exec::create` and `Exec::start` into one method because otherwise you + // needlessly tie the Stream to the lifetime of `container_id` and `opts`. This is because + // `Exec::create` is async so it must occur inside of the `async move` block. However, this + // means that `container_id` and `opts` are both expected to be alive in the returned stream + // because we can't do the work of creating an endpoint from `container_id` or serializing + // `opts`. By doing this work outside of the stream, we get owned values that we can then move + // into the stream and have the lifetimes work out as you would expect. + // + // Yes, it is sad that we can't do the easy method and thus have some duplicated code. + pub(crate) fn create_and_start( + docker: &'docker Docker, + container_id: &str, + opts: &ExecContainerOptions, + ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'docker { + #[derive(serde::Deserialize)] + #[serde(rename_all = "PascalCase")] + struct Response { + id: String, + } + + // To not tie the lifetime of `opts` to the stream, we do the serializing work outside of + // the stream. But for backwards compatability, we have to return the error inside of the + // stream. + let body_result = opts.serialize(); + + // To not tie the lifetime of `container_id` to the stream, we convert it to an (owned) + // endpoint outside of the stream. + let container_endpoint = format!("/containers/{}/exec", container_id); + + Box::pin( + async move { + // Bubble up the error inside the stream for backwards compatability + let body: Body = body_result?.into(); + + let exec_id = docker + .post_json(&container_endpoint, Some((body, mime::APPLICATION_JSON))) + .await + .map(|resp: Response| resp.id)?; + + let stream = Box::pin(docker.stream_post( + format!("/exec/{}/start", exec_id), + Some(("{}".into(), mime::APPLICATION_JSON)), + None::<iter::Empty<_>>, + )); + + Ok(tty::decode(stream)) + } + .try_flatten_stream(), + ) + } + + /// Get a reference to a set of operations available to an already created exec instance. + /// + /// It's in callers responsibility to ensure that exec instance with specified id actually + /// exists. Use [Exec::create](Exec::create) to ensure that the exec instance is created + /// beforehand. + pub async fn get<S>( + docker: &'docker Docker, + id: S, + ) -> Exec<'docker> + where + S: Into<String>, + { + Exec::new(docker, id) + } + + /// Starts this exec instance returning a multiplexed tty stream + pub fn start(&self) -> impl Stream<Item = Result<tty::TtyChunk>> + 'docker { + // We must take ownership of the docker reference to not needlessly tie the stream to the + // lifetime of `self`. + let docker = self.docker; + // We convert `self.id` into the (owned) endpoint outside of the stream to not needlessly + // tie the stream to the lifetime of `self`. + let endpoint = format!("/exec/{}/start", &self.id); + Box::pin( + async move { + let stream = Box::pin(docker.stream_post( + |