summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs26
1 files changed, 18 insertions, 8 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index a2eb7e5..2005f6b 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -71,8 +71,9 @@ impl Orchestrator {
for jobset in self.jobsets.into_iter() {
let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone());
- let results = { // run the jobs in the set
- let unordered = futures::stream::FuturesUnordered::new();
+ let (results, logs) = { // run the jobs in the set
+ let unordered_results = futures::stream::FuturesUnordered::new();
+ let unordered_receivers = futures::stream::FuturesUnordered::new();
for runnable in jobset.into_runables(&merged_store) {
let runnable = runnable?;
trace!("Runnable {} for package {}", runnable.uuid(), runnable.package().name());
@@ -80,17 +81,26 @@ impl Orchestrator {
let jobhandle = self.scheduler.schedule_job(runnable, sender).await?;
trace!("Jobhandle -> {:?}", jobhandle);
- unordered.push(async move {
- jobhandle.get_result().await
+ unordered_results.push(async move {
+ jobhandle.get_result()
+ .await
+ });
+ unordered_receivers.push(async move {
+ receiver
});
}
- unordered.collect::<Result<Vec<_>>>().await?
- .into_iter()
- .flatten()
- .collect::<Vec<PathBuf>>()
+ (unordered_results.collect::<Result<Vec<_>>>(), unordered_receivers.collect::<Vec<_>>())
};
+ let (results, _logs) = tokio::join!(results, logs);
+ // TODO: Use logs.
+
+ let results = results?
+ .into_iter()
+ .flatten()
+ .collect::<Vec<PathBuf>>();
+
{ // check if all paths that were written are actually there in the staging store
let staging_store_lock = self.staging_store
.read()