summaryrefslogtreecommitdiffstats
path: root/src/orchestrator
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-12-08 13:05:02 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-12-08 13:29:20 +0100
commit4be403e9eb7fdcb37a6083757c0c460e1d33cf82 (patch)
tree009949d23841186d86e2413719f5bbe9761b30dd /src/orchestrator
parent92177554546f3ed428c6d22d9cc34026c3fd4f0c (diff)
Outsource running of JobSet to helper function
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r--src/orchestrator/orchestrator.rs119
1 files changed, 68 insertions, 51 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 8384103..e9a392e 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -68,73 +68,90 @@ impl<'a> OrchestratorSetup<'a> {
impl<'a> Orchestrator<'a> {
pub async fn run(self) -> Result<Vec<Artifact>> {
- use tokio::stream::StreamExt;
-
let mut report_result = vec![];
let scheduler = self.scheduler; // moved here because of partial-move semantics
for jobset in self.jobsets.into_iter() {
- let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone());
+ let mut results = Self::run_jobset(&scheduler,
+ self.release_store.clone(),
+ self.staging_store.clone(),
+ &self.source_cache,
+ &self.config,
+ jobset)
+ .await?;
- let multibar = Arc::new(indicatif::MultiProgress::new());
+ report_result.append(&mut results);
+ }
- let results = jobset // run the jobs in the set
- .into_runables(&merged_store, &self.source_cache, &self.config)
- .await?
- .into_iter()
- .map(|runnable| {
- let multibar = multibar.clone();
+ Ok(report_result)
+ }
- async {
- Self::run_runnable(multibar, runnable, &scheduler).await
- }
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Vec<RResult<Vec<Artifact>, ContainerError>>>();
-
- let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
-
- let (results, barres) = tokio::join!(results, multibar_block);
- let _ = barres?;
- let (okays, errors): (Vec<_>, Vec<_>) = results
- .into_iter()
- .inspect(|e| trace!("Processing result from jobset run: {:?}", e))
- .partition(|e| e.is_ok());
-
- let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<Artifact>>();
-
- {
- let mut out = std::io::stderr();
- for error in errors {
- if let Err(e) = error {
- if let Some(expl) = e.explain_container_error() {
- writeln!(out, "{}", expl)?;
- }
+ async fn run_jobset(
+ scheduler: &EndpointScheduler,
+ release_store: Arc<RwLock<ReleaseStore>>,
+ staging_store: Arc<RwLock<StagingStore>>,
+ source_cache: &SourceCache,
+ config: &Configuration,
+ jobset: JobSet)
+ -> Result<Vec<Artifact>>
+ {
+ use tokio::stream::StreamExt;
+
+ let merged_store = MergedStores::new(release_store.clone(), staging_store.clone());
+ let multibar = Arc::new(indicatif::MultiProgress::new());
+ let results = jobset // run the jobs in the set
+ .into_runables(&merged_store, source_cache, config)
+ .await?
+ .into_iter()
+ .map(|runnable| {
+ let multibar = multibar.clone();
+
+ async {
+ Self::run_runnable(multibar, runnable, scheduler).await
+ }
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<RResult<Vec<Artifact>, ContainerError>>>();
+
+ let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
+
+ let (results, barres) = tokio::join!(results, multibar_block);
+ let _ = barres?;
+ let (okays, errors): (Vec<_>, Vec<_>) = results
+ .into_iter()
+ .inspect(|e| trace!("Processing result from jobset run: {:?}", e))
+ .partition(|e| e.is_ok());
+
+ let results = okays.into_iter().filter_map(Result::ok).flatten().collect::<Vec<Artifact>>();
+
+ {
+ let mut out = std::io::stderr();
+ for error in errors {
+ if let Err(e) = error {
+ if let Some(expl) = e.explain_container_error() {
+ writeln!(out, "{}", expl)?;
}
}
}
+ }
- { // check if all paths that were written are actually there in the staging store
- let staging_store_lock = self.staging_store.read().await;
-
- trace!("Checking {} results...", results.len());
- for artifact in results.iter() {
- let a_path = artifact.path_buf();
- trace!("Checking path: {}", a_path.display());
- if !staging_store_lock.path_exists_in_store_root(&a_path) {
- return Err(anyhow!("Result path {} is missing from staging store", a_path.display()))
- .with_context(|| anyhow!("Should be: {}/{}", staging_store_lock.root_path().display(), a_path.display()))
- .map_err(Error::from)
- }
+ { // check if all paths that were written are actually there in the staging store
+ let staging_store_lock = staging_store.read().await;
+
+ trace!("Checking {} results...", results.len());
+ for artifact in results.iter() {
+ let a_path = artifact.path_buf();
+ trace!("Checking path: {}", a_path.display());
+ if !staging_store_lock.path_exists_in_store_root(&a_path) {
+ return Err(anyhow!("Result path {} is missing from staging store", a_path.display()))
+ .with_context(|| anyhow!("Should be: {}/{}", staging_store_lock.root_path().display(), a_path.display()))
+ .map_err(Error::from)
}
-
}
- let mut results = results; // rebind!
- report_result.append(&mut results);
}
- Ok(report_result)
+ Ok(results)
}
async fn run_runnable(multibar: Arc<indicatif::MultiProgress>, runnable: RunnableJob, scheduler: &EndpointScheduler)