summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorWojciech Kępka <46892771+wojciechkepka@users.noreply.github.com>2021-02-09 01:22:26 +0100
committerGitHub <noreply@github.com>2021-02-08 19:22:26 -0500
commit5af822c5691a772cad36bd9a69e94419b03d4511 (patch)
tree761ebc2ac10608a8772d8f7085596ee2ee838b37 /src
parent4e3f69c34a177697af940849fced1a284fc09449 (diff)
Add services api (#263)
* Add initial Services models * Add initial Services controllers * Add ServicesListOptions * Rename ServicesList -> ServiceList * Fix some optional fields on ServiceRep * Add Service::inspect * Add Service::delete * Add Service::logs * Rename example logs -> containerlogs * Add ServiceOptions, ServiceCreateInfo, fix typo * Add a way to pass headers to request, add Payload and Headers for easier types * Add Service::create * Add examples * Fix fmt * Fix example
Diffstat (limited to 'src')
-rw-r--r--src/builder.rs233
-rw-r--r--src/lib.rs160
-rw-r--r--src/rep.rs183
-rw-r--r--src/transport.rs11
4 files changed, 570 insertions, 17 deletions
diff --git a/src/builder.rs b/src/builder.rs
index 6971be9..982b3ba 100644
--- a/src/builder.rs
+++ b/src/builder.rs
@@ -1,13 +1,17 @@
//! Interfaces for building various structures
-use crate::{errors::Error, Result};
+use crate::{
+ errors::Error,
+ rep::{EndpointSpec, Mode, NetworkAttachmentConfig, RollbackConfig, TaskSpec, UpdateConfig},
+ Result,
+};
use serde::Serialize;
use serde_json::{self, json, map::Map, Value};
use std::{
cmp::Eq,
collections::{BTreeMap, HashMap},
hash::Hash,
- iter::{IntoIterator, Peekable},
+ iter::{IntoIterator, Iterator, Peekable},
time::Duration,
};
use url::form_urlencoded;
@@ -1771,6 +1775,231 @@ impl ExecResizeOptionsBuilder {
}
}
+//################################################################################
+// Services
+//################################################################################
+
+/// Options for filtering services list results
+#[derive(Default, Debug)]
+pub struct ServiceListOptions {
+ params: HashMap<&'static str, String>,
+}
+
+impl ServiceListOptions {
+ /// return a new instance of a builder for options
+ pub fn builder() -> ServiceListOptionsBuilder {
+ ServiceListOptionsBuilder::default()
+ }
+
+ /// serialize options as a string. returns None if no options are defined
+ pub fn serialize(&self) -> Option<String> {
+ if self.params.is_empty() {
+ None
+ } else {
+ Some(
+ form_urlencoded::Serializer::new(String::new())
+ .extend_pairs(&self.params)
+ .finish(),
+ )
+ }
+ }
+}
+
+/// Filter options for services listings
+pub enum ServiceFilter {
+ Id(String),
+ Label(String),
+ ReplicatedMode,
+ GlobalMode,
+ Name(String),
+}
+
+/// Builder interface for `ServicesListOptions`
+#[derive(Default)]
+pub struct ServiceListOptionsBuilder {
+ params: HashMap<&'static str, String>,
+}
+
+impl ServiceListOptionsBuilder {
+ pub fn filter(
+ &mut self,
+ filters: Vec<ServiceFilter>,
+ ) -> &mut Self {
+ let mut param = HashMap::new();
+ for f in filters {
+ match f {
+ ServiceFilter::Id(i) => param.insert("id", vec![i]),
+ ServiceFilter::Label(l) => param.insert("label", vec![l]),
+ ServiceFilter::ReplicatedMode => {
+ param.insert("mode", vec!["replicated".to_string()])
+ }
+ ServiceFilter::GlobalMode => param.insert("mode", vec!["global".to_string()]),
+ ServiceFilter::Name(n) => param.insert("name", vec![n.to_string()]),
+ };
+ }
+ // structure is a a json encoded object mapping string keys to a list
+ // of string values
+ self.params
+ .insert("filters", serde_json::to_string(&param).unwrap());
+ self
+ }
+
+ pub fn enable_status(&mut self) -> &mut Self {
+ self.params.insert("status", "true".to_owned());
+ self
+ }
+
+ pub fn build(&self) -> ServiceListOptions {
+ ServiceListOptions {
+ params: self.params.clone(),
+ }
+ }
+}
+
+#[derive(Default, Debug)]
+pub struct ServiceOptions {
+ auth: Option<RegistryAuth>,
+ params: HashMap<&'static str, Value>,
+}
+
+impl ServiceOptions {
+ /// return a new instance of a builder for options
+ pub fn builder() -> ServiceOptionsBuilder {
+ ServiceOptionsBuilder::default()
+ }
+
+ /// serialize options as a string. returns None if no options are defined
+ pub fn serialize(&self) -> Result<String> {
+ serde_json::to_string(&self.params).map_err(Error::from)
+ }
+
+ pub(crate) fn auth_header(&self) -> Option<String> {
+ self.auth.clone().map(|a| a.serialize())
+ }
+}
+
+#[derive(Default)]
+pub struct ServiceOptionsBuilder {
+ auth: Option<RegistryAuth>,
+ params: HashMap<&'static str, Result<Value>>,
+}
+
+impl ServiceOptionsBuilder {
+ pub fn name<S>(
+ &mut self,
+ name: S,
+ ) -> &mut Self
+ where
+ S: AsRef<str>,
+ {
+ self.params.insert("Name", Ok(json!(name.as_ref())));
+ self
+ }
+
+ pub fn labels<I>(
+ &mut self,
+ labels: I,
+ ) -> &mut Self
+ where
+ I: IntoIterator<Item = (String, String)>,
+ {
+ self.params.insert(
+ "Labels",
+ Ok(json!(labels
+ .into_iter()
+ .collect::<HashMap<String, String>>())),
+ );
+ self
+ }
+
+ pub fn task_template(
+ &mut self,
+ spec: &TaskSpec,
+ ) -> &mut Self {
+ self.params.insert("TaskTemplate", to_json_value(spec));
+ self
+ }
+
+ pub fn mode(
+ &mut self,
+ mode: &Mode,
+ ) -> &mut Self {
+ self.params.insert("Mode", to_json_value(mode));
+ self
+ }
+
+ pub fn update_config(
+ &mut self,
+ conf: &UpdateConfig,
+ ) -> &mut Self {
+ self.params.insert("UpdateConfig", to_json_value(conf));
+ self
+ }
+
+ pub fn rollback_config(
+ &mut self,
+ conf: &RollbackConfig,
+ ) -> &mut Self {
+ self.params.insert("RollbackConfig", to_json_value(conf));
+ self
+ }
+
+ pub fn networks<I>(
+ &mut self,
+ networks: I,
+ ) -> &mut Self
+ where
+ I: IntoIterator<Item = NetworkAttachmentConfig>,
+ {
+ self.params.insert(
+ "Networks",
+ to_json_value(
+ networks
+ .into_iter()
+ .collect::<Vec<NetworkAttachmentConfig>>(),
+ ),
+ );
+ self
+ }
+
+ pub fn endpoint_spec(
+ &mut self,
+ spec: &EndpointSpec,
+ ) -> &mut Self {
+ self.params.insert("EndpointSpec", to_json_value(spec));
+ self
+ }
+
+ pub fn auth(
+ &mut self,
+ auth: RegistryAuth,
+ ) -> &mut Self {
+ self.auth = Some(auth);
+ self
+ }
+
+ pub fn build(&mut self) -> Result<ServiceOptions> {
+ let params = std::mem::take(&mut self.params);
+ let mut new_params = HashMap::new();
+ for (k, v) in params.into_iter() {
+ new_params.insert(k, v?);
+ }
+ Ok(ServiceOptions {
+ auth: self.auth.take(),
+ params: new_params,
+ })
+ }
+}
+
+fn to_json_value<T>(value: T) -> Result<Value>
+where
+ T: Serialize,
+{
+ Ok(serde_json::to_value(value)?)
+}
+
+//################################################################################
+
#[cfg(test)]
mod tests {
use super::{ContainerOptionsBuilder, LogsOptionsBuilder, RegistryAuth};
diff --git a/src/lib.rs b/src/lib.rs
index 5d77b83..ac09a41 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -30,7 +30,8 @@ pub use crate::{
BuildOptions, ContainerConnectionOptions, ContainerFilter, ContainerListOptions,
ContainerOptions, EventsOptions, ExecContainerOptions, ExecResizeOptions, ImageFilter,
ImageListOptions, LogsOptions, NetworkCreateOptions, NetworkListOptions, PullOptions,
- RegistryAuth, RmContainerOptions, TagOptions, VolumeCreateOptions,
+ RegistryAuth, RmContainerOptions, ServiceFilter, ServiceListOptions, ServiceOptions,
+ TagOptions, VolumeCreateOptions,
},
errors::Error,
};
@@ -38,10 +39,11 @@ use crate::{
rep::{
Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event,
ExecDetails, Exit, History, Image as ImageRep, ImageDetails, Info, NetworkCreateInfo,
- NetworkDetails as NetworkInfo, SearchResult, Stats, Status, Top, Version,
- Volume as VolumeRep, VolumeCreateInfo, Volumes as VolumesRep,
+ NetworkDetails as NetworkInfo, SearchResult, ServiceCreateInfo, ServiceDetails,
+ Services as ServicesRep, Stats, Status, Top, Version, Volume as VolumeRep,
+ VolumeCreateInfo, Volumes as VolumesRep,
},
- transport::{tar, Transport},
+ transport::{tar, Headers, Payload, Transport},
tty::Multiplexer as TtyMultiPlexer,
};
use futures_util::{
@@ -983,6 +985,111 @@ impl<'a> Volume<'a> {
}
}
+/// Interface for docker services
+pub struct Services<'a> {
+ docker: &'a Docker,
+}
+
+impl<'a> Services<'a> {
+ /// Exports an interface for interacting with docker services
+ pub fn new(docker: &Docker) -> Services {
+ Services { docker }
+ }
+
+ /// Lists the docker services on the current docker host
+ pub async fn list(
+ &self,
+ opts: &ServiceListOptions,
+ ) -> Result<ServicesRep> {
+ let mut path = vec!["/services".to_owned()];
+ if let Some(query) = opts.serialize() {
+ path.push(query);
+ }
+
+ self.docker.get_json::<ServicesRep>(&path.join("?")).await
+ }
+
+ /// Returns a reference to a set of operations available for a named service
+ pub fn get(
+ &self,
+ name: &str,
+ ) -> Service {
+ Service::new(self.docker, name)
+ }
+}
+
+/// Interface for accessing and manipulating a named docker volume
+pub struct Service<'a> {
+ docker: &'a Docker,
+ name: String,
+}
+
+impl<'a> Service<'a> {
+ /// Exports an interface for operations that may be performed against a named service
+ pub fn new<S>(
+ docker: &Docker,
+ name: S,
+ ) -> Service
+ where
+ S: Into<String>,
+ {
+ Service {
+ docker,
+ name: name.into(),
+ }
+ }
+
+ /// Creates a new service from ServiceOptions
+ pub async fn create(
+ &self,
+ opts: &ServiceOptions,
+ ) -> Result<ServiceCreateInfo> {
+ let body: Body = opts.serialize()?.into();
+ let path = vec!["/service/create".to_owned()];
+
+ let headers = opts
+ .auth_header()
+ .map(|a| iter::once(("X-Registry-Auth", a)));
+
+ self.docker
+ .post_json_headers(
+ &path.join("?"),
+ Some((body, mime::APPLICATION_JSON)),
+ headers,
+ )
+ .await
+ }
+
+ /// Inspects a named service's details
+ pub async fn inspect(&self) -> Result<ServiceDetails> {
+ self.docker
+ .get_json(&format!("/services/{}", self.name)[..])
+ .await
+ }
+
+ /// Deletes a service
+ pub async fn delete(&self) -> Result<()> {
+ self.docker
+ .delete_json(&format!("/services/{}", self.name)[..])
+ .await
+ }
+
+ /// Returns a stream of logs from a service
+ pub fn logs(
+ &self,
+ opts: &LogsOptions,
+ ) -> impl Stream<Item = Result<tty::TtyChunk>> + Unpin + 'a {
+ let mut path = vec![format!("/services/{}/logs", self.name)];
+ if let Some(query) = opts.serialize() {
+ path.push(query)
+ }
+
+ let stream = Box::pin(self.docker.stream_get(path.join("?")));
+
+ Box::pin(tty::decode(stream))
+ }
+}
+
fn get_http_connector() -> HttpConnector {
let mut http = HttpConnector::new();
http.enforce_http(false);
@@ -1122,6 +1229,11 @@ impl Docker {
Containers::new(self)
}
+ /// Exports an interface for interacting with docker services
+ pub fn services(&self) -> Services {
+ Services::new(self)
+ }
+
pub fn networks(&self) -> Networks {
Networks::new(self)
}
@@ -1180,7 +1292,7 @@ impl Docker {
endpoint: &str,
) -> Result<String> {
self.transport
- .request(Method::GET, endpoint, Option::<(Body, Mime)>::None)
+ .request(Method::GET, endpoint, Payload::None, Headers::None)
.await
}
@@ -1190,7 +1302,7 @@ impl Docker {
) -> Result<T> {
let raw_string = self
.transport
- .request(Method::GET, endpoint, Option::<(Body, Mime)>::None)
+ .request(Method::GET, endpoint, Payload::None, Headers::None)
.await?;
Ok(serde_json::from_str::<T>(&raw_string)?)
@@ -1201,7 +1313,9 @@ impl Docker {
endpoint: &str,
body: Option<(Body, Mime)>,
) -> Result<String> {
- self.transport.request(Method::POST, endpoint, body).await
+ self.transport
+ .request(Method::POST, endpoint, body, Headers::None)
+ .await
}
async fn put(
@@ -1209,7 +1323,9 @@ impl Docker {
endpoint: &str,
body: Option<(Body, Mime)>,
) -> Result<String> {
- self.transport.request(Method::PUT, endpoint, body).await
+ self.transport
+ .request(Method::PUT, endpoint, body, Headers::None)
+ .await
}
async fn post_json<T, B>(
@@ -1221,7 +1337,29 @@ impl Docker {
T: serde::de::DeserializeOwned,
B: Into<Body>,
{
- let string = self.transport.request(Method::POST, endpoint, body).await?;
+ let string = self
+ .transport
+ .request(Method::POST, endpoint, body, Headers::None)
+ .await?;
+
+ Ok(serde_json::from_str::<T>(&string)?)
+ }
+
+ async fn post_json_headers<'a, T, B, H>(
+ &self,
+ endpoint: impl AsRef<str>,
+ body: Option<(B, Mime)>,
+ headers: Option<H>,
+ ) -> Result<T>
+ where
+ T: serde::de::DeserializeOwned,
+ B: Into<Body>,
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
+ {
+ let string = self
+ .transport
+ .request(Method::POST, endpoint, body, headers)
+ .await?;
Ok(serde_json::from_str::<T>(&string)?)
}
@@ -1231,7 +1369,7 @@ impl Docker {
endpoint: &str,
) -> Result<String> {
self.transport
- .request(Method::DELETE, endpoint, Option::<(Body, Mime)>::None)
+ .request(Method::DELETE, endpoint, Payload::None, Headers::None)
.await
}
@@ -1241,7 +1379,7 @@ impl Docker {
) -> Result<T> {
let string = self
.transport
- .request(Method::DELETE, endpoint, Option::<(Body, Mime)>::None)
+ .request(Method::DELETE, endpoint, Payload::None, Headers::None)
.await?;
Ok(serde_json::from_str::<T>(&string)?)
diff --git a/src/rep.rs b/src/rep.rs
index b5619f6..62b248a 100644
--- a/src/rep.rs
+++ b/src/rep.rs
@@ -563,6 +563,189 @@ pub struct Volume {
pub scope: String,
}
+//################################################################################
+// SERVICES
+//################################################################################
+
+pub type Services = Vec<Service>;
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct Service {
+ #[serde(rename = "ID")]
+ pub id: String,
+ pub version: ObjectVersion,
+ #[cfg(feature = "chrono")]
+ pub created_at: DateTime<Utc>,
+ #[cfg(not(feature = "chrono"))]
+ pub created_at: String,
+ #[cfg(feature = "chrono")]
+ pub updated_at: DateTime<Utc>,
+ #[cfg(not(feature = "chrono"))]
+ pub updated_at: String,
+ pub endpoint: Endpoint,
+ pub update_status: Option<UpdateStatus>,
+ pub service_status: Option<ServiceStatus>,
+ pub job_status: Option<JobStatus>,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ObjectVersion {
+ pub index: u64,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct Endpoint {
+ pub spec: EndpointSpec,
+ pub ports: Option<Vec<EndpointPortConfig>>,
+ #[serde(rename = "VirtualIPs")]
+ pub virtual_ips: Option<serde_json::Value>,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct EndpointSpec {
+ pub mode: Option<String>,
+ pub ports: Option<Vec<EndpointPortConfig>>,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct EndpointPortConfig {
+ pub name: Option<String>,
+ pub protocol: String,
+ pub publish_mode: String,
+ pub published_port: Option<u64>,
+ pub target_port: u64,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct UpdateStatus {
+ pub state: String,
+ #[cfg(feature = "chrono")]
+ pub started_at: DateTime<Utc>,
+ #[cfg(not(feature = "chrono"))]
+ pub started_at: String,
+ #[cfg(feature = "chrono")]
+ pub completed_at: DateTime<Utc>,
+ #[cfg(not(feature = "chrono"))]
+ pub completed_at: String,
+ pub message: String,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ServiceStatus {
+ pub running_tasks: u64,
+ pub desired_tasks: u64,
+ pub completed_tasks: u64,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct JobStatus {
+ pub job_iteration: ObjectVersion,
+ #[cfg(feature = "chrono")]
+ pub last_execution: DateTime<Utc>,
+ #[cfg(not(feature = "chrono"))]
+ pub last_execution: String,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ServiceDetails {
+ #[serde(rename = "ID")]
+ pub id: String,
+ pub version: ObjectVersion,
+ #[cfg(feature = "chrono")]
+ pub created_at: DateTime<Utc>,
+ #[cfg(not(feature = "chrono"))]
+ pub created_at: String,
+ #[cfg(feature = "chrono")]
+ pub updated_at: DateTime<Utc>,
+ #[cfg(not(feature = "chrono"))]
+ pub updated_at: String,
+ pub spec: ServiceSpec,
+ pub endpoint: Endpoint,
+ pub update_status: Option<UpdateStatus>,
+ pub service_status: Option<ServiceStatus>,
+ pub job_status: Option<JobStatus>,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ServiceSpec {
+ pub name: String,
+ pub labels: Option<serde_json::Value>,
+ pub task_template: TaskSpec,
+ pub mode: Mode,
+ pub update_config: Option<UpdateConfig>,
+ pub rollback_config: Option<RollbackConfig>,
+ pub networks: Option<Vec<NetworkAttachmentConfig>>,
+ pub endpoint_spec: EndpointSpec,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+// #TODO: Add missing fields...
+pub struct TaskSpec {}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct Mode {
+ pub replicated: Option<Replicated>,
+ pub global: Option<serde_json::Value>,
+ pub replicated_job: Option<ReplicatedJob>,
+ pub global_job: Option<serde_json::Value>,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct Replicated {
+ pub replicas: u64,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ReplicatedJob {
+ pub max_concurrent: u64,
+ pub total_completions: u64,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct UpdateConfig {
+ pub parallelism: u64,
+ pub delay: u64,
+ pub failure_action: String,
+ pub monitor: u64,
+ pub max_failure_ratio: usize,
+ pub order: String,
+}
+
+pub type RollbackConfig = UpdateConfig;
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct NetworkAttachmentConfig {
+ pub target: String,
+ pub aliases: Vec<String>,
+ pub driver_opts: Option<serde_json::Value>,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct ServiceCreateInfo {
+ #[serde(rename = "ID")]
+ pub id: String,
+ #[serde(rename = "Warning")]
+ pub warning: Option<String>,
+}
+
+//################################################################################
+
#[cfg(feature = "chrono")]
fn datetime_from_unix_timestamp<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
where
diff --git a/src/transport.rs b/src/transport.rs
index b496e87..0fbbcc1 100644
--- a/src/transport.rs
+++ b/src/transport.rs
@@ -30,6 +30,9 @@ pub fn tar() -> Mime {
"application/tar".parse().unwrap()
}
+pub(crate) type Headers = Option<Vec<(&'static str, String)>>;
+pub(crate) type Payload = Option<(Body, Mime)>;
+
/// Transports are types which define the means of communication
/// with the docker daemon
#[derive(Clone)]
@@ -70,18 +73,18 @@ impl fmt::Debug for Transport {
impl Transport {
/// Make a request and return the whole response in a `String`
- pub async fn request<B>(
+ pub async fn request<'a, B, H>(
&self,
method: Method,
endpoint: impl AsRef<str>,
body: Option<(B, Mime)>,
+ headers: Option<H>,
) -> Result<String>
where
B: Into<Body>,
+ H: IntoIterator<Item = (&'static str, String)> + 'a,
{
- let body = self
- .get_body(method, endpoint, body, None::<iter::Empty<_>>)
- .await?;
+ let body = self.get_body(method, endpoint, body, headers).await?;
let bytes = hyper::body::to_bytes(body).await?;
let string = String::from_utf8(bytes.to_vec())?;