diff options
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 21ac133..1c401c3 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -9,6 +9,7 @@ use diesel::PgConnection; use log::trace; use tokio::sync::RwLock; use typed_builder::TypedBuilder; +use uuid::Uuid; use crate::config::Configuration; use crate::db::models::Artifact; @@ -40,7 +41,7 @@ pub struct OrchestratorSetup<'a> { release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, jobsets: Vec<JobSet>, - database: PgConnection, + database: Arc<PgConnection>, submit: Submit, log_dir: Option<PathBuf>, config: &'a Configuration, @@ -48,8 +49,7 @@ pub struct OrchestratorSetup<'a> { impl<'a> OrchestratorSetup<'a> { pub async fn setup(self) -> Result<Orchestrator<'a>> { - let db = Arc::new(self.database); - let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.submit.clone(), self.log_dir).await?; + let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), self.database, self.submit.clone(), self.log_dir).await?; Ok(Orchestrator { scheduler: scheduler, @@ -64,7 +64,7 @@ impl<'a> OrchestratorSetup<'a> { impl<'a> Orchestrator<'a> { - pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<anyhow::Error>> { + pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, anyhow::Error)>> { for jobset in self.jobsets.into_iter() { let errs = Self::run_jobset(&self.scheduler, &self.merged_stores, @@ -91,7 +91,7 @@ impl<'a> Orchestrator<'a> { progress_generator: &ProgressBars, jobset: JobSet, output: &mut Vec<Artifact>) - -> Result<Vec<anyhow::Error>> + -> Result<Vec<(Uuid, anyhow::Error)>> { use tokio::stream::StreamExt; @@ -102,10 +102,16 @@ impl<'a> Orchestrator<'a> { .into_iter() .map(|runnable| { let bar = multibar.add(progress_generator.bar()); - Self::run_runnable(runnable, scheduler, bar) + + async { + let uuid = runnable.uuid().clone(); + Self::run_runnable(runnable, scheduler, bar) + .await + .map_err(|e| (uuid, e)) + } }) .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Vec<Result<Vec<Artifact>>>>(); + .collect::<Vec<std::result::Result<Vec<Artifact>, (Uuid, Error)>>>(); let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); |