diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-05 20:15:24 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-05 20:59:41 +0100 |
commit | 6a483a052bf39ea6834cec96acc6771e63d2bc94 (patch) | |
tree | d687a9f41086304391b21b75fdbf83e0b91ae9dc | |
parent | 913fc8a8c59204b262b5692e8e6dc468363a2c5e (diff) |
Remove EndpointManager
This patch removes the `EndpointManager` type,
which did nothing except wrap the `ConfiguredEndpoint` and make things way
more complicated than they need to be.
Now, the `EndpointScheduler` holds the `ConfiguredEndpoint`s directly in
a `Arc<RwLock<_>>` and manages them without the additional layer of
complexity.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | src/endpoint/configured.rs | 195 | ||||
-rw-r--r-- | src/endpoint/manager.rs | 237 | ||||
-rw-r--r-- | src/endpoint/managerconf.rs | 10 | ||||
-rw-r--r-- | src/endpoint/mod.rs | 4 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 58 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 4 |
6 files changed, 216 insertions, 292 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 10d424a..4528ac9 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -1,10 +1,28 @@ use std::fmt::{Debug, Formatter}; +use std::sync::Arc; +use std::sync::RwLock; +use std::str::FromStr; +use std::path::PathBuf; -use shiplift::Docker; +use anyhow::Error; +use anyhow::Result; +use anyhow::anyhow; use getset::{Getters, CopyGetters}; +use shiplift::ContainerOptions; +use shiplift::Docker; +use shiplift::ExecContainerOptions; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::UnboundedSender; use typed_builder::TypedBuilder; -#[derive(Getters, CopyGetters, TypedBuilder, Clone)] +use crate::util::docker::ImageName; +use crate::endpoint::EndpointConfiguration; +use crate::job::RunnableJob; +use crate::job::JobResource; +use crate::log::LogItem; +use crate::filestore::StagingStore; + +#[derive(Getters, CopyGetters, TypedBuilder)] pub struct ConfiguredEndpoint { #[getset(get = "pub")] name: String, @@ -32,3 +50,176 @@ impl Debug for ConfiguredEndpoint { } } +impl ConfiguredEndpoint { + pub(in super) async fn setup(epc: EndpointConfiguration) -> Result<Self> { + let ep = ConfiguredEndpoint::setup_endpoint(epc.endpoint())?; + + let versions_compat = ConfiguredEndpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep); + let api_versions_compat = ConfiguredEndpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep); + let imgs_avail = ConfiguredEndpoint::check_images_available(epc.required_images().as_ref(), &ep); + + let (versions_compat, api_versions_compat, imgs_avail) = + tokio::join!(versions_compat, api_versions_compat, imgs_avail); + + let _ = versions_compat?; + let _ = api_versions_compat?; + let _ = imgs_avail?; + + Ok(ep) + } + + fn setup_endpoint(ep: &crate::config::Endpoint) -> Result<ConfiguredEndpoint> { + match ep.endpoint_type() { + crate::config::EndpointType::Http => { + shiplift::Uri::from_str(ep.uri()) + .map(|uri| shiplift::Docker::host(uri)) + .map_err(Error::from) + .map(|docker| { + ConfiguredEndpoint::builder() + .name(ep.name().clone()) + .docker(docker) + .speed(ep.speed()) + .num_current_jobs(0) + .num_max_jobs(ep.maxjobs()) + .build() + }) + } + + crate::config::EndpointType::Socket => { + Ok({ + ConfiguredEndpoint::builder() + .name(ep.name().clone()) + .speed(ep.speed()) + .num_current_jobs(0) + .num_max_jobs(ep.maxjobs()) + .docker(shiplift::Docker::unix(ep.uri())) + .build() + }) + } + } + } + + async fn check_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> { + match req { + None => Ok(()), + Some(v) => { + let avail = ep.docker().version().await?; + + if !v.contains(&avail.version) { + Err(anyhow!("Incompatible docker version on endpoint {}: {}", + ep.name(), avail.version)) + } else { + Ok(()) + } + } + } + } + + async fn check_api_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> { + match req { + None => Ok(()), + Some(v) => { + let avail = ep.docker().version().await?; + + if !v.contains(&avail.api_version) { + Err(anyhow!("Incompatible docker API version on endpoint {}: {}", + ep.name(), avail.api_version)) + } else { + Ok(()) + } + } + } + } + + async fn check_images_available(imgs: &Vec<ImageName>, ep: &ConfiguredEndpoint) -> Result<()> { + use shiplift::ImageListOptions; + + ep.docker() + .images() + .list(&ImageListOptions::builder().all().build()) + .await? + .into_iter() + .map(|image_rep| { + if let Some(tags) = image_rep.repo_tags { + tags.into_iter().map(|name| { + let tag = ImageName::from(name.clone()); + + if imgs.iter().any(|img| *img == tag) && !imgs.is_empty() { + return Err(anyhow!("Image {} missing in endpoint {}", name, ep.name())) + } + + Ok(()) + }) + .collect::<Result<()>>()?; + } + // If no tags, ignore + + Ok(()) + }) + .collect::<Result<()>>() + } + + pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<Vec<PathBuf>> { + use crate::log::buffer_stream_to_line_stream; + use tokio::stream::StreamExt; + + let (container_id, _warnings) = { + let envs: Vec<String> = job.resources() + .iter() + .filter_map(|r| match r { + JobResource::Environment(k, v) => Some(format!("{}={}", k, v)), + JobResource::Artifact(_) => None, + }) + .collect(); + + let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref()) + .env(envs.iter().map(AsRef::as_ref).collect()) + .build(); + + let create_info = self.docker + .containers() + .create(&builder_opts) + .await?; + + if create_info.warnings.is_some() { + // TODO: Handle warnings + } + + (create_info.id, create_info.warnings) + }; + + let script = job.script().as_ref().as_bytes(); + let script_path = PathBuf::from("/script"); + let exec_opts = ExecContainerOptions::builder() + .cmd(vec!["/script"]) + .build(); + + let container = self.docker.containers().get(&container_id); + container.copy_file_into(script_path, script).await?; + let stream = container.exec(&exec_opts); + let _ = buffer_stream_to_line_stream(stream) + .map(|line| { + line.map_err(Error::from) + .and_then(|l| { + crate::log::parser() + .parse(l.as_bytes()) + .map_err(Error::from) + .and_then(|item| logsink.send(item).map_err(Error::from)) + }) + }) + .collect::<Result<Vec<_>>>() + .await?; + + let tar_stream = container.copy_from(&PathBuf::from("/outputs/")) + .map(|item| item.map_err(Error::from)); + + staging + .write() + .map_err(|_| anyhow!("Lock poisoned"))? + .write_files_from_tar_stream(tar_stream) + .await + } + + +} + diff --git a/src/endpoint/manager.rs b/src/endpoint/manager.rs deleted file mode 100644 index 5e891b7..0000000 --- a/src/endpoint/manager.rs +++ /dev/null @@ -1,237 +0,0 @@ -use std::sync::Arc; -use std::sync::RwLock; -use std::str::FromStr; -use std::path::PathBuf; - -use anyhow::anyhow; -use anyhow::Result; -use anyhow::Error; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::mpsc::UnboundedReceiver; -use shiplift::ExecContainerOptions; -use shiplift::ContainerOptions; - -use crate::util::docker::ImageName; -use crate::endpoint::configured::ConfiguredEndpoint; -use crate::endpoint::managerconf::EndpointManagerConfiguration; -use crate::job::RunnableJob; -use crate::job::JobResource; -use crate::log::LogItem; -use crate::filestore::StagingStore; - -/// The EndpointManager manages a _single_ endpoint -#[derive(Clone, Debug)] -pub struct EndpointManager { - inner: Arc<RwLock<Inner>>, -} - - -#[derive(Debug)] -struct Inner { - endpoint: ConfiguredEndpoint, -} - -impl EndpointManager { - pub(in super) async fn setup(epc: EndpointManagerConfiguration) -> Result<Self> { - let ep = EndpointManager::setup_endpoint(epc.endpoint())?; - - let versions_compat = EndpointManager::check_version_compat(epc.required_docker_versions().as_ref(), &ep); - let api_versions_compat = EndpointManager::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep); - let imgs_avail = EndpointManager::check_images_available(epc.required_images().as_ref(), &ep); - - tokio::try_join!(versions_compat, api_versions_compat, imgs_avail)?; - - Ok(EndpointManager { - inner: Arc::new(RwLock::new(Inner { - endpoint: ep - })) - }) - } - - fn setup_endpoint(ep: &crate::config::Endpoint) -> Result<ConfiguredEndpoint> { - match ep.endpoint_type() { - crate::config::EndpointType::Http => { - shiplift::Uri::from_str(ep.uri()) - .map(|uri| shiplift::Docker::host(uri)) - .map_err(Error::from) - .map(|docker| { - ConfiguredEndpoint::builder() - .name(ep.name().clone()) - .docker(docker) - .speed(ep.speed()) - .num_current_jobs(0) - .num_max_jobs(ep.maxjobs()) - .build() - }) - } - - crate::config::EndpointType::Socket => { - Ok({ - ConfiguredEndpoint::builder() - .name(ep.name().clone()) - .speed(ep.speed()) - .num_current_jobs(0) - .num_max_jobs(ep.maxjobs()) - .docker(shiplift::Docker::unix(ep.uri())) - .build() - }) - } - } - } - - async fn check_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> { - match req { - None => Ok(()), - Some(v) => { - let avail = ep.docker().version().await?; - - if !v.contains(&avail.version) { - Err(anyhow!("Incompatible docker version on endpoint {}: {}", - ep.name(), avail.version)) - } else { - Ok(()) - } - } - } - } - - async fn check_api_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> { - match req { - None => Ok(()), - Some(v) => { - let avail = ep.docker().version().await?; - - if !v.contains(&avail.api_version) { - Err(anyhow!("Incompatible docker API version on endpoint {}: {}", - ep.name(), avail.api_version)) - } else { - Ok(()) - } - } - } - } - - async fn check_images_available(imgs: &Vec<ImageName>, ep: &ConfiguredEndpoint) -> Result<()> { - use shiplift::ImageListOptions; - - ep.docker() - .images() - .list(&ImageListOptions::builder().all().build()) - .await? - .into_iter() - .map(|image_rep| { - if let Some(tags) = image_rep.repo_tags { - tags.into_iter().map(|name| { - let tag = ImageName::from(name.clone()); - - if imgs.iter().any(|img| *img == tag) && !imgs.is_empty() { - return Err(anyhow!("Image {} missing in endpoint {}", name, ep.name())) - } - - Ok(()) - }) - .collect::<Result<()>>()?; - } - // If no tags, ignore - - Ok(()) - }) - .collect::<Result<()>>() - } - - /// Get the name of the endpoint - pub fn endpoint_name(&self) -> Result<String> { - self.inner.read().map(|rw| rw.endpoint.name().clone()).map_err(|_| lockpoisoned()) - } - - pub fn get_num_current_jobs(&self) -> Result<usize> { - self.inner - .read() - .map(|lock| lock.endpoint.num_current_jobs()) - .map_err(|_| anyhow!("Lock poisoned")) - } - - pub fn get_num_max_jobs(&self) -> Result<usize> { - self.inner - .read() - .map(|lock| lock.endpoint.num_max_jobs()) - .map_err(|_| anyhow!("Lock poisoned")) - } - - pub async fn run_job(self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<Vec<PathBuf>> { - use crate::log::buffer_stream_to_line_stream; - use tokio::stream::StreamExt; - - let lock = self.inner - // Taking write lock, because we alter interior here, which shouldn't happen in - // parallel eventhough technically possible. - .write() - .map_err(|_| anyhow!("Lock poisoned"))?; - - let (container_id, _warnings) = { - let envs: Vec<String> = job.resources() - .iter() - .filter_map(|r| match r { - JobResource::Environment(k, v) => Some(format!("{}={}", k, v)), - JobResource::Artifact(_) => None, - }) - .collect(); - - let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref()) - .env(envs.iter().map(AsRef::as_ref).collect()) - .build(); - - let create_info = lock.endpoint - .docker() - .containers() - .create(&builder_opts) - .await?; - - if create_info.warnings.is_some() { - // TODO: Handle warnings - } - - (create_info.id, create_info.warnings) - }; - - let script = job.script().as_ref().as_bytes(); - let script_path = PathBuf::from("/script"); - let exec_opts = ExecContainerOptions::builder() - .cmd(vec!["/script"]) - .build(); - - let container = lock.endpoint.docker().containers().get(&container_id); - container.copy_file_into(script_path, script).await?; - - let stream = container.exec(&exec_opts); - let _ = buffer_stream_to_line_stream(stream) - .map(|line| { - line.map_err(Error::from) - .and_then(|l| { - crate::log::parser() - .parse(l.as_bytes()) - .map_err(Error::from) - .and_then(|item| logsink.send(item).map_err(Error::from)) - }) - }) - .collect::<Result<Vec<_>>>() - .await?; - - let tar_stream = container.copy_from(&PathBuf::from("/outputs/")) - .map(|item| item.map_err(Error::from)); - - staging - .write() - .map_err(|_| lockpoisoned())? - .write_files_from_tar_stream(tar_stream) - .await - } - -} - - -/// Helper fn for std::sync::PoisonError wrapping -/// because std::sync::PoisonError is not Send for <Inner> -fn lockpoisoned() -> Error { - anyhow!("Lock poisoned") -} diff --git a/src/endpoint/managerconf.rs b/src/endpoint/managerconf.rs index aa9ef58..c7dd70a 100644 --- a/src/endpoint/managerconf.rs +++ b/src/endpoint/managerconf.rs @@ -3,10 +3,10 @@ use typed_builder::TypedBuilder; use anyhow::Result; use crate::util::docker::ImageName; -use crate::endpoint::EndpointManager; +use crate::endpoint::ConfiguredEndpoint; #[derive(Getters, TypedBuilder)] -pub struct EndpointManagerConfiguration { +pub struct EndpointConfiguration { #[getset(get = "pub")] endpoint: crate::config::Endpoint, @@ -23,9 +23,9 @@ pub struct EndpointManagerConfiguration { required_docker_api_versions: Option<Vec<String>>, } -impl EndpointManagerConfiguration { - pub async fn connect(self) -> Result<EndpointManager> { - EndpointManager::setup(self).await +impl EndpointConfiguration { + pub async fn connect(self) -> Result<ConfiguredEndpoint> { + ConfiguredEndpoint::setup(self).await } } diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index 4a57f67..d75fc7e 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -1,6 +1,3 @@ -mod manager; -pub use manager::*; - mod managerconf; pub use managerconf::*; @@ -8,4 +5,5 @@ mod scheduler; pub use scheduler::*; mod configured; +pub use configured::*; diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index c714818..266e664 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -6,22 +6,22 @@ use anyhow::Result; use tokio::stream::StreamExt; use tokio::sync::mpsc::UnboundedSender; -use crate::endpoint::EndpointManager; -use crate::endpoint::EndpointManagerConfiguration; +use crate::endpoint::ConfiguredEndpoint; +use crate::endpoint::EndpointConfiguration; use crate::job::JobSet; use crate::job::RunnableJob; use crate::log::LogItem; use crate::filestore::StagingStore; pub struct EndpointScheduler { - endpoints: Vec<EndpointManager>, + endpoints: Vec<Arc<RwLock<ConfiguredEndpoint>>>, staging_store: Arc<RwLock<StagingStore>>, } impl EndpointScheduler { - pub async fn setup(endpoints: Vec<EndpointManagerConfiguration>, staging_store: Arc<RwLock<StagingStore>>) -> Result<Self> { + pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>) -> Result<Self> { let endpoints = Self::setup_endpoints(endpoints).await?; Ok(EndpointScheduler { @@ -30,51 +30,23 @@ impl EndpointScheduler { }) } - async fn setup_endpoints(endpoints: Vec<EndpointManagerConfiguration>) -> Result<Vec<EndpointManager>> { + async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<RwLock<ConfiguredEndpoint>>>> { + use futures::FutureExt; + let unordered = futures::stream::FuturesUnordered::new(); for cfg in endpoints.into_iter() { - unordered.push(EndpointManager::setup(cfg)); + unordered.push({ + ConfiguredEndpoint::setup(cfg) + .map(|r_ep| { + r_ep.map(RwLock::new) + .map(Arc::new) + }) + }); } unordered.collect().await } - /// Run a jobset on all endpoints - /// - /// TODO: This is a naive implementation that simple pushes the complete jobset to the - /// available endpoints. - /// - /// It does not yet schedule like it is supposed to do. - pub async fn run_jobset(&self, js: Vec<(RunnableJob, UnboundedSender<LogItem>)>) -> Result<Vec<PathBuf>> { - let unordered = futures::stream::FuturesUnordered::new(); - let mut i: usize = 0; - let mut iter = js.into_iter(); - - while let Some(next_job) = iter.next() { - match self.endpoints.get(i) { - None => { - i = 0; - }, - - Some(ep) => { - let ep = ep.clone(); - unordered.push(async { - ep.run_job(next_job.0, next_job.1, self.staging_store.clone()).await - }); - } - } - - i += 1; - } - - let res = unordered.collect::<Result<Vec<_>>>() - .await? - .into_iter() - .flatten() // We get a Vec<Vec<PathBuf>> here, but we only care about all pathes in one Vec<_> - .collect(); - - Ok(res) - } - } + diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index b1e5fa4..7bf5376 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -6,7 +6,7 @@ use anyhow::Result; use typed_builder::TypedBuilder; use diesel::PgConnection; -use crate::endpoint::EndpointManagerConfiguration; +use crate::endpoint::EndpointConfiguration; use crate::endpoint::EndpointScheduler; use crate::job::JobSet; use crate::job::RunnableJob; @@ -27,7 +27,7 @@ pub struct Orchestrator { #[derive(TypedBuilder)] pub struct OrchestratorSetup { - ep_cfg: Vec<EndpointManagerConfiguration>, + ep_cfg: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, release_store: Arc<RwLock<ReleaseStore>>, jobsets: Vec<JobSet>, |