summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs20
1 files changed, 13 insertions, 7 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 21ac133..1c401c3 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -9,6 +9,7 @@ use diesel::PgConnection;
use log::trace;
use tokio::sync::RwLock;
use typed_builder::TypedBuilder;
+use uuid::Uuid;
use crate::config::Configuration;
use crate::db::models::Artifact;
@@ -40,7 +41,7 @@ pub struct OrchestratorSetup<'a> {
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
jobsets: Vec<JobSet>,
- database: PgConnection,
+ database: Arc<PgConnection>,
submit: Submit,
log_dir: Option<PathBuf>,
config: &'a Configuration,
@@ -48,8 +49,7 @@ pub struct OrchestratorSetup<'a> {
impl<'a> OrchestratorSetup<'a> {
pub async fn setup(self) -> Result<Orchestrator<'a>> {
- let db = Arc::new(self.database);
- let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.submit.clone(), self.log_dir).await?;
+ let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), self.database, self.submit.clone(), self.log_dir).await?;
Ok(Orchestrator {
scheduler: scheduler,
@@ -64,7 +64,7 @@ impl<'a> OrchestratorSetup<'a> {
impl<'a> Orchestrator<'a> {
- pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<anyhow::Error>> {
+ pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, anyhow::Error)>> {
for jobset in self.jobsets.into_iter() {
let errs = Self::run_jobset(&self.scheduler,
&self.merged_stores,
@@ -91,7 +91,7 @@ impl<'a> Orchestrator<'a> {
progress_generator: &ProgressBars,
jobset: JobSet,
output: &mut Vec<Artifact>)
- -> Result<Vec<anyhow::Error>>
+ -> Result<Vec<(Uuid, anyhow::Error)>>
{
use tokio::stream::StreamExt;
@@ -102,10 +102,16 @@ impl<'a> Orchestrator<'a> {
.into_iter()
.map(|runnable| {
let bar = multibar.add(progress_generator.bar());
- Self::run_runnable(runnable, scheduler, bar)
+
+ async {
+ let uuid = runnable.uuid().clone();
+ Self::run_runnable(runnable, scheduler, bar)
+ .await
+ .map_err(|e| (uuid, e))
+ }
})
.collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Vec<Result<Vec<Artifact>>>>();
+ .collect::<Vec<std::result::Result<Vec<Artifact>, (Uuid, Error)>>>();
let multibar_block = tokio::task::spawn_blocking(move || multibar.join());