summaryrefslogtreecommitdiffstats
path: root/src/orchestrator
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-14 17:56:11 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-14 18:18:27 +0100
commitfcecf5fce755b2870cf0baae4a29104d146df948 (patch)
tree978997e6d0a5f1443befd28fdda35271e05f9c8f /src/orchestrator
parent056a2c66e3d3fc8c4b6803acb9d151673b78a23f (diff)
Rewrite to use tokio::sync::RwLock
This patch rewrites the codebase to not use std::sync::RwLock, but tokio::sync::RwLock. tokios RwLock is an async RwLock, which is what we want in an async-await context. The more I use tokio, the more I understand what you should do and what you shouldn't do. Some parts of this patch are a rewrite, for example, JobSet::into_runables() was completely rewritten. That was necessary because the function used inside is `Runnable::build_from_job()`, which uses an RwLock internally, thus, gets `async` in this patch. Because of this, `JobSet::into_runables()` needed a complete rewrite as well. Because it is way more difficult than transforming the function to return an iterator of futures, this patch simply rewrites it to return a `Result<Vec<RunnableJob>>` instead. Internally, tokio jobs are submitted via the `futures::stream::FuturesUnordered<_>` now. This is not the most performant implementation for the problem at hand, but it is a reasonable simple one. Optimization could happen here, of course. Also, the implementation of resource preparation inside `RunnableJob::build_from_job()` got a rewrite using the same technique. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r--src/orchestrator/orchestrator.rs17
1 files changed, 10 insertions, 7 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 33dcd1f..c486296 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -1,6 +1,6 @@
use std::path::PathBuf;
use std::sync::Arc;
-use std::sync::RwLock;
+use tokio::sync::RwLock;
use anyhow::Context;
use anyhow::Error;
@@ -78,10 +78,11 @@ impl Orchestrator {
for (i, jobset) in self.jobsets.into_iter().enumerate() {
let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone());
+ let multibar = indicatif::MultiProgress::new();
+
let results = { // run the jobs in the set
let unordered_results = futures::stream::FuturesUnordered::new();
- for runnable in jobset.into_runables(&merged_store, &self.source_cache) {
- let runnable = runnable?;
+ for runnable in jobset.into_runables(&merged_store, &self.source_cache).await?.into_iter() {
let job_id = runnable.uuid().clone();
trace!("Runnable {} for package {}", job_id, runnable.package().name());
@@ -100,15 +101,17 @@ impl Orchestrator {
unordered_results.collect::<Result<Vec<_>>>()
};
- let results = results.await?
+ let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
+
+ let (results, barres) = tokio::join!(results, multibar_block);
+ let _ = barres?;
+ let results = results?
.into_iter()
.flatten()
.collect::<Vec<PathBuf>>();
{ // check if all paths that were written are actually there in the staging store
- let staging_store_lock = self.staging_store
- .read()
- .map_err(|_| anyhow!("Lock Poisoned"))?;
+ let staging_store_lock = self.staging_store.read().await;
trace!("Checking {} results...", results.len());
for path in results.iter() {