summaryrefslogtreecommitdiffstats
path: root/src/endpoint/configured.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-15 11:35:32 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-15 11:39:34 +0100
commit771f957d4202bfa15820dc004b9cb874a32b7746 (patch)
tree302b918379b0a8289921d55ef431bef37279d6d9 /src/endpoint/configured.rs
parent183eac77bfd87eb5c2d8e2832d3fe35779ec5fef (diff)
Implement error reporting of failed jobs
This patch implements error reporting if a container job did not end successfully. It does so by adding an error type `ContainerError`, which is either an error that describes that a container did not exit with success, or an anyhow::Error (that describes an error from the container management code). The algorithm of log-aggregation is now intercepted to catch any exit-state log items. If there is no exit-state from the container (No line with "#BUTIDO:STATE:..."), no error is assumed. Here could be a warning later on. The so aggregated state is then passed up to the orchestrator, which then collects the errors and prints them. If the implementation is correct (which is not tested yet, because this is rather difficult to test), all other containers should continue operation until they are ready, before the errors are handled. The code responsible for this (in the Orchestrator implementation) was adapted to not collect until the first error, but collect everything and then check for errors. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r--src/endpoint/configured.rs47
1 files changed, 37 insertions, 10 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 7949437..cb57ea8 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -1,5 +1,6 @@
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
+use std::result::Result as RResult;
use std::str::FromStr;
use std::sync::Arc;
@@ -22,6 +23,7 @@ use crate::log::LogItem;
use crate::package::Script;
use crate::util::docker::ContainerHash;
use crate::util::docker::ImageName;
+use crate::endpoint::ContainerError;
#[derive(Getters, CopyGetters, TypedBuilder)]
pub struct Endpoint {
@@ -169,7 +171,7 @@ impl Endpoint {
.map(|_| ())
}
- pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<(Vec<PathBuf>, ContainerHash, Script)> {
+ pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> {
use crate::log::buffer_stream_to_line_stream;
use tokio::stream::StreamExt;
use futures::FutureExt;
@@ -276,7 +278,7 @@ impl Endpoint {
.with_context(|| anyhow!("Copying artifacts to container {}", container_id))?;
}
- container
+ let exited_successfully: Option<bool> = container
.copy_file_into(script_path, job.script().as_ref().as_bytes())
.inspect(|r| { trace!("Copying script to container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Copying the script into the container {} on '{}'", container_id, self.name)))
@@ -284,6 +286,7 @@ impl Endpoint {
.inspect(|r| { trace!("Starting container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name)))
.then(|_| {
+ use futures::FutureExt;
trace!("Moving logs to log sink for container {}", container_id);
buffer_stream_to_line_stream(container.exec(&exec_opts))
.map(|line| {
@@ -296,19 +299,39 @@ impl Endpoint {
.with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.name, container_id, l))
.map_err(Error::from)
.and_then(|item| {
+
+ let mut exited_successfully = None;
+ {
+ match item {
+ LogItem::State(Ok(_)) => exited_successfully = Some(true),
+ LogItem::State(Err(_)) => exited_successfully = Some(false),
+ _ => {
+ // Nothing
+ }
+ }
+ }
+
trace!("Log item: {}", item.display()?);
logsink.send(item)
.with_context(|| anyhow!("Sending log to log sink"))
.map_err(Error::from)
+ .map(|_| exited_successfully)
})
})
})
.collect::<Result<Vec<_>>>()
})
- .inspect(|r| { trace!("Fetching log from container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name)))
.await
- .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))?;
+ .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))?
+ .into_iter()
+ .fold(None, |accu, elem| match (accu, elem) {
+ (None , b) => b,
+ (Some(false) , _) => Some(false),
+ (_ , Some(false)) => Some(false),
+ (a , None) => a,
+ (Some(true) , Some(true)) => Some(true),
+ });
trace!("Fetching /outputs from container {}", container_id);
let tar_stream = container
@@ -327,13 +350,17 @@ impl Endpoint {
.with_context(|| anyhow!("Copying the TAR stream to the staging store"))?
};
- container.stop(Some(std::time::Duration::new(1, 0)))
- .await
- .with_context(|| anyhow!("Stopping container {}", container_id))?;
-
- trace!("Returning job {} result = {:?}, container hash = {}", job.uuid(), r, container_id);
let script: Script = job.script().clone();
- Ok((r, ContainerHash::from(container_id), script))
+ match exited_successfully {
+ Some(false) => Err(ContainerError::container_error(ContainerHash::from(container_id))),
+ Some(true) | None => {
+ container.stop(Some(std::time::Duration::new(1, 0)))
+ .await
+ .with_context(|| anyhow!("Stopping container {}", container_id))?;
+
+ Ok((r, ContainerHash::from(container_id), script))
+ },
+ }
}
pub async fn number_of_running_containers(&self) -> Result<usize> {