diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-15 11:35:32 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-15 11:39:34 +0100 |
commit | 771f957d4202bfa15820dc004b9cb874a32b7746 (patch) | |
tree | 302b918379b0a8289921d55ef431bef37279d6d9 /src/orchestrator | |
parent | 183eac77bfd87eb5c2d8e2832d3fe35779ec5fef (diff) |
Implement error reporting of failed jobs
This patch implements error reporting if a container job did not end
successfully.
It does so by adding an error type `ContainerError`, which is either an
error that describes that a container did not exit with success, or an
anyhow::Error (that describes an error from the container management
code).
The algorithm of log-aggregation is now intercepted to catch any
exit-state log items.
If there is no exit-state from the container (No line with
"#BUTIDO:STATE:..."), no error is assumed.
Here could be a warning later on.
The so aggregated state is then passed up to the orchestrator, which
then collects the errors and prints them.
If the implementation is correct (which is not tested yet, because this
is rather difficult to test), all other containers should continue
operation until they are ready, before the errors are handled.
The code responsible for this (in the Orchestrator implementation) was
adapted to not collect until the first error, but collect everything and
then check for errors.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index e42582e..e7b5ba6 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -1,18 +1,21 @@ +use std::io::Write; use std::path::PathBuf; +use std::result::Result as RResult; use std::sync::Arc; -use tokio::sync::RwLock; use anyhow::Context; use anyhow::Error; use anyhow::Result; use anyhow::anyhow; use diesel::PgConnection; +use indicatif::ProgressBar; +use tokio::sync::RwLock; +use tokio::sync::mpsc::UnboundedReceiver; use typed_builder::TypedBuilder; use uuid::Uuid; -use tokio::sync::mpsc::UnboundedReceiver; -use indicatif::ProgressBar; use crate::db::models::Submit; +use crate::endpoint::ContainerError; use crate::endpoint::EndpointConfiguration; use crate::endpoint::EndpointScheduler; use crate::filestore::MergedStores; @@ -96,17 +99,30 @@ impl Orchestrator { }); } - unordered_results.collect::<Result<Vec<_>>>() + unordered_results.collect::<Vec<RResult<_, ContainerError>>>() }; let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); let (results, barres) = tokio::join!(results, multibar_block); let _ = barres?; - let results = results? + let (okays, errors): (Vec<_>, Vec<_>) = results .into_iter() - .flatten() - .collect::<Vec<PathBuf>>(); + .inspect(|e| trace!("Processing result from jobset run: {:?}", e)) + .partition(|e| e.is_ok()); + + let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<PathBuf>>(); + + { + let mut out = std::io::stderr(); + for error in errors { + if let Err(e) = error { + if let Some(expl) = e.explain_container_error() { + writeln!(out, "{}", expl)?; + } + } + } + } { // check if all paths that were written are actually there in the staging store let staging_store_lock = self.staging_store.read().await; |