From fcecf5fce755b2870cf0baae4a29104d146df948 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Sat, 14 Nov 2020 17:56:11 +0100 Subject: Rewrite to use tokio::sync::RwLock This patch rewrites the codebase to not use std::sync::RwLock, but tokio::sync::RwLock. tokios RwLock is an async RwLock, which is what we want in an async-await context. The more I use tokio, the more I understand what you should do and what you shouldn't do. Some parts of this patch are a rewrite, for example, JobSet::into_runables() was completely rewritten. That was necessary because the function used inside is `Runnable::build_from_job()`, which uses an RwLock internally, thus, gets `async` in this patch. Because of this, `JobSet::into_runables()` needed a complete rewrite as well. Because it is way more difficult than transforming the function to return an iterator of futures, this patch simply rewrites it to return a `Result>` instead. Internally, tokio jobs are submitted via the `futures::stream::FuturesUnordered<_>` now. This is not the most performant implementation for the problem at hand, but it is a reasonable simple one. Optimization could happen here, of course. Also, the implementation of resource preparation inside `RunnableJob::build_from_job()` got a rewrite using the same technique. Signed-off-by: Matthias Beyer --- src/orchestrator/orchestrator.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) (limited to 'src/orchestrator') diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 33dcd1f..c486296 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -1,6 +1,6 @@ use std::path::PathBuf; use std::sync::Arc; -use std::sync::RwLock; +use tokio::sync::RwLock; use anyhow::Context; use anyhow::Error; @@ -78,10 +78,11 @@ impl Orchestrator { for (i, jobset) in self.jobsets.into_iter().enumerate() { let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone()); + let multibar = indicatif::MultiProgress::new(); + let results = { // run the jobs in the set let unordered_results = futures::stream::FuturesUnordered::new(); - for runnable in jobset.into_runables(&merged_store, &self.source_cache) { - let runnable = runnable?; + for runnable in jobset.into_runables(&merged_store, &self.source_cache).await?.into_iter() { let job_id = runnable.uuid().clone(); trace!("Runnable {} for package {}", job_id, runnable.package().name()); @@ -100,15 +101,17 @@ impl Orchestrator { unordered_results.collect::>>() }; - let results = results.await? + let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); + + let (results, barres) = tokio::join!(results, multibar_block); + let _ = barres?; + let results = results? .into_iter() .flatten() .collect::>(); { // check if all paths that were written are actually there in the staging store - let staging_store_lock = self.staging_store - .read() - .map_err(|_| anyhow!("Lock Poisoned"))?; + let staging_store_lock = self.staging_store.read().await; trace!("Checking {} results...", results.len()); for path in results.iter() { -- cgit v1.2.3