summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-05 20:15:24 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-05 20:59:41 +0100
commit6a483a052bf39ea6834cec96acc6771e63d2bc94 (patch)
treed687a9f41086304391b21b75fdbf83e0b91ae9dc
parent913fc8a8c59204b262b5692e8e6dc468363a2c5e (diff)
Remove EndpointManager
This patch removes the `EndpointManager` type, which did nothing except wrap the `ConfiguredEndpoint` and make things way more complicated than they need to be. Now, the `EndpointScheduler` holds the `ConfiguredEndpoint`s directly in a `Arc<RwLock<_>>` and manages them without the additional layer of complexity. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--src/endpoint/configured.rs195
-rw-r--r--src/endpoint/manager.rs237
-rw-r--r--src/endpoint/managerconf.rs10
-rw-r--r--src/endpoint/mod.rs4
-rw-r--r--src/endpoint/scheduler.rs58
-rw-r--r--src/orchestrator/orchestrator.rs4
6 files changed, 216 insertions, 292 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 10d424a..4528ac9 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -1,10 +1,28 @@
use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+use std::sync::RwLock;
+use std::str::FromStr;
+use std::path::PathBuf;
-use shiplift::Docker;
+use anyhow::Error;
+use anyhow::Result;
+use anyhow::anyhow;
use getset::{Getters, CopyGetters};
+use shiplift::ContainerOptions;
+use shiplift::Docker;
+use shiplift::ExecContainerOptions;
+use tokio::sync::mpsc::UnboundedReceiver;
+use tokio::sync::mpsc::UnboundedSender;
use typed_builder::TypedBuilder;
-#[derive(Getters, CopyGetters, TypedBuilder, Clone)]
+use crate::util::docker::ImageName;
+use crate::endpoint::EndpointConfiguration;
+use crate::job::RunnableJob;
+use crate::job::JobResource;
+use crate::log::LogItem;
+use crate::filestore::StagingStore;
+
+#[derive(Getters, CopyGetters, TypedBuilder)]
pub struct ConfiguredEndpoint {
#[getset(get = "pub")]
name: String,
@@ -32,3 +50,176 @@ impl Debug for ConfiguredEndpoint {
}
}
+impl ConfiguredEndpoint {
+ pub(in super) async fn setup(epc: EndpointConfiguration) -> Result<Self> {
+ let ep = ConfiguredEndpoint::setup_endpoint(epc.endpoint())?;
+
+ let versions_compat = ConfiguredEndpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep);
+ let api_versions_compat = ConfiguredEndpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep);
+ let imgs_avail = ConfiguredEndpoint::check_images_available(epc.required_images().as_ref(), &ep);
+
+ let (versions_compat, api_versions_compat, imgs_avail) =
+ tokio::join!(versions_compat, api_versions_compat, imgs_avail);
+
+ let _ = versions_compat?;
+ let _ = api_versions_compat?;
+ let _ = imgs_avail?;
+
+ Ok(ep)
+ }
+
+ fn setup_endpoint(ep: &crate::config::Endpoint) -> Result<ConfiguredEndpoint> {
+ match ep.endpoint_type() {
+ crate::config::EndpointType::Http => {
+ shiplift::Uri::from_str(ep.uri())
+ .map(|uri| shiplift::Docker::host(uri))
+ .map_err(Error::from)
+ .map(|docker| {
+ ConfiguredEndpoint::builder()
+ .name(ep.name().clone())
+ .docker(docker)
+ .speed(ep.speed())
+ .num_current_jobs(0)
+ .num_max_jobs(ep.maxjobs())
+ .build()
+ })
+ }
+
+ crate::config::EndpointType::Socket => {
+ Ok({
+ ConfiguredEndpoint::builder()
+ .name(ep.name().clone())
+ .speed(ep.speed())
+ .num_current_jobs(0)
+ .num_max_jobs(ep.maxjobs())
+ .docker(shiplift::Docker::unix(ep.uri()))
+ .build()
+ })
+ }
+ }
+ }
+
+ async fn check_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> {
+ match req {
+ None => Ok(()),
+ Some(v) => {
+ let avail = ep.docker().version().await?;
+
+ if !v.contains(&avail.version) {
+ Err(anyhow!("Incompatible docker version on endpoint {}: {}",
+ ep.name(), avail.version))
+ } else {
+ Ok(())
+ }
+ }
+ }
+ }
+
+ async fn check_api_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> {
+ match req {
+ None => Ok(()),
+ Some(v) => {
+ let avail = ep.docker().version().await?;
+
+ if !v.contains(&avail.api_version) {
+ Err(anyhow!("Incompatible docker API version on endpoint {}: {}",
+ ep.name(), avail.api_version))
+ } else {
+ Ok(())
+ }
+ }
+ }
+ }
+
+ async fn check_images_available(imgs: &Vec<ImageName>, ep: &ConfiguredEndpoint) -> Result<()> {
+ use shiplift::ImageListOptions;
+
+ ep.docker()
+ .images()
+ .list(&ImageListOptions::builder().all().build())
+ .await?
+ .into_iter()
+ .map(|image_rep| {
+ if let Some(tags) = image_rep.repo_tags {
+ tags.into_iter().map(|name| {
+ let tag = ImageName::from(name.clone());
+
+ if imgs.iter().any(|img| *img == tag) && !imgs.is_empty() {
+ return Err(anyhow!("Image {} missing in endpoint {}", name, ep.name()))
+ }
+
+ Ok(())
+ })
+ .collect::<Result<()>>()?;
+ }
+ // If no tags, ignore
+
+ Ok(())
+ })
+ .collect::<Result<()>>()
+ }
+
+ pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<Vec<PathBuf>> {
+ use crate::log::buffer_stream_to_line_stream;
+ use tokio::stream::StreamExt;
+
+ let (container_id, _warnings) = {
+ let envs: Vec<String> = job.resources()
+ .iter()
+ .filter_map(|r| match r {
+ JobResource::Environment(k, v) => Some(format!("{}={}", k, v)),
+ JobResource::Artifact(_) => None,
+ })
+ .collect();
+
+ let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref())
+ .env(envs.iter().map(AsRef::as_ref).collect())
+ .build();
+
+ let create_info = self.docker
+ .containers()
+ .create(&builder_opts)
+ .await?;
+
+ if create_info.warnings.is_some() {
+ // TODO: Handle warnings
+ }
+
+ (create_info.id, create_info.warnings)
+ };
+
+ let script = job.script().as_ref().as_bytes();
+ let script_path = PathBuf::from("/script");
+ let exec_opts = ExecContainerOptions::builder()
+ .cmd(vec!["/script"])
+ .build();
+
+ let container = self.docker.containers().get(&container_id);
+ container.copy_file_into(script_path, script).await?;
+ let stream = container.exec(&exec_opts);
+ let _ = buffer_stream_to_line_stream(stream)
+ .map(|line| {
+ line.map_err(Error::from)
+ .and_then(|l| {
+ crate::log::parser()
+ .parse(l.as_bytes())
+ .map_err(Error::from)
+ .and_then(|item| logsink.send(item).map_err(Error::from))
+ })
+ })
+ .collect::<Result<Vec<_>>>()
+ .await?;
+
+ let tar_stream = container.copy_from(&PathBuf::from("/outputs/"))
+ .map(|item| item.map_err(Error::from));
+
+ staging
+ .write()
+ .map_err(|_| anyhow!("Lock poisoned"))?
+ .write_files_from_tar_stream(tar_stream)
+ .await
+ }
+
+
+}
+
diff --git a/src/endpoint/manager.rs b/src/endpoint/manager.rs
deleted file mode 100644
index 5e891b7..0000000
--- a/src/endpoint/manager.rs
+++ /dev/null
@@ -1,237 +0,0 @@
-use std::sync::Arc;
-use std::sync::RwLock;
-use std::str::FromStr;
-use std::path::PathBuf;
-
-use anyhow::anyhow;
-use anyhow::Result;
-use anyhow::Error;
-use tokio::sync::mpsc::UnboundedSender;
-use tokio::sync::mpsc::UnboundedReceiver;
-use shiplift::ExecContainerOptions;
-use shiplift::ContainerOptions;
-
-use crate::util::docker::ImageName;
-use crate::endpoint::configured::ConfiguredEndpoint;
-use crate::endpoint::managerconf::EndpointManagerConfiguration;
-use crate::job::RunnableJob;
-use crate::job::JobResource;
-use crate::log::LogItem;
-use crate::filestore::StagingStore;
-
-/// The EndpointManager manages a _single_ endpoint
-#[derive(Clone, Debug)]
-pub struct EndpointManager {
- inner: Arc<RwLock<Inner>>,
-}
-
-
-#[derive(Debug)]
-struct Inner {
- endpoint: ConfiguredEndpoint,
-}
-
-impl EndpointManager {
- pub(in super) async fn setup(epc: EndpointManagerConfiguration) -> Result<Self> {
- let ep = EndpointManager::setup_endpoint(epc.endpoint())?;
-
- let versions_compat = EndpointManager::check_version_compat(epc.required_docker_versions().as_ref(), &ep);
- let api_versions_compat = EndpointManager::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep);
- let imgs_avail = EndpointManager::check_images_available(epc.required_images().as_ref(), &ep);
-
- tokio::try_join!(versions_compat, api_versions_compat, imgs_avail)?;
-
- Ok(EndpointManager {
- inner: Arc::new(RwLock::new(Inner {
- endpoint: ep
- }))
- })
- }
-
- fn setup_endpoint(ep: &crate::config::Endpoint) -> Result<ConfiguredEndpoint> {
- match ep.endpoint_type() {
- crate::config::EndpointType::Http => {
- shiplift::Uri::from_str(ep.uri())
- .map(|uri| shiplift::Docker::host(uri))
- .map_err(Error::from)
- .map(|docker| {
- ConfiguredEndpoint::builder()
- .name(ep.name().clone())
- .docker(docker)
- .speed(ep.speed())
- .num_current_jobs(0)
- .num_max_jobs(ep.maxjobs())
- .build()
- })
- }
-
- crate::config::EndpointType::Socket => {
- Ok({
- ConfiguredEndpoint::builder()
- .name(ep.name().clone())
- .speed(ep.speed())
- .num_current_jobs(0)
- .num_max_jobs(ep.maxjobs())
- .docker(shiplift::Docker::unix(ep.uri()))
- .build()
- })
- }
- }
- }
-
- async fn check_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> {
- match req {
- None => Ok(()),
- Some(v) => {
- let avail = ep.docker().version().await?;
-
- if !v.contains(&avail.version) {
- Err(anyhow!("Incompatible docker version on endpoint {}: {}",
- ep.name(), avail.version))
- } else {
- Ok(())
- }
- }
- }
- }
-
- async fn check_api_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> {
- match req {
- None => Ok(()),
- Some(v) => {
- let avail = ep.docker().version().await?;
-
- if !v.contains(&avail.api_version) {
- Err(anyhow!("Incompatible docker API version on endpoint {}: {}",
- ep.name(), avail.api_version))
- } else {
- Ok(())
- }
- }
- }
- }
-
- async fn check_images_available(imgs: &Vec<ImageName>, ep: &ConfiguredEndpoint) -> Result<()> {
- use shiplift::ImageListOptions;
-
- ep.docker()
- .images()
- .list(&ImageListOptions::builder().all().build())
- .await?
- .into_iter()
- .map(|image_rep| {
- if let Some(tags) = image_rep.repo_tags {
- tags.into_iter().map(|name| {
- let tag = ImageName::from(name.clone());
-
- if imgs.iter().any(|img| *img == tag) && !imgs.is_empty() {
- return Err(anyhow!("Image {} missing in endpoint {}", name, ep.name()))
- }
-
- Ok(())
- })
- .collect::<Result<()>>()?;
- }
- // If no tags, ignore
-
- Ok(())
- })
- .collect::<Result<()>>()
- }
-
- /// Get the name of the endpoint
- pub fn endpoint_name(&self) -> Result<String> {
- self.inner.read().map(|rw| rw.endpoint.name().clone()).map_err(|_| lockpoisoned())
- }
-
- pub fn get_num_current_jobs(&self) -> Result<usize> {
- self.inner
- .read()
- .map(|lock| lock.endpoint.num_current_jobs())
- .map_err(|_| anyhow!("Lock poisoned"))
- }
-
- pub fn get_num_max_jobs(&self) -> Result<usize> {
- self.inner
- .read()
- .map(|lock| lock.endpoint.num_max_jobs())
- .map_err(|_| anyhow!("Lock poisoned"))
- }
-
- pub async fn run_job(self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<Vec<PathBuf>> {
- use crate::log::buffer_stream_to_line_stream;
- use tokio::stream::StreamExt;
-
- let lock = self.inner
- // Taking write lock, because we alter interior here, which shouldn't happen in
- // parallel eventhough technically possible.
- .write()
- .map_err(|_| anyhow!("Lock poisoned"))?;
-
- let (container_id, _warnings) = {
- let envs: Vec<String> = job.resources()
- .iter()
- .filter_map(|r| match r {
- JobResource::Environment(k, v) => Some(format!("{}={}", k, v)),
- JobResource::Artifact(_) => None,
- })
- .collect();
-
- let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref())
- .env(envs.iter().map(AsRef::as_ref).collect())
- .build();
-
- let create_info = lock.endpoint
- .docker()
- .containers()
- .create(&builder_opts)
- .await?;
-
- if create_info.warnings.is_some() {
- // TODO: Handle warnings
- }
-
- (create_info.id, create_info.warnings)
- };
-
- let script = job.script().as_ref().as_bytes();
- let script_path = PathBuf::from("/script");
- let exec_opts = ExecContainerOptions::builder()
- .cmd(vec!["/script"])
- .build();
-
- let container = lock.endpoint.docker().containers().get(&container_id);
- container.copy_file_into(script_path, script).await?;
-
- let stream = container.exec(&exec_opts);
- let _ = buffer_stream_to_line_stream(stream)
- .map(|line| {
- line.map_err(Error::from)
- .and_then(|l| {
- crate::log::parser()
- .parse(l.as_bytes())
- .map_err(Error::from)
- .and_then(|item| logsink.send(item).map_err(Error::from))
- })
- })
- .collect::<Result<Vec<_>>>()
- .await?;
-
- let tar_stream = container.copy_from(&PathBuf::from("/outputs/"))
- .map(|item| item.map_err(Error::from));
-
- staging
- .write()
- .map_err(|_| lockpoisoned())?
- .write_files_from_tar_stream(tar_stream)
- .await
- }
-
-}
-
-
-/// Helper fn for std::sync::PoisonError wrapping
-/// because std::sync::PoisonError is not Send for <Inner>
-fn lockpoisoned() -> Error {
- anyhow!("Lock poisoned")
-}
diff --git a/src/endpoint/managerconf.rs b/src/endpoint/managerconf.rs
index aa9ef58..c7dd70a 100644
--- a/src/endpoint/managerconf.rs
+++ b/src/endpoint/managerconf.rs
@@ -3,10 +3,10 @@ use typed_builder::TypedBuilder;
use anyhow::Result;
use crate::util::docker::ImageName;
-use crate::endpoint::EndpointManager;
+use crate::endpoint::ConfiguredEndpoint;
#[derive(Getters, TypedBuilder)]
-pub struct EndpointManagerConfiguration {
+pub struct EndpointConfiguration {
#[getset(get = "pub")]
endpoint: crate::config::Endpoint,
@@ -23,9 +23,9 @@ pub struct EndpointManagerConfiguration {
required_docker_api_versions: Option<Vec<String>>,
}
-impl EndpointManagerConfiguration {
- pub async fn connect(self) -> Result<EndpointManager> {
- EndpointManager::setup(self).await
+impl EndpointConfiguration {
+ pub async fn connect(self) -> Result<ConfiguredEndpoint> {
+ ConfiguredEndpoint::setup(self).await
}
}
diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs
index 4a57f67..d75fc7e 100644
--- a/src/endpoint/mod.rs
+++ b/src/endpoint/mod.rs
@@ -1,6 +1,3 @@
-mod manager;
-pub use manager::*;
-
mod managerconf;
pub use managerconf::*;
@@ -8,4 +5,5 @@ mod scheduler;
pub use scheduler::*;
mod configured;
+pub use configured::*;
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index c714818..266e664 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -6,22 +6,22 @@ use anyhow::Result;
use tokio::stream::StreamExt;
use tokio::sync::mpsc::UnboundedSender;
-use crate::endpoint::EndpointManager;
-use crate::endpoint::EndpointManagerConfiguration;
+use crate::endpoint::ConfiguredEndpoint;
+use crate::endpoint::EndpointConfiguration;
use crate::job::JobSet;
use crate::job::RunnableJob;
use crate::log::LogItem;
use crate::filestore::StagingStore;
pub struct EndpointScheduler {
- endpoints: Vec<EndpointManager>,
+ endpoints: Vec<Arc<RwLock<ConfiguredEndpoint>>>,
staging_store: Arc<RwLock<StagingStore>>,
}
impl EndpointScheduler {
- pub async fn setup(endpoints: Vec<EndpointManagerConfiguration>, staging_store: Arc<RwLock<StagingStore>>) -> Result<Self> {
+ pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>) -> Result<Self> {
let endpoints = Self::setup_endpoints(endpoints).await?;
Ok(EndpointScheduler {
@@ -30,51 +30,23 @@ impl EndpointScheduler {
})
}
- async fn setup_endpoints(endpoints: Vec<EndpointManagerConfiguration>) -> Result<Vec<EndpointManager>> {
+ async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<RwLock<ConfiguredEndpoint>>>> {
+ use futures::FutureExt;
+
let unordered = futures::stream::FuturesUnordered::new();
for cfg in endpoints.into_iter() {
- unordered.push(EndpointManager::setup(cfg));
+ unordered.push({
+ ConfiguredEndpoint::setup(cfg)
+ .map(|r_ep| {
+ r_ep.map(RwLock::new)
+ .map(Arc::new)
+ })
+ });
}
unordered.collect().await
}
- /// Run a jobset on all endpoints
- ///
- /// TODO: This is a naive implementation that simple pushes the complete jobset to the
- /// available endpoints.
- ///
- /// It does not yet schedule like it is supposed to do.
- pub async fn run_jobset(&self, js: Vec<(RunnableJob, UnboundedSender<LogItem>)>) -> Result<Vec<PathBuf>> {
- let unordered = futures::stream::FuturesUnordered::new();
- let mut i: usize = 0;
- let mut iter = js.into_iter();
-
- while let Some(next_job) = iter.next() {
- match self.endpoints.get(i) {
- None => {
- i = 0;
- },
-
- Some(ep) => {
- let ep = ep.clone();
- unordered.push(async {
- ep.run_job(next_job.0, next_job.1, self.staging_store.clone()).await
- });
- }
- }
-
- i += 1;
- }
-
- let res = unordered.collect::<Result<Vec<_>>>()
- .await?
- .into_iter()
- .flatten() // We get a Vec<Vec<PathBuf>> here, but we only care about all pathes in one Vec<_>
- .collect();
-
- Ok(res)
- }
-
}
+
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index b1e5fa4..7bf5376 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -6,7 +6,7 @@ use anyhow::Result;
use typed_builder::TypedBuilder;
use diesel::PgConnection;
-use crate::endpoint::EndpointManagerConfiguration;
+use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
use crate::job::JobSet;
use crate::job::RunnableJob;
@@ -27,7 +27,7 @@ pub struct Orchestrator {
#[derive(TypedBuilder)]
pub struct OrchestratorSetup {
- ep_cfg: Vec<EndpointManagerConfiguration>,
+ ep_cfg: Vec<EndpointConfiguration>,
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
jobsets: Vec<JobSet>,