diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-12-08 13:05:02 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-12-08 13:29:20 +0100 |
commit | 4be403e9eb7fdcb37a6083757c0c460e1d33cf82 (patch) | |
tree | 009949d23841186d86e2413719f5bbe9761b30dd /src/orchestrator/orchestrator.rs | |
parent | 92177554546f3ed428c6d22d9cc34026c3fd4f0c (diff) |
Outsource running of JobSet to helper function
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 119 |
1 files changed, 68 insertions, 51 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 8384103..e9a392e 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -68,73 +68,90 @@ impl<'a> OrchestratorSetup<'a> { impl<'a> Orchestrator<'a> { pub async fn run(self) -> Result<Vec<Artifact>> { - use tokio::stream::StreamExt; - let mut report_result = vec![]; let scheduler = self.scheduler; // moved here because of partial-move semantics for jobset in self.jobsets.into_iter() { - let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone()); + let mut results = Self::run_jobset(&scheduler, + self.release_store.clone(), + self.staging_store.clone(), + &self.source_cache, + &self.config, + jobset) + .await?; - let multibar = Arc::new(indicatif::MultiProgress::new()); + report_result.append(&mut results); + } - let results = jobset // run the jobs in the set - .into_runables(&merged_store, &self.source_cache, &self.config) - .await? - .into_iter() - .map(|runnable| { - let multibar = multibar.clone(); + Ok(report_result) + } - async { - Self::run_runnable(multibar, runnable, &scheduler).await - } - }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Vec<RResult<Vec<Artifact>, ContainerError>>>(); - - let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); - - let (results, barres) = tokio::join!(results, multibar_block); - let _ = barres?; - let (okays, errors): (Vec<_>, Vec<_>) = results - .into_iter() - .inspect(|e| trace!("Processing result from jobset run: {:?}", e)) - .partition(|e| e.is_ok()); - - let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<Artifact>>(); - - { - let mut out = std::io::stderr(); - for error in errors { - if let Err(e) = error { - if let Some(expl) = e.explain_container_error() { - writeln!(out, "{}", expl)?; - } + async fn run_jobset( + scheduler: &EndpointScheduler, + release_store: Arc<RwLock<ReleaseStore>>, + staging_store: Arc<RwLock<StagingStore>>, + source_cache: &SourceCache, + config: &Configuration, + jobset: JobSet) + -> Result<Vec<Artifact>> + { + use tokio::stream::StreamExt; + + let merged_store = MergedStores::new(release_store.clone(), staging_store.clone()); + let multibar = Arc::new(indicatif::MultiProgress::new()); + let results = jobset // run the jobs in the set + .into_runables(&merged_store, source_cache, config) + .await? + .into_iter() + .map(|runnable| { + let multibar = multibar.clone(); + + async { + Self::run_runnable(multibar, runnable, scheduler).await + } + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Vec<RResult<Vec<Artifact>, ContainerError>>>(); + + let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); + + let (results, barres) = tokio::join!(results, multibar_block); + let _ = barres?; + let (okays, errors): (Vec<_>, Vec<_>) = results + .into_iter() + .inspect(|e| trace!("Processing result from jobset run: {:?}", e)) + .partition(|e| e.is_ok()); + + let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<Artifact>>(); + + { + let mut out = std::io::stderr(); + for error in errors { + if let Err(e) = error { + if let Some(expl) = e.explain_container_error() { + writeln!(out, "{}", expl)?; } } } + } - { // check if all paths that were written are actually there in the staging store - let staging_store_lock = self.staging_store.read().await; - - trace!("Checking {} results...", results.len()); - for artifact in results.iter() { - let a_path = artifact.path_buf(); - trace!("Checking path: {}", a_path.display()); - if !staging_store_lock.path_exists_in_store_root(&a_path) { - return Err(anyhow!("Result path {} is missing from staging store", a_path.display())) - .with_context(|| anyhow!("Should be: {}/{}", staging_store_lock.root_path().display(), a_path.display())) - .map_err(Error::from) - } + { // check if all paths that were written are actually there in the staging store + let staging_store_lock = staging_store.read().await; + + trace!("Checking {} results...", results.len()); + for artifact in results.iter() { + let a_path = artifact.path_buf(); + trace!("Checking path: {}", a_path.display()); + if !staging_store_lock.path_exists_in_store_root(&a_path) { + return Err(anyhow!("Result path {} is missing from staging store", a_path.display())) + .with_context(|| anyhow!("Should be: {}/{}", staging_store_lock.root_path().display(), a_path.display())) + .map_err(Error::from) } - } - let mut results = results; // rebind! - report_result.append(&mut results); } - Ok(report_result) + Ok(results) } async fn run_runnable(multibar: Arc<indicatif::MultiProgress>, runnable: RunnableJob, scheduler: &EndpointScheduler) |