diff options
-rw-r--r-- | src/commands/build.rs | 9 | ||||
-rw-r--r-- | src/endpoint/configured.rs | 5 | ||||
-rw-r--r-- | src/endpoint/error.rs | 30 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 6 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 59 |
5 files changed, 43 insertions, 66 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs index b83d790..0f7c77b 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -243,12 +243,15 @@ pub async fn build(matches: &ArgMatches, .await?; info!("Running orchestrator..."); - let res = orch.run().await?; + let mut artifacts = vec![]; + let res = orch.run(&mut artifacts).await; let out = std::io::stdout(); let mut outlock = out.lock(); writeln!(outlock, "Packages created:")?; - res.into_iter() + artifacts.into_iter() .map(|artifact| writeln!(outlock, "-> {}", staging_dir.join(artifact.path).display()).map_err(Error::from)) - .collect::<Result<_>>() + .collect::<Result<_>>()?; + + res } diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 191e565..a478857 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -1,6 +1,5 @@ use std::fmt::{Debug, Formatter}; use std::path::PathBuf; -use std::result::Result as RResult; use std::str::FromStr; use std::sync::Arc; @@ -176,7 +175,7 @@ impl Endpoint { .map(|_| ()) } - pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<ArtifactPath>, ContainerHash, Script), ContainerError> { + pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<(Vec<ArtifactPath>, ContainerHash, Script)> { let (container_id, _warnings) = { let envs = job.environment() .into_iter() @@ -365,7 +364,7 @@ impl Endpoint { let script: Script = job.script().clone(); match exited_successfully { - Some(false) => Err(ContainerError::container_error(ContainerHash::from(container_id), self.uri().clone())), + Some(false) => Err(ContainerError::container_error(ContainerHash::from(container_id), self.uri().clone())).map_err(Error::from), Some(true) | None => { container.stop(Some(std::time::Duration::new(1, 0))) .await diff --git a/src/endpoint/error.rs b/src/endpoint/error.rs index dc4468a..5638efe 100644 --- a/src/endpoint/error.rs +++ b/src/endpoint/error.rs @@ -4,44 +4,16 @@ use crate::util::docker::ContainerHash; #[derive(ThisError, Debug)] pub enum ContainerError { - - #[error("Error during container run: {container_id}")] + #[error("Error during container run, connect using `docker --host {uri} exec -it {container_id} /bin/bash`")] ContainerError { container_id: ContainerHash, uri: String, }, - - #[error("{0}")] - Err(anyhow::Error), } impl ContainerError { pub fn container_error(container_id: ContainerHash, uri: String) -> Self { ContainerError::ContainerError { container_id, uri } } - - pub fn explain_container_error(&self) -> Option<String> { - match self { - ContainerError::ContainerError { container_id, uri } => Some({ - indoc::formatdoc!(r#" - Container did not exit successfully: {container_id} - It was not stopped because of this. - - Use - - docker --host {uri} exec -it {container_id} /bin/bash - - to access and debug. - "#, uri = uri, container_id = container_id) - }), - _ => None, - } - } -} - -impl From<anyhow::Error> for ContainerError { - fn from(ae: anyhow::Error) -> Self { - ContainerError::Err(ae) - } } diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 6d0eb1e..1b15789 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -1,5 +1,4 @@ use std::path::PathBuf; -use std::result::Result as RResult; use std::sync::Arc; use anyhow::Context; @@ -18,7 +17,6 @@ use tokio::sync::mpsc::UnboundedReceiver; use uuid::Uuid; use crate::db::models as dbmodels; -use crate::endpoint::ContainerError; use crate::endpoint::Endpoint; use crate::endpoint::EndpointConfiguration; use crate::filestore::StagingStore; @@ -131,7 +129,7 @@ impl std::fmt::Debug for JobHandle { } impl JobHandle { - pub async fn run(self) -> RResult<Vec<dbmodels::Artifact>, ContainerError> { + pub async fn run(self) -> Result<Vec<dbmodels::Artifact>> { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); let ep = self.endpoint.read().await; let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?; @@ -156,7 +154,7 @@ impl JobHandle { trace!("Found result for job {}: {:?}", job_id, res); let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?; - let (paths, container_hash, script) = res.with_context(|| anyhow!("Running job on '{}'", ep.name()))?; + let (paths, container_hash, script) = res.with_context(|| anyhow!("Error during running job on '{}'", ep.name()))?; let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?; for env in envs { diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index ef12a53..2ce3f62 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -1,6 +1,5 @@ use std::io::Write; use std::path::PathBuf; -use std::result::Result as RResult; use std::sync::Arc; use anyhow::Context; @@ -15,7 +14,6 @@ use typed_builder::TypedBuilder; use crate::config::Configuration; use crate::db::models::Artifact; use crate::db::models::Submit; -use crate::endpoint::ContainerError; use crate::endpoint::EndpointConfiguration; use crate::endpoint::EndpointScheduler; use crate::filestore::MergedStores; @@ -67,21 +65,19 @@ impl<'a> OrchestratorSetup<'a> { impl<'a> Orchestrator<'a> { - pub async fn run(self) -> Result<Vec<Artifact>> { - let mut report_result = vec![]; + pub async fn run(self, output: &mut Vec<Artifact>) -> Result<()> { for jobset in self.jobsets.into_iter() { - let mut results = Self::run_jobset(&self.scheduler, + let _ = Self::run_jobset(&self.scheduler, &self.merged_stores, &self.source_cache, &self.config, &self.progress_generator, - jobset) + jobset, + output) .await?; - - report_result.append(&mut results); } - Ok(report_result) + Ok(()) } async fn run_jobset( @@ -90,8 +86,9 @@ impl<'a> Orchestrator<'a> { source_cache: &SourceCache, config: &Configuration, progress_generator: &ProgressBars, - jobset: JobSet) - -> Result<Vec<Artifact>> + jobset: JobSet, + output: &mut Vec<Artifact>) + -> Result<()> { use tokio::stream::StreamExt; @@ -105,7 +102,7 @@ impl<'a> Orchestrator<'a> { Self::run_runnable(runnable, scheduler, bar) }) .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Vec<RResult<Vec<Artifact>, ContainerError>>>(); + .collect::<Vec<Result<Vec<Artifact>>>>(); let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); @@ -118,17 +115,6 @@ impl<'a> Orchestrator<'a> { let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<Artifact>>(); - { - let mut out = std::io::stderr(); - for error in errors { - if let Err(e) = error { - if let Some(expl) = e.explain_container_error() { - writeln!(out, "{}", expl)?; - } - } - } - } - { // check if all paths that were written are actually there in the staging store let staging_store_lock = merged_store.staging().read().await; @@ -145,12 +131,31 @@ impl<'a> Orchestrator<'a> { } - Ok(results) + let mut have_error = false; + { + trace!("Logging {} errors...", errors.len()); + let out = std::io::stderr(); + let mut lock = out.lock(); + for error in errors { + have_error = true; + if let Err(e) = error.map_err(Error::from) { + for cause in e.chain() { + writeln!(lock, "{}", cause)?; + } + } + } + } + + let mut results = results; // rebind + output.append(&mut results); + if have_error { + Err(anyhow!("Error during build")) + } else { + Ok(()) + } } - async fn run_runnable(runnable: RunnableJob, scheduler: &EndpointScheduler, bar: indicatif::ProgressBar) - -> RResult<Vec<Artifact>, ContainerError> - { + async fn run_runnable(runnable: RunnableJob, scheduler: &EndpointScheduler, bar: indicatif::ProgressBar) -> Result<Vec<Artifact>> { let job_id = runnable.uuid().clone(); trace!("Runnable {} for package {}", job_id, runnable.package().name()); |