summaryrefslogtreecommitdiffstats
path: root/src/endpoint
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-01-18 15:27:47 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-01-18 15:27:47 +0100
commit64c01b98b9bf2eea7a270fb5016a3be995af483c (patch)
treea17463a4aeaed6e5b87f91c57a8767e2f32952de /src/endpoint
parent9baa6ade3071dc0aad429b72b4568d77868f7409 (diff)
parentd2fb625f60846629d271fc6ca3c38583c51e9e3c (diff)
Merge branch 'rustfmt'
Merge formatting, but do not enforce it because that'd be a too high hurdle for possible contributors.
Diffstat (limited to 'src/endpoint')
-rw-r--r--src/endpoint/configuration.rs1
-rw-r--r--src/endpoint/configured.rs372
-rw-r--r--src/endpoint/mod.rs1
-rw-r--r--src/endpoint/scheduler.rs208
4 files changed, 406 insertions, 176 deletions
diff --git a/src/endpoint/configuration.rs b/src/endpoint/configuration.rs
index e1f811f..a9b4688 100644
--- a/src/endpoint/configuration.rs
+++ b/src/endpoint/configuration.rs
@@ -30,4 +30,3 @@ pub struct EndpointConfiguration {
#[builder(default)]
required_docker_api_versions: Option<Vec<String>>,
}
-
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 92217e6..b89b0a5 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -13,19 +13,19 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
+use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
-use anyhow::anyhow;
use futures::FutureExt;
-use getset::{Getters, CopyGetters};
+use getset::{CopyGetters, Getters};
use log::trace;
use shiplift::Container;
use shiplift::Docker;
use shiplift::ExecContainerOptions;
use tokio::stream::StreamExt;
-use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedSender;
+use tokio::sync::RwLock;
use typed_builder::TypedBuilder;
use crate::endpoint::EndpointConfiguration;
@@ -33,8 +33,8 @@ use crate::filestore::path::ArtifactPath;
use crate::filestore::StagingStore;
use crate::job::JobResource;
use crate::job::RunnableJob;
-use crate::log::LogItem;
use crate::log::buffer_stream_to_line_stream;
+use crate::log::LogItem;
use crate::package::Script;
use crate::util::docker::ContainerHash;
use crate::util::docker::ImageName;
@@ -61,55 +61,72 @@ impl Debug for Endpoint {
}
impl Endpoint {
-
- pub(in super) async fn setup(epc: EndpointConfiguration) -> Result<Self> {
- let ep = Endpoint::setup_endpoint(epc.endpoint())
- .with_context(|| anyhow!("Setting up endpoint: {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?;
-
- let versions_compat = Endpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep);
- let api_versions_compat = Endpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep);
- let imgs_avail = Endpoint::check_images_available(epc.required_images().as_ref(), &ep);
+ pub(super) async fn setup(epc: EndpointConfiguration) -> Result<Self> {
+ let ep = Endpoint::setup_endpoint(epc.endpoint()).with_context(|| {
+ anyhow!(
+ "Setting up endpoint: {} -> {}",
+ epc.endpoint().name(),
+ epc.endpoint().uri()
+ )
+ })?;
+
+ let versions_compat =
+ Endpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep);
+ let api_versions_compat =
+ Endpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep);
+ let imgs_avail = Endpoint::check_images_available(epc.required_images().as_ref(), &ep);
let (versions_compat, api_versions_compat, imgs_avail) =
tokio::join!(versions_compat, api_versions_compat, imgs_avail);
- let _ = versions_compat
- .with_context(|| anyhow!("Checking version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?;
- let _ = api_versions_compat
- .with_context(|| anyhow!("Checking API version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?;
- let _ = imgs_avail
- .with_context(|| anyhow!("Checking for available images on {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?;
+ let _ = versions_compat.with_context(|| {
+ anyhow!(
+ "Checking version compatibility for {} -> {}",
+ epc.endpoint().name(),
+ epc.endpoint().uri()
+ )
+ })?;
+ let _ = api_versions_compat.with_context(|| {
+ anyhow!(
+ "Checking API version compatibility for {} -> {}",
+ epc.endpoint().name(),
+ epc.endpoint().uri()
+ )
+ })?;
+ let _ = imgs_avail.with_context(|| {
+ anyhow!(
+ "Checking for available images on {} -> {}",
+ epc.endpoint().name(),
+ epc.endpoint().uri()
+ )
+ })?;
Ok(ep)
}
fn setup_endpoint(ep: &crate::config::Endpoint) -> Result<Endpoint> {
match ep.endpoint_type() {
- crate::config::EndpointType::Http => {
- shiplift::Uri::from_str(ep.uri())
- .map(shiplift::Docker::host)
- .with_context(|| anyhow!("Connecting to {}", ep.uri()))
- .map_err(Error::from)
- .map(|docker| {
- Endpoint::builder()
- .name(ep.name().clone())
- .uri(ep.uri().clone())
- .docker(docker)
- .num_max_jobs(ep.maxjobs())
- .build()
- })
- }
-
- crate::config::EndpointType::Socket => {
- Ok({
+ crate::config::EndpointType::Http => shiplift::Uri::from_str(ep.uri())
+ .map(shiplift::Docker::host)
+ .with_context(|| anyhow!("Connecting to {}", ep.uri()))
+ .map_err(Error::from)
+ .map(|docker| {
Endpoint::builder()
.name(ep.name().clone())
.uri(ep.uri().clone())
+ .docker(docker)
.num_max_jobs(ep.maxjobs())
- .docker(shiplift::Docker::unix(ep.uri()))
.build()
- })
- }
+ }),
+
+ crate::config::EndpointType::Socket => Ok({
+ Endpoint::builder()
+ .name(ep.name().clone())
+ .uri(ep.uri().clone())
+ .num_max_jobs(ep.maxjobs())
+ .docker(shiplift::Docker::unix(ep.uri()))
+ .build()
+ }),
}
}
@@ -117,14 +134,19 @@ impl Endpoint {
match req {
None => Ok(()),
Some(v) => {
- let avail = ep.docker()
+ let avail = ep
+ .docker()
.version()
.await
.with_context(|| anyhow!("Getting version of endpoint: {}", ep.name))?;
if !v.contains(&avail.version) {
- Err(anyhow!("Incompatible docker version on endpoint {}: Expected: {}, Available: [{}]",
- ep.name(), avail.version, v.join(", ")))
+ Err(anyhow!(
+ "Incompatible docker version on endpoint {}: Expected: {}, Available: [{}]",
+ ep.name(),
+ avail.version,
+ v.join(", ")
+ ))
} else {
Ok(())
}
@@ -136,7 +158,8 @@ impl Endpoint {
match req {
None => Ok(()),
Some(v) => {
- let avail = ep.docker()
+ let avail = ep
+ .docker()
.version()
.await
.with_context(|| anyhow!("Getting API version of endpoint: {}", ep.name))?;
@@ -163,7 +186,8 @@ impl Endpoint {
.with_context(|| anyhow!("Listing images on endpoint: {}", ep.name))?
.into_iter()
.map(|image_rep| {
- image_rep.repo_tags
+ image_rep
+ .repo_tags
.unwrap_or_default()
.into_iter()
.map(ImageName::from)
@@ -176,7 +200,11 @@ impl Endpoint {
imgs.iter()
.map(|img| {
if !available_names.contains(img) {
- Err(anyhow!("Image '{}' missing from endpoint '{}'", img.as_ref(), ep.name))
+ Err(anyhow!(
+ "Image '{}' missing from endpoint '{}'",
+ img.as_ref(),
+ ep.name
+ ))
} else {
Ok(())
}
@@ -185,8 +213,11 @@ impl Endpoint {
.map(|_| ())
}
-
- pub async fn prepare_container(&self, job: RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<PreparedContainer<'_>> {
+ pub async fn prepare_container(
+ &self,
+ job: RunnableJob,
+ staging: Arc<RwLock<StagingStore>>,
+ ) -> Result<PreparedContainer<'_>> {
PreparedContainer::new(self, job, staging).await
}
@@ -199,7 +230,6 @@ impl Endpoint {
.map_err(Error::from)
.map(|list| list.len())
}
-
}
#[derive(Getters)]
@@ -212,10 +242,14 @@ pub struct PreparedContainer<'a> {
}
impl<'a> PreparedContainer<'a> {
- async fn new(endpoint: &'a Endpoint, job: RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<PreparedContainer<'a>> {
- let script = job.script().clone();
+ async fn new(
+ endpoint: &'a Endpoint,
+ job: RunnableJob,
+ staging: Arc<RwLock<StagingStore>>,
+ ) -> Result<PreparedContainer<'a>> {
+ let script = job.script().clone();
let create_info = Self::build_container(endpoint, &job).await?;
- let container = endpoint.docker.containers().get(&create_info.id);
+ let container = endpoint.docker.containers().get(&create_info.id);
let (cpysrc, cpyart, cpyscr) = tokio::join!(
Self::copy_source_to_container(&container, &job),
@@ -223,26 +257,45 @@ impl<'a> PreparedContainer<'a> {
Self::copy_script_to_container(&container, &script)
);
- let _ = cpysrc
- .with_context(|| anyhow!("Copying the sources to container {} on '{}'", create_info.id, endpoint.name))?;
-
- let _ = cpyart
- .with_context(|| anyhow!("Copying the artifacts to container {} on '{}'", create_info.id, endpoint.name))?;
-
- let _ = cpyscr
- .with_context(|| anyhow!("Copying the script to container {} on '{}'", create_info.id, endpoint.name))?;
+ let _ = cpysrc.with_context(|| {
+ anyhow!(
+ "Copying the sources to container {} on '{}'",
+ create_info.id,
+ endpoint.name
+ )
+ })?;
+
+ let _ = cpyart.with_context(|| {
+ anyhow!(
+ "Copying the artifacts to container {} on '{}'",
+ create_info.id,
+ endpoint.name
+ )
+ })?;
+
+ let _ = cpyscr.with_context(|| {
+ anyhow!(
+ "Copying the script to container {} on '{}'",
+ create_info.id,
+ endpoint.name
+ )
+ })?;
Ok({
PreparedContainer {
endpoint,
script,
- create_info
+ create_info,
}
})
}
- async fn build_container(endpoint: &Endpoint, job: &RunnableJob) -> Result<shiplift::rep::ContainerCreateInfo> {
- let envs = job.environment()
+ async fn build_container(
+ endpoint: &Endpoint,
+ job: &RunnableJob,
+ ) -> Result<shiplift::rep::ContainerCreateInfo> {
+ let envs = job
+ .environment()
.into_iter()
.map(|(k, v)| format!("{}={}", k.as_ref(), v))
.chain({
@@ -270,7 +323,10 @@ impl<'a> PreparedContainer<'a> {
Ok(create_info)
}
- async fn copy_source_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob) -> Result<()> {
+ async fn copy_source_to_container<'ca>(
+ container: &Container<'ca>,
+ job: &RunnableJob,
+ ) -> Result<()> {
use tokio::io::AsyncReadExt;
job.package_sources()
@@ -278,9 +334,16 @@ impl<'a> PreparedContainer<'a> {
.map(|entry| async {
let source_path = entry.path();
let destination = PathBuf::from("/inputs").join({
- source_path.file_name()
+ source_path
+ .file_name()
.ok_or_else(|| anyhow!("Not a file: {}", source_path.display()))
- .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(), container.id()))?
+ .with_context(|| {
+ anyhow!(
+ "Copying package source from {} to container {}",
+ source_path.display(),
+ container.id()
+ )
+ })?
});
trace!("Source path = {:?}", source_path);
trace!("Source dest = {:?}", destination);
@@ -309,17 +372,33 @@ impl<'a> PreparedContainer<'a> {
.map_err(Error::from)
}
- async fn copy_artifacts_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<()> {
+ async fn copy_artifacts_to_container<'ca>(
+ container: &Container<'ca>,
+ job: &RunnableJob,
+ staging: Arc<RwLock<StagingStore>>,
+ ) -> Result<()> {
job.resources()
.iter()
.filter_map(JobResource::artifact)
.cloned()
.map(|art| async {
- let artifact_file_name = art.path().file_name()
+ let artifact_file_name = art
+ .path()
+ .file_name()
.ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display()))
- .with_context(|| anyhow!("Collecting artifacts for copying to container {}", container.id()))?;
+ .with_context(|| {
+ anyhow!(
+ "Collecting artifacts for copying to container {}",
+ container.id()
+ )
+ })?;
let destination = PathBuf::from("/inputs/").join(artifact_file_name);
- trace!("Copying {} to container: {}:{}", art.path().display(), container.id(), destination.display());
+ trace!(
+ "Copying {} to container: {}:{}",
+ art.path().display(),
+ container.id(),
+ destination.display()
+ );
let buf = staging
.read()
.await
@@ -327,11 +406,24 @@ impl<'a> PreparedContainer<'a> {
.join(art.path())?
.read()
.await
- .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display()))?;
-
- let r = container.copy_file_into(&destination, &buf)
+ .with_context(|| {
+ anyhow!(
+ "Reading artifact {}, so it can be copied to container",
+ art.path().display()
+ )
+ })?;
+
+ let r = container
+ .copy_file_into(&destination, &buf)
.await
- .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(), container.id(), destination.display()))
+ .with_context(|| {
+ anyhow!(
+ "Copying artifact {} to container {} at {}",
+ art.path().display(),
+ container.id(),
+ destination.display()
+ )
+ })
.map_err(Error::from);
drop(art); // ensure `art` is moved into closure
r
@@ -344,9 +436,13 @@ impl<'a> PreparedContainer<'a> {
.map(|_| ())
}
- async fn copy_script_to_container<'ca>(container: &Container<'ca>, script: &Script) -> Result<()> {
+ async fn copy_script_to_container<'ca>(
+ container: &Container<'ca>,
+ script: &Script,
+ ) -> Result<()> {
let script_path = PathBuf::from("/script");
- container.copy_file_into(script_path, script.as_ref().as_bytes())
+ container
+ .copy_file_into(script_path, script.as_ref().as_bytes())
.await
.with_context(|| anyhow!("Copying the script into container {}", container.id()))
.map_err(Error::from)
@@ -354,9 +450,20 @@ impl<'a> PreparedContainer<'a> {
pub async fn start(self) -> Result<StartedContainer<'a>> {
let container = self.endpoint.docker.containers().get(&self.create_info.id);
- let _ = container.start()
- .inspect(|r| { trace!("Starting container {} -> {:?}", self.create_info.id, r); })
- .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", self.create_info.id, self.endpoint.name)))
+ let _ = container
+ .start()
+ .inspect(|r| {
+ trace!("Starting container {} -> {:?}", self.create_info.id, r);
+ })
+ .map(|r| {
+ r.with_context(|| {
+ anyhow!(
+ "Starting the container {} on '{}'",
+ self.create_info.id,
+ self.endpoint.name
+ )
+ })
+ })
.await?;
Ok({
@@ -376,7 +483,10 @@ pub struct StartedContainer<'a> {
}
impl<'a> StartedContainer<'a> {
- pub async fn execute_script(self, logsink: UnboundedSender<LogItem>) -> Result<ExecutedContainer<'a>> {
+ pub async fn execute_script(
+ self,
+ logsink: UnboundedSender<LogItem>,
+ ) -> Result<ExecutedContainer<'a>> {
let exec_opts = ExecContainerOptions::builder()
.cmd(vec!["/bin/bash", "/script"])
.attach_stderr(true)
@@ -386,26 +496,51 @@ impl<'a> StartedContainer<'a> {
let container = self.endpoint.docker.containers().get(&self.create_info.id);
- trace!("Moving logs to log sink for container {}", self.create_info.id);
+ trace!(
+ "Moving logs to log sink for container {}",
+ self.create_info.id
+ );
let stream = container.exec(&exec_opts);
- let exited_successfully: Option<(bool, Option<String>)> = buffer_stream_to_line_stream(stream)
- .map(|line| {
- trace!("['{}':{}] Found log line: {:?}", self.endpoint.name, self.create_info.id, line);
- line.with_context(|| anyhow!("Getting log from {}:{}", self.endpoint.name, self.create_info.id))
+ let exited_successfully: Option<(bool, Option<String>)> =
+ buffer_stream_to_line_stream(stream)
+ .map(|line| {
+ trace!(
+ "['{}':{}] Found log line: {:?}",
+ self.endpoint.name,
+ self.create_info.id,
+ line
+ );
+ line.with_context(|| {
+ anyhow!(
+ "Getting log from {}:{}",
+ self.endpoint.name,
+ self.create_info.id
+ )
+ })
.map_err(Error::from)
.and_then(|l| {
crate::log::parser()
.parse(l.as_bytes())
- .with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.endpoint.name, self.create_info.id, l))
+ .with_context(|| {
+ anyhow!(
+ "Parsing log from {}:{}: {:?}",
+ self.endpoint.name,
+ self.create_info.id,
+ l
+ )
+ })
.map_err(Error::from)
.and_then(|item| {
-
let mut exited_successfully = None;
{
match item {
- LogItem::State(Ok(_)) => exited_successfully = Some((true, None)),
- LogItem::State(Err(ref msg)) => exited_successfully = Some((false, Some(msg.clone()))),
+ LogItem::State(Ok(_)) => {
+ exited_successfully = Some((true, None))
+ }
+ LogItem::State(Err(ref msg)) => {
+ exited_successfully = Some((false, Some(msg.clone())))
+ }
_ => {
// Nothing
}
@@ -413,25 +548,39 @@ impl<'a> StartedContainer<'a> {
}
trace!("Log item: {}", item.display()?);
- logsink.send(item)
+ logsink
+ .send(item)
.with_context(|| anyhow!("Sending log to log sink"))
.map_err(Error::from)
.map(|_| exited_successfully)
})
})
- })
- .collect::<Result<Vec<_>>>()
- .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", self.create_info.id, self.endpoint.name)))
- .await
- .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", self.create_info.id))?
- .into_iter()
- .fold(None, |accu, elem| match (accu, elem) {
- (None , b) => b,
- (Some((false, msg)) , _) => Some((false, msg)),
- (_ , Some((false, msg))) => Some((false, msg)),
- (a , None) => a,
- (Some((true, _)) , Some((true, _))) => Some((true, None)),
- });
+ })
+ .collect::<Result<Vec<_>>>()
+ .map(|r| {
+ r.with_context(|| {
+ anyhow!(
+ "Fetching log from container {} on {}",
+ self.create_info.id,
+ self.endpoint.name
+ )
+ })
+ })
+ .await
+ .with_context(|| {
+ anyhow!(
+ "Copying script to container, running container and getting logs: {}",
+ self.create_info.id
+ )
+ })?
+ .into_iter()
+ .fold(None, |accu, elem| match (accu, elem) {
+ (None, b) => b,
+ (Some((false, msg)), _) => Some((false, msg)),
+ (_, Some((false, msg))) => Some((false, msg)),
+ (a, None) => a,
+ (Some((true, _)), Some((true, _))) => Some((true, None)),
+ });
Ok({
ExecutedContainer {
@@ -442,7 +591,6 @@ impl<'a> StartedContainer<'a> {
}
})
}
-
}
pub struct ExecutedContainer<'a> {
@@ -467,8 +615,13 @@ impl<'a> ExecutedContainer<'a> {
let tar_stream = container
.copy_from(&PathBuf::from("/outputs/"))
.map(|item| {
- item.with_context(|| anyhow!("Copying item from container {} to host", self.create_info.id))
- .map_err(Error::from)
+ item.with_context(|| {
+ anyhow!(
+ "Copying item from container {} to host",
+ self.create_info.id
+ )
+ })
+ .map_err(Error::from)
});
let mut writelock = staging.write().await;
@@ -479,7 +632,7 @@ impl<'a> ExecutedContainer<'a> {
.with_context(|| anyhow!("Copying the TAR stream to the staging store"))?;
let exit_info = match self.exit_info {
- Some((false, msg)) => {
+ Some((false, msg)) => {
let err = anyhow!("Error during container run:\n\tMessage: '{msg}'\n\tConnect using\n\n\t\t`docker --host {uri} exec -it {container_id} /bin/bash`\n\n\tto debug.",
container_id = self.create_info.id,
uri = self.endpoint.uri(),
@@ -488,15 +641,16 @@ impl<'a> ExecutedContainer<'a> {
// error because the container errored
Err(err)
- },
+ }
Some((true, _)) | None => {
let container = self.endpoint.docker.containers().get(&self.create_info.id);
- container.stop(Some(std::time::Duration::new(1, 0)))
+ container
+ .stop(Some(std::time::Duration::new(1, 0)))
.await
.with_context(|| anyhow!("Stopping container {}", self.create_info.id))?;
Ok(())
- },
+ }
};
Ok({
@@ -519,5 +673,3 @@ impl FinalizedContainer {
(self.artifacts, self.exit_info)
}
}
-
-
diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs
index 6b134e0..b87a303 100644
--- a/src/endpoint/mod.rs
+++ b/src/endpoint/mod.rs
@@ -16,4 +16,3 @@ pub use scheduler::*;
mod configured;
pub use configured::*;
-
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 6f57d61..6404c6f 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -11,19 +11,19 @@
use std::path::PathBuf;
use std::sync::Arc;
+use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
-use anyhow::anyhow;
use diesel::PgConnection;
use futures::FutureExt;
use indicatif::ProgressBar;
use itertools::Itertools;
use log::trace;
-use tokio::stream::StreamExt;
use tokio::io::AsyncWriteExt;
-use tokio::sync::RwLock;
+use tokio::stream::StreamExt;
use tokio::sync::mpsc::UnboundedReceiver;
+use tokio::sync::RwLock;
use uuid::Uuid;
use crate::db::models as dbmodels;
@@ -44,8 +44,13 @@ pub struct EndpointScheduler {
}
impl EndpointScheduler {
-
- pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, submit: crate::db::models::Submit, log_dir: Option<PathBuf>) -> Result<Self> {
+ pub async fn setup(
+ endpoints: Vec<EndpointConfiguration>,
+ staging_store: Arc<RwLock<StagingStore>>,
+ db: Arc<PgConnection>,
+ submit: crate::db::models::Submit,
+ log_dir: Option<PathBuf>,
+ ) -> Result<Self> {
let endpoints = Self::setup_endpoints(endpoints).await?;
Ok(EndpointScheduler {
@@ -57,17 +62,14 @@ impl EndpointScheduler {
})
}
- async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<RwLock<Endpoint>>>> {
+ async fn setup_endpoints(
+ endpoints: Vec<EndpointConfiguration>,
+ ) -> Result<Vec<Arc<RwLock<Endpoint>>>> {
let unordered = futures::stream::FuturesUnordered::new();
for cfg in endpoints.into_iter() {
- unordered.push({
- Endpoint::setup(cfg)
- .map(|r_ep| {
- r_ep.map(RwLock::new)
- .map(Arc::new)
- })
- });
+ unordered
+ .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(RwLock::new).map(Arc::new)));
}
unordered.collect().await
@@ -78,7 +80,11 @@ impl EndpointScheduler {
/// # Warning
///
/// This function blocks as long as there is no free endpoint available!
- pub async fn schedule_job(&self, job: RunnableJob, bar: indicatif::ProgressBar) -> Result<JobHandle> {
+ pub async fn schedule_job(
+ &self,
+ job: RunnableJob,
+ bar: indicatif::ProgressBar,
+ ) -> Result<JobHandle> {
let endpoint = self.select_free_endpoint().await?;
Ok(JobHandle {
@@ -94,7 +100,8 @@ impl EndpointScheduler {
async fn select_free_endpoint(&self) -> Result<Arc<RwLock<Endpoint>>> {
loop {
- let ep = self.endpoints
+ let ep = self
+ .endpoints
.iter()
.cloned()
.map(|ep| async move {
@@ -113,13 +120,12 @@ impl EndpointScheduler {
.next();
if let Some(endpoint) = ep {
- return Ok(endpoint)
+ return Ok(endpoint);
} else {
trace!("No free endpoint found, retry...");
}
}
}
-
}
pub struct JobHandle {
@@ -141,19 +147,29 @@ impl std::fmt::Debug for JobHandle {
impl JobHandle {
pub async fn run(self) -> Result<Vec<dbmodels::Artifact>> {
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
- let ep = self.endpoint.read().await;
+ let ep = self.endpoint.read().await;
let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?;
- let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?;
- let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?;
- let envs = self.create_env_in_db()?;
- let job_id = *self.job.uuid();
+ let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?;
+ let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?;
+ let envs = self.create_env_in_db()?;
+ let job_id = *self.job.uuid();
trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
- let prepared_container = ep.prepare_container(self.job, self.staging_store.clone()).await?;
- let container_id = prepared_container.create_info().id.clone();
- let running_container = prepared_container
+ let prepared_container = ep
+ .prepare_container(self.job, self.staging_store.clone())
+ .await?;
+ let container_id = prepared_container.create_info().id.clone();
+ let running_container = prepared_container
.start()
.await
- .with_context(|| Self::create_job_run_error(&job_id, &package.name, &package.version, ep.uri(), &container_id))?
+ .with_context(|| {
+ Self::create_job_run_error(
+ &job_id,
+ &package.name,
+ &package.version,
+ ep.uri(),
+ &container_id,
+ )
+ })?
.execute_script(log_sender);
let logres = LogReceiver {
@@ -163,28 +179,65 @@ impl JobHandle {
job_id,
log_receiver,
bar: &self.bar,
- }.join();
+ }
+ .join();
let (run_container, logres) = tokio::join!(running_container, logres);
let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?;
- let run_container = run_container.with_context(|| anyhow!("Running container {} failed"))
- .with_context(|| Self::create_job_run_error(&job_id, &package.name, &package.version, ep.uri(), &container_id))?;
-
- let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &run_container.container_hash(), run_container.script(), &log)?;
+ let run_container = run_container
+ .with_context(|| anyhow!("Running container {} failed"))
+ .with_context(|| {
+ Self::create_job_run_error(
+ &job_id,
+ &package.name,
+ &package.version,
+ ep.uri(),
+ &container_id,
+ )
+ })?;
+
+ let job = dbmodels::Job::create(
+ &self.db,
+ &job_id,
+ &self.submit,
+ &endpoint,
+ &package,
+ &image,
+ &run_container.container_hash(),
+ run_container.script(),
+ &log,
+ )?;
trace!("DB: Job entry for job {} created: {}", job.uuid, job.id);
for env in envs {
let _ = dbmodels::JobEnv::create(&self.db, &job, &env)?;
}
- let res : crate::endpoint::FinalizedContainer = run_container
+ let res: crate::endpoint::FinalizedContainer = run_container
.finalize(self.staging_store.clone())
.await
- .with_context(|| Self::create_job_run_error(&job.uuid, &package.name, &package.version, ep.uri(), &container_id))?;
+ .with_context(|| {
+ Self::create_job_run_error(
+ &job.uuid,
+ &package.name,
+ &package.version,
+ ep.uri(),
+ &container_id,
+ )
+ })?;
trace!("Found result for job {}: {:?}", job_id, res);
let (paths, res) = res.unpack();
- let _ = res.with_context(|| anyhow!("Error during running job on '{}'", ep.name()))
- .with_context(|| Self::create_job_run_error(&job.uuid, &package.name, &package.version, ep.uri(), &container_id))?;
+ let _ = res
+ .with_context(|| anyhow!("Error during running job on '{}'", ep.name()))
+ .with_context(|| {
+ Self::create_job_run_error(
+ &job.uuid,
+ &package.name,
+ &package.version,
+ ep.uri(),
+ &container_id,
+ )
+ })?;
// Have to do it the ugly way here because of borrowing semantics
let mut r = vec![];
@@ -196,8 +249,15 @@ impl JobHandle {
}
/// Helper to create an error object with a nice message.
- fn create_job_run_error(job_id: &Uuid, package_name: &str, package_version: &str, endpoint_uri: &str, container_id: &str) -> anyhow::Error {
- anyhow!(indoc::formatdoc!(r#"Error while running job
+ fn create_job_run_error(
+ job_id: &Uuid,
+ package_name: &str,
+ package_version: &str,
+ endpoint_uri: &str,
+ container_id: &str,
+ ) -> anyhow::Error {
+ anyhow!(indoc::formatdoc!(
+ r#"Error while running job
{job_id}
for package
{package_name} {package_version}
@@ -208,11 +268,11 @@ impl JobHandle {
to debug.
"#,
- job_id = job_id,
- package_name = package_name,
+ job_id = job_id,
+ package_name = package_name,
package_version = package_version,
- endpoint_uri