From 5af822c5691a772cad36bd9a69e94419b03d4511 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20K=C4=99pka?= <46892771+wojciechkepka@users.noreply.github.com> Date: Tue, 9 Feb 2021 01:22:26 +0100 Subject: Add services api (#263) * Add initial Services models * Add initial Services controllers * Add ServicesListOptions * Rename ServicesList -> ServiceList * Fix some optional fields on ServiceRep * Add Service::inspect * Add Service::delete * Add Service::logs * Rename example logs -> containerlogs * Add ServiceOptions, ServiceCreateInfo, fix typo * Add a way to pass headers to request, add Payload and Headers for easier types * Add Service::create * Add examples * Fix fmt * Fix example --- src/builder.rs | 233 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 160 +++++++++++++++++++++++++++++++++++--- src/rep.rs | 183 +++++++++++++++++++++++++++++++++++++++++++ src/transport.rs | 11 ++- 4 files changed, 570 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/builder.rs b/src/builder.rs index 6971be9..982b3ba 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1,13 +1,17 @@ //! Interfaces for building various structures -use crate::{errors::Error, Result}; +use crate::{ + errors::Error, + rep::{EndpointSpec, Mode, NetworkAttachmentConfig, RollbackConfig, TaskSpec, UpdateConfig}, + Result, +}; use serde::Serialize; use serde_json::{self, json, map::Map, Value}; use std::{ cmp::Eq, collections::{BTreeMap, HashMap}, hash::Hash, - iter::{IntoIterator, Peekable}, + iter::{IntoIterator, Iterator, Peekable}, time::Duration, }; use url::form_urlencoded; @@ -1771,6 +1775,231 @@ impl ExecResizeOptionsBuilder { } } +//################################################################################ +// Services +//################################################################################ + +/// Options for filtering services list results +#[derive(Default, Debug)] +pub struct ServiceListOptions { + params: HashMap<&'static str, String>, +} + +impl ServiceListOptions { + /// return a new instance of a builder for options + pub fn builder() -> ServiceListOptionsBuilder { + ServiceListOptionsBuilder::default() + } + + /// serialize options as a string. returns None if no options are defined + pub fn serialize(&self) -> Option { + if self.params.is_empty() { + None + } else { + Some( + form_urlencoded::Serializer::new(String::new()) + .extend_pairs(&self.params) + .finish(), + ) + } + } +} + +/// Filter options for services listings +pub enum ServiceFilter { + Id(String), + Label(String), + ReplicatedMode, + GlobalMode, + Name(String), +} + +/// Builder interface for `ServicesListOptions` +#[derive(Default)] +pub struct ServiceListOptionsBuilder { + params: HashMap<&'static str, String>, +} + +impl ServiceListOptionsBuilder { + pub fn filter( + &mut self, + filters: Vec, + ) -> &mut Self { + let mut param = HashMap::new(); + for f in filters { + match f { + ServiceFilter::Id(i) => param.insert("id", vec![i]), + ServiceFilter::Label(l) => param.insert("label", vec![l]), + ServiceFilter::ReplicatedMode => { + param.insert("mode", vec!["replicated".to_string()]) + } + ServiceFilter::GlobalMode => param.insert("mode", vec!["global".to_string()]), + ServiceFilter::Name(n) => param.insert("name", vec![n.to_string()]), + }; + } + // structure is a a json encoded object mapping string keys to a list + // of string values + self.params + .insert("filters", serde_json::to_string(¶m).unwrap()); + self + } + + pub fn enable_status(&mut self) -> &mut Self { + self.params.insert("status", "true".to_owned()); + self + } + + pub fn build(&self) -> ServiceListOptions { + ServiceListOptions { + params: self.params.clone(), + } + } +} + +#[derive(Default, Debug)] +pub struct ServiceOptions { + auth: Option, + params: HashMap<&'static str, Value>, +} + +impl ServiceOptions { + /// return a new instance of a builder for options + pub fn builder() -> ServiceOptionsBuilder { + ServiceOptionsBuilder::default() + } + + /// serialize options as a string. returns None if no options are defined + pub fn serialize(&self) -> Result { + serde_json::to_string(&self.params).map_err(Error::from) + } + + pub(crate) fn auth_header(&self) -> Option { + self.auth.clone().map(|a| a.serialize()) + } +} + +#[derive(Default)] +pub struct ServiceOptionsBuilder { + auth: Option, + params: HashMap<&'static str, Result>, +} + +impl ServiceOptionsBuilder { + pub fn name( + &mut self, + name: S, + ) -> &mut Self + where + S: AsRef, + { + self.params.insert("Name", Ok(json!(name.as_ref()))); + self + } + + pub fn labels( + &mut self, + labels: I, + ) -> &mut Self + where + I: IntoIterator, + { + self.params.insert( + "Labels", + Ok(json!(labels + .into_iter() + .collect::>())), + ); + self + } + + pub fn task_template( + &mut self, + spec: &TaskSpec, + ) -> &mut Self { + self.params.insert("TaskTemplate", to_json_value(spec)); + self + } + + pub fn mode( + &mut self, + mode: &Mode, + ) -> &mut Self { + self.params.insert("Mode", to_json_value(mode)); + self + } + + pub fn update_config( + &mut self, + conf: &UpdateConfig, + ) -> &mut Self { + self.params.insert("UpdateConfig", to_json_value(conf)); + self + } + + pub fn rollback_config( + &mut self, + conf: &RollbackConfig, + ) -> &mut Self { + self.params.insert("RollbackConfig", to_json_value(conf)); + self + } + + pub fn networks( + &mut self, + networks: I, + ) -> &mut Self + where + I: IntoIterator, + { + self.params.insert( + "Networks", + to_json_value( + networks + .into_iter() + .collect::>(), + ), + ); + self + } + + pub fn endpoint_spec( + &mut self, + spec: &EndpointSpec, + ) -> &mut Self { + self.params.insert("EndpointSpec", to_json_value(spec)); + self + } + + pub fn auth( + &mut self, + auth: RegistryAuth, + ) -> &mut Self { + self.auth = Some(auth); + self + } + + pub fn build(&mut self) -> Result { + let params = std::mem::take(&mut self.params); + let mut new_params = HashMap::new(); + for (k, v) in params.into_iter() { + new_params.insert(k, v?); + } + Ok(ServiceOptions { + auth: self.auth.take(), + params: new_params, + }) + } +} + +fn to_json_value(value: T) -> Result +where + T: Serialize, +{ + Ok(serde_json::to_value(value)?) +} + +//################################################################################ + #[cfg(test)] mod tests { use super::{ContainerOptionsBuilder, LogsOptionsBuilder, RegistryAuth}; diff --git a/src/lib.rs b/src/lib.rs index 5d77b83..ac09a41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,7 +30,8 @@ pub use crate::{ BuildOptions, ContainerConnectionOptions, ContainerFilter, ContainerListOptions, ContainerOptions, EventsOptions, ExecContainerOptions, ExecResizeOptions, ImageFilter, ImageListOptions, LogsOptions, NetworkCreateOptions, NetworkListOptions, PullOptions, - RegistryAuth, RmContainerOptions, TagOptions, VolumeCreateOptions, + RegistryAuth, RmContainerOptions, ServiceFilter, ServiceListOptions, ServiceOptions, + TagOptions, VolumeCreateOptions, }, errors::Error, }; @@ -38,10 +39,11 @@ use crate::{ rep::{ Change, Container as ContainerRep, ContainerCreateInfo, ContainerDetails, Event, ExecDetails, Exit, History, Image as ImageRep, ImageDetails, Info, NetworkCreateInfo, - NetworkDetails as NetworkInfo, SearchResult, Stats, Status, Top, Version, - Volume as VolumeRep, VolumeCreateInfo, Volumes as VolumesRep, + NetworkDetails as NetworkInfo, SearchResult, ServiceCreateInfo, ServiceDetails, + Services as ServicesRep, Stats, Status, Top, Version, Volume as VolumeRep, + VolumeCreateInfo, Volumes as VolumesRep, }, - transport::{tar, Transport}, + transport::{tar, Headers, Payload, Transport}, tty::Multiplexer as TtyMultiPlexer, }; use futures_util::{ @@ -983,6 +985,111 @@ impl<'a> Volume<'a> { } } +/// Interface for docker services +pub struct Services<'a> { + docker: &'a Docker, +} + +impl<'a> Services<'a> { + /// Exports an interface for interacting with docker services + pub fn new(docker: &Docker) -> Services { + Services { docker } + } + + /// Lists the docker services on the current docker host + pub async fn list( + &self, + opts: &ServiceListOptions, + ) -> Result { + let mut path = vec!["/services".to_owned()]; + if let Some(query) = opts.serialize() { + path.push(query); + } + + self.docker.get_json::(&path.join("?")).await + } + + /// Returns a reference to a set of operations available for a named service + pub fn get( + &self, + name: &str, + ) -> Service { + Service::new(self.docker, name) + } +} + +/// Interface for accessing and manipulating a named docker volume +pub struct Service<'a> { + docker: &'a Docker, + name: String, +} + +impl<'a> Service<'a> { + /// Exports an interface for operations that may be performed against a named service + pub fn new( + docker: &Docker, + name: S, + ) -> Service + where + S: Into, + { + Service { + docker, + name: name.into(), + } + } + + /// Creates a new service from ServiceOptions + pub async fn create( + &self, + opts: &ServiceOptions, + ) -> Result { + let body: Body = opts.serialize()?.into(); + let path = vec!["/service/create".to_owned()]; + + let headers = opts + .auth_header() + .map(|a| iter::once(("X-Registry-Auth", a))); + + self.docker + .post_json_headers( + &path.join("?"), + Some((body, mime::APPLICATION_JSON)), + headers, + ) + .await + } + + /// Inspects a named service's details + pub async fn inspect(&self) -> Result { + self.docker + .get_json(&format!("/services/{}", self.name)[..]) + .await + } + + /// Deletes a service + pub async fn delete(&self) -> Result<()> { + self.docker + .delete_json(&format!("/services/{}", self.name)[..]) + .await + } + + /// Returns a stream of logs from a service + pub fn logs( + &self, + opts: &LogsOptions, + ) -> impl Stream> + Unpin + 'a { + let mut path = vec![format!("/services/{}/logs", self.name)]; + if let Some(query) = opts.serialize() { + path.push(query) + } + + let stream = Box::pin(self.docker.stream_get(path.join("?"))); + + Box::pin(tty::decode(stream)) + } +} + fn get_http_connector() -> HttpConnector { let mut http = HttpConnector::new(); http.enforce_http(false); @@ -1122,6 +1229,11 @@ impl Docker { Containers::new(self) } + /// Exports an interface for interacting with docker services + pub fn services(&self) -> Services { + Services::new(self) + } + pub fn networks(&self) -> Networks { Networks::new(self) } @@ -1180,7 +1292,7 @@ impl Docker { endpoint: &str, ) -> Result { self.transport - .request(Method::GET, endpoint, Option::<(Body, Mime)>::None) + .request(Method::GET, endpoint, Payload::None, Headers::None) .await } @@ -1190,7 +1302,7 @@ impl Docker { ) -> Result { let raw_string = self .transport - .request(Method::GET, endpoint, Option::<(Body, Mime)>::None) + .request(Method::GET, endpoint, Payload::None, Headers::None) .await?; Ok(serde_json::from_str::(&raw_string)?) @@ -1201,7 +1313,9 @@ impl Docker { endpoint: &str, body: Option<(Body, Mime)>, ) -> Result { - self.transport.request(Method::POST, endpoint, body).await + self.transport + .request(Method::POST, endpoint, body, Headers::None) + .await } async fn put( @@ -1209,7 +1323,9 @@ impl Docker { endpoint: &str, body: Option<(Body, Mime)>, ) -> Result { - self.transport.request(Method::PUT, endpoint, body).await + self.transport + .request(Method::PUT, endpoint, body, Headers::None) + .await } async fn post_json( @@ -1221,7 +1337,29 @@ impl Docker { T: serde::de::DeserializeOwned, B: Into, { - let string = self.transport.request(Method::POST, endpoint, body).await?; + let string = self + .transport + .request(Method::POST, endpoint, body, Headers::None) + .await?; + + Ok(serde_json::from_str::(&string)?) + } + + async fn post_json_headers<'a, T, B, H>( + &self, + endpoint: impl AsRef, + body: Option<(B, Mime)>, + headers: Option, + ) -> Result + where + T: serde::de::DeserializeOwned, + B: Into, + H: IntoIterator + 'a, + { + let string = self + .transport + .request(Method::POST, endpoint, body, headers) + .await?; Ok(serde_json::from_str::(&string)?) } @@ -1231,7 +1369,7 @@ impl Docker { endpoint: &str, ) -> Result { self.transport - .request(Method::DELETE, endpoint, Option::<(Body, Mime)>::None) + .request(Method::DELETE, endpoint, Payload::None, Headers::None) .await } @@ -1241,7 +1379,7 @@ impl Docker { ) -> Result { let string = self .transport - .request(Method::DELETE, endpoint, Option::<(Body, Mime)>::None) + .request(Method::DELETE, endpoint, Payload::None, Headers::None) .await?; Ok(serde_json::from_str::(&string)?) diff --git a/src/rep.rs b/src/rep.rs index b5619f6..62b248a 100644 --- a/src/rep.rs +++ b/src/rep.rs @@ -563,6 +563,189 @@ pub struct Volume { pub scope: String, } +//################################################################################ +// SERVICES +//################################################################################ + +pub type Services = Vec; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct Service { + #[serde(rename = "ID")] + pub id: String, + pub version: ObjectVersion, + #[cfg(feature = "chrono")] + pub created_at: DateTime, + #[cfg(not(feature = "chrono"))] + pub created_at: String, + #[cfg(feature = "chrono")] + pub updated_at: DateTime, + #[cfg(not(feature = "chrono"))] + pub updated_at: String, + pub endpoint: Endpoint, + pub update_status: Option, + pub service_status: Option, + pub job_status: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ObjectVersion { + pub index: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct Endpoint { + pub spec: EndpointSpec, + pub ports: Option>, + #[serde(rename = "VirtualIPs")] + pub virtual_ips: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct EndpointSpec { + pub mode: Option, + pub ports: Option>, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct EndpointPortConfig { + pub name: Option, + pub protocol: String, + pub publish_mode: String, + pub published_port: Option, + pub target_port: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct UpdateStatus { + pub state: String, + #[cfg(feature = "chrono")] + pub started_at: DateTime, + #[cfg(not(feature = "chrono"))] + pub started_at: String, + #[cfg(feature = "chrono")] + pub completed_at: DateTime, + #[cfg(not(feature = "chrono"))] + pub completed_at: String, + pub message: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ServiceStatus { + pub running_tasks: u64, + pub desired_tasks: u64, + pub completed_tasks: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct JobStatus { + pub job_iteration: ObjectVersion, + #[cfg(feature = "chrono")] + pub last_execution: DateTime, + #[cfg(not(feature = "chrono"))] + pub last_execution: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ServiceDetails { + #[serde(rename = "ID")] + pub id: String, + pub version: ObjectVersion, + #[cfg(feature = "chrono")] + pub created_at: DateTime, + #[cfg(not(feature = "chrono"))] + pub created_at: String, + #[cfg(feature = "chrono")] + pub updated_at: DateTime, + #[cfg(not(feature = "chrono"))] + pub updated_at: String, + pub spec: ServiceSpec, + pub endpoint: Endpoint, + pub update_status: Option, + pub service_status: Option, + pub job_status: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ServiceSpec { + pub name: String, + pub labels: Option, + pub task_template: TaskSpec, + pub mode: Mode, + pub update_config: Option, + pub rollback_config: Option, + pub networks: Option>, + pub endpoint_spec: EndpointSpec, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +// #TODO: Add missing fields... +pub struct TaskSpec {} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct Mode { + pub replicated: Option, + pub global: Option, + pub replicated_job: Option, + pub global_job: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct Replicated { + pub replicas: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ReplicatedJob { + pub max_concurrent: u64, + pub total_completions: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct UpdateConfig { + pub parallelism: u64, + pub delay: u64, + pub failure_action: String, + pub monitor: u64, + pub max_failure_ratio: usize, + pub order: String, +} + +pub type RollbackConfig = UpdateConfig; + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct NetworkAttachmentConfig { + pub target: String, + pub aliases: Vec, + pub driver_opts: Option, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ServiceCreateInfo { + #[serde(rename = "ID")] + pub id: String, + #[serde(rename = "Warning")] + pub warning: Option, +} + +//################################################################################ + #[cfg(feature = "chrono")] fn datetime_from_unix_timestamp<'de, D>(deserializer: D) -> Result, D::Error> where diff --git a/src/transport.rs b/src/transport.rs index b496e87..0fbbcc1 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -30,6 +30,9 @@ pub fn tar() -> Mime { "application/tar".parse().unwrap() } +pub(crate) type Headers = Option>; +pub(crate) type Payload = Option<(Body, Mime)>; + /// Transports are types which define the means of communication /// with the docker daemon #[derive(Clone)] @@ -70,18 +73,18 @@ impl fmt::Debug for Transport { impl Transport { /// Make a request and return the whole response in a `String` - pub async fn request( + pub async fn request<'a, B, H>( &self, method: Method, endpoint: impl AsRef, body: Option<(B, Mime)>, + headers: Option, ) -> Result where B: Into, + H: IntoIterator + 'a, { - let body = self - .get_body(method, endpoint, body, None::>) - .await?; + let body = self.get_body(method, endpoint, body, headers).await?; let bytes = hyper::body::to_bytes(body).await?; let string = String::from_utf8(bytes.to_vec())?; -- cgit v1.2.3