diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-05 20:15:24 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-05 20:59:41 +0100 |
commit | 6a483a052bf39ea6834cec96acc6771e63d2bc94 (patch) | |
tree | d687a9f41086304391b21b75fdbf83e0b91ae9dc /src/endpoint/configured.rs | |
parent | 913fc8a8c59204b262b5692e8e6dc468363a2c5e (diff) |
Remove EndpointManager
This patch removes the `EndpointManager` type,
which did nothing except wrap the `ConfiguredEndpoint` and make things way
more complicated than they need to be.
Now, the `EndpointScheduler` holds the `ConfiguredEndpoint`s directly in
a `Arc<RwLock<_>>` and manages them without the additional layer of
complexity.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r-- | src/endpoint/configured.rs | 195 |
1 files changed, 193 insertions, 2 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 10d424a..4528ac9 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -1,10 +1,28 @@ use std::fmt::{Debug, Formatter}; +use std::sync::Arc; +use std::sync::RwLock; +use std::str::FromStr; +use std::path::PathBuf; -use shiplift::Docker; +use anyhow::Error; +use anyhow::Result; +use anyhow::anyhow; use getset::{Getters, CopyGetters}; +use shiplift::ContainerOptions; +use shiplift::Docker; +use shiplift::ExecContainerOptions; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::UnboundedSender; use typed_builder::TypedBuilder; -#[derive(Getters, CopyGetters, TypedBuilder, Clone)] +use crate::util::docker::ImageName; +use crate::endpoint::EndpointConfiguration; +use crate::job::RunnableJob; +use crate::job::JobResource; +use crate::log::LogItem; +use crate::filestore::StagingStore; + +#[derive(Getters, CopyGetters, TypedBuilder)] pub struct ConfiguredEndpoint { #[getset(get = "pub")] name: String, @@ -32,3 +50,176 @@ impl Debug for ConfiguredEndpoint { } } +impl ConfiguredEndpoint { + pub(in super) async fn setup(epc: EndpointConfiguration) -> Result<Self> { + let ep = ConfiguredEndpoint::setup_endpoint(epc.endpoint())?; + + let versions_compat = ConfiguredEndpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep); + let api_versions_compat = ConfiguredEndpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep); + let imgs_avail = ConfiguredEndpoint::check_images_available(epc.required_images().as_ref(), &ep); + + let (versions_compat, api_versions_compat, imgs_avail) = + tokio::join!(versions_compat, api_versions_compat, imgs_avail); + + let _ = versions_compat?; + let _ = api_versions_compat?; + let _ = imgs_avail?; + + Ok(ep) + } + + fn setup_endpoint(ep: &crate::config::Endpoint) -> Result<ConfiguredEndpoint> { + match ep.endpoint_type() { + crate::config::EndpointType::Http => { + shiplift::Uri::from_str(ep.uri()) + .map(|uri| shiplift::Docker::host(uri)) + .map_err(Error::from) + .map(|docker| { + ConfiguredEndpoint::builder() + .name(ep.name().clone()) + .docker(docker) + .speed(ep.speed()) + .num_current_jobs(0) + .num_max_jobs(ep.maxjobs()) + .build() + }) + } + + crate::config::EndpointType::Socket => { + Ok({ + ConfiguredEndpoint::builder() + .name(ep.name().clone()) + .speed(ep.speed()) + .num_current_jobs(0) + .num_max_jobs(ep.maxjobs()) + .docker(shiplift::Docker::unix(ep.uri())) + .build() + }) + } + } + } + + async fn check_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> { + match req { + None => Ok(()), + Some(v) => { + let avail = ep.docker().version().await?; + + if !v.contains(&avail.version) { + Err(anyhow!("Incompatible docker version on endpoint {}: {}", + ep.name(), avail.version)) + } else { + Ok(()) + } + } + } + } + + async fn check_api_version_compat(req: Option<&Vec<String>>, ep: &ConfiguredEndpoint) -> Result<()> { + match req { + None => Ok(()), + Some(v) => { + let avail = ep.docker().version().await?; + + if !v.contains(&avail.api_version) { + Err(anyhow!("Incompatible docker API version on endpoint {}: {}", + ep.name(), avail.api_version)) + } else { + Ok(()) + } + } + } + } + + async fn check_images_available(imgs: &Vec<ImageName>, ep: &ConfiguredEndpoint) -> Result<()> { + use shiplift::ImageListOptions; + + ep.docker() + .images() + .list(&ImageListOptions::builder().all().build()) + .await? + .into_iter() + .map(|image_rep| { + if let Some(tags) = image_rep.repo_tags { + tags.into_iter().map(|name| { + let tag = ImageName::from(name.clone()); + + if imgs.iter().any(|img| *img == tag) && !imgs.is_empty() { + return Err(anyhow!("Image {} missing in endpoint {}", name, ep.name())) + } + + Ok(()) + }) + .collect::<Result<()>>()?; + } + // If no tags, ignore + + Ok(()) + }) + .collect::<Result<()>>() + } + + pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<Vec<PathBuf>> { + use crate::log::buffer_stream_to_line_stream; + use tokio::stream::StreamExt; + + let (container_id, _warnings) = { + let envs: Vec<String> = job.resources() + .iter() + .filter_map(|r| match r { + JobResource::Environment(k, v) => Some(format!("{}={}", k, v)), + JobResource::Artifact(_) => None, + }) + .collect(); + + let builder_opts = shiplift::ContainerOptions::builder(job.image().as_ref()) + .env(envs.iter().map(AsRef::as_ref).collect()) + .build(); + + let create_info = self.docker + .containers() + .create(&builder_opts) + .await?; + + if create_info.warnings.is_some() { + // TODO: Handle warnings + } + + (create_info.id, create_info.warnings) + }; + + let script = job.script().as_ref().as_bytes(); + let script_path = PathBuf::from("/script"); + let exec_opts = ExecContainerOptions::builder() + .cmd(vec!["/script"]) + .build(); + + let container = self.docker.containers().get(&container_id); + container.copy_file_into(script_path, script).await?; + let stream = container.exec(&exec_opts); + let _ = buffer_stream_to_line_stream(stream) + .map(|line| { + line.map_err(Error::from) + .and_then(|l| { + crate::log::parser() + .parse(l.as_bytes()) + .map_err(Error::from) + .and_then(|item| logsink.send(item).map_err(Error::from)) + }) + }) + .collect::<Result<Vec<_>>>() + .await?; + + let tar_stream = container.copy_from(&PathBuf::from("/outputs/")) + .map(|item| item.map_err(Error::from)); + + staging + .write() + .map_err(|_| anyhow!("Lock poisoned"))? + .write_files_from_tar_stream(tar_stream) + .await + } + + +} + |