summaryrefslogtreecommitdiffstats
path: root/src/orchestrator
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-12-08 12:39:32 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-12-08 12:39:32 +0100
commitad1f9e4e4daeb5e04871349e7a9eafd8cf64e748 (patch)
treef026d34c0178bdd9fce3b370d71f51015bfe26ed /src/orchestrator
parent24fff4c70f07fd51dee40aea52f136bc69f75d65 (diff)
Refactor to iterator chaining
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r--src/orchestrator/orchestrator.rs31
1 files changed, 16 insertions, 15 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index b5bac4c..ea451a0 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -70,32 +70,33 @@ impl<'a> Orchestrator<'a> {
use tokio::stream::StreamExt;
let mut report_result = vec![];
+ let scheduler = self.scheduler; // moved here because of partial-move semantics
for jobset in self.jobsets.into_iter() {
let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone());
let multibar = Arc::new(indicatif::MultiProgress::new());
- let results = { // run the jobs in the set
- let unordered_results = futures::stream::FuturesUnordered::new();
- for runnable in jobset.into_runables(&merged_store, &self.source_cache, &self.config).await?.into_iter() {
- let job_id = runnable.uuid().clone();
- trace!("Runnable {} for package {}", job_id, runnable.package().name());
+ let results = jobset // run the jobs in the set
+ .into_runables(&merged_store, &self.source_cache, &self.config)
+ .await?
+ .into_iter()
+ .map(|runnable| {
+ let multibar = multibar.clone();
+ async {
+ let job_id = runnable.uuid().clone();
+ trace!("Runnable {} for package {}", job_id, runnable.package().name());
- let jobhandle = self.scheduler.schedule_job(runnable, multibar.clone()).await?;
- trace!("Jobhandle -> {:?}", jobhandle);
+ let jobhandle = scheduler.schedule_job(runnable, multibar).await?;
+ trace!("Jobhandle -> {:?}", jobhandle);
- // clone the bar here, so we can give a handle to the async result fetcher closure
- // where we tick() it as soon as the job returns the result (= is finished)
- unordered_results.push(async move {
let r = jobhandle.run().await;
trace!("Found result in job {}: {:?}", job_id, r);
r
- });
- }
-
- unordered_results.collect::<Vec<RResult<Vec<Artifact>, ContainerError>>>()
- };
+ }
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<RResult<Vec<Artifact>, ContainerError>>>();
let multibar_block = tokio::task::spawn_blocking(move || multibar.join());