diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-15 11:35:32 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-15 11:39:34 +0100 |
commit | 771f957d4202bfa15820dc004b9cb874a32b7746 (patch) | |
tree | 302b918379b0a8289921d55ef431bef37279d6d9 /src/endpoint/configured.rs | |
parent | 183eac77bfd87eb5c2d8e2832d3fe35779ec5fef (diff) |
Implement error reporting of failed jobs
This patch implements error reporting if a container job did not end
successfully.
It does so by adding an error type `ContainerError`, which is either an
error that describes that a container did not exit with success, or an
anyhow::Error (that describes an error from the container management
code).
The algorithm of log-aggregation is now intercepted to catch any
exit-state log items.
If there is no exit-state from the container (No line with
"#BUTIDO:STATE:..."), no error is assumed.
Here could be a warning later on.
The so aggregated state is then passed up to the orchestrator, which
then collects the errors and prints them.
If the implementation is correct (which is not tested yet, because this
is rather difficult to test), all other containers should continue
operation until they are ready, before the errors are handled.
The code responsible for this (in the Orchestrator implementation) was
adapted to not collect until the first error, but collect everything and
then check for errors.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r-- | src/endpoint/configured.rs | 47 |
1 files changed, 37 insertions, 10 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 7949437..cb57ea8 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -1,5 +1,6 @@ use std::fmt::{Debug, Formatter}; use std::path::PathBuf; +use std::result::Result as RResult; use std::str::FromStr; use std::sync::Arc; @@ -22,6 +23,7 @@ use crate::log::LogItem; use crate::package::Script; use crate::util::docker::ContainerHash; use crate::util::docker::ImageName; +use crate::endpoint::ContainerError; #[derive(Getters, CopyGetters, TypedBuilder)] pub struct Endpoint { @@ -169,7 +171,7 @@ impl Endpoint { .map(|_| ()) } - pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<(Vec<PathBuf>, ContainerHash, Script)> { + pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> { use crate::log::buffer_stream_to_line_stream; use tokio::stream::StreamExt; use futures::FutureExt; @@ -276,7 +278,7 @@ impl Endpoint { .with_context(|| anyhow!("Copying artifacts to container {}", container_id))?; } - container + let exited_successfully: Option<bool> = container .copy_file_into(script_path, job.script().as_ref().as_bytes()) .inspect(|r| { trace!("Copying script to container {} -> {:?}", container_id, r); }) .map(|r| r.with_context(|| anyhow!("Copying the script into the container {} on '{}'", container_id, self.name))) @@ -284,6 +286,7 @@ impl Endpoint { .inspect(|r| { trace!("Starting container {} -> {:?}", container_id, r); }) .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name))) .then(|_| { + use futures::FutureExt; trace!("Moving logs to log sink for container {}", container_id); buffer_stream_to_line_stream(container.exec(&exec_opts)) .map(|line| { @@ -296,19 +299,39 @@ impl Endpoint { .with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.name, container_id, l)) .map_err(Error::from) .and_then(|item| { + + let mut exited_successfully = None; + { + match item { + LogItem::State(Ok(_)) => exited_successfully = Some(true), + LogItem::State(Err(_)) => exited_successfully = Some(false), + _ => { + // Nothing + } + } + } + trace!("Log item: {}", item.display()?); logsink.send(item) .with_context(|| anyhow!("Sending log to log sink")) .map_err(Error::from) + .map(|_| exited_successfully) }) }) }) .collect::<Result<Vec<_>>>() }) - .inspect(|r| { trace!("Fetching log from container {} -> {:?}", container_id, r); }) .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name))) .await - .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))?; + .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))? + .into_iter() + .fold(None, |accu, elem| match (accu, elem) { + (None , b) => b, + (Some(false) , _) => Some(false), + (_ , Some(false)) => Some(false), + (a , None) => a, + (Some(true) , Some(true)) => Some(true), + }); trace!("Fetching /outputs from container {}", container_id); let tar_stream = container @@ -327,13 +350,17 @@ impl Endpoint { .with_context(|| anyhow!("Copying the TAR stream to the staging store"))? }; - container.stop(Some(std::time::Duration::new(1, 0))) - .await - .with_context(|| anyhow!("Stopping container {}", container_id))?; - - trace!("Returning job {} result = {:?}, container hash = {}", job.uuid(), r, container_id); let script: Script = job.script().clone(); - Ok((r, ContainerHash::from(container_id), script)) + match exited_successfully { + Some(false) => Err(ContainerError::container_error(ContainerHash::from(container_id))), + Some(true) | None => { + container.stop(Some(std::time::Duration::new(1, 0))) + .await + .with_context(|| anyhow!("Stopping container {}", container_id))?; + + Ok((r, ContainerHash::from(container_id), script)) + }, + } } pub async fn number_of_running_containers(&self) -> Result<usize> { |