diff options
-rw-r--r-- | src/orchestrator/orchestrator.rs | 26 |
1 files changed, 18 insertions, 8 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index a2eb7e5..2005f6b 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -71,8 +71,9 @@ impl Orchestrator { for jobset in self.jobsets.into_iter() { let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone()); - let results = { // run the jobs in the set - let unordered = futures::stream::FuturesUnordered::new(); + let (results, logs) = { // run the jobs in the set + let unordered_results = futures::stream::FuturesUnordered::new(); + let unordered_receivers = futures::stream::FuturesUnordered::new(); for runnable in jobset.into_runables(&merged_store) { let runnable = runnable?; trace!("Runnable {} for package {}", runnable.uuid(), runnable.package().name()); @@ -80,17 +81,26 @@ impl Orchestrator { let jobhandle = self.scheduler.schedule_job(runnable, sender).await?; trace!("Jobhandle -> {:?}", jobhandle); - unordered.push(async move { - jobhandle.get_result().await + unordered_results.push(async move { + jobhandle.get_result() + .await + }); + unordered_receivers.push(async move { + receiver }); } - unordered.collect::<Result<Vec<_>>>().await? - .into_iter() - .flatten() - .collect::<Vec<PathBuf>>() + (unordered_results.collect::<Result<Vec<_>>>(), unordered_receivers.collect::<Vec<_>>()) }; + let (results, _logs) = tokio::join!(results, logs); + // TODO: Use logs. + + let results = results? + .into_iter() + .flatten() + .collect::<Vec<PathBuf>>(); + { // check if all paths that were written are actually there in the staging store let staging_store_lock = self.staging_store .read() |