diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-01-18 15:27:47 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-01-18 15:27:47 +0100 |
commit | 64c01b98b9bf2eea7a270fb5016a3be995af483c (patch) | |
tree | a17463a4aeaed6e5b87f91c57a8767e2f32952de /src/endpoint | |
parent | 9baa6ade3071dc0aad429b72b4568d77868f7409 (diff) | |
parent | d2fb625f60846629d271fc6ca3c38583c51e9e3c (diff) |
Merge branch 'rustfmt'
Merge formatting, but do not enforce it because that'd be a too high
hurdle for possible contributors.
Diffstat (limited to 'src/endpoint')
-rw-r--r-- | src/endpoint/configuration.rs | 1 | ||||
-rw-r--r-- | src/endpoint/configured.rs | 372 | ||||
-rw-r--r-- | src/endpoint/mod.rs | 1 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 208 |
4 files changed, 406 insertions, 176 deletions
diff --git a/src/endpoint/configuration.rs b/src/endpoint/configuration.rs index e1f811f..a9b4688 100644 --- a/src/endpoint/configuration.rs +++ b/src/endpoint/configuration.rs @@ -30,4 +30,3 @@ pub struct EndpointConfiguration { #[builder(default)] required_docker_api_versions: Option<Vec<String>>, } - diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 92217e6..b89b0a5 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -13,19 +13,19 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; +use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; -use anyhow::anyhow; use futures::FutureExt; -use getset::{Getters, CopyGetters}; +use getset::{CopyGetters, Getters}; use log::trace; use shiplift::Container; use shiplift::Docker; use shiplift::ExecContainerOptions; use tokio::stream::StreamExt; -use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::RwLock; use typed_builder::TypedBuilder; use crate::endpoint::EndpointConfiguration; @@ -33,8 +33,8 @@ use crate::filestore::path::ArtifactPath; use crate::filestore::StagingStore; use crate::job::JobResource; use crate::job::RunnableJob; -use crate::log::LogItem; use crate::log::buffer_stream_to_line_stream; +use crate::log::LogItem; use crate::package::Script; use crate::util::docker::ContainerHash; use crate::util::docker::ImageName; @@ -61,55 +61,72 @@ impl Debug for Endpoint { } impl Endpoint { - - pub(in super) async fn setup(epc: EndpointConfiguration) -> Result<Self> { - let ep = Endpoint::setup_endpoint(epc.endpoint()) - .with_context(|| anyhow!("Setting up endpoint: {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; - - let versions_compat = Endpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep); - let api_versions_compat = Endpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep); - let imgs_avail = Endpoint::check_images_available(epc.required_images().as_ref(), &ep); + pub(super) async fn setup(epc: EndpointConfiguration) -> Result<Self> { + let ep = Endpoint::setup_endpoint(epc.endpoint()).with_context(|| { + anyhow!( + "Setting up endpoint: {} -> {}", + epc.endpoint().name(), + epc.endpoint().uri() + ) + })?; + + let versions_compat = + Endpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep); + let api_versions_compat = + Endpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep); + let imgs_avail = Endpoint::check_images_available(epc.required_images().as_ref(), &ep); let (versions_compat, api_versions_compat, imgs_avail) = tokio::join!(versions_compat, api_versions_compat, imgs_avail); - let _ = versions_compat - .with_context(|| anyhow!("Checking version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; - let _ = api_versions_compat - .with_context(|| anyhow!("Checking API version compatibility for {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; - let _ = imgs_avail - .with_context(|| anyhow!("Checking for available images on {} -> {}", epc.endpoint().name(), epc.endpoint().uri()))?; + let _ = versions_compat.with_context(|| { + anyhow!( + "Checking version compatibility for {} -> {}", + epc.endpoint().name(), + epc.endpoint().uri() + ) + })?; + let _ = api_versions_compat.with_context(|| { + anyhow!( + "Checking API version compatibility for {} -> {}", + epc.endpoint().name(), + epc.endpoint().uri() + ) + })?; + let _ = imgs_avail.with_context(|| { + anyhow!( + "Checking for available images on {} -> {}", + epc.endpoint().name(), + epc.endpoint().uri() + ) + })?; Ok(ep) } fn setup_endpoint(ep: &crate::config::Endpoint) -> Result<Endpoint> { match ep.endpoint_type() { - crate::config::EndpointType::Http => { - shiplift::Uri::from_str(ep.uri()) - .map(shiplift::Docker::host) - .with_context(|| anyhow!("Connecting to {}", ep.uri())) - .map_err(Error::from) - .map(|docker| { - Endpoint::builder() - .name(ep.name().clone()) - .uri(ep.uri().clone()) - .docker(docker) - .num_max_jobs(ep.maxjobs()) - .build() - }) - } - - crate::config::EndpointType::Socket => { - Ok({ + crate::config::EndpointType::Http => shiplift::Uri::from_str(ep.uri()) + .map(shiplift::Docker::host) + .with_context(|| anyhow!("Connecting to {}", ep.uri())) + .map_err(Error::from) + .map(|docker| { Endpoint::builder() .name(ep.name().clone()) .uri(ep.uri().clone()) + .docker(docker) .num_max_jobs(ep.maxjobs()) - .docker(shiplift::Docker::unix(ep.uri())) .build() - }) - } + }), + + crate::config::EndpointType::Socket => Ok({ + Endpoint::builder() + .name(ep.name().clone()) + .uri(ep.uri().clone()) + .num_max_jobs(ep.maxjobs()) + .docker(shiplift::Docker::unix(ep.uri())) + .build() + }), } } @@ -117,14 +134,19 @@ impl Endpoint { match req { None => Ok(()), Some(v) => { - let avail = ep.docker() + let avail = ep + .docker() .version() .await .with_context(|| anyhow!("Getting version of endpoint: {}", ep.name))?; if !v.contains(&avail.version) { - Err(anyhow!("Incompatible docker version on endpoint {}: Expected: {}, Available: [{}]", - ep.name(), avail.version, v.join(", "))) + Err(anyhow!( + "Incompatible docker version on endpoint {}: Expected: {}, Available: [{}]", + ep.name(), + avail.version, + v.join(", ") + )) } else { Ok(()) } @@ -136,7 +158,8 @@ impl Endpoint { match req { None => Ok(()), Some(v) => { - let avail = ep.docker() + let avail = ep + .docker() .version() .await .with_context(|| anyhow!("Getting API version of endpoint: {}", ep.name))?; @@ -163,7 +186,8 @@ impl Endpoint { .with_context(|| anyhow!("Listing images on endpoint: {}", ep.name))? .into_iter() .map(|image_rep| { - image_rep.repo_tags + image_rep + .repo_tags .unwrap_or_default() .into_iter() .map(ImageName::from) @@ -176,7 +200,11 @@ impl Endpoint { imgs.iter() .map(|img| { if !available_names.contains(img) { - Err(anyhow!("Image '{}' missing from endpoint '{}'", img.as_ref(), ep.name)) + Err(anyhow!( + "Image '{}' missing from endpoint '{}'", + img.as_ref(), + ep.name + )) } else { Ok(()) } @@ -185,8 +213,11 @@ impl Endpoint { .map(|_| ()) } - - pub async fn prepare_container(&self, job: RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<PreparedContainer<'_>> { + pub async fn prepare_container( + &self, + job: RunnableJob, + staging: Arc<RwLock<StagingStore>>, + ) -> Result<PreparedContainer<'_>> { PreparedContainer::new(self, job, staging).await } @@ -199,7 +230,6 @@ impl Endpoint { .map_err(Error::from) .map(|list| list.len()) } - } #[derive(Getters)] @@ -212,10 +242,14 @@ pub struct PreparedContainer<'a> { } impl<'a> PreparedContainer<'a> { - async fn new(endpoint: &'a Endpoint, job: RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<PreparedContainer<'a>> { - let script = job.script().clone(); + async fn new( + endpoint: &'a Endpoint, + job: RunnableJob, + staging: Arc<RwLock<StagingStore>>, + ) -> Result<PreparedContainer<'a>> { + let script = job.script().clone(); let create_info = Self::build_container(endpoint, &job).await?; - let container = endpoint.docker.containers().get(&create_info.id); + let container = endpoint.docker.containers().get(&create_info.id); let (cpysrc, cpyart, cpyscr) = tokio::join!( Self::copy_source_to_container(&container, &job), @@ -223,26 +257,45 @@ impl<'a> PreparedContainer<'a> { Self::copy_script_to_container(&container, &script) ); - let _ = cpysrc - .with_context(|| anyhow!("Copying the sources to container {} on '{}'", create_info.id, endpoint.name))?; - - let _ = cpyart - .with_context(|| anyhow!("Copying the artifacts to container {} on '{}'", create_info.id, endpoint.name))?; - - let _ = cpyscr - .with_context(|| anyhow!("Copying the script to container {} on '{}'", create_info.id, endpoint.name))?; + let _ = cpysrc.with_context(|| { + anyhow!( + "Copying the sources to container {} on '{}'", + create_info.id, + endpoint.name + ) + })?; + + let _ = cpyart.with_context(|| { + anyhow!( + "Copying the artifacts to container {} on '{}'", + create_info.id, + endpoint.name + ) + })?; + + let _ = cpyscr.with_context(|| { + anyhow!( + "Copying the script to container {} on '{}'", + create_info.id, + endpoint.name + ) + })?; Ok({ PreparedContainer { endpoint, script, - create_info + create_info, } }) } - async fn build_container(endpoint: &Endpoint, job: &RunnableJob) -> Result<shiplift::rep::ContainerCreateInfo> { - let envs = job.environment() + async fn build_container( + endpoint: &Endpoint, + job: &RunnableJob, + ) -> Result<shiplift::rep::ContainerCreateInfo> { + let envs = job + .environment() .into_iter() .map(|(k, v)| format!("{}={}", k.as_ref(), v)) .chain({ @@ -270,7 +323,10 @@ impl<'a> PreparedContainer<'a> { Ok(create_info) } - async fn copy_source_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob) -> Result<()> { + async fn copy_source_to_container<'ca>( + container: &Container<'ca>, + job: &RunnableJob, + ) -> Result<()> { use tokio::io::AsyncReadExt; job.package_sources() @@ -278,9 +334,16 @@ impl<'a> PreparedContainer<'a> { .map(|entry| async { let source_path = entry.path(); let destination = PathBuf::from("/inputs").join({ - source_path.file_name() + source_path + .file_name() .ok_or_else(|| anyhow!("Not a file: {}", source_path.display())) - .with_context(|| anyhow!("Copying package source from {} to container {}", source_path.display(), container.id()))? + .with_context(|| { + anyhow!( + "Copying package source from {} to container {}", + source_path.display(), + container.id() + ) + })? }); trace!("Source path = {:?}", source_path); trace!("Source dest = {:?}", destination); @@ -309,17 +372,33 @@ impl<'a> PreparedContainer<'a> { .map_err(Error::from) } - async fn copy_artifacts_to_container<'ca>(container: &Container<'ca>, job: &RunnableJob, staging: Arc<RwLock<StagingStore>>) -> Result<()> { + async fn copy_artifacts_to_container<'ca>( + container: &Container<'ca>, + job: &RunnableJob, + staging: Arc<RwLock<StagingStore>>, + ) -> Result<()> { job.resources() .iter() .filter_map(JobResource::artifact) .cloned() .map(|art| async { - let artifact_file_name = art.path().file_name() + let artifact_file_name = art + .path() + .file_name() .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display())) - .with_context(|| anyhow!("Collecting artifacts for copying to container {}", container.id()))?; + .with_context(|| { + anyhow!( + "Collecting artifacts for copying to container {}", + container.id() + ) + })?; let destination = PathBuf::from("/inputs/").join(artifact_file_name); - trace!("Copying {} to container: {}:{}", art.path().display(), container.id(), destination.display()); + trace!( + "Copying {} to container: {}:{}", + art.path().display(), + container.id(), + destination.display() + ); let buf = staging .read() .await @@ -327,11 +406,24 @@ impl<'a> PreparedContainer<'a> { .join(art.path())? .read() .await - .with_context(|| anyhow!("Reading artifact {}, so it can be copied to container", art.path().display()))?; - - let r = container.copy_file_into(&destination, &buf) + .with_context(|| { + anyhow!( + "Reading artifact {}, so it can be copied to container", + art.path().display() + ) + })?; + + let r = container + .copy_file_into(&destination, &buf) .await - .with_context(|| anyhow!("Copying artifact {} to container {} at {}", art.path().display(), container.id(), destination.display())) + .with_context(|| { + anyhow!( + "Copying artifact {} to container {} at {}", + art.path().display(), + container.id(), + destination.display() + ) + }) .map_err(Error::from); drop(art); // ensure `art` is moved into closure r @@ -344,9 +436,13 @@ impl<'a> PreparedContainer<'a> { .map(|_| ()) } - async fn copy_script_to_container<'ca>(container: &Container<'ca>, script: &Script) -> Result<()> { + async fn copy_script_to_container<'ca>( + container: &Container<'ca>, + script: &Script, + ) -> Result<()> { let script_path = PathBuf::from("/script"); - container.copy_file_into(script_path, script.as_ref().as_bytes()) + container + .copy_file_into(script_path, script.as_ref().as_bytes()) .await .with_context(|| anyhow!("Copying the script into container {}", container.id())) .map_err(Error::from) @@ -354,9 +450,20 @@ impl<'a> PreparedContainer<'a> { pub async fn start(self) -> Result<StartedContainer<'a>> { let container = self.endpoint.docker.containers().get(&self.create_info.id); - let _ = container.start() - .inspect(|r| { trace!("Starting container {} -> {:?}", self.create_info.id, r); }) - .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", self.create_info.id, self.endpoint.name))) + let _ = container + .start() + .inspect(|r| { + trace!("Starting container {} -> {:?}", self.create_info.id, r); + }) + .map(|r| { + r.with_context(|| { + anyhow!( + "Starting the container {} on '{}'", + self.create_info.id, + self.endpoint.name + ) + }) + }) .await?; Ok({ @@ -376,7 +483,10 @@ pub struct StartedContainer<'a> { } impl<'a> StartedContainer<'a> { - pub async fn execute_script(self, logsink: UnboundedSender<LogItem>) -> Result<ExecutedContainer<'a>> { + pub async fn execute_script( + self, + logsink: UnboundedSender<LogItem>, + ) -> Result<ExecutedContainer<'a>> { let exec_opts = ExecContainerOptions::builder() .cmd(vec!["/bin/bash", "/script"]) .attach_stderr(true) @@ -386,26 +496,51 @@ impl<'a> StartedContainer<'a> { let container = self.endpoint.docker.containers().get(&self.create_info.id); - trace!("Moving logs to log sink for container {}", self.create_info.id); + trace!( + "Moving logs to log sink for container {}", + self.create_info.id + ); let stream = container.exec(&exec_opts); - let exited_successfully: Option<(bool, Option<String>)> = buffer_stream_to_line_stream(stream) - .map(|line| { - trace!("['{}':{}] Found log line: {:?}", self.endpoint.name, self.create_info.id, line); - line.with_context(|| anyhow!("Getting log from {}:{}", self.endpoint.name, self.create_info.id)) + let exited_successfully: Option<(bool, Option<String>)> = + buffer_stream_to_line_stream(stream) + .map(|line| { + trace!( + "['{}':{}] Found log line: {:?}", + self.endpoint.name, + self.create_info.id, + line + ); + line.with_context(|| { + anyhow!( + "Getting log from {}:{}", + self.endpoint.name, + self.create_info.id + ) + }) .map_err(Error::from) .and_then(|l| { crate::log::parser() .parse(l.as_bytes()) - .with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.endpoint.name, self.create_info.id, l)) + .with_context(|| { + anyhow!( + "Parsing log from {}:{}: {:?}", + self.endpoint.name, + self.create_info.id, + l + ) + }) .map_err(Error::from) .and_then(|item| { - let mut exited_successfully = None; { match item { - LogItem::State(Ok(_)) => exited_successfully = Some((true, None)), - LogItem::State(Err(ref msg)) => exited_successfully = Some((false, Some(msg.clone()))), + LogItem::State(Ok(_)) => { + exited_successfully = Some((true, None)) + } + LogItem::State(Err(ref msg)) => { + exited_successfully = Some((false, Some(msg.clone()))) + } _ => { // Nothing } @@ -413,25 +548,39 @@ impl<'a> StartedContainer<'a> { } trace!("Log item: {}", item.display()?); - logsink.send(item) + logsink + .send(item) .with_context(|| anyhow!("Sending log to log sink")) .map_err(Error::from) .map(|_| exited_successfully) }) }) - }) - .collect::<Result<Vec<_>>>() - .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", self.create_info.id, self.endpoint.name))) - .await - .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", self.create_info.id))? - .into_iter() - .fold(None, |accu, elem| match (accu, elem) { - (None , b) => b, - (Some((false, msg)) , _) => Some((false, msg)), - (_ , Some((false, msg))) => Some((false, msg)), - (a , None) => a, - (Some((true, _)) , Some((true, _))) => Some((true, None)), - }); + }) + .collect::<Result<Vec<_>>>() + .map(|r| { + r.with_context(|| { + anyhow!( + "Fetching log from container {} on {}", + self.create_info.id, + self.endpoint.name + ) + }) + }) + .await + .with_context(|| { + anyhow!( + "Copying script to container, running container and getting logs: {}", + self.create_info.id + ) + })? + .into_iter() + .fold(None, |accu, elem| match (accu, elem) { + (None, b) => b, + (Some((false, msg)), _) => Some((false, msg)), + (_, Some((false, msg))) => Some((false, msg)), + (a, None) => a, + (Some((true, _)), Some((true, _))) => Some((true, None)), + }); Ok({ ExecutedContainer { @@ -442,7 +591,6 @@ impl<'a> StartedContainer<'a> { } }) } - } pub struct ExecutedContainer<'a> { @@ -467,8 +615,13 @@ impl<'a> ExecutedContainer<'a> { let tar_stream = container .copy_from(&PathBuf::from("/outputs/")) .map(|item| { - item.with_context(|| anyhow!("Copying item from container {} to host", self.create_info.id)) - .map_err(Error::from) + item.with_context(|| { + anyhow!( + "Copying item from container {} to host", + self.create_info.id + ) + }) + .map_err(Error::from) }); let mut writelock = staging.write().await; @@ -479,7 +632,7 @@ impl<'a> ExecutedContainer<'a> { .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?; let exit_info = match self.exit_info { - Some((false, msg)) => { + Some((false, msg)) => { let err = anyhow!("Error during container run:\n\tMessage: '{msg}'\n\tConnect using\n\n\t\t`docker --host {uri} exec -it {container_id} /bin/bash`\n\n\tto debug.", container_id = self.create_info.id, uri = self.endpoint.uri(), @@ -488,15 +641,16 @@ impl<'a> ExecutedContainer<'a> { // error because the container errored Err(err) - }, + } Some((true, _)) | None => { let container = self.endpoint.docker.containers().get(&self.create_info.id); - container.stop(Some(std::time::Duration::new(1, 0))) + container + .stop(Some(std::time::Duration::new(1, 0))) .await .with_context(|| anyhow!("Stopping container {}", self.create_info.id))?; Ok(()) - }, + } }; Ok({ @@ -519,5 +673,3 @@ impl FinalizedContainer { (self.artifacts, self.exit_info) } } - - diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index 6b134e0..b87a303 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -16,4 +16,3 @@ pub use scheduler::*; mod configured; pub use configured::*; - diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 6f57d61..6404c6f 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -11,19 +11,19 @@ use std::path::PathBuf; use std::sync::Arc; +use anyhow::anyhow; use anyhow::Context; use anyhow::Error; use anyhow::Result; -use anyhow::anyhow; use diesel::PgConnection; use futures::FutureExt; use indicatif::ProgressBar; use itertools::Itertools; use log::trace; -use tokio::stream::StreamExt; use tokio::io::AsyncWriteExt; -use tokio::sync::RwLock; +use tokio::stream::StreamExt; use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::RwLock; use uuid::Uuid; use crate::db::models as dbmodels; @@ -44,8 +44,13 @@ pub struct EndpointScheduler { } impl EndpointScheduler { - - pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, submit: crate::db::models::Submit, log_dir: Option<PathBuf>) -> Result<Self> { + pub async fn setup( + endpoints: Vec<EndpointConfiguration>, + staging_store: Arc<RwLock<StagingStore>>, + db: Arc<PgConnection>, + submit: crate::db::models::Submit, + log_dir: Option<PathBuf>, + ) -> Result<Self> { let endpoints = Self::setup_endpoints(endpoints).await?; Ok(EndpointScheduler { @@ -57,17 +62,14 @@ impl EndpointScheduler { }) } - async fn setup_endpoints(endpoints: Vec<EndpointConfiguration>) -> Result<Vec<Arc<RwLock<Endpoint>>>> { + async fn setup_endpoints( + endpoints: Vec<EndpointConfiguration>, + ) -> Result<Vec<Arc<RwLock<Endpoint>>>> { let unordered = futures::stream::FuturesUnordered::new(); for cfg in endpoints.into_iter() { - unordered.push({ - Endpoint::setup(cfg) - .map(|r_ep| { - r_ep.map(RwLock::new) - .map(Arc::new) - }) - }); + unordered + .push(Endpoint::setup(cfg).map(|r_ep| r_ep.map(RwLock::new).map(Arc::new))); } unordered.collect().await @@ -78,7 +80,11 @@ impl EndpointScheduler { /// # Warning /// /// This function blocks as long as there is no free endpoint available! - pub async fn schedule_job(&self, job: RunnableJob, bar: indicatif::ProgressBar) -> Result<JobHandle> { + pub async fn schedule_job( + &self, + job: RunnableJob, + bar: indicatif::ProgressBar, + ) -> Result<JobHandle> { let endpoint = self.select_free_endpoint().await?; Ok(JobHandle { @@ -94,7 +100,8 @@ impl EndpointScheduler { async fn select_free_endpoint(&self) -> Result<Arc<RwLock<Endpoint>>> { loop { - let ep = self.endpoints + let ep = self + .endpoints .iter() .cloned() .map(|ep| async move { @@ -113,13 +120,12 @@ impl EndpointScheduler { .next(); if let Some(endpoint) = ep { - return Ok(endpoint) + return Ok(endpoint); } else { trace!("No free endpoint found, retry..."); } } } - } pub struct JobHandle { @@ -141,19 +147,29 @@ impl std::fmt::Debug for JobHandle { impl JobHandle { pub async fn run(self) -> Result<Vec<dbmodels::Artifact>> { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); - let ep = self.endpoint.read().await; + let ep = self.endpoint.read().await; let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?; - let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?; - let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?; - let envs = self.create_env_in_db()?; - let job_id = *self.job.uuid(); + let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?; + let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?; + let envs = self.create_env_in_db()?; + let job_id = *self.job.uuid(); trace!("Running on Job {} on Endpoint {}", job_id, ep.name()); - let prepared_container = ep.prepare_container(self.job, self.staging_store.clone()).await?; - let container_id = prepared_container.create_info().id.clone(); - let running_container = prepared_container + let prepared_container = ep + .prepare_container(self.job, self.staging_store.clone()) + .await?; + let container_id = prepared_container.create_info().id.clone(); + let running_container = prepared_container .start() .await - .with_context(|| Self::create_job_run_error(&job_id, &package.name, &package.version, ep.uri(), &container_id))? + .with_context(|| { + Self::create_job_run_error( + &job_id, + &package.name, + &package.version, + ep.uri(), + &container_id, + ) + })? .execute_script(log_sender); let logres = LogReceiver { @@ -163,28 +179,65 @@ impl JobHandle { job_id, log_receiver, bar: &self.bar, - }.join(); + } + .join(); let (run_container, logres) = tokio::join!(running_container, logres); let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?; - let run_container = run_container.with_context(|| anyhow!("Running container {} failed")) - .with_context(|| Self::create_job_run_error(&job_id, &package.name, &package.version, ep.uri(), &container_id))?; - - let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &run_container.container_hash(), run_container.script(), &log)?; + let run_container = run_container + .with_context(|| anyhow!("Running container {} failed")) + .with_context(|| { + Self::create_job_run_error( + &job_id, + &package.name, + &package.version, + ep.uri(), + &container_id, + ) + })?; + + let job = dbmodels::Job::create( + &self.db, + &job_id, + &self.submit, + &endpoint, + &package, + &image, + &run_container.container_hash(), + run_container.script(), + &log, + )?; trace!("DB: Job entry for job {} created: {}", job.uuid, job.id); for env in envs { let _ = dbmodels::JobEnv::create(&self.db, &job, &env)?; } - let res : crate::endpoint::FinalizedContainer = run_container + let res: crate::endpoint::FinalizedContainer = run_container .finalize(self.staging_store.clone()) .await - .with_context(|| Self::create_job_run_error(&job.uuid, &package.name, &package.version, ep.uri(), &container_id))?; + .with_context(|| { + Self::create_job_run_error( + &job.uuid, + &package.name, + &package.version, + ep.uri(), + &container_id, + ) + })?; trace!("Found result for job {}: {:?}", job_id, res); let (paths, res) = res.unpack(); - let _ = res.with_context(|| anyhow!("Error during running job on '{}'", ep.name())) - .with_context(|| Self::create_job_run_error(&job.uuid, &package.name, &package.version, ep.uri(), &container_id))?; + let _ = res + .with_context(|| anyhow!("Error during running job on '{}'", ep.name())) + .with_context(|| { + Self::create_job_run_error( + &job.uuid, + &package.name, + &package.version, + ep.uri(), + &container_id, + ) + })?; // Have to do it the ugly way here because of borrowing semantics let mut r = vec![]; @@ -196,8 +249,15 @@ impl JobHandle { } /// Helper to create an error object with a nice message. - fn create_job_run_error(job_id: &Uuid, package_name: &str, package_version: &str, endpoint_uri: &str, container_id: &str) -> anyhow::Error { - anyhow!(indoc::formatdoc!(r#"Error while running job + fn create_job_run_error( + job_id: &Uuid, + package_name: &str, + package_version: &str, + endpoint_uri: &str, + container_id: &str, + ) -> anyhow::Error { + anyhow!(indoc::formatdoc!( + r#"Error while running job {job_id} for package {package_name} {package_version} @@ -208,11 +268,11 @@ impl JobHandle { to debug. "#, - job_id = job_id, - package_name = package_name, + job_id = job_id, + package_name = package_name, package_version = package_version, - endpoint_uri |