//! Shiplift is a multi-transport utility for maneuvering [docker](https://www.docker.com/) containers //! //! # examples //! //! ```no_run //! # async { //! let docker = shiplift::Docker::new(); //! //! match docker.images().list(&Default::default()).await { //! Ok(images) => { //! for image in images { //! println!("{:?}", image.repo_tags); //! } //! }, //! Err(e) => eprintln!("Something bad happened! {}", e), //! } //! # }; //! ``` pub mod builder; pub mod errors; pub mod rep; pub mod transport; pub mod tty; mod tarball; pub use crate::{ builder::{ BuildOptions, ContainerConnectionOptions, ContainerFilter, ContainerListOptions, ContainerOptions, EventsOptions, ExecContainerOptions, ExecResizeOptions, ImageFilter, ImageListOptions, LogsOptions, NetworkCreateOptions, NetworkListOptions, PullOptions, RegistryAuth, RmContainerOptions, ServiceFilter, ServiceListOptions, ServiceOptions, TagOptions, VolumeCreateOptions, }, errors::Error, }; use crate::{ rep::{ Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, ExecDetails, Exit, History, Image as ImageRep, ImageDetails, Info, NetworkCreateInfo, NetworkDetails as NetworkInfo, SearchResult, ServiceCreateInfo, ServiceDetails, Services as ServicesRep, Stats, Status, Top, Version, Volume as VolumeRep, VolumeCreateInfo, Volumes as VolumesRep, }, transport::{tar, Headers, Payload, Transport}, tty::Multiplexer as TtyMultiPlexer, }; use futures_util::{ io::{AsyncRead, AsyncWrite}, stream::Stream, TryFutureExt, TryStreamExt, }; // use futures::{future::Either, Future, IntoFuture, Stream}; pub use hyper::Uri; use hyper::{client::HttpConnector, Body, Client, Method}; #[cfg(feature = "tls")] use hyper_openssl::HttpsConnector; #[cfg(feature = "unix-socket")] use hyperlocal::UnixConnector; use mime::Mime; #[cfg(feature = "tls")] use openssl::ssl::{SslConnector, SslFiletype, SslMethod}; use serde_json::Value; use std::{env, io, io::Read, iter, path::Path, time::Duration}; use url::form_urlencoded; /// Represents the result of all docker operations pub type Result = std::result::Result; /// Entrypoint interface for communicating with docker daemon #[derive(Clone)] pub struct Docker { transport: Transport, } /// Interface for accessing and manipulating a named docker image pub struct Image<'docker> { docker: &'docker Docker, name: String, } impl<'docker> Image<'docker> { /// Exports an interface for operations that may be performed against a named image pub fn new( docker: &'docker Docker, name: S, ) -> Self where S: Into, { Image { docker, name: name.into(), } } /// Inspects a named image's details pub async fn inspect(&self) -> Result { self.docker .get_json(&format!("/images/{}/json", self.name)[..]) .await } /// Lists the history of the images set of changes pub async fn history(&self) -> Result> { self.docker .get_json(&format!("/images/{}/history", self.name)[..]) .await } /// Deletes an image pub async fn delete(&self) -> Result> { self.docker .delete_json::>(&format!("/images/{}", self.name)[..]) .await } /// Export this image to a tarball pub fn export(&self) -> impl Stream>> + Unpin + 'docker { Box::pin( self.docker .stream_get(format!("/images/{}/get", self.name)) .map_ok(|c| c.to_vec()), ) } /// Adds a tag to an image pub async fn tag( &self, opts: &TagOptions, ) -> Result<()> { let mut path = vec![format!("/images/{}/tag", self.name)]; if let Some(query) = opts.serialize() { path.push(query) } let _ = self.docker.post(&path.join("?"), None).await?; Ok(()) } } /// Interface for docker images pub struct Images<'docker> { docker: &'docker Docker, } impl<'docker> Images<'docker> { /// Exports an interface for interacting with docker images pub fn new(docker: &'docker Docker) -> Self { Images { docker } } /// Builds a new image build by reading a Dockerfile in a target directory pub fn build( &self, opts: &BuildOptions, ) -> impl Stream> + Unpin + 'docker { let mut endpoint = vec!["/build".to_owned()]; if let Some(query) = opts.serialize() { endpoint.push(query) } // To not tie the lifetime of `opts` to the 'stream, we do the tarring work outside of the // stream. But for backwards compatability, we have to return the error inside of the // stream. let mut bytes = Vec::default(); let tar_result = tarball::dir(&mut bytes, opts.path.as_str()); // We must take ownership of the Docker reference. If we don't then the lifetime of 'stream // is incorrectly tied to `self`. let docker = self.docker; Box::pin( async move { // Bubble up error inside the stream for backwards compatability tar_result?; let value_stream = docker.stream_post_into_values( endpoint.join("?"), Some((Body::from(bytes), tar())), None::>, ); Ok(value_stream) } .try_flatten_stream(), ) } /// Lists the docker images on the current docker host pub async fn list( &self, opts: &ImageListOptions, ) -> Result> { let mut path = vec!["/images/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } self.docker.get_json::>(&path.join("?")).await } /// Returns a reference to a set of operations available for a named image pub fn get( &self, name: S, ) -> Image<'docker> where S: Into, { Image::new(self.docker, name) } /// Search for docker images by term pub async fn search( &self, term: &str, ) -> Result> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("term", term) .finish(); self.docker .get_json::>(&format!("/images/search?{}", query)[..]) .await } /// Pull and create a new docker images from an existing image pub fn pull( &self, opts: &PullOptions, ) -> impl Stream> + Unpin + 'docker { let mut path = vec!["/images/create".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } let headers = opts .auth_header() .map(|a| iter::once(("X-Registry-Auth", a))); Box::pin( self.docker .stream_post_into_values(path.join("?"), None, headers), ) } /// exports a collection of named images, /// either by name, name:tag, or image id, into a tarball pub fn export( &self, names: Vec<&str>, ) -> impl Stream>> + 'docker { let params = names.iter().map(|n| ("names", *n)); let query = form_urlencoded::Serializer::new(String::new()) .extend_pairs(params) .finish(); self.docker .stream_get(format!("/images/get?{}", query)) .map_ok(|c| c.to_vec()) } /// imports an image or set of images from a given tarball source /// source can be uncompressed on compressed via gzip, bzip2 or xz pub fn import( self, mut tarball: R, ) -> impl Stream> + Unpin + 'docker where R: Read + Send + 'docker, { Box::pin( async move { let mut bytes = Vec::default(); tarball.read_to_end(&mut bytes)?; let value_stream = self.docker.stream_post_into_values( "/images/load", Some((Body::from(bytes), tar())), None::>, ); Ok(value_stream) } .try_flatten_stream(), ) } } /// Interface for accessing and manipulating a docker container pub struct Container<'docker> { docker: &'docker Docker, id: String, } impl<'docker> Container<'docker> { /// Exports an interface exposing operations against a container instance pub fn new( docker: &'docker Docker, id: S, ) -> Self where S: Into, { Container { docker, id: id.into(), } } /// a getter for the container id pub fn id(&self) -> &str { &self.id } /// Inspects the current docker container instance's details pub async fn inspect(&self) -> Result { self.docker .get_json::(&format!("/containers/{}/json", self.id)[..]) .await } /// Returns a `top` view of information about the container process pub async fn top( &self, psargs: Option<&str>, ) -> Result { let mut path = vec![format!("/containers/{}/top", self.id)]; if let Some(ref args) = psargs { let encoded = form_urlencoded::Serializer::new(String::new()) .append_pair("ps_args", args) .finish(); path.push(encoded) } self.docker.get_json(&path.join("?")).await } /// Returns a stream of logs emitted but the container instance pub fn logs( &self, opts: &LogsOptions, ) -> impl Stream> + Unpin + 'docker { let mut path = vec![format!("/containers/{}/logs", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } let stream = Box::pin(self.docker.stream_get(path.join("?"))); Box::pin(tty::decode(stream)) } /// Attaches a multiplexed TCP stream to the container that can be used to read Stdout, Stderr and write Stdin. async fn attach_raw(&self) -> Result { self.docker .stream_post_upgrade( format!( "/containers/{}/attach?stream=1&stdout=1&stderr=1&stdin=1", self.id ), None, ) .await } /// Attaches a `[TtyMultiplexer]` to the container. /// /// The `[TtyMultiplexer]` implements Stream for returning Stdout and Stderr chunks. It also implements `[AsyncWrite]` for writing to Stdin. /// /// The multiplexer can be split into its read and write halves with the `[split](TtyMultiplexer::split)` method pub async fn attach(&self) -> Result> { let tcp_stream = self.attach_raw().await?; Ok(TtyMultiPlexer::new(tcp_stream)) } /// Returns a set of changes made to the container instance pub async fn changes(&self) -> Result> { self.docker .get_json::>(&format!("/containers/{}/changes", self.id)[..]) .await } /// Exports the current docker container into a tarball pub fn export(&self) -> impl Stream>> + 'docker { self.docker .stream_get(format!("/containers/{}/export", self.id)) .map_ok(|c| c.to_vec()) } /// Returns a stream of stats specific to this container instance pub fn stats(&self) -> impl Stream> + Unpin + 'docker { let codec = futures_codec::LinesCodec {}; let reader = Box::pin( self.docker .stream_get(format!("/containers/{}/stats", self.id)) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), ) .into_async_read(); Box::pin( futures_codec::FramedRead::new(reader, codec) .map_err(Error::IO) .and_then(|s: String| async move { serde_json::from_str(&s).map_err(Error::SerdeJsonError) }), ) } /// Start the container instance pub async fn start(&self) -> Result<()> { self.docker .post(&format!("/containers/{}/start", self.id)[..], None) .await?; Ok(()) } /// Stop the container instance pub async fn stop( &self, wait: Option, ) -> Result<()> { let mut path = vec![format!("/containers/{}/stop", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) .append_pair("t", &w.as_secs().to_string()) .finish(); path.push(encoded) } self.docker.post(&path.join("?"), None).await?; Ok(()) } /// Restart the container instance pub async fn restart( &self, wait: Option, ) -> Result<()> { let mut path = vec![format!("/containers/{}/restart", self.id)]; if let Some(w) = wait { let encoded = form_urlencoded::Serializer::new(String::new()) .append_pair("t", &w.as_secs().to_string()) .finish(); path.push(encoded) } self.docker.post(&path.join("?"), None).await?; Ok(()) } /// Kill the container instance pub async fn kill( &self, signal: Option<&str>, ) -> Result<()> { let mut path = vec![format!("/containers/{}/kill", self.id)]; if let Some(sig) = signal { let encoded = form_urlencoded::Serializer::new(String::new()) .append_pair("signal", &sig.to_owned()) .finish(); path.push(encoded) } self.docker.post(&path.join("?"), None).await?; Ok(()) } /// Rename the container instance pub async fn rename( &self, name: &str, ) -> Result<()> { let query = form_urlencoded::Serializer::new(String::new()) .append_pair("name", name) .finish(); self.docker .post( &format!("/containers/{}/rename?{}", self.id, query)[..], None, ) .await?; Ok(()) } /// Pause the container instance pub async fn pause(&self) -> Result<()> { self.docker .post(&format!("/containers/{}/pause", self.id)[..], None) .await?; Ok(()) } /// Unpause the container instance pub async fn unpause(&self) -> Result<()> { self.docker .post(&format!("/containers/{}/unpause", self.id)[..], None) .await?; Ok(()) } /// Wait until the container stops pub async fn wait(&self) -> Result { self.docker .post_json( format!("/containers/{}/wait", self.id), Option::<(Body, Mime)>::None, ) .await } /// Delete the container instance /// /// Use remove instead to use the force/v options. pub async fn delete(&self) -> Result<()> { self.docker .delete(&format!("/containers/{}", self.id)[..]) .await?; Ok(()) } /// Delete the container instance (todo: force/v) pub async fn remove( &self, opts: RmContainerOptions, ) -> Result<()> { let mut path = vec![format!("/containers/{}", self.id)]; if let Some(query) = opts.serialize() { path.push(query) } self.docker.delete(&path.join("?")).await?; Ok(()) } /// Execute a command in this container pub fn exec( &self, opts: &ExecContainerOptions, ) -> impl Stream> + Unpin + 'docker { Exec::create_and_start(self.docker, &self.id, opts) } /// Copy a file/folder from the container. The resulting stream is a tarball of the extracted /// files. /// /// If `path` is not an absolute path, it is relative to the container’s root directory. The /// resource specified by `path` must exist. To assert that the resource is expected to be a /// directory, `path` should end in `/` or `/`. (assuming a path separator of `/`). If `path` /// ends in `/.` then this indicates that only the contents of the path directory should be /// copied. A symlink is always resolved to its target. pub fn copy_from( &self, path: &Path, ) -> impl Stream>> + 'docker { let path_arg = form_urlencoded::Serializer::new(String::new()) .append_pair("path", &path.to_string_lossy()) .finish(); let endpoint = format!("/containers/{}/archive?{}", self.id, path_arg); self.docker.stream_get(endpoint).map_ok(|c| c.to_vec()) } /// Copy a byte slice as file into (see `bytes`) the container. /// /// The file will be copied at the given location (see `path`) and will be owned by root /// with access mask 644. pub async fn copy_file_into>( &self, path: P, bytes: &[u8], ) -> Result<()> { let path = path.as_ref(); let mut ar = tar::Builder::new(Vec::new()); let mut header = tar::Header::new_gnu(); header.set_size(bytes.len() as u64); header.set_mode(0o0644); ar.append_data( &mut header, path.to_path_buf() .iter() .skip(1) .collect::(), bytes, ) .unwrap(); let data = ar.into_inner().unwrap(); self.copy_to(Path::new("/"), data.into()).await?; Ok(()) } /// Copy a tarball (see `body`) to the container. /// /// The tarball will be copied to the container and extracted at the given location (see `path`). pub async fn copy_to( &self, path: &Path, body: Body, ) -> Result<()> { let path_arg = form_urlencoded::Serializer::new(String::new()) .append_pair("path", &path.to_string_lossy()) .finish(); let mime = "application/x-tar".parse::().unwrap(); self.docker .put( &format!("/containers/{}/archive?{}", self.id, path_arg), Some((body, mime)), ) .await?; Ok(()) } } /// Interface for docker containers pub struct Containers<'docker> { docker: &'docker Docker, } impl<'docker> Containers<'docker> { /// Exports an interface for interacting with docker containers pub fn new(docker: &'docker Docker) -> Self { Containers { docker } } /// Lists the container instances on the docker host pub async fn list( &self, opts: &ContainerListOptions, ) -> Result> { let mut path = vec!["/containers/json".to_owned()]; if let Some(query) = opts.serialize() { path.push(query) } self.docker .get_json::>(&path.join("?")) .await } /// Returns a reference to a set of operations available to a specific container instance pub fn get( &self, name: S, ) -> Container<'docker> where S: Into, { Container::new(self.docker, name) } /// Same as `Container::get()`, but checks whether the container exists and returns None if it /// doesn't pub async fn get_checked( &self, name: S, ) -> Result>> where S: AsRef, { let listopts = ContainerListOptions::builder().all().build(); self.list(&listopts) .await? .into_iter() .find(|rep| rep.id == name.as_ref()) .map(|rep| Container::new(self.docker, rep.id)) .map(Ok) .transpose() } /// Returns a builder interface for creating a new container instance pub async fn create( &self, opts: &ContainerOptions, ) -> Result { let body: Body = opts.serialize()?.into(); let mut path = vec!["/containers/create".to_owned()]; if let Some(ref name) = opts.name { path.push( form_urlencoded::Serializer::new(String::new()) .append_pair("name", name) .finish(), ); } self.docker .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON))) .await } } /// Interface for docker exec instance pub struct Exec<'docker> { docker: &'docker Docker, id: String, } impl<'docker> Exec<'docker> { fn new( docker: &'docker Docker, id: S, ) -> Self where S: Into, { Exec { docker, id: id.into(), } } /// Creates a new exec instance that will be executed in a container with id == container_id pub async fn create( docker: &'docker Docker, container_id: &str, opts: &ExecContainerOptions, ) -> Result> { #[derive(serde::Deserialize)] #[serde(rename_all = "PascalCase")] struct Response { id: String, } let body: Body = opts.serialize()?.into(); let id = docker .post_json( &format!("/containers/{}/exec", container_id), Some((body, mime::APPLICATION_JSON)), ) .await .map(|resp: Response| resp.id)?; Ok(Exec::new(docker, id)) } // This exists for Container::exec() // // We need to combine `Exec::create` and `Exec::start` into one method because otherwise you // needlessly tie the Stream to the lifetime of `container_id` and `opts`. This is because // `Exec::create` is async so it must occur inside of the `async move` block. However, this // means that `container_id` and `opts` are both expected to be alive in the returned stream // because we can't do the work of creating an endpoint from `container_id` or serializing // `opts`. By doing this work outside of the stream, we get owned values that we can then move // into the stream and have the lifetimes work out as you would expect. // // Yes, it is sad that we can't do the easy method and thus have some duplicated code. pub(crate) fn create_and_start( docker: &'docker Docker, container_id: &str, opts: &ExecContainerOptions, ) -> impl Stream> + Unpin + 'docker { #[derive(serde::Deserialize)] #[serde(rename_all = "PascalCase")] struct Response { id: String, } // To not tie the lifetime of `opts` to the stream, we do the serializing work outside of // the stream. But for backwards compatability, we have to return the error inside of the // stream. let body_result = opts.serialize(); // To not tie the lifetime of `container_id` to the stream, we convert it to an (owned) // endpoint outside of the stream. let container_endpoint = format!("/containers/{}/exec", container_id); Box::pin( async move { // Bubble up the error inside the stream for backwards compatability let body: Body = body_result?.into(); let exec_id = docker .post_json(&container_endpoint, Some((body, mime::APPLICATION_JSON))) .await .map(|resp: Response| resp.id)?; let stream = Box::pin(docker.stream_post( format!("/exec/{}/start", exec_id), Some(("{}".into(), mime::APPLICATION_JSON)), None::>, )); Ok(tty::decode(stream)) } .try_flatten_stream(), ) } /// Get a reference to a set of operations available to an already created exec instance. /// /// It's in callers responsibility to ensure that exec instance with specified id actually /// exists. Use [Exec::create](Exec::create) to ensure that the exec instance is created /// beforehand. pub async fn get( docker: &'docker Docker, id: S, ) -> Exec<'docker> where S: Into, { Exec::new(docker, id) } /// Starts this exec instance returning a multiplexed tty stream pub fn start(&self) -> impl Stream> + 'docker { // We must take ownership of the docker reference to not needlessly tie the stream to the // lifetime of `self`. let docker = self.docker; // We convert `self.id` into the (owned) endpoint outside of the stream to not needlessly // tie the stream to the lifetime of `self`. let endpoint = format!("/exec/{}/start", &self.id); Box::pin( async move { let stream = Box::pin(docker.stream_post( endpoint, Some(("{}".into(), mime::APPLICATION_JSON)), None::>, )); Ok(tty::decode(stream)) } .try_flatten_stream(), ) } /// Inspect this exec instance to aquire detailed information pub async fn inspect(&self) -> Result { self.docker .get_json(&format!("/exec/{}/json", &self.id)[..]) .await } pub async fn resize( &self, opts: &ExecResizeOptions, ) -> Result<()> { let body: Body = opts.serialize()?.into(); self.docker .post_json( &format!("/exec/{}/resize", &self.id)[..], Some((body, mime::APPLICATION_JSON)), ) .await } } /// Interface for docker network pub struct Networks<'docker> { docker: &'docker Docker, } impl<'docker> Networks<'docker> { /// Exports an interface for interacting with docker Networks pub fn new(docker: &'docker Docker) -> Self { Networks { docker } } /// List the docker networks on the current docker host pub async fn list( &self, opts: &NetworkListOptions, ) -> Result> { let mut path = vec!["/networks".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } self.docker.get_json(&path.join("?")).await } /// Returns a reference to a set of operations available to a specific network instance pub fn get( &self, id: S, ) -> Network<'docker> where S: Into, { Network::new(self.docker, id) } /// Create a new Network instance pub async fn create( &self, opts: &NetworkCreateOptions, ) -> Result { let body: Body = opts.serialize()?.into(); let path = vec!["/networks/create".to_owned()]; self.docker .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON))) .await } } /// Interface for accessing and manipulating a docker network pub struct Network<'docker> { docker: &'docker Docker, id: String, } impl<'docker> Network<'docker> { /// Exports an interface exposing operations against a network instance pub fn new( docker: &'docker Docker, id: S, ) -> Self where S: Into, { Network { docker, id: id.into(), } } /// a getter for the Network id pub fn id(&self) -> &str { &self.id } /// Inspects the current docker network instance's details pub async fn inspect(&self) -> Result { self.docker .get_json(&format!("/networks/{}", self.id)[..]) .await } /// Delete the network instance pub async fn delete(&self) -> Result<()> { self.docker .delete(&format!("/networks/{}", self.id)[..]) .await?; Ok(()) } /// Connect container to network pub async fn connect( &self, opts: &ContainerConnectionOptions, ) -> Result<()> { self.do_connection("connect", opts).await } /// Disconnect container to network pub async fn disconnect( &self, opts: &ContainerConnectionOptions, ) -> Result<()> { self.do_connection("disconnect", opts).await } async fn do_connection( &self, segment: &str, opts: &ContainerConnectionOptions, ) -> Result<()> { let body: Body = opts.serialize()?.into(); self.docker .post( &format!("/networks/{}/{}", self.id, segment)[..], Some((body, mime::APPLICATION_JSON)), ) .await?; Ok(()) } } /// Interface for docker volumes pub struct Volumes<'docker> { docker: &'docker Docker, } impl<'docker> Volumes<'docker> { /// Exports an interface for interacting with docker volumes pub fn new(docker: &'docker Docker) -> Self { Volumes { docker } } pub async fn create( &self, opts: &VolumeCreateOptions, ) -> Result { let body: Body = opts.serialize()?.into(); let path = vec!["/volumes/create".to_owned()]; self.docker .post_json(&path.join("?"), Some((body, mime::APPLICATION_JSON))) .await } /// Lists the docker volumes on the current docker host pub async fn list(&self) -> Result> { let path = vec!["/volumes".to_owned()]; let volumes_rep = self.docker.get_json::(&path.join("?")).await?; Ok(match volumes_rep.volumes { Some(volumes) => volumes, None => vec![], }) } /// Returns a reference to a set of operations available for a named volume pub fn get( &self, name: &str, ) -> Volume<'docker> { Volume::new(self.docker, name) } } /// Interface for accessing and manipulating a named docker volume pub struct Volume<'docker> { docker: &'docker Docker, name: String, } impl<'docker> Volume<'docker> { /// Exports an interface for operations that may be performed against a named volume pub fn new( docker: &'docker Docker, name: S, ) -> Self where S: Into, { Volume { docker, name: name.into(), } } /// Deletes a volume pub async fn delete(&self) -> Result<()> { self.docker .delete(&format!("/volumes/{}", self.name)[..]) .await?; Ok(()) } } /// Interface for docker services pub struct Services<'docker> { docker: &'docker Docker, } impl<'docker> Services<'docker> { /// Exports an interface for interacting with docker services pub fn new(docker: &'docker Docker) -> Self { Services { docker } } /// Lists the docker services on the current docker host pub async fn list( &self, opts: &ServiceListOptions, ) -> Result { let mut path = vec!["/services".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } self.docker.get_json::(&path.join("?")).await } /// Returns a reference to a set of operations available for a named service pub fn get( &self, name: &str, ) -> Service<'docker> { Service::new(self.docker, name) } } /// Interface for accessing and manipulating a named docker volume pub struct Service<'docker> { docker: &'docker Docker, name: String, } impl<'docker> Service<'docker> { /// Exports an interface for operations that may be performed against a named service pub fn new( docker: &'docker Docker, name: S, ) -> Self where S: Into, { Service { docker, name: name.into(), } } /// Creates a new service from ServiceOptions pub async fn create( &self, opts: &ServiceOptions, ) -> Result { let body: Body = opts.serialize()?.into(); let path = vec!["/service/create".to_owned()]; let headers = opts .auth_header() .map(|a| iter::once(("X-Registry-Auth", a))); self.docker .post_json_headers( &path.join("?"), Some((body, mime::APPLICATION_JSON)), headers, ) .await } /// Inspects a named service's details pub async fn inspect(&self) -> Result { self.docker .get_json(&format!("/services/{}", self.name)[..]) .await } /// Deletes a service pub async fn delete(&self) -> Result<()> { self.docker .delete_json(&format!("/services/{}", self.name)[..]) .await } /// Returns a stream of logs from a service pub fn logs( &self, opts: &LogsOptions, ) -> impl Stream> + Unpin + 'docker { let mut path = vec![format!("/services/{}/logs", self.name)]; if let Some(query) = opts.serialize() { path.push(query) } let stream = Box::pin(self.docker.stream_get(path.join("?"))); Box::pin(tty::decode(stream)) } } fn get_http_connector() -> HttpConnector { let mut http = HttpConnector::new(); http.enforce_http(false); http } #[cfg(feature = "tls")] fn get_docker_for_tcp(tcp_host_str: String) -> Docker { let http = get_http_connector(); if let Ok(ref certs) = env::var("DOCKER_CERT_PATH") { // fixme: don't unwrap before you know what's in the box // https://github.com/hyperium/hyper/blob/master/src/net.rs#L427-L428 let mut connector = SslConnector::builder(SslMethod::tls()).unwrap(); connector.set_cipher_list("DEFAULT").unwrap(); let cert = &format!("{}/cert.pem", certs); let key = &format!("{}/key.pem", certs); connector .set_certificate_file(&Path::new(cert), SslFiletype::PEM) .unwrap(); connector .set_private_key_file(&Path::new(key), SslFiletype::PEM) .unwrap(); if env::var("DOCKER_TLS_VERIFY").is_ok() { let ca = &format!("{}/ca.pem", certs); connector.set_ca_file(&Path::new(ca)).unwrap(); } // If we are attempting to connec to the docker daemon via tcp // we need to convert the scheme to `https` to let hyper connect. // Otherwise, hyper will reject the connection since it does not // recongnize `tcp` as a valid `http` scheme. let tcp_host_str = if tcp_host_str.contains("tcp://") { tcp_host_str.replace("tcp://", "https://") } else { tcp_host_str }; Docker { transport: Transport::EncryptedTcp { client: Client::builder() .build(HttpsConnector::with_connector(http, connector).unwrap()), host: tcp_host_str, }, } } else { Docker { transport: Transport::Tcp { client: Client::builder().build(http), host: tcp_host_str, }, } } } #[cfg(not(feature = "tls"))] fn get_docker_for_tcp(tcp_host_str: String) -> Docker { let http = get_http_connector(); Docker { transport: Transport::Tcp { client: Client::builder().build(http), host: tcp_host_str, }, } } // https://docs.docker.com/reference/api/docker_remote_api_v1.17/ impl Docker { /// constructs a new Docker instance for a docker host listening at a url specified by an env var `DOCKER_HOST`, /// falling back on unix:///var/run/docker.sock pub fn new() -> Docker { match env::var("DOCKER_HOST").ok() { Some(host) => { #[cfg(feature = "unix-socket")] if let Some(path) = host.strip_prefix("unix://") { return Docker::unix(path); } let host = host.parse().expect("invalid url"); Docker::host(host) } #[cfg(feature = "unix-socket")] None => Docker::unix("/var/run/docker.sock"), #[cfg(not(feature = "unix-socket"))] None => panic!("Unix socket support is disabled"), } } /// Creates a new docker instance for a docker host /// listening on a given Unix socket. #[cfg(feature = "unix-socket")] pub fn unix(socket_path: S) -> Docker where S: Into, { Docker { transport: Transport::Unix { client: Client::builder() .pool_max_idle_per_host(0) .build(UnixConnector), path: socket_path.into(), }, } } /// constructs a new Docker instance for docker host listening at the given host url pub fn host(host: Uri) -> Docker { let tcp_host_str = format!( "{}://{}:{}", host.scheme_str().unwrap(), host.host().unwrap().to_owned(), host.port_u16().unwrap_or(80) ); match host.scheme_str() { #[cfg(feature = "unix-socket")] Some("unix") => Docker { transport: Transport::Unix { client: Client::builder().build(UnixConnector), path: host.path().to_owned(), }, }, #[cfg(not(feature = "unix-socket"))] Some("unix") => panic!("Unix socket support is disabled"), _ => get_docker_for_tcp(tcp_host_str), } } /// Exports an interface for interacting with docker images pub fn images(&'_ self) -> Images<'_> { Images::new(self) } /// Exports an interface for interacting with docker containers pub fn containers(&'_ self) -> Containers<'_> { Containers::new(self) } /// Exports an interface for interacting with docker services pub fn services(&'_ self) -> Services<'_> { Services::new(self) } pub fn networks(&'_ self) -> Networks<'_> { Networks::new(self) } pub fn volumes(&'_ self) -> Volumes<'_> { Volumes::new(self) } /// Returns version information associated with the docker daemon pub async fn version(&self) -> Result { self.get_json("/version").await } /// Returns information associated with the docker daemon pub async fn info(&self) -> Result { self.get_json("/info").await } /// Returns a simple ping response indicating the docker daemon is accessible pub async fn ping(&self) -> Result { self.get("/_ping").await } /// Returns a stream of docker events pub fn events<'docker>( &'docker self, opts: &EventsOptions, ) -> impl Stream> + Unpin + 'docker { let mut path = vec!["/events".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } let reader = Box::pin( self.stream_get(path.join("?")) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), ) .into_async_read(); let codec = futures_codec::LinesCodec {}; Box::pin( futures_codec::FramedRead::new(reader, codec) .map_err(Error::IO) .and_then(|s: String| async move { serde_json::from_str(&s).map_err(Error::SerdeJsonError) }), ) } // // Utility functions to make requests // async fn get( &self, endpoint: &str, ) -> Result { self.transport .request(Method::GET, endpoint, Payload::None, Headers::None) .await } async fn get_json( &self, endpoint: &str, ) -> Result { let raw_string = self .transport .request(Method::GET, endpoint, Payload::None, Headers::None) .await?; Ok(serde_json::from_str::(&raw_string)?) } async fn post( &self, endpoint: &str, body: Option<(Body, Mime)>, ) -> Result { self.transport .request(Method::POST, endpoint, body, Headers::None) .await } async fn put( &self, endpoint: &str, body: Option<(Body, Mime)>, ) -> Result { self.transport .request(Method::PUT, endpoint, body, Headers::None) .await } async fn post_json( &self, endpoint: impl AsRef, body: Option<(B, Mime)>, ) -> Result where T: serde::de::DeserializeOwned, B: Into, { let string = self .transport .request(Method::POST, endpoint, body, Headers::None) .await?; Ok(serde_json::from_str::(&string)?) } async fn post_json_headers<'a, T, B, H>( &self, endpoint: impl AsRef, body: Option<(B, Mime)>, headers: Option, ) -> Result where T: serde::de::DeserializeOwned, B: Into, H: IntoIterator + 'a, { let string = self .transport .request(Method::POST, endpoint, body, headers) .await?; Ok(serde_json::from_str::(&string)?) } async fn delete( &self, endpoint: &str, ) -> Result { self.transport .request(Method::DELETE, endpoint, Payload::None, Headers::None) .await } async fn delete_json( &self, endpoint: &str, ) -> Result { let string = self .transport .request(Method::DELETE, endpoint, Payload::None, Headers::None) .await?; Ok(serde_json::from_str::(&string)?) } /// Send a streaming post request. /// /// Use stream_post_into_values if the endpoint returns JSON values fn stream_post<'a, H>( &'a self, endpoint: impl AsRef + 'a, body: Option<(Body, Mime)>, headers: Option, ) -> impl Stream> + 'a where H: IntoIterator + 'a, { self.transport .stream_chunks(Method::POST, endpoint, body, headers) } /// Send a streaming post request that returns a stream of JSON values /// /// Assumes that each received chunk contains one or more JSON values fn stream_post_into_values<'a, H>( &'a self, endpoint: impl AsRef + 'a, body: Option<(Body, Mime)>, headers: Option, ) -> impl Stream> + 'a where H: IntoIterator + 'a, { self.stream_post(endpoint, body, headers) .and_then(|chunk| async move { let stream = futures_util::stream::iter( serde_json::Deserializer::from_slice(&chunk) .into_iter() .collect::>(), ) .map_err(Error::from); Ok(stream) }) .try_flatten() } fn stream_get<'a>( &'a self, endpoint: impl AsRef + Unpin + 'a, ) -> impl Stream> + 'a { let headers = Some(Vec::default()); self.transport .stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers) } async fn stream_post_upgrade<'a>( &'a self, endpoint: impl AsRef + 'a, body: Option<(Body, Mime)>, ) -> Result { self.transport .stream_upgrade(Method::POST, endpoint, body) .await } } impl Default for Docker { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { #[cfg(feature = "unix-socket")] #[test] fn unix_host_env() { use super::Docker; use std::env; env::set_var("DOCKER_HOST", "unix:///docker.sock"); let d = Docker::new(); match d.transport { crate::transport::Transport::Unix { path, .. } => { assert_eq!(path, "/docker.sock"); } _ => { panic!("Expected transport to be unix."); } } env::set_var("DOCKER_HOST", "http://localhost:8000"); let d = Docker::new(); match d.transport { crate::transport::Transport::Tcp { host, .. } => { assert_eq!(host, "http://localhost:8000"); } _ => { panic!("Expected transport to be http."); } } } }