//! Main entrypoint for interacting with the Docker API. //! //! API Reference: use std::{collections::HashMap, env, io, path::Path}; use futures_util::{stream::Stream, TryStreamExt}; use hyper::{client::HttpConnector, Body, Client, Method}; use mime::Mime; use serde::{de, Deserialize, Serialize}; use url::form_urlencoded; use crate::{ container::Containers, errors::{Error, Result}, image::Images, network::Networks, service::Services, transport::{Headers, Payload, Transport}, volume::Volumes, Uri, }; #[cfg(feature = "chrono")] use crate::datetime::{datetime_from_nano_timestamp, datetime_from_unix_timestamp}; #[cfg(feature = "chrono")] use chrono::{DateTime, Utc}; #[cfg(feature = "tls")] use hyper_openssl::HttpsConnector; #[cfg(feature = "tls")] use openssl::ssl::{SslConnector, SslFiletype, SslMethod}; #[cfg(feature = "unix-socket")] use hyperlocal::UnixConnector; /// Entrypoint interface for communicating with docker daemon #[derive(Clone)] pub struct Docker { transport: Transport, } fn get_http_connector() -> HttpConnector { let mut http = HttpConnector::new(); http.enforce_http(false); http } #[cfg(feature = "tls")] fn get_docker_for_tcp(tcp_host_str: String) -> Docker { let http = get_http_connector(); if let Ok(ref certs) = env::var("DOCKER_CERT_PATH") { // fixme: don't unwrap before you know what's in the box // https://github.com/hyperium/hyper/blob/master/src/net.rs#L427-L428 let mut connector = SslConnector::builder(SslMethod::tls()).unwrap(); connector.set_cipher_list("DEFAULT").unwrap(); let cert = &format!("{}/cert.pem", certs); let key = &format!("{}/key.pem", certs); connector .set_certificate_file(&Path::new(cert), SslFiletype::PEM) .unwrap(); connector .set_private_key_file(&Path::new(key), SslFiletype::PEM) .unwrap(); if env::var("DOCKER_TLS_VERIFY").is_ok() { let ca = &format!("{}/ca.pem", certs); connector.set_ca_file(&Path::new(ca)).unwrap(); } // If we are attempting to connec to the docker daemon via tcp // we need to convert the scheme to `https` to let hyper connect. // Otherwise, hyper will reject the connection since it does not // recongnize `tcp` as a valid `http` scheme. let tcp_host_str = if tcp_host_str.contains("tcp://") { tcp_host_str.replace("tcp://", "https://") } else { tcp_host_str }; Docker { transport: Transport::EncryptedTcp { client: Client::builder() .build(HttpsConnector::with_connector(http, connector).unwrap()), host: tcp_host_str, }, } } else { Docker { transport: Transport::Tcp { client: Client::builder().build(http), host: tcp_host_str, }, } } } #[cfg(not(feature = "tls"))] fn get_docker_for_tcp(tcp_host_str: String) -> Docker { let http = get_http_connector(); Docker { transport: Transport::Tcp { client: Client::builder().build(http), host: tcp_host_str, }, } } // https://docs.docker.com/reference/api/docker_remote_api_v1.17/ impl Docker { /// constructs a new Docker instance for a docker host listening at a url specified by an env var `DOCKER_HOST`, /// falling back on unix:///var/run/docker.sock pub fn new() -> Docker { match env::var("DOCKER_HOST").ok() { Some(host) => { #[cfg(feature = "unix-socket")] if let Some(path) = host.strip_prefix("unix://") { return Docker::unix(path); } let host = host.parse().expect("invalid url"); Docker::host(host) } #[cfg(feature = "unix-socket")] None => Docker::unix("/var/run/docker.sock"), #[cfg(not(feature = "unix-socket"))] None => panic!("Unix socket support is disabled"), } } /// Creates a new docker instance for a docker host /// listening on a given Unix socket. #[cfg(feature = "unix-socket")] pub fn unix(socket_path: S) -> Docker where S: Into, { Docker { transport: Transport::Unix { client: Client::builder() .pool_max_idle_per_host(0) .build(UnixConnector), path: socket_path.into(), }, } } /// constructs a new Docker instance for docker host listening at the given host url pub fn host(host: Uri) -> Docker { let tcp_host_str = format!( "{}://{}:{}", host.scheme_str().unwrap(), host.host().unwrap().to_owned(), host.port_u16().unwrap_or(80) ); match host.scheme_str() { #[cfg(feature = "unix-socket")] Some("unix") => Docker { transport: Transport::Unix { client: Client::builder().build(UnixConnector), path: host.path().to_owned(), }, }, #[cfg(not(feature = "unix-socket"))] Some("unix") => panic!("Unix socket support is disabled"), _ => get_docker_for_tcp(tcp_host_str), } } /// Exports an interface for interacting with docker images pub fn images(&'_ self) -> Images<'_> { Images::new(self) } /// Exports an interface for interacting with docker containers pub fn containers(&'_ self) -> Containers<'_> { Containers::new(self) } /// Exports an interface for interacting with docker services pub fn services(&'_ self) -> Services<'_> { Services::new(self) } pub fn networks(&'_ self) -> Networks<'_> { Networks::new(self) } pub fn volumes(&'_ self) -> Volumes<'_> { Volumes::new(self) } /// Returns version information associated with the docker daemon pub async fn version(&self) -> Result { self.get_json("/version").await } /// Returns information associated with the docker daemon pub async fn info(&self) -> Result { self.get_json("/info").await } /// Returns a simple ping response indicating the docker daemon is accessible pub async fn ping(&self) -> Result { self.get("/_ping").await } /// Returns a stream of docker events pub fn events<'docker>( &'docker self, opts: &EventsOptions, ) -> impl Stream> + Unpin + 'docker { let mut path = vec!["/events".to_owned()]; if let Some(query) = opts.serialize() { path.push(query); } let reader = Box::pin( self.stream_get(path.join("?")) .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), ) .into_async_read(); let codec = futures_codec::LinesCodec {}; Box::pin( futures_codec::FramedRead::new(reader, codec) .map_err(Error::IO) .and_then(|s: String| async move { serde_json::from_str(&s).map_err(Error::SerdeJsonError) }), ) } // // Utility functions to make requests // pub(crate) async fn get( &self, endpoint: &str, ) -> Result { self.transport .request(Method::GET, endpoint, Payload::None, Headers::None) .await } pub(crate) async fn get_json( &self, endpoint: &str, ) -> Result { let raw_string = self .transport .request(Method::GET, endpoint, Payload::None, Headers::None) .await?; Ok(serde_json::from_str::(&raw_string)?) } pub(crate) async fn post( &self, endpoint: &str, body: Option<(Body, Mime)>, ) -> Result { self.transport .request(Method::POST, endpoint, body, Headers::None) .await } pub(crate) async fn put( &self, endpoint: &str, body: Option<(Body, Mime)>, ) -> Result { self.transport .request(Method::PUT, endpoint, body, Headers::None) .await } pub(crate) async fn post_json( &self, endpoint: impl AsRef, body: Option<(B, Mime)>, ) -> Result where T: serde::de::DeserializeOwned, B: Into, { let string = self .transport .request(Method::POST, endpoint, body, Headers::None) .await?; Ok(serde_json::from_str::(&string)?) } pub(crate) async fn post_json_headers<'a, T, B, H>( &self, endpoint: impl AsRef, body: Option<(B, Mime)>, headers: Option, ) -> Result where T: serde::de::DeserializeOwned, B: Into, H: IntoIterator + 'a, { let string = self .transport .request(Method::POST, endpoint, body, headers) .await?; Ok(serde_json::from_str::(&string)?) } pub(crate) async fn delete( &self, endpoint: &str, ) -> Result { self.transport .request(Method::DELETE, endpoint, Payload::None, Headers::None) .await } pub(crate) async fn delete_json( &self, endpoint: &str, ) -> Result { let string = self .transport .request(Method::DELETE, endpoint, Payload::None, Headers::None) .await?; Ok(serde_json::from_str::(&string)?) } /// Send a streaming post request. /// /// Use stream_post_into_values if the endpoint returns JSON values pub(crate) fn stream_post<'a, H>( &'a self, endpoint: impl AsRef + 'a, body: Option<(Body, Mime)>, headers: Option, ) -> impl Stream> + 'a where H: IntoIterator + 'a, { self.transport .stream_chunks(Method::POST, endpoint, body, headers) } /// Send a streaming post request that returns a stream of JSON values /// /// Assumes that each received chunk contains one or more JSON values pub(crate) fn stream_post_into<'a, H, T>( &'a self, endpoint: impl AsRef + 'a, body: Option<(Body, Mime)>, headers: Option, ) -> impl Stream> + 'a where H: IntoIterator + 'a, T: de::DeserializeOwned, { self.stream_post(endpoint, body, headers) .and_then(|chunk| async move { let stream = futures_util::stream::iter( serde_json::Deserializer::from_slice(&chunk) .into_iter() .collect::>(), ) .map_err(Error::from); Ok(stream) }) .try_flatten() } pub(crate) fn stream_get<'a>( &'a self, endpoint: impl AsRef + Unpin + 'a, ) -> impl Stream> + 'a { let headers = Some(Vec::default()); self.transport .stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers) } pub(crate) async fn stream_post_upgrade<'a>( &'a self, endpoint: impl AsRef + 'a, body: Option<(Body, Mime)>, ) -> Result { self.transport .stream_upgrade(Method::POST, endpoint, body) .await } } impl Default for Docker { fn default() -> Self { Self::new() } } /// Options for filtering streams of Docker events #[derive(Default, Debug)] pub struct EventsOptions { params: HashMap<&'static str, String>, } impl EventsOptions { pub fn builder() -> EventsOptionsBuilder { EventsOptionsBuilder::default() } /// serialize options as a string. returns None if no options are defined pub fn serialize(&self) -> Option { if self.params.is_empty() { None } else { Some( form_urlencoded::Serializer::new(String::new()) .extend_pairs(&self.params) .finish(), ) } } } #[derive(Copy, Clone)] pub enum EventFilterType { Container, Image, Volume, Network, Daemon, } fn event_filter_type_to_string(filter: EventFilterType) -> &'static str { match filter { EventFilterType::Container => "container", EventFilterType::Image => "image", EventFilterType::Volume => "volume", EventFilterType::Network => "network", EventFilterType::Daemon => "daemon", } } /// Filter options for image listings pub enum EventFilter { Container(String), Event(String), Image(String), Label(String), Type(EventFilterType), Volume(String), Network(String), Daemon(String), } /// Builder interface for `EventOptions` #[derive(Default)] pub struct EventsOptionsBuilder { params: HashMap<&'static str, String>, events: Vec, containers: Vec, images: Vec, labels: Vec, volumes: Vec, networks: Vec, daemons: Vec, types: Vec, } impl EventsOptionsBuilder { /// Filter events since a given timestamp pub fn since( &mut self, ts: &u64, ) -> &mut Self { self.params.insert("since", ts.to_string()); self } /// Filter events until a given timestamp pub fn until( &mut self, ts: &u64, ) -> &mut Self { self.params.insert("until", ts.to_string()); self } pub fn filter( &mut self, filters: Vec, ) -> &mut Self { let mut params = HashMap::new(); for f in filters { match f { EventFilter::Container(n) => { self.containers.push(n); params.insert("container", self.containers.clone()) } EventFilter::Event(n) => { self.events.push(n); params.insert("event", self.events.clone()) } EventFilter::Image(n) => { self.images.push(n); params.insert("image", self.images.clone()) } EventFilter::Label(n) => { self.labels.push(n); params.insert("label", self.labels.clone()) } EventFilter::Volume(n) => { self.volumes.push(n); params.insert("volume", self.volumes.clone()) } EventFilter::Network(n) => { self.networks.push(n); params.insert("network", self.networks.clone()) } EventFilter::Daemon(n) => { self.daemons.push(n); params.insert("daemon", self.daemons.clone()) } EventFilter::Type(n) => { let event_type = event_filter_type_to_string(n).to_string(); self.types.push(event_type); params.insert("type", self.types.clone()) } }; } self.params .insert("filters", serde_json::to_string(¶ms).unwrap()); self } pub fn build(&self) -> EventsOptions { EventsOptions { params: self.params.clone(), } } } #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "PascalCase")] pub struct Version { pub version: String, pub api_version: String, pub git_commit: String, pub go_version: String, pub os: String, pub arch: String, pub kernel_version: String, #[cfg(feature = "chrono")] pub build_time: DateTime, #[cfg(not(feature = "chrono"))] pub build_time: String, } #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "PascalCase")] pub struct Info { pub containers: u64, pub images: u64, pub driver: String, pub docker_root_dir: String, pub driver_status: Vec>, #[serde(rename = "ID")] pub id: String, pub kernel_version: String, // pub Labels: Option, pub mem_total: u64, pub memory_limit: bool, #[serde(rename = "NCPU")] pub n_cpu: u64, pub n_events_listener: u64, pub n_goroutines: u64, pub name: String, pub operating_system: String, // pub RegistryConfig:??? pub swap_limit: bool, pub system_time: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Event { #[serde(rename = "Type")] pub typ: String, #[serde(rename = "Action")] pub action: String, #[serde(rename = "Actor")] pub actor: Actor, pub status: Option, pub id: Option, pub from: Option, #[cfg(feature = "chrono")] #[serde(deserialize_with = "datetime_from_unix_timestamp")] pub time: DateTime, #[cfg(not(feature = "chrono"))] pub time: u64, #[cfg(feature = "chrono")] #[serde(deserialize_with = "datetime_from_nano_timestamp", rename = "timeNano")] pub time_nano: DateTime, #[cfg(not(feature = "chrono"))] #[serde(rename = "timeNano")] pub time_nano: u64, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Actor { #[serde(rename = "ID")] pub id: String, #[serde(rename = "Attributes")] pub attributes: HashMap, } #[cfg(test)] mod tests { #[cfg(feature = "unix-socket")] #[test] fn unix_host_env() { use super::Docker; use std::env; env::set_var("DOCKER_HOST", "unix:///docker.sock"); let d = Docker::new(); match d.transport { crate::transport::Transport::Unix { path, .. } => { assert_eq!(path, "/docker.sock"); } _ => { panic!("Expected transport to be unix."); } } env::set_var("DOCKER_HOST", "http://localhost:8000"); let d = Docker::new(); match d.transport { crate::transport::Transport::Tcp { host, .. } => { assert_eq!(host, "http://localhost:8000"); } _ => { panic!("Expected transport to be http."); } } } }