summaryrefslogtreecommitdiffstats
path: root/src/orchestrator
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-15 11:35:32 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-15 11:39:34 +0100
commit771f957d4202bfa15820dc004b9cb874a32b7746 (patch)
tree302b918379b0a8289921d55ef431bef37279d6d9 /src/orchestrator
parent183eac77bfd87eb5c2d8e2832d3fe35779ec5fef (diff)
Implement error reporting of failed jobs
This patch implements error reporting if a container job did not end successfully. It does so by adding an error type `ContainerError`, which is either an error that describes that a container did not exit with success, or an anyhow::Error (that describes an error from the container management code). The algorithm of log-aggregation is now intercepted to catch any exit-state log items. If there is no exit-state from the container (No line with "#BUTIDO:STATE:..."), no error is assumed. Here could be a warning later on. The so aggregated state is then passed up to the orchestrator, which then collects the errors and prints them. If the implementation is correct (which is not tested yet, because this is rather difficult to test), all other containers should continue operation until they are ready, before the errors are handled. The code responsible for this (in the Orchestrator implementation) was adapted to not collect until the first error, but collect everything and then check for errors. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r--src/orchestrator/orchestrator.rs30
1 files changed, 23 insertions, 7 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index e42582e..e7b5ba6 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -1,18 +1,21 @@
+use std::io::Write;
use std::path::PathBuf;
+use std::result::Result as RResult;
use std::sync::Arc;
-use tokio::sync::RwLock;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use diesel::PgConnection;
+use indicatif::ProgressBar;
+use tokio::sync::RwLock;
+use tokio::sync::mpsc::UnboundedReceiver;
use typed_builder::TypedBuilder;
use uuid::Uuid;
-use tokio::sync::mpsc::UnboundedReceiver;
-use indicatif::ProgressBar;
use crate::db::models::Submit;
+use crate::endpoint::ContainerError;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
use crate::filestore::MergedStores;
@@ -96,17 +99,30 @@ impl Orchestrator {
});
}
- unordered_results.collect::<Result<Vec<_>>>()
+ unordered_results.collect::<Vec<RResult<_, ContainerError>>>()
};
let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
let (results, barres) = tokio::join!(results, multibar_block);
let _ = barres?;
- let results = results?
+ let (okays, errors): (Vec<_>, Vec<_>) = results
.into_iter()
- .flatten()
- .collect::<Vec<PathBuf>>();
+ .inspect(|e| trace!("Processing result from jobset run: {:?}", e))
+ .partition(|e| e.is_ok());
+
+ let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<PathBuf>>();
+
+ {
+ let mut out = std::io::stderr();
+ for error in errors {
+ if let Err(e) = error {
+ if let Some(expl) = e.explain_container_error() {
+ writeln!(out, "{}", expl)?;
+ }
+ }
+ }
+ }
{ // check if all paths that were written are actually there in the staging store
let staging_store_lock = self.staging_store.read().await;