summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-14 17:56:11 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-14 18:18:27 +0100
commitfcecf5fce755b2870cf0baae4a29104d146df948 (patch)
tree978997e6d0a5f1443befd28fdda35271e05f9c8f
parent056a2c66e3d3fc8c4b6803acb9d151673b78a23f (diff)
Rewrite to use tokio::sync::RwLock
This patch rewrites the codebase to not use std::sync::RwLock, but tokio::sync::RwLock. tokios RwLock is an async RwLock, which is what we want in an async-await context. The more I use tokio, the more I understand what you should do and what you shouldn't do. Some parts of this patch are a rewrite, for example, JobSet::into_runables() was completely rewritten. That was necessary because the function used inside is `Runnable::build_from_job()`, which uses an RwLock internally, thus, gets `async` in this patch. Because of this, `JobSet::into_runables()` needed a complete rewrite as well. Because it is way more difficult than transforming the function to return an iterator of futures, this patch simply rewrites it to return a `Result<Vec<RunnableJob>>` instead. Internally, tokio jobs are submitted via the `futures::stream::FuturesUnordered<_>` now. This is not the most performant implementation for the problem at hand, but it is a reasonable simple one. Optimization could happen here, of course. Also, the implementation of resource preparation inside `RunnableJob::build_from_job()` got a rewrite using the same technique. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--src/commands/build.rs2
-rw-r--r--src/endpoint/configured.rs16
-rw-r--r--src/endpoint/scheduler.rs6
-rw-r--r--src/filestore/merged.rs14
-rw-r--r--src/job/runnable.rs46
-rw-r--r--src/job/set.rs11
-rw-r--r--src/orchestrator/orchestrator.rs17
7 files changed, 66 insertions, 46 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs
index 09af33d..08d7804 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -3,7 +3,6 @@ use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
-use std::sync::RwLock;
use anyhow::Context;
use anyhow::Error;
@@ -12,6 +11,7 @@ use anyhow::anyhow;
use clap_v3::ArgMatches;
use diesel::PgConnection;
use logcrate::debug;
+use tokio::sync::RwLock;
use crate::config::*;
use crate::filestore::ReleaseStore;
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 23b3069..a9d4982 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -2,7 +2,6 @@ use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
-use std::sync::RwLock;
use anyhow::Context;
use anyhow::Error;
@@ -11,6 +10,7 @@ use anyhow::anyhow;
use getset::{Getters, CopyGetters};
use shiplift::Docker;
use shiplift::ExecContainerOptions;
+use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedSender;
use typed_builder::TypedBuilder;
@@ -300,12 +300,14 @@ impl Endpoint {
.copy_from(&PathBuf::from("/outputs/"))
.map(|item| item.map_err(Error::from));
- let r = staging
- .write()
- .map_err(|_| anyhow!("Lock poisoned"))?
- .write_files_from_tar_stream(tar_stream)
- .await
- .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?;
+ let r = {
+ let mut writelock = staging.write().await;
+
+ writelock
+ .write_files_from_tar_stream(tar_stream)
+ .await
+ .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?
+ };
container.stop(Some(std::time::Duration::new(1, 0))).await?;
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 3c2ebee..8b99a1a 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -1,6 +1,5 @@
use std::path::PathBuf;
use std::sync::Arc;
-use std::sync::RwLock;
use anyhow::Context;
use anyhow::Error;
@@ -11,6 +10,7 @@ use futures::FutureExt;
use indicatif::ProgressBar;
use itertools::Itertools;
use tokio::stream::StreamExt;
+use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use uuid::Uuid;
@@ -84,7 +84,7 @@ impl EndpointScheduler {
let unordered = futures::stream::FuturesUnordered::new();
for ep in self.endpoints.iter().cloned() {
unordered.push(async move {
- let wl = ep.write().map_err(|_| anyhow!("Lock poisoned"))?;
+ let wl = ep.write().await;
wl.number_of_running_containers().await.map(|u| (u, ep.clone()))
});
}
@@ -125,7 +125,7 @@ impl JobHandle {
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
let ep = self.endpoint
.read()
- .map_err(|_| anyhow!("Lock poisoned"))?;
+ .await;
let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?;
let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?;
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
index 7cdeb78..38bd302 100644
--- a/src/filestore/merged.rs
+++ b/src/filestore/merged.rs
@@ -1,5 +1,5 @@
use std::sync::Arc;
-use std::sync::RwLock;
+use tokio::sync::RwLock;
use anyhow::Result;
use anyhow::anyhow;
@@ -25,10 +25,10 @@ impl MergedStores {
MergedStores { release, staging }
}
- pub fn get_artifact_by_name(&self, name: &PackageName) -> Result<Vec<Artifact>> {
+ pub async fn get_artifact_by_name(&self, name: &PackageName) -> Result<Vec<Artifact>> {
let v = self.staging
.read()
- .map_err(|_| anyhow!("Lock poisoned"))?
+ .await
.0
.values()
.filter(|a| a.name() == name)
@@ -39,7 +39,7 @@ impl MergedStores {
Ok({
self.release
.read()
- .map_err(|_| anyhow!("Lock poisoned"))?
+ .await
.0
.values()
.filter(|a| a.name() == name)
@@ -51,10 +51,10 @@ impl MergedStores {
}
}
- pub fn get_artifact_by_name_and_version(&self, name: &PackageName, version: &PackageVersionConstraint) -> Result<Vec<Artifact>> {
+ pub async fn get_artifact_by_name_and_version(&self, name: &PackageName, version: &PackageVersionConstraint) -> Result<Vec<Artifact>> {
let v = self.staging
.read()
- .map_err(|_| anyhow!("Lock poisoned"))?
+ .await
.0
.values()
.filter(|a| {
@@ -68,7 +68,7 @@ impl MergedStores {
Ok({
self.release
.read()
- .map_err(|_| anyhow!("Lock poisoned"))?
+ .await
.0
.values()
.filter(|a| a.name() == name && version.matches(a.version()))
diff --git a/src/job/runnable.rs b/src/job/runnable.rs
index 066d434..6de5622 100644
--- a/src/job/runnable.rs
+++ b/src/job/runnable.rs
@@ -3,6 +3,7 @@ use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use getset::Getters;
+use tokio::stream::StreamExt;
use uuid::Uuid;
use crate::filestore::MergedStores;
@@ -39,32 +40,39 @@ pub struct RunnableJob {
}
impl RunnableJob {
- pub fn build_from_job(job: Job, merged_stores: &MergedStores, source_cache: &SourceCache) -> Result<Self> {
+ pub async fn build_from_job(job: Job, merged_stores: &MergedStores, source_cache: &SourceCache) -> Result<Self> {
let script = ScriptBuilder::new(&job.script_shebang)
.build(&job.package, &job.script_phases)?;
trace!("Preparing build dependencies");
- let build_resources = job.package()
- .dependencies()
- .build()
- .into_iter()
- .map(|dep| Self::build_resource(dep, merged_stores));
-
- trace!("Preparing runtime dependencies");
- let runtime_resources = job.package()
- .dependencies()
- .runtime()
- .into_iter()
- .map(|dep| Self::build_resource(dep, merged_stores));
-
- let resources = build_resources.chain(runtime_resources)
- .collect::<Result<Vec<JobResource>>>()?;
+ let resources = {
+ let deps = job.package().dependencies();
+ let build = deps
+ .build()
+ .into_iter()
+ .map(|dep| Self::build_resource(dep, merged_stores))
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<JobResource>>>();
+
+ let rt = deps
+ .runtime()
+ .into_iter()
+ .map(|dep| Self::build_resource(dep, merged_stores))
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<JobResource>>>();
+
+ let (build, rt) = tokio::join!(build, rt);
+ let (mut build, mut rt) = (build?, rt?);
+
+ build.append(&mut rt);
+ build
+ };
Ok(RunnableJob {
uuid: job.uuid,
package: job.package,
image: job.image,
- resources: job.resources.into_iter().chain(resources.into_iter()).collect(),
+ resources,
source_cache: source_cache.clone(),
script,
@@ -76,10 +84,10 @@ impl RunnableJob {
self.source_cache.source_for(&self.package())
}
- fn build_resource(dep: &dyn ParseDependency, merged_stores: &MergedStores) -> Result<JobResource> {
+ async fn build_resource(dep: &dyn ParseDependency, merged_stores: &MergedStores) -> Result<JobResource> {
let (name, vers) = dep.parse_as_name_and_version()?;
trace!("Copying dep: {:?} {:?}", name, vers);
- let mut a = merged_stores.get_artifact_by_name_and_version(&name, &vers)?;
+ let mut a = merged_stores.get_artifact_by_name_and_version(&name, &vers).await?;
if a.is_empty() {
Err(anyhow!("Cannot find dependency: {:?} {:?}", name, vers))
diff --git a/src/job/set.rs b/src/job/set.rs
index 58bf7ce..4410e20 100644
--- a/src/job/set.rs
+++ b/src/job/set.rs
@@ -1,4 +1,6 @@
use anyhow::Result;
+use futures::future::Future;
+use tokio::stream::StreamExt;
use crate::filestore::MergedStores;
use crate::job::Job;
@@ -23,8 +25,13 @@ impl JobSet {
self.set.is_empty()
}
- pub fn into_runables<'a>(self, merged_stores: &'a MergedStores, source_cache: &'a SourceCache) -> impl Iterator<Item = Result<RunnableJob>> + 'a {
- self.set.into_iter().map(move |j| RunnableJob::build_from_job(j, merged_stores, source_cache))
+ pub async fn into_runables<'a>(self, merged_stores: &'a MergedStores, source_cache: &'a SourceCache) -> Result<Vec<RunnableJob>> {
+ self.set
+ .into_iter()
+ .map(move |j| RunnableJob::build_from_job(j, merged_stores, source_cache))
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<RunnableJob>>>()
+ .await
}
pub fn len(&self) -> usize {
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 33dcd1f..c486296 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -1,6 +1,6 @@
use std::path::PathBuf;
use std::sync::Arc;
-use std::sync::RwLock;
+use tokio::sync::RwLock;
use anyhow::Context;
use anyhow::Error;
@@ -78,10 +78,11 @@ impl Orchestrator {
for (i, jobset) in self.jobsets.into_iter().enumerate() {
let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone());
+ let multibar = indicatif::MultiProgress::new();
+
let results = { // run the jobs in the set
let unordered_results = futures::stream::FuturesUnordered::new();
- for runnable in jobset.into_runables(&merged_store, &self.source_cache) {
- let runnable = runnable?;
+ for runnable in jobset.into_runables(&merged_store, &self.source_cache).await?.into_iter() {
let job_id = runnable.uuid().clone();
trace!("Runnable {} for package {}", job_id, runnable.package().name());
@@ -100,15 +101,17 @@ impl Orchestrator {
unordered_results.collect::<Result<Vec<_>>>()
};
- let results = results.await?
+ let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
+
+ let (results, barres) = tokio::join!(results, multibar_block);
+ let _ = barres?;
+ let results = results?
.into_iter()
.flatten()
.collect::<Vec<PathBuf>>();
{ // check if all paths that were written are actually there in the staging store
- let staging_store_lock = self.staging_store
- .read()
- .map_err(|_| anyhow!("Lock Poisoned"))?;
+ let staging_store_lock = self.staging_store.read().await;
trace!("Checking {} results...", results.len());
for path in results.iter() {