summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-10-22 18:03:47 +0200
committerMatthias Beyer <mail@beyermatthias.de>2020-11-03 15:36:49 +0100
commit9fbda0620e45a3eff03ac44bfab2941ad909d346 (patch)
treeaa3f1b07b2c959169d8d85a4b661b30d087739a1
parent7817211f97e33b2b3201c62aaa0d0fdf6bce04a9 (diff)
Add first setup for EndpointManager
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--Cargo.toml3
-rw-r--r--src/endpoint/configured.rs34
-rw-r--r--src/endpoint/manager.rs153
-rw-r--r--src/endpoint/managerconf.rs31
-rw-r--r--src/endpoint/mod.rs8
-rw-r--r--src/main.rs1
6 files changed, 229 insertions, 1 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 86b8290..5317b8c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,7 +27,8 @@ pom = "3"
futures = "0.3"
handlebars = "3"
filters = "0.4.0"
-indoc = "1"
+indoc = "1"
+typed-builder = "0.7"
url = { version = "2", features = ["serde"] }
tokio = { version = "0.2", features = ["full"] }
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
new file mode 100644
index 0000000..10d424a
--- /dev/null
+++ b/src/endpoint/configured.rs
@@ -0,0 +1,34 @@
+use std::fmt::{Debug, Formatter};
+
+use shiplift::Docker;
+use getset::{Getters, CopyGetters};
+use typed_builder::TypedBuilder;
+
+#[derive(Getters, CopyGetters, TypedBuilder, Clone)]
+pub struct ConfiguredEndpoint {
+ #[getset(get = "pub")]
+ name: String,
+
+ #[getset(get = "pub")]
+ docker: Docker,
+
+ #[getset(get_copy = "pub")]
+ speed: usize,
+
+ #[getset(get_copy = "pub")]
+ num_current_jobs: usize,
+
+ #[getset(get_copy = "pub")]
+ num_max_jobs: usize,
+}
+
+impl Debug for ConfiguredEndpoint {
+ fn fmt(&self, f: &mut Formatter) -> std::result::Result<(), std::fmt::Error> {
+ write!(f,
+ "ConfiguredEndpoint({}, {}/{})",
+ self.name,
+ self.num_current_jobs,
+ self.num_max_jobs)
+ }
+}
+
diff --git a/src/endpoint/manager.rs b/src/endpoint/manager.rs
new file mode 100644
index 0000000..63fbf38
--- /dev/null
+++ b/src/endpoint/manager.rs
@@ -0,0 +1,153 @@
+use std::sync::Arc;
+use std::sync::RwLock;
+use std::str::FromStr;
+
+use anyhow::anyhow;
+use anyhow::Result;
+use anyhow::Error;
+use tokio::sync::mpsc::UnboundedSender;
+
+use crate::util::docker::ImageName;
+use crate::endpoint::configured::ConfiguredEndpoint;
+use crate::endpoint::managerconf::EndpointManagerConfiguration;
+
+/// The EndpointManager manages a _single_ endpoint
+#[derive(Clone, Debug)]
+pub struct EndpointManager {
+ inner: Arc<RwLock<Inner>>,
+}
+
+
+#[derive(Debug)]
+struct Inner {
+ endpoint: ConfiguredEndpoint,
+}
+
+impl EndpointManager {
+ pub(in super) async fn setup(epc: EndpointManagerConfiguration) -> Result<Self> {
+ let ep = EndpointManager::setup_endpoint(epc.endpoint())?;
+
+ let versions_compat = EndpointManager::check_version_compat(epc.required_docker_versions().as_ref(), &ep);
+ let api_versions_compat = EndpointManager::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep);
+ let imgs_avail = EndpointManager::check_images_available(epc.required_images().as_ref(), &ep);
+
+ tokio::try_join!(versions_compat, api_versions_compat, imgs_avail)?;
+
+ Ok(EndpointManager {
+ inner: Arc::new(RwLock::new(Inner {
+ endpoint: ep
+ }))
+ })
+ }
+
+ fn setup_endpoint(ep: &crate::config::Endpoint) -> Result<ConfiguredEndpoint> {
+ match ep.endpoint_type() {
+ crate::config::EndpointType::Http => {
+ shiplift::Uri::from_str(ep.uri())
+ .map(|uri| shiplift::Docker::host(uri))
+ .map_err(Error::from)
+ .map(|docker| {
+ ConfiguredEndpoint::builder()
+ .name(ep.name().clone())
+ .docker(docker)
+ .speed(ep.speed())
+ .num_current_jobs(0)
+ .num_max_jobs(ep.maxjobs())
+ .build()
+ })
+ }
+
+ crate::config::EndpointType::Socket => {
+ Ok({
+ ConfiguredEndpoint::builder()
+ .name(ep.name().clone())
+ .speed(ep.speed())
+ .num_current_jobs(0)
+ .num_max_jobs(ep.maxjobs())
+ .docker(shiplift::Docker::unix(ep.uri()))
+ .build()
+ })
+ }
+ }
+ }
+
+ async fn check_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> {
+ match req {
+ None => Ok(()),
+ Some(v) => {
+ let avail = ep.docker().version().await?;
+
+ if !v.contains(&avail.version) {
+ Err(anyhow!("Incompatible docker version on endpoint {}: {}",
+ ep.name(), avail.version))
+ } else {
+ Ok(())
+ }
+ }
+ }
+ }
+
+ async fn check_api_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> {
+ match req {
+ None => Ok(()),
+ Some(v) => {
+ let avail = ep.docker().version().await?;
+
+ if !v.contains(&avail.api_version) {
+ Err(anyhow!("Incompatible docker API version on endpoint {}: {}",
+ ep.name(), avail.api_version))
+ } else {
+ Ok(())
+ }
+ }
+ }
+ }
+
+ async fn check_images_available(imgs: &Vec<ImageName>, ep: &ConfiguredEndpoint) -> Result<()> {
+ use shiplift::ImageListOptions;
+
+ ep.docker()
+ .images()
+ .list(&ImageListOptions::builder().all().build())
+ .await?
+ .into_iter()
+ .map(|image_rep| {
+ if let Some(tags) = image_rep.repo_tags {
+ tags.into_iter().map(|name| {
+ let tag = ImageName::from(name.clone());
+
+ if imgs.iter().any(|img| *img == tag) && !imgs.is_empty() {
+ return Err(anyhow!("Image {} missing in endpoint {}", name, ep.name()))
+ }
+
+ Ok(())
+ })
+ .collect::<Result<()>>()?;
+ }
+ // If no tags, ignore
+
+ Ok(())
+ })
+ .collect::<Result<()>>()
+ }
+
+ /// Get the name of the endpoint
+ pub fn endpoint_name(&self) -> Result<String> {
+ self.inner.read().map(|rw| rw.endpoint.name().clone()).map_err(|_| lockpoisoned())
+ }
+
+ pub async fn run_job(self, job: RunnableJob, logsink: UnboundedSender<LogItem>) -> Result<()> {
+ unimplemented!()
+ }
+
+}
+
+type LogItem = (); // TODO Replace with actual implementation
+type RunnableJob = (); // TODO Replace with actual implementation
+
+
+/// Helper fn for std::sync::PoisonError wrapping
+/// because std::sync::PoisonError is not Send for <Inner>
+fn lockpoisoned() -> Error {
+ anyhow!("Lock poisoned")
+}
diff --git a/src/endpoint/managerconf.rs b/src/endpoint/managerconf.rs
new file mode 100644
index 0000000..aa9ef58
--- /dev/null
+++ b/src/endpoint/managerconf.rs
@@ -0,0 +1,31 @@
+use getset::Getters;
+use typed_builder::TypedBuilder;
+use anyhow::Result;
+
+use crate::util::docker::ImageName;
+use crate::endpoint::EndpointManager;
+
+#[derive(Getters, TypedBuilder)]
+pub struct EndpointManagerConfiguration {
+ #[getset(get = "pub")]
+ endpoint: crate::config::Endpoint,
+
+ #[getset(get = "pub")]
+ #[builder(default)]
+ required_images: Vec<ImageName>,
+
+ #[getset(get = "pub")]
+ #[builder(default)]
+ required_docker_versions: Option<Vec<String>>,
+
+ #[getset(get = "pub")]
+ #[builder(default)]
+ required_docker_api_versions: Option<Vec<String>>,
+}
+
+impl EndpointManagerConfiguration {
+ pub async fn connect(self) -> Result<EndpointManager> {
+ EndpointManager::setup(self).await
+ }
+}
+
diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs
new file mode 100644
index 0000000..167817d
--- /dev/null
+++ b/src/endpoint/mod.rs
@@ -0,0 +1,8 @@
+mod manager;
+pub use manager::*;
+
+mod managerconf;
+pub use managerconf::*;
+
+mod configured;
+
diff --git a/src/main.rs b/src/main.rs
index 489e4c8..82cad0b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -15,6 +15,7 @@ use clap_v3::ArgMatches;
mod cli;
mod job;
+mod endpoint;
mod util;
mod log;
mod package;