summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/commands/build.rs9
-rw-r--r--src/endpoint/configured.rs5
-rw-r--r--src/endpoint/error.rs30
-rw-r--r--src/endpoint/scheduler.rs6
-rw-r--r--src/orchestrator/orchestrator.rs59
5 files changed, 43 insertions, 66 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs
index b83d790..0f7c77b 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -243,12 +243,15 @@ pub async fn build(matches: &ArgMatches,
.await?;
info!("Running orchestrator...");
- let res = orch.run().await?;
+ let mut artifacts = vec![];
+ let res = orch.run(&mut artifacts).await;
let out = std::io::stdout();
let mut outlock = out.lock();
writeln!(outlock, "Packages created:")?;
- res.into_iter()
+ artifacts.into_iter()
.map(|artifact| writeln!(outlock, "-> {}", staging_dir.join(artifact.path).display()).map_err(Error::from))
- .collect::<Result<_>>()
+ .collect::<Result<_>>()?;
+
+ res
}
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 191e565..a478857 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -1,6 +1,5 @@
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
-use std::result::Result as RResult;
use std::str::FromStr;
use std::sync::Arc;
@@ -176,7 +175,7 @@ impl Endpoint {
.map(|_| ())
}
- pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<ArtifactPath>, ContainerHash, Script), ContainerError> {
+ pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<(Vec<ArtifactPath>, ContainerHash, Script)> {
let (container_id, _warnings) = {
let envs = job.environment()
.into_iter()
@@ -365,7 +364,7 @@ impl Endpoint {
let script: Script = job.script().clone();
match exited_successfully {
- Some(false) => Err(ContainerError::container_error(ContainerHash::from(container_id), self.uri().clone())),
+ Some(false) => Err(ContainerError::container_error(ContainerHash::from(container_id), self.uri().clone())).map_err(Error::from),
Some(true) | None => {
container.stop(Some(std::time::Duration::new(1, 0)))
.await
diff --git a/src/endpoint/error.rs b/src/endpoint/error.rs
index dc4468a..5638efe 100644
--- a/src/endpoint/error.rs
+++ b/src/endpoint/error.rs
@@ -4,44 +4,16 @@ use crate::util::docker::ContainerHash;
#[derive(ThisError, Debug)]
pub enum ContainerError {
-
- #[error("Error during container run: {container_id}")]
+ #[error("Error during container run, connect using `docker --host {uri} exec -it {container_id} /bin/bash`")]
ContainerError {
container_id: ContainerHash,
uri: String,
},
-
- #[error("{0}")]
- Err(anyhow::Error),
}
impl ContainerError {
pub fn container_error(container_id: ContainerHash, uri: String) -> Self {
ContainerError::ContainerError { container_id, uri }
}
-
- pub fn explain_container_error(&self) -> Option<String> {
- match self {
- ContainerError::ContainerError { container_id, uri } => Some({
- indoc::formatdoc!(r#"
- Container did not exit successfully: {container_id}
- It was not stopped because of this.
-
- Use
-
- docker --host {uri} exec -it {container_id} /bin/bash
-
- to access and debug.
- "#, uri = uri, container_id = container_id)
- }),
- _ => None,
- }
- }
-}
-
-impl From<anyhow::Error> for ContainerError {
- fn from(ae: anyhow::Error) -> Self {
- ContainerError::Err(ae)
- }
}
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 6d0eb1e..1b15789 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -1,5 +1,4 @@
use std::path::PathBuf;
-use std::result::Result as RResult;
use std::sync::Arc;
use anyhow::Context;
@@ -18,7 +17,6 @@ use tokio::sync::mpsc::UnboundedReceiver;
use uuid::Uuid;
use crate::db::models as dbmodels;
-use crate::endpoint::ContainerError;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::StagingStore;
@@ -131,7 +129,7 @@ impl std::fmt::Debug for JobHandle {
}
impl JobHandle {
- pub async fn run(self) -> RResult<Vec<dbmodels::Artifact>, ContainerError> {
+ pub async fn run(self) -> Result<Vec<dbmodels::Artifact>> {
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
let ep = self.endpoint.read().await;
let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?;
@@ -156,7 +154,7 @@ impl JobHandle {
trace!("Found result for job {}: {:?}", job_id, res);
let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?;
- let (paths, container_hash, script) = res.with_context(|| anyhow!("Running job on '{}'", ep.name()))?;
+ let (paths, container_hash, script) = res.with_context(|| anyhow!("Error during running job on '{}'", ep.name()))?;
let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?;
for env in envs {
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index ef12a53..2ce3f62 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -1,6 +1,5 @@
use std::io::Write;
use std::path::PathBuf;
-use std::result::Result as RResult;
use std::sync::Arc;
use anyhow::Context;
@@ -15,7 +14,6 @@ use typed_builder::TypedBuilder;
use crate::config::Configuration;
use crate::db::models::Artifact;
use crate::db::models::Submit;
-use crate::endpoint::ContainerError;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
use crate::filestore::MergedStores;
@@ -67,21 +65,19 @@ impl<'a> OrchestratorSetup<'a> {
impl<'a> Orchestrator<'a> {
- pub async fn run(self) -> Result<Vec<Artifact>> {
- let mut report_result = vec![];
+ pub async fn run(self, output: &mut Vec<Artifact>) -> Result<()> {
for jobset in self.jobsets.into_iter() {
- let mut results = Self::run_jobset(&self.scheduler,
+ let _ = Self::run_jobset(&self.scheduler,
&self.merged_stores,
&self.source_cache,
&self.config,
&self.progress_generator,
- jobset)
+ jobset,
+ output)
.await?;
-
- report_result.append(&mut results);
}
- Ok(report_result)
+ Ok(())
}
async fn run_jobset(
@@ -90,8 +86,9 @@ impl<'a> Orchestrator<'a> {
source_cache: &SourceCache,
config: &Configuration,
progress_generator: &ProgressBars,
- jobset: JobSet)
- -> Result<Vec<Artifact>>
+ jobset: JobSet,
+ output: &mut Vec<Artifact>)
+ -> Result<()>
{
use tokio::stream::StreamExt;
@@ -105,7 +102,7 @@ impl<'a> Orchestrator<'a> {
Self::run_runnable(runnable, scheduler, bar)
})
.collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Vec<RResult<Vec<Artifact>, ContainerError>>>();
+ .collect::<Vec<Result<Vec<Artifact>>>>();
let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
@@ -118,17 +115,6 @@ impl<'a> Orchestrator<'a> {
let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<Artifact>>();
- {
- let mut out = std::io::stderr();
- for error in errors {
- if let Err(e) = error {
- if let Some(expl) = e.explain_container_error() {
- writeln!(out, "{}", expl)?;
- }
- }
- }
- }
-
{ // check if all paths that were written are actually there in the staging store
let staging_store_lock = merged_store.staging().read().await;
@@ -145,12 +131,31 @@ impl<'a> Orchestrator<'a> {
}
- Ok(results)
+ let mut have_error = false;
+ {
+ trace!("Logging {} errors...", errors.len());
+ let out = std::io::stderr();
+ let mut lock = out.lock();
+ for error in errors {
+ have_error = true;
+ if let Err(e) = error.map_err(Error::from) {
+ for cause in e.chain() {
+ writeln!(lock, "{}", cause)?;
+ }
+ }
+ }
+ }
+
+ let mut results = results; // rebind
+ output.append(&mut results);
+ if have_error {
+ Err(anyhow!("Error during build"))
+ } else {
+ Ok(())
+ }
}
- async fn run_runnable(runnable: RunnableJob, scheduler: &EndpointScheduler, bar: indicatif::ProgressBar)
- -> RResult<Vec<Artifact>, ContainerError>
- {
+ async fn run_runnable(runnable: RunnableJob, scheduler: &EndpointScheduler, bar: indicatif::ProgressBar) -> Result<Vec<Artifact>> {
let job_id = runnable.uuid().clone();
trace!("Runnable {} for package {}", job_id, runnable.package().name());