From ad1f9e4e4daeb5e04871349e7a9eafd8cf64e748 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Tue, 8 Dec 2020 12:39:32 +0100 Subject: Refactor to iterator chaining Signed-off-by: Matthias Beyer --- src/orchestrator/orchestrator.rs | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) (limited to 'src/orchestrator') diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index b5bac4c..ea451a0 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -70,32 +70,33 @@ impl<'a> Orchestrator<'a> { use tokio::stream::StreamExt; let mut report_result = vec![]; + let scheduler = self.scheduler; // moved here because of partial-move semantics for jobset in self.jobsets.into_iter() { let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone()); let multibar = Arc::new(indicatif::MultiProgress::new()); - let results = { // run the jobs in the set - let unordered_results = futures::stream::FuturesUnordered::new(); - for runnable in jobset.into_runables(&merged_store, &self.source_cache, &self.config).await?.into_iter() { - let job_id = runnable.uuid().clone(); - trace!("Runnable {} for package {}", job_id, runnable.package().name()); + let results = jobset // run the jobs in the set + .into_runables(&merged_store, &self.source_cache, &self.config) + .await? + .into_iter() + .map(|runnable| { + let multibar = multibar.clone(); + async { + let job_id = runnable.uuid().clone(); + trace!("Runnable {} for package {}", job_id, runnable.package().name()); - let jobhandle = self.scheduler.schedule_job(runnable, multibar.clone()).await?; - trace!("Jobhandle -> {:?}", jobhandle); + let jobhandle = scheduler.schedule_job(runnable, multibar).await?; + trace!("Jobhandle -> {:?}", jobhandle); - // clone the bar here, so we can give a handle to the async result fetcher closure - // where we tick() it as soon as the job returns the result (= is finished) - unordered_results.push(async move { let r = jobhandle.run().await; trace!("Found result in job {}: {:?}", job_id, r); r - }); - } - - unordered_results.collect::, ContainerError>>>() - }; + } + }) + .collect::>() + .collect::, ContainerError>>>(); let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); -- cgit v1.2.3