summaryrefslogtreecommitdiffstats
path: root/src/docker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/docker.rs')
-rw-r--r--src/docker.rs651
1 files changed, 651 insertions, 0 deletions
diff --git a/src/docker.rs b/src/docker.rs
new file mode 100644
index 0000000..1b7f603
--- /dev/null
+++ b/src/docker.rs
@@ -0,0 +1,651 @@
+//! Main entrypoint for interacting with the Docker API.
+//!
+//! API Reference: <https://docs.docker.com/engine/api/v1.41/>
+
+use std::{collections::HashMap, env, io, path::Path};
+
+use futures_util::{stream::Stream, TryStreamExt};
+use hyper::{client::HttpConnector, Body, Client, Method};
+use mime::Mime;
+use serde::{de, Deserialize, Serialize};
+use url::form_urlencoded;
+
+use crate::{
+ container::Containers,
+ errors::{Error, Result},
+ image::Images,
+ network::Networks,
+ service::Services,
+ transport::{Headers, Payload, Transport},
+ volume::Volumes,
+ Uri,
+};
+
+#[cfg(feature = "chrono")]
+use crate::datetime::{datetime_from_nano_timestamp, datetime_from_unix_timestamp};
+#[cfg(feature = "chrono")]
+use chrono::{DateTime, Utc};
+
+#[cfg(feature = "tls")]
+use hyper_openssl::HttpsConnector;
+#[cfg(feature = "tls")]
+use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
+
+#[cfg(feature = "unix-socket")]
+use hyperlocal::UnixConnector;
+
+/// Entrypoint interface for communicating with docker daemon
+#[derive(Clone)]
+pub struct Docker {
+ transport: Transport,
+}
+
+fn get_http_connector() -> HttpConnector {
+ let mut http = HttpConnector::new();
+ http.enforce_http(false);
+
+ http
+}
+
+#[cfg(feature = "tls")]
+fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
+ let http = get_http_connector();
+ if let Ok(ref certs) = env::var("DOCKER_CERT_PATH") {
+ // fixme: don't unwrap before you know what's in the box
+ // https://github.com/hyperium/hyper/blob/master/src/net.rs#L427-L428
+ let mut connector = SslConnector::builder(SslMethod::tls()).unwrap();
+ connector.set_cipher_list("DEFAULT").unwrap();
+ let cert = &format!("{}/cert.pem", certs);
+ let key = &format!("{}/key.pem", certs);
+ connector
+ .set_certificate_file(&Path::new(cert), SslFiletype::PEM)
+ .unwrap();
+ connector
+ .set_private_key_file(&Path::new(key), SslFiletype::PEM)
+ .unwrap();
+ if env::var("DOCKER_TLS_VERIFY").is_ok() {
+ let ca = &format!("{}/ca.pem", certs);
+ connector.set_ca_file(&Path::new(ca)).unwrap();
+ }
+
+ // If we are attempting to connec to the docker daemon via tcp
+ // we need to convert the scheme to `https` to let hyper connect.
+ // Otherwise, hyper will reject the connection since it does not
+ // recongnize `tcp` as a valid `http` scheme.
+ let tcp_host_str = if tcp_host_str.contains("tcp://") {
+ tcp_host_str.replace("tcp://", "https://")
+ } else {
+ tcp_host_str
+ };
+
+ Docker {
+ transport: Transport::EncryptedTcp {
+ client: Client::builder()
+ .build(HttpsConnector::with_connector(http, connector).unwrap()),
+ host: tcp_host_str,
+ },
+ }
+ } else {
+ Docker {
+ transport: Transport::Tcp {
+ client: Client::builder().build(http),
+ host: tcp_host_str,
+ },
+ }
+ }
+}
+
+#[cfg(not(feature = "tls"))]
+fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
+ let http = get_http_connector();
+ Docker {
+ transport: Transport::Tcp {
+ client: Client::builder().build(http),
+ host: tcp_host_str,
+ },
+ }
+}
+
+// https://docs.docker.com/reference/api/docker_remote_api_v1.17/
+impl Docker {
+ /// constructs a new Docker instance for a docker host listening at a url specified by an env var `DOCKER_HOST`,
+ /// falling back on unix:///var/run/docker.sock
+ pub fn new() -> Docker {
+ match env::var("DOCKER_HOST").ok() {
+ Some(host) => {
+ #[cfg(feature = "unix-socket")]
+ if let Some(path) = host.strip_prefix("unix://") {
+ return Docker::unix(path);
+ }
+ let host = host.parse().expect("invalid url");
+ Docker::host(host)
+ }
+ #[cfg(feature = "unix-socket")]
+ None => Docker::unix("/var/run/docker.sock"),
+ #[cfg(not(feature = "unix-socket"))]
+ None => panic!("Unix socket support is disabled"),
+ }
+ }
+
+ /// Creates a new docker instance for a docker host
+ /// listening on a given Unix socket.
+ #[cfg(feature = "unix-socket")]
+ pub fn unix<S>(socket_path: S) -> Docker
+ where
+ S: Into<String>,
+ {
+ Docker {
+ transport: Transport::Unix {
+ client: Client::builder()
+ .pool_max_idle_per_host(0)
+ .build(UnixConnector),
+ path: socket_path.into(),
+ },
+ }
+ }
+
+ /// constructs a new Docker instance for docker host listening at the given host url
+ pub fn host(host: Uri) -> Docker {
+ let tcp_host_str = format!(
+ "{}://{}:{}",
+ host.scheme_str().unwrap(),
+ host.host().unwrap().to_owned(),
+ host.port_u16().unwrap_or(80)
+ );
+
+ match host.scheme_str() {
+ #[cfg(feature = "unix-socket")]
+ Some("unix") => Docker {
+ transport: Transport::Unix {
+ client: Client::builder().build(UnixConnector),
+ path: host.path().to_owned(),
+ },
+ },
+
+ #[cfg(not(feature = "unix-socket"))]
+ Some("unix") => panic!("Unix socket support is disabled"),
+
+ _ => get_docker_for_tcp(tcp_host_str),
+ }
+ }
+
+ /// Exports an interface for interacting with docker images
+ pub fn images(&'_ self) -> Images<'_> {
+ Images::new(self)
+ }
+
+ /// Exports an interface for interacting with docker containers
+ pub fn containers(&'_ self) -> Containers<'_> {
+ Containers::new(self)
+ }
+
+ /// Exports an interface for interacting with docker services
+ pub fn services(&'_ self) -> Services<'_> {
+ Services::new(self)
+ }
+
+ pub fn networks(&'_ self) -> Networks<'_> {
+ Networks::new(self)
+ }
+
+ pub fn volumes(&'_ self) -> Volumes<'_> {
+ Volumes::new(self)
+ }
+
+ /// Returns version information associated with the docker daemon
+ pub async fn version(&self) -> Result<Version> {
+ self.get_json("/version").await
+ }
+
+ /// Returns information associated with the docker daemon
+ pub async fn info(&self) -> Result<Info> {
+ self.get_json("/info").await
+ }
+
+ /// Returns a simple ping response indicating the docker daemon is accessible
+ pub async fn ping(&self) -> Result<String> {
+ self.get("/_ping").await
+ }
+
+ /// Returns a stream of docker events
+ pub fn events<'docker>(
+ &'docker self,
+ opts: &EventsOptions,
+ ) -> impl Stream<Item = Result<Event>> + Unpin + 'docker {
+ let mut path = vec!["/events".to_owned()];
+ if let Some(query) = opts.serialize() {
+ path.push(query);
+ }
+ let reader = Box::pin(
+ self.stream_get(path.join("?"))
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
+ )
+ .into_async_read();
+
+ let codec = futures_codec::LinesCodec {};
+
+ Box::pin(
+ futures_codec::FramedRead::new(reader, codec)
+ .map_err(Error::IO)
+ .and_then(|s: String| async move {
+ serde_json::from_str(&s).map_err(Error::SerdeJsonError)
+ }),
+ )
+ }
+
+ //
+ // Utility functions to make requests
+ //
+
+ pub(crate) async fn get(
+ &self,
+ endpoint: &str,
+ ) -> Result<String> {
+ self.transport
+ .request(Method::GET, endpoint, Payload::None, Headers::None)
+ .await
+ }
+
+ pub(crate) async fn get_json<T: serde::de::DeserializeOwned>(
+ &self,
+ endpoint: &str,
+ ) -> Result<T> {
+ let raw_string = self
+ .transport
+ .request(Method::GET, endpoint, Payload::None, Headers::None)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&raw_string)?)
+ }
+
+ pub(crate) async fn post(
+ &self,
+ endpoint: &str,
+ body: Option<(Body, Mime)>,
+ ) -> Result<String> {
+ self.transport
+ .request(Method::POST, endpoint, body, Headers::None)
+ .await
+ }
+
+ pub(crate) async fn put(
+ &self,
+ endpoint: &str,
+ body: Option<(Body, Mime)>,
+ ) -> Result<String> {
+ self.transport
+ .request(Method::PUT, endpoint, body, Headers::None)
+ .await
+ }
+
+ pub(crate) async fn post_json<T, B>(
+ &self,
+ endpoint: impl AsRef<str>,
+ body: Option<(B, Mime)>,
+ ) -> Result<T>
+ where
+ T: serde::de::DeserializeOwned,
+ B: Into<Body>,
+ {
+ let string = self
+ .transport
+ .request(Method::POST, endpoint, body, Headers::None)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&string)?)
+ }
+
+ pub(crate) async fn post_json_headers<'a, T, B, H>(
+ &self,
+ endpoint: impl AsRef<str>,
+ body: Option<(B, Mime)>,
+ headers: Option<H>,
+ ) -> Result<T>
+ where
+ T: serde::de::DeserializeOwned,
+ B: Into<Body>,
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
+ {
+ let string = self
+ .transport
+ .request(Method::POST, endpoint, body, headers)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&string)?)
+ }
+
+ pub(crate) async fn delete(
+ &self,
+ endpoint: &str,
+ ) -> Result<String> {
+ self.transport
+ .request(Method::DELETE, endpoint, Payload::None, Headers::None)
+ .await
+ }
+
+ pub(crate) async fn delete_json<T: serde::de::DeserializeOwned>(
+ &self,
+ endpoint: &str,
+ ) -> Result<T> {
+ let string = self
+ .transport
+ .request(Method::DELETE, endpoint, Payload::None, Headers::None)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&string)?)
+ }
+
+ /// Send a streaming post request.
+ ///
+ /// Use stream_post_into_values if the endpoint returns JSON values
+ pub(crate) fn stream_post<'a, H>(
+ &'a self,
+ endpoint: impl AsRef<str> + 'a,
+ body: Option<(Body, Mime)>,
+ headers: Option<H>,
+ ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a
+ where
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
+ {
+ self.transport
+ .stream_chunks(Method::POST, endpoint, body, headers)
+ }
+
+ /// Send a streaming post request that returns a stream of JSON values
+ ///
+ /// Assumes that each received chunk contains one or more JSON values
+ pub(crate) fn stream_post_into<'a, H, T>(
+ &'a self,
+ endpoint: impl AsRef<str> + 'a,
+ body: Option<(Body, Mime)>,
+ headers: Option<H>,
+ ) -> impl Stream<Item = Result<T>> + 'a
+ where
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
+ T: de::DeserializeOwned,
+ {
+ self.stream_post(endpoint, body, headers)
+ .and_then(|chunk| async move {
+ let stream = futures_util::stream::iter(
+ serde_json::Deserializer::from_slice(&chunk)
+ .into_iter()
+ .collect::<Vec<_>>(),
+ )
+ .map_err(Error::from);
+
+ Ok(stream)
+ })
+ .try_flatten()
+ }
+
+ pub(crate) fn stream_get<'a>(
+ &'a self,
+ endpoint: impl AsRef<str> + Unpin + 'a,
+ ) -> impl Stream<Item = Result<hyper::body::Bytes>> + 'a {
+ let headers = Some(Vec::default());
+ self.transport
+ .stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers)
+ }
+
+ pub(crate) async fn stream_post_upgrade<'a>(
+ &'a self,
+ endpoint: impl AsRef<str> + 'a,
+ body: Option<(Body, Mime)>,
+ ) -> Result<impl futures_util::io::AsyncRead + futures_util::io::AsyncWrite + 'a> {
+ self.transport
+ .stream_upgrade(Method::POST, endpoint, body)
+ .await
+ }
+}
+
+impl Default for Docker {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Options for filtering streams of Docker events
+#[derive(Default, Debug)]
+pub struct EventsOptions {
+ params: HashMap<&'static str, String>,
+}
+
+impl EventsOptions {
+ pub fn builder() -> EventsOptionsBuilder {
+ EventsOptionsBuilder::default()
+ }
+
+ /// serialize options as a string. returns None if no options are defined
+ pub fn serialize(&self) -> Option<String> {
+ if self.params.is_empty() {
+ None
+ } else {
+ Some(
+ form_urlencoded::Serializer::new(String::new())
+ .extend_pairs(&self.params)
+ .finish(),
+ )
+ }
+ }
+}
+
+#[derive(Copy, Clone)]
+pub enum EventFilterType {
+ Container,
+ Image,
+ Volume,
+ Network,
+ Daemon,
+}
+
+fn event_filter_type_to_string(filter: EventFilterType) -> &'static str {
+ match filter {
+ EventFilterType::Container => "container",
+ EventFilterType::Image => "image",
+ EventFilterType::Volume => "volume",
+ EventFilterType::Network => "network",
+ EventFilterType::Daemon => "daemon",
+ }
+}
+
+/// Filter options for image listings
+pub enum EventFilter {
+ Container(String),
+ Event(String),
+ Image(String),
+ Label(String),
+ Type(EventFilterType),
+ Volume(String),
+ Network(String),
+ Daemon(String),
+}
+
+/// Builder interface for `EventOptions`
+#[derive(Default)]
+pub struct EventsOptionsBuilder {
+ params: HashMap<&'static str, String>,
+ events: Vec<String>,
+ containers: Vec<String>,
+ images: Vec<String>,
+ labels: Vec<String>,
+ volumes: Vec<String>,
+ networks: Vec<String>,
+ daemons: Vec<String>,
+ types: Vec<String>,
+}
+
+impl EventsOptionsBuilder {
+ /// Filter events since a given timestamp
+ pub fn since(
+ &mut self,
+ ts: &u64,
+ ) -> &mut Self {
+ self.params.insert("since", ts.to_string());
+ self
+ }
+
+ /// Filter events until a given timestamp
+ pub fn until(
+ &mut self,
+ ts: &u64,
+ ) -> &mut Self {
+ self.params.insert("until", ts.to_string());
+ self
+ }
+
+ pub fn filter(
+ &mut self,
+ filters: Vec<EventFilter>,
+ ) -> &mut Self {
+ let mut params = HashMap::new();
+ for f in filters {
+ match f {
+ EventFilter::Container(n) => {
+ self.containers.push(n);
+ params.insert("container", self.containers.clone())
+ }
+ EventFilter::Event(n) => {
+ self.events.push(n);
+ params.insert("event", self.events.clone())
+ }
+ EventFilter::Image(n) => {
+ self.images.push(n);
+ params.insert("image", self.images.clone())
+ }
+ EventFilter::Label(n) => {
+ self.labels.push(n);
+ params.insert("label", self.labels.clone())
+ }
+ EventFilter::Volume(n) => {
+ self.volumes.push(n);
+ params.insert("volume", self.volumes.clone())
+ }
+ EventFilter::Network(n) => {
+ self.networks.push(n);
+ params.insert("network", self.networks.clone())
+ }
+ EventFilter::Daemon(n) => {
+ self.daemons.push(n);
+ params.insert("daemon", self.daemons.clone())
+ }
+ EventFilter::Type(n) => {
+ let event_type = event_filter_type_to_string(n).to_string();
+ self.types.push(event_type);
+ params.insert("type", self.types.clone())
+ }
+ };
+ }
+ self.params
+ .insert("filters", serde_json::to_string(&params).unwrap());
+ self
+ }
+
+ pub fn build(&self) -> EventsOptions {
+ EventsOptions {
+ params: self.params.clone(),
+ }
+ }
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct Version {
+ pub version: String,
+ pub api_version: String,
+ pub git_commit: String,
+ pub go_version: String,
+ pub os: String,
+ pub arch: String,
+ pub kernel_version: String,
+ #[cfg(feature = "chrono")]
+ pub build_time: DateTime<Utc>,
+ #[cfg(not(feature = "chrono"))]
+ pub build_time: String,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct Info {
+ pub containers: u64,
+ pub images: u64,
+ pub driver: String,
+ pub docker_root_dir: String,
+ pub driver_status: Vec<Vec<String>>,
+ #[serde(rename = "ID")]
+ pub id: String,
+ pub kernel_version: String,
+ // pub Labels: Option<???>,
+ pub mem_total: u64,
+ pub memory_limit: bool,
+ #[serde(rename = "NCPU")]
+ pub n_cpu: u64,
+ pub n_events_listener: u64,
+ pub n_goroutines: u64,
+ pub name: String,
+ pub operating_system: String,
+ // pub RegistryConfig:???
+ pub swap_limit: bool,
+ pub system_time: Option<String>,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct Event {
+ #[serde(rename = "Type")]
+ pub typ: String,
+ #[serde(rename = "Action")]
+ pub action: String,
+ #[serde(rename = "Actor")]
+ pub actor: Actor,
+ pub status: Option<String>,
+ pub id: Option<String>,
+ pub from: Option<String>,
+ #[cfg(feature = "chrono")]
+ #[serde(deserialize_with = "datetime_from_unix_timestamp")]
+ pub time: DateTime<Utc>,
+ #[cfg(not(feature = "chrono"))]
+ pub time: u64,
+ #[cfg(feature = "chrono")]
+ #[serde(deserialize_with = "datetime_from_nano_timestamp", rename = "timeNano")]
+ pub time_nano: DateTime<Utc>,
+ #[cfg(not(feature = "chrono"))]
+ #[serde(rename = "timeNano")]
+ pub time_nano: u64,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct Actor {
+ #[serde(rename = "ID")]
+ pub id: String,
+ #[serde(rename = "Attributes")]
+ pub attributes: HashMap<String, String>,
+}
+
+#[cfg(test)]
+mod tests {
+ #[cfg(feature = "unix-socket")]
+ #[test]
+ fn unix_host_env() {
+ use super::Docker;
+ use std::env;
+ env::set_var("DOCKER_HOST", "unix:///docker.sock");
+ let d = Docker::new();
+ match d.transport {
+ crate::transport::Transport::Unix { path, .. } => {
+ assert_eq!(path, "/docker.sock");
+ }
+ _ => {
+ panic!("Expected transport to be unix.");
+ }
+ }
+ env::set_var("DOCKER_HOST", "http://localhost:8000");
+ let d = Docker::new();
+ match d.transport {
+ crate::transport::Transport::Tcp { host, .. } => {
+ assert_eq!(host, "http://localhost:8000");
+ }
+ _ => {
+ panic!("Expected transport to be http.");
+ }
+ }
+ }
+}