diff options
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/endpoint/configured.rs | 47 | ||||
-rw-r--r-- | src/endpoint/error.rs | 45 | ||||
-rw-r--r-- | src/endpoint/mod.rs | 3 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 4 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 30 |
6 files changed, 112 insertions, 18 deletions
@@ -45,6 +45,7 @@ sha2 = "0.9" reqwest = { version = "0.10", features = [ "stream" ] } colored = "2" syntect = "4.4" +thiserror = "1" url = { version = "2", features = ["serde"] } tokio = { version = "0.2", features = ["full"] } diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 7949437..cb57ea8 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -1,5 +1,6 @@ use std::fmt::{Debug, Formatter}; use std::path::PathBuf; +use std::result::Result as RResult; use std::str::FromStr; use std::sync::Arc; @@ -22,6 +23,7 @@ use crate::log::LogItem; use crate::package::Script; use crate::util::docker::ContainerHash; use crate::util::docker::ImageName; +use crate::endpoint::ContainerError; #[derive(Getters, CopyGetters, TypedBuilder)] pub struct Endpoint { @@ -169,7 +171,7 @@ impl Endpoint { .map(|_| ()) } - pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<(Vec<PathBuf>, ContainerHash, Script)> { + pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> { use crate::log::buffer_stream_to_line_stream; use tokio::stream::StreamExt; use futures::FutureExt; @@ -276,7 +278,7 @@ impl Endpoint { .with_context(|| anyhow!("Copying artifacts to container {}", container_id))?; } - container + let exited_successfully: Option<bool> = container .copy_file_into(script_path, job.script().as_ref().as_bytes()) .inspect(|r| { trace!("Copying script to container {} -> {:?}", container_id, r); }) .map(|r| r.with_context(|| anyhow!("Copying the script into the container {} on '{}'", container_id, self.name))) @@ -284,6 +286,7 @@ impl Endpoint { .inspect(|r| { trace!("Starting container {} -> {:?}", container_id, r); }) .map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name))) .then(|_| { + use futures::FutureExt; trace!("Moving logs to log sink for container {}", container_id); buffer_stream_to_line_stream(container.exec(&exec_opts)) .map(|line| { @@ -296,19 +299,39 @@ impl Endpoint { .with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.name, container_id, l)) .map_err(Error::from) .and_then(|item| { + + let mut exited_successfully = None; + { + match item { + LogItem::State(Ok(_)) => exited_successfully = Some(true), + LogItem::State(Err(_)) => exited_successfully = Some(false), + _ => { + // Nothing + } + } + } + trace!("Log item: {}", item.display()?); logsink.send(item) .with_context(|| anyhow!("Sending log to log sink")) .map_err(Error::from) + .map(|_| exited_successfully) }) }) }) .collect::<Result<Vec<_>>>() }) - .inspect(|r| { trace!("Fetching log from container {} -> {:?}", container_id, r); }) .map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name))) .await - .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))?; + .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))? + .into_iter() + .fold(None, |accu, elem| match (accu, elem) { + (None , b) => b, + (Some(false) , _) => Some(false), + (_ , Some(false)) => Some(false), + (a , None) => a, + (Some(true) , Some(true)) => Some(true), + }); trace!("Fetching /outputs from container {}", container_id); let tar_stream = container @@ -327,13 +350,17 @@ impl Endpoint { .with_context(|| anyhow!("Copying the TAR stream to the staging store"))? }; - container.stop(Some(std::time::Duration::new(1, 0))) - .await - .with_context(|| anyhow!("Stopping container {}", container_id))?; - - trace!("Returning job {} result = {:?}, container hash = {}", job.uuid(), r, container_id); let script: Script = job.script().clone(); - Ok((r, ContainerHash::from(container_id), script)) + match exited_successfully { + Some(false) => Err(ContainerError::container_error(ContainerHash::from(container_id))), + Some(true) | None => { + container.stop(Some(std::time::Duration::new(1, 0))) + .await + .with_context(|| anyhow!("Stopping container {}", container_id))?; + + Ok((r, ContainerHash::from(container_id), script)) + }, + } } pub async fn number_of_running_containers(&self) -> Result<usize> { diff --git a/src/endpoint/error.rs b/src/endpoint/error.rs new file mode 100644 index 0000000..a6a6d7b --- /dev/null +++ b/src/endpoint/error.rs @@ -0,0 +1,45 @@ +use thiserror::Error as ThisError; + +use crate::util::docker::ContainerHash; +use crate::package::Script; + +#[derive(ThisError, Debug)] +pub enum ContainerError { + + #[error("Error during container run: {container_id}")] + ContainerError { + container_id: ContainerHash, + }, + + #[error("{0}")] + Err(anyhow::Error), +} + +impl ContainerError { + pub fn container_error(container_id: ContainerHash) -> Self { + ContainerError::ContainerError { container_id } + } + + pub fn explain_container_error(&self) -> Option<String> { + match self { + ContainerError::ContainerError { container_id } => Some({ + indoc::formatdoc!(r#" + Container did not exit successfully: {container_id} + Use + + docker exec -it {container_id} /bin/bash + + to access and debug. + "#, container_id = container_id) + }), + _ => None, + } + } +} + +impl From<anyhow::Error> for ContainerError { + fn from(ae: anyhow::Error) -> Self { + ContainerError::Err(ae) + } +} + diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index 316a8f3..c66e52f 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -1,6 +1,9 @@ mod configuration; pub use configuration::*; +mod error; +pub use error::*; + mod scheduler; pub use scheduler::*; diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index cc20444..3b4b7c6 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -1,4 +1,5 @@ use std::path::PathBuf; +use std::result::Result as RResult; use std::sync::Arc; use anyhow::Context; @@ -22,6 +23,7 @@ use crate::filestore::StagingStore; use crate::job::RunnableJob; use crate::log::LogItem; use crate::util::progress::ProgressBars; +use crate::endpoint::ContainerError; pub struct EndpointScheduler { log_dir: Option<PathBuf>, @@ -125,7 +127,7 @@ impl std::fmt::Debug for JobHandle { } impl JobHandle { - pub async fn run(self) -> Result<Vec<PathBuf>> { + pub async fn run(self) -> RResult<Vec<PathBuf>, ContainerError> { use crate::db::models as dbmodels; let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); let ep = self.endpoint diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index e42582e..e7b5ba6 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -1,18 +1,21 @@ +use std::io::Write; use std::path::PathBuf; +use std::result::Result as RResult; use std::sync::Arc; -use tokio::sync::RwLock; use anyhow::Context; use anyhow::Error; use anyhow::Result; use anyhow::anyhow; use diesel::PgConnection; +use indicatif::ProgressBar; +use tokio::sync::RwLock; +use tokio::sync::mpsc::UnboundedReceiver; use typed_builder::TypedBuilder; use uuid::Uuid; -use tokio::sync::mpsc::UnboundedReceiver; -use indicatif::ProgressBar; use crate::db::models::Submit; +use crate::endpoint::ContainerError; use crate::endpoint::EndpointConfiguration; use crate::endpoint::EndpointScheduler; use crate::filestore::MergedStores; @@ -96,17 +99,30 @@ impl Orchestrator { }); } - unordered_results.collect::<Result<Vec<_>>>() + unordered_results.collect::<Vec<RResult<_, ContainerError>>>() }; let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); let (results, barres) = tokio::join!(results, multibar_block); let _ = barres?; - let results = results? + let (okays, errors): (Vec<_>, Vec<_>) = results .into_iter() - .flatten() - .collect::<Vec<PathBuf>>(); + .inspect(|e| trace!("Processing result from jobset run: {:?}", e)) + .partition(|e| e.is_ok()); + + let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<PathBuf>>(); + + { + let mut out = std::io::stderr(); + for error in errors { + if let Err(e) = error { + if let Some(expl) = e.explain_container_error() { + writeln!(out, "{}", expl)?; + } + } + } + } { // check if all paths that were written are actually there in the staging store let staging_store_lock = self.staging_store.read().await; |