//! Main entrypoint for interacting with the Docker API.
//!
//! API Reference:
use std::{collections::HashMap, env, io, path::Path};
use futures_util::{stream::Stream, TryStreamExt};
use hyper::{client::HttpConnector, Body, Client, Method};
use mime::Mime;
use serde::{de, Deserialize, Serialize};
use url::form_urlencoded;
use crate::{
container::Containers,
errors::{Error, Result},
image::Images,
network::Networks,
service::Services,
transport::{Headers, Payload, Transport},
volume::Volumes,
Uri,
};
#[cfg(feature = "chrono")]
use crate::datetime::{datetime_from_nano_timestamp, datetime_from_unix_timestamp};
#[cfg(feature = "chrono")]
use chrono::{DateTime, Utc};
#[cfg(feature = "tls")]
use hyper_openssl::HttpsConnector;
#[cfg(feature = "tls")]
use openssl::ssl::{SslConnector, SslFiletype, SslMethod};
#[cfg(feature = "unix-socket")]
use hyperlocal::UnixConnector;
/// Entrypoint interface for communicating with docker daemon
#[derive(Clone)]
pub struct Docker {
transport: Transport,
}
fn get_http_connector() -> HttpConnector {
let mut http = HttpConnector::new();
http.enforce_http(false);
http
}
#[cfg(feature = "tls")]
fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
let http = get_http_connector();
if let Ok(ref certs) = env::var("DOCKER_CERT_PATH") {
// fixme: don't unwrap before you know what's in the box
// https://github.com/hyperium/hyper/blob/master/src/net.rs#L427-L428
let mut connector = SslConnector::builder(SslMethod::tls()).unwrap();
connector.set_cipher_list("DEFAULT").unwrap();
let cert = &format!("{}/cert.pem", certs);
let key = &format!("{}/key.pem", certs);
connector
.set_certificate_file(&Path::new(cert), SslFiletype::PEM)
.unwrap();
connector
.set_private_key_file(&Path::new(key), SslFiletype::PEM)
.unwrap();
if env::var("DOCKER_TLS_VERIFY").is_ok() {
let ca = &format!("{}/ca.pem", certs);
connector.set_ca_file(&Path::new(ca)).unwrap();
}
// If we are attempting to connec to the docker daemon via tcp
// we need to convert the scheme to `https` to let hyper connect.
// Otherwise, hyper will reject the connection since it does not
// recongnize `tcp` as a valid `http` scheme.
let tcp_host_str = if tcp_host_str.contains("tcp://") {
tcp_host_str.replace("tcp://", "https://")
} else {
tcp_host_str
};
Docker {
transport: Transport::EncryptedTcp {
client: Client::builder()
.build(HttpsConnector::with_connector(http, connector).unwrap()),
host: tcp_host_str,
},
}
} else {
Docker {
transport: Transport::Tcp {
client: Client::builder().build(http),
host: tcp_host_str,
},
}
}
}
#[cfg(not(feature = "tls"))]
fn get_docker_for_tcp(tcp_host_str: String) -> Docker {
let http = get_http_connector();
Docker {
transport: Transport::Tcp {
client: Client::builder().build(http),
host: tcp_host_str,
},
}
}
// https://docs.docker.com/reference/api/docker_remote_api_v1.17/
impl Docker {
/// constructs a new Docker instance for a docker host listening at a url specified by an env var `DOCKER_HOST`,
/// falling back on unix:///var/run/docker.sock
pub fn new() -> Docker {
match env::var("DOCKER_HOST").ok() {
Some(host) => {
#[cfg(feature = "unix-socket")]
if let Some(path) = host.strip_prefix("unix://") {
return Docker::unix(path);
}
let host = host.parse().expect("invalid url");
Docker::host(host)
}
#[cfg(feature = "unix-socket")]
None => Docker::unix("/var/run/docker.sock"),
#[cfg(not(feature = "unix-socket"))]
None => panic!("Unix socket support is disabled"),
}
}
/// Creates a new docker instance for a docker host
/// listening on a given Unix socket.
#[cfg(feature = "unix-socket")]
pub fn unix(socket_path: S) -> Docker
where
S: Into,
{
Docker {
transport: Transport::Unix {
client: Client::builder()
.pool_max_idle_per_host(0)
.build(UnixConnector),
path: socket_path.into(),
},
}
}
/// constructs a new Docker instance for docker host listening at the given host url
pub fn host(host: Uri) -> Docker {
let tcp_host_str = format!(
"{}://{}:{}",
host.scheme_str().unwrap(),
host.host().unwrap().to_owned(),
host.port_u16().unwrap_or(80)
);
match host.scheme_str() {
#[cfg(feature = "unix-socket")]
Some("unix") => Docker {
transport: Transport::Unix {
client: Client::builder().build(UnixConnector),
path: host.path().to_owned(),
},
},
#[cfg(not(feature = "unix-socket"))]
Some("unix") => panic!("Unix socket support is disabled"),
_ => get_docker_for_tcp(tcp_host_str),
}
}
/// Exports an interface for interacting with docker images
pub fn images(&'_ self) -> Images<'_> {
Images::new(self)
}
/// Exports an interface for interacting with docker containers
pub fn containers(&'_ self) -> Containers<'_> {
Containers::new(self)
}
/// Exports an interface for interacting with docker services
pub fn services(&'_ self) -> Services<'_> {
Services::new(self)
}
pub fn networks(&'_ self) -> Networks<'_> {
Networks::new(self)
}
pub fn volumes(&'_ self) -> Volumes<'_> {
Volumes::new(self)
}
/// Returns version information associated with the docker daemon
pub async fn version(&self) -> Result {
self.get_json("/version").await
}
/// Returns information associated with the docker daemon
pub async fn info(&self) -> Result {
self.get_json("/info").await
}
/// Returns a simple ping response indicating the docker daemon is accessible
pub async fn ping(&self) -> Result {
self.get("/_ping").await
}
/// Returns a stream of docker events
pub fn events<'docker>(
&'docker self,
opts: &EventsOptions,
) -> impl Stream- > + Unpin + 'docker {
let mut path = vec!["/events".to_owned()];
if let Some(query) = opts.serialize() {
path.push(query);
}
let reader = Box::pin(
self.stream_get(path.join("?"))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)),
)
.into_async_read();
let codec = futures_codec::LinesCodec {};
Box::pin(
futures_codec::FramedRead::new(reader, codec)
.map_err(Error::IO)
.and_then(|s: String| async move {
serde_json::from_str(&s).map_err(Error::SerdeJsonError)
}),
)
}
//
// Utility functions to make requests
//
pub(crate) async fn get(
&self,
endpoint: &str,
) -> Result {
self.transport
.request(Method::GET, endpoint, Payload::None, Headers::None)
.await
}
pub(crate) async fn get_json(
&self,
endpoint: &str,
) -> Result {
let raw_string = self
.transport
.request(Method::GET, endpoint, Payload::None, Headers::None)
.await?;
Ok(serde_json::from_str::(&raw_string)?)
}
pub(crate) async fn post(
&self,
endpoint: &str,
body: Option<(Body, Mime)>,
) -> Result {
self.transport
.request(Method::POST, endpoint, body, Headers::None)
.await
}
pub(crate) async fn put(
&self,
endpoint: &str,
body: Option<(Body, Mime)>,
) -> Result {
self.transport
.request(Method::PUT, endpoint, body, Headers::None)
.await
}
pub(crate) async fn post_json(
&self,
endpoint: impl AsRef,
body: Option<(B, Mime)>,
) -> Result
where
T: serde::de::DeserializeOwned,
B: Into,
{
let string = self
.transport
.request(Method::POST, endpoint, body, Headers::None)
.await?;
Ok(serde_json::from_str::(&string)?)
}
pub(crate) async fn post_json_headers<'a, T, B, H>(
&self,
endpoint: impl AsRef,
body: Option<(B, Mime)>,
headers: Option,
) -> Result
where
T: serde::de::DeserializeOwned,
B: Into,
H: IntoIterator
- + 'a,
{
let string = self
.transport
.request(Method::POST, endpoint, body, headers)
.await?;
Ok(serde_json::from_str::(&string)?)
}
pub(crate) async fn delete(
&self,
endpoint: &str,
) -> Result {
self.transport
.request(Method::DELETE, endpoint, Payload::None, Headers::None)
.await
}
pub(crate) async fn delete_json(
&self,
endpoint: &str,
) -> Result {
let string = self
.transport
.request(Method::DELETE, endpoint, Payload::None, Headers::None)
.await?;
Ok(serde_json::from_str::(&string)?)
}
/// Send a streaming post request.
///
/// Use stream_post_into_values if the endpoint returns JSON values
pub(crate) fn stream_post<'a, H>(
&'a self,
endpoint: impl AsRef + 'a,
body: Option<(Body, Mime)>,
headers: Option,
) -> impl Stream
- > + 'a
where
H: IntoIterator
- + 'a,
{
self.transport
.stream_chunks(Method::POST, endpoint, body, headers)
}
/// Send a streaming post request that returns a stream of JSON values
///
/// Assumes that each received chunk contains one or more JSON values
pub(crate) fn stream_post_into<'a, H, T>(
&'a self,
endpoint: impl AsRef + 'a,
body: Option<(Body, Mime)>,
headers: Option,
) -> impl Stream
- > + 'a
where
H: IntoIterator
- + 'a,
T: de::DeserializeOwned,
{
self.stream_post(endpoint, body, headers)
.and_then(|chunk| async move {
let stream = futures_util::stream::iter(
serde_json::Deserializer::from_slice(&chunk)
.into_iter()
.collect::>(),
)
.map_err(Error::from);
Ok(stream)
})
.try_flatten()
}
pub(crate) fn stream_get<'a>(
&'a self,
endpoint: impl AsRef + Unpin + 'a,
) -> impl Stream
- > + 'a {
let headers = Some(Vec::default());
self.transport
.stream_chunks(Method::GET, endpoint, Option::<(Body, Mime)>::None, headers)
}
pub(crate) async fn stream_post_upgrade<'a>(
&'a self,
endpoint: impl AsRef + 'a,
body: Option<(Body, Mime)>,
) -> Result {
self.transport
.stream_upgrade(Method::POST, endpoint, body)
.await
}
}
impl Default for Docker {
fn default() -> Self {
Self::new()
}
}
/// Options for filtering streams of Docker events
#[derive(Default, Debug)]
pub struct EventsOptions {
params: HashMap<&'static str, String>,
}
impl EventsOptions {
pub fn builder() -> EventsOptionsBuilder {
EventsOptionsBuilder::default()
}
/// serialize options as a string. returns None if no options are defined
pub fn serialize(&self) -> Option {
if self.params.is_empty() {
None
} else {
Some(
form_urlencoded::Serializer::new(String::new())
.extend_pairs(&self.params)
.finish(),
)
}
}
}
#[derive(Copy, Clone)]
pub enum EventFilterType {
Container,
Image,
Volume,
Network,
Daemon,
}
fn event_filter_type_to_string(filter: EventFilterType) -> &'static str {
match filter {
EventFilterType::Container => "container",
EventFilterType::Image => "image",
EventFilterType::Volume => "volume",
EventFilterType::Network => "network",
EventFilterType::Daemon => "daemon",
}
}
/// Filter options for image listings
pub enum EventFilter {
Container(String),
Event(String),
Image(String),
Label(String),
Type(EventFilterType),
Volume(String),
Network(String),
Daemon(String),
}
/// Builder interface for `EventOptions`
#[derive(Default)]
pub struct EventsOptionsBuilder {
params: HashMap<&'static str, String>,
events: Vec,
containers: Vec,
images: Vec,
labels: Vec,
volumes: Vec,
networks: Vec,
daemons: Vec,
types: Vec,
}
impl EventsOptionsBuilder {
/// Filter events since a given timestamp
pub fn since(
&mut self,
ts: &u64,
) -> &mut Self {
self.params.insert("since", ts.to_string());
self
}
/// Filter events until a given timestamp
pub fn until(
&mut self,
ts: &u64,
) -> &mut Self {
self.params.insert("until", ts.to_string());
self
}
pub fn filter(
&mut self,
filters: Vec,
) -> &mut Self {
let mut params = HashMap::new();
for f in filters {
match f {
EventFilter::Container(n) => {
self.containers.push(n);
params.insert("container", self.containers.clone())
}
EventFilter::Event(n) => {
self.events.push(n);
params.insert("event", self.events.clone())
}
EventFilter::Image(n) => {
self.images.push(n);
params.insert("image", self.images.clone())
}
EventFilter::Label(n) => {
self.labels.push(n);
params.insert("label", self.labels.clone())
}
EventFilter::Volume(n) => {
self.volumes.push(n);
params.insert("volume", self.volumes.clone())
}
EventFilter::Network(n) => {
self.networks.push(n);
params.insert("network", self.networks.clone())
}
EventFilter::Daemon(n) => {
self.daemons.push(n);
params.insert("daemon", self.daemons.clone())
}
EventFilter::Type(n) => {
let event_type = event_filter_type_to_string(n).to_string();
self.types.push(event_type);
params.insert("type", self.types.clone())
}
};
}
self.params
.insert("filters", serde_json::to_string(¶ms).unwrap());
self
}
pub fn build(&self) -> EventsOptions {
EventsOptions {
params: self.params.clone(),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct Version {
pub version: String,
pub api_version: String,
pub git_commit: String,
pub go_version: String,
pub os: String,
pub arch: String,
pub kernel_version: String,
#[cfg(feature = "chrono")]
pub build_time: DateTime,
#[cfg(not(feature = "chrono"))]
pub build_time: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct Info {
pub containers: u64,
pub images: u64,
pub driver: String,
pub docker_root_dir: String,
pub driver_status: Vec>,
#[serde(rename = "ID")]
pub id: String,
pub kernel_version: String,
// pub Labels: Option??>,
pub mem_total: u64,
pub memory_limit: bool,
#[serde(rename = "NCPU")]
pub n_cpu: u64,
pub n_events_listener: u64,
pub n_goroutines: u64,
pub name: String,
pub operating_system: String,
// pub RegistryConfig:???
pub swap_limit: bool,
pub system_time: Option,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Event {
#[serde(rename = "Type")]
pub typ: String,
#[serde(rename = "Action")]
pub action: String,
#[serde(rename = "Actor")]
pub actor: Actor,
pub status: Option,
pub id: Option,
pub from: Option,
#[cfg(feature = "chrono")]
#[serde(deserialize_with = "datetime_from_unix_timestamp")]
pub time: DateTime,
#[cfg(not(feature = "chrono"))]
pub time: u64,
#[cfg(feature = "chrono")]
#[serde(deserialize_with = "datetime_from_nano_timestamp", rename = "timeNano")]
pub time_nano: DateTime,
#[cfg(not(feature = "chrono"))]
#[serde(rename = "timeNano")]
pub time_nano: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Actor {
#[serde(rename = "ID")]
pub id: String,
#[serde(rename = "Attributes")]
pub attributes: HashMap,
}
#[cfg(test)]
mod tests {
#[cfg(feature = "unix-socket")]
#[test]
fn unix_host_env() {
use super::Docker;
use std::env;
env::set_var("DOCKER_HOST", "unix:///docker.sock");
let d = Docker::new();
match d.transport {
crate::transport::Transport::Unix { path, .. } => {
assert_eq!(path, "/docker.sock");
}
_ => {
panic!("Expected transport to be unix.");
}
}
env::set_var("DOCKER_HOST", "http://localhost:8000");
let d = Docker::new();
match d.transport {
crate::transport::Transport::Tcp { host, .. } => {
assert_eq!(host, "http://localhost:8000");
}
_ => {
panic!("Expected transport to be http.");
}
}
}
}