summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml1
-rw-r--r--src/endpoint/configured.rs47
-rw-r--r--src/endpoint/error.rs45
-rw-r--r--src/endpoint/mod.rs3
-rw-r--r--src/endpoint/scheduler.rs4
-rw-r--r--src/orchestrator/orchestrator.rs30
6 files changed, 112 insertions, 18 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 41637b6..7beacb5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -45,6 +45,7 @@ sha2 = "0.9"
reqwest = { version = "0.10", features = [ "stream" ] }
colored = "2"
syntect = "4.4"
+thiserror = "1"
url = { version = "2", features = ["serde"] }
tokio = { version = "0.2", features = ["full"] }
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 7949437..cb57ea8 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -1,5 +1,6 @@
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
+use std::result::Result as RResult;
use std::str::FromStr;
use std::sync::Arc;
@@ -22,6 +23,7 @@ use crate::log::LogItem;
use crate::package::Script;
use crate::util::docker::ContainerHash;
use crate::util::docker::ImageName;
+use crate::endpoint::ContainerError;
#[derive(Getters, CopyGetters, TypedBuilder)]
pub struct Endpoint {
@@ -169,7 +171,7 @@ impl Endpoint {
.map(|_| ())
}
- pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> Result<(Vec<PathBuf>, ContainerHash, Script)> {
+ pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> {
use crate::log::buffer_stream_to_line_stream;
use tokio::stream::StreamExt;
use futures::FutureExt;
@@ -276,7 +278,7 @@ impl Endpoint {
.with_context(|| anyhow!("Copying artifacts to container {}", container_id))?;
}
- container
+ let exited_successfully: Option<bool> = container
.copy_file_into(script_path, job.script().as_ref().as_bytes())
.inspect(|r| { trace!("Copying script to container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Copying the script into the container {} on '{}'", container_id, self.name)))
@@ -284,6 +286,7 @@ impl Endpoint {
.inspect(|r| { trace!("Starting container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Starting the container {} on '{}'", container_id, self.name)))
.then(|_| {
+ use futures::FutureExt;
trace!("Moving logs to log sink for container {}", container_id);
buffer_stream_to_line_stream(container.exec(&exec_opts))
.map(|line| {
@@ -296,19 +299,39 @@ impl Endpoint {
.with_context(|| anyhow!("Parsing log from {}:{}: {:?}", self.name, container_id, l))
.map_err(Error::from)
.and_then(|item| {
+
+ let mut exited_successfully = None;
+ {
+ match item {
+ LogItem::State(Ok(_)) => exited_successfully = Some(true),
+ LogItem::State(Err(_)) => exited_successfully = Some(false),
+ _ => {
+ // Nothing
+ }
+ }
+ }
+
trace!("Log item: {}", item.display()?);
logsink.send(item)
.with_context(|| anyhow!("Sending log to log sink"))
.map_err(Error::from)
+ .map(|_| exited_successfully)
})
})
})
.collect::<Result<Vec<_>>>()
})
- .inspect(|r| { trace!("Fetching log from container {} -> {:?}", container_id, r); })
.map(|r| r.with_context(|| anyhow!("Fetching log from container {} on {}", container_id, self.name)))
.await
- .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))?;
+ .with_context(|| anyhow!("Copying script to container, running container and getting logs: {}", container_id))?
+ .into_iter()
+ .fold(None, |accu, elem| match (accu, elem) {
+ (None , b) => b,
+ (Some(false) , _) => Some(false),
+ (_ , Some(false)) => Some(false),
+ (a , None) => a,
+ (Some(true) , Some(true)) => Some(true),
+ });
trace!("Fetching /outputs from container {}", container_id);
let tar_stream = container
@@ -327,13 +350,17 @@ impl Endpoint {
.with_context(|| anyhow!("Copying the TAR stream to the staging store"))?
};
- container.stop(Some(std::time::Duration::new(1, 0)))
- .await
- .with_context(|| anyhow!("Stopping container {}", container_id))?;
-
- trace!("Returning job {} result = {:?}, container hash = {}", job.uuid(), r, container_id);
let script: Script = job.script().clone();
- Ok((r, ContainerHash::from(container_id), script))
+ match exited_successfully {
+ Some(false) => Err(ContainerError::container_error(ContainerHash::from(container_id))),
+ Some(true) | None => {
+ container.stop(Some(std::time::Duration::new(1, 0)))
+ .await
+ .with_context(|| anyhow!("Stopping container {}", container_id))?;
+
+ Ok((r, ContainerHash::from(container_id), script))
+ },
+ }
}
pub async fn number_of_running_containers(&self) -> Result<usize> {
diff --git a/src/endpoint/error.rs b/src/endpoint/error.rs
new file mode 100644
index 0000000..a6a6d7b
--- /dev/null
+++ b/src/endpoint/error.rs
@@ -0,0 +1,45 @@
+use thiserror::Error as ThisError;
+
+use crate::util::docker::ContainerHash;
+use crate::package::Script;
+
+#[derive(ThisError, Debug)]
+pub enum ContainerError {
+
+ #[error("Error during container run: {container_id}")]
+ ContainerError {
+ container_id: ContainerHash,
+ },
+
+ #[error("{0}")]
+ Err(anyhow::Error),
+}
+
+impl ContainerError {
+ pub fn container_error(container_id: ContainerHash) -> Self {
+ ContainerError::ContainerError { container_id }
+ }
+
+ pub fn explain_container_error(&self) -> Option<String> {
+ match self {
+ ContainerError::ContainerError { container_id } => Some({
+ indoc::formatdoc!(r#"
+ Container did not exit successfully: {container_id}
+ Use
+
+ docker exec -it {container_id} /bin/bash
+
+ to access and debug.
+ "#, container_id = container_id)
+ }),
+ _ => None,
+ }
+ }
+}
+
+impl From<anyhow::Error> for ContainerError {
+ fn from(ae: anyhow::Error) -> Self {
+ ContainerError::Err(ae)
+ }
+}
+
diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs
index 316a8f3..c66e52f 100644
--- a/src/endpoint/mod.rs
+++ b/src/endpoint/mod.rs
@@ -1,6 +1,9 @@
mod configuration;
pub use configuration::*;
+mod error;
+pub use error::*;
+
mod scheduler;
pub use scheduler::*;
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index cc20444..3b4b7c6 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -1,4 +1,5 @@
use std::path::PathBuf;
+use std::result::Result as RResult;
use std::sync::Arc;
use anyhow::Context;
@@ -22,6 +23,7 @@ use crate::filestore::StagingStore;
use crate::job::RunnableJob;
use crate::log::LogItem;
use crate::util::progress::ProgressBars;
+use crate::endpoint::ContainerError;
pub struct EndpointScheduler {
log_dir: Option<PathBuf>,
@@ -125,7 +127,7 @@ impl std::fmt::Debug for JobHandle {
}
impl JobHandle {
- pub async fn run(self) -> Result<Vec<PathBuf>> {
+ pub async fn run(self) -> RResult<Vec<PathBuf>, ContainerError> {
use crate::db::models as dbmodels;
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
let ep = self.endpoint
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index e42582e..e7b5ba6 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -1,18 +1,21 @@
+use std::io::Write;
use std::path::PathBuf;
+use std::result::Result as RResult;
use std::sync::Arc;
-use tokio::sync::RwLock;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use diesel::PgConnection;
+use indicatif::ProgressBar;
+use tokio::sync::RwLock;
+use tokio::sync::mpsc::UnboundedReceiver;
use typed_builder::TypedBuilder;
use uuid::Uuid;
-use tokio::sync::mpsc::UnboundedReceiver;
-use indicatif::ProgressBar;
use crate::db::models::Submit;
+use crate::endpoint::ContainerError;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
use crate::filestore::MergedStores;
@@ -96,17 +99,30 @@ impl Orchestrator {
});
}
- unordered_results.collect::<Result<Vec<_>>>()
+ unordered_results.collect::<Vec<RResult<_, ContainerError>>>()
};
let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
let (results, barres) = tokio::join!(results, multibar_block);
let _ = barres?;
- let results = results?
+ let (okays, errors): (Vec<_>, Vec<_>) = results
.into_iter()
- .flatten()
- .collect::<Vec<PathBuf>>();
+ .inspect(|e| trace!("Processing result from jobset run: {:?}", e))
+ .partition(|e| e.is_ok());
+
+ let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<PathBuf>>();
+
+ {
+ let mut out = std::io::stderr();
+ for error in errors {
+ if let Err(e) = error {
+ if let Some(expl) = e.explain_container_error() {
+ writeln!(out, "{}", expl)?;
+ }
+ }
+ }
+ }
{ // check if all paths that were written are actually there in the staging store
let staging_store_lock = self.staging_store.read().await;