summaryrefslogtreecommitdiffstats
path: root/src/job/runnable.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-14 17:56:11 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-14 18:18:27 +0100
commitfcecf5fce755b2870cf0baae4a29104d146df948 (patch)
tree978997e6d0a5f1443befd28fdda35271e05f9c8f /src/job/runnable.rs
parent056a2c66e3d3fc8c4b6803acb9d151673b78a23f (diff)
Rewrite to use tokio::sync::RwLock
This patch rewrites the codebase to not use std::sync::RwLock, but tokio::sync::RwLock. tokios RwLock is an async RwLock, which is what we want in an async-await context. The more I use tokio, the more I understand what you should do and what you shouldn't do. Some parts of this patch are a rewrite, for example, JobSet::into_runables() was completely rewritten. That was necessary because the function used inside is `Runnable::build_from_job()`, which uses an RwLock internally, thus, gets `async` in this patch. Because of this, `JobSet::into_runables()` needed a complete rewrite as well. Because it is way more difficult than transforming the function to return an iterator of futures, this patch simply rewrites it to return a `Result<Vec<RunnableJob>>` instead. Internally, tokio jobs are submitted via the `futures::stream::FuturesUnordered<_>` now. This is not the most performant implementation for the problem at hand, but it is a reasonable simple one. Optimization could happen here, of course. Also, the implementation of resource preparation inside `RunnableJob::build_from_job()` got a rewrite using the same technique. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/job/runnable.rs')
-rw-r--r--src/job/runnable.rs46
1 files changed, 27 insertions, 19 deletions
diff --git a/src/job/runnable.rs b/src/job/runnable.rs
index 066d434..6de5622 100644
--- a/src/job/runnable.rs
+++ b/src/job/runnable.rs
@@ -3,6 +3,7 @@ use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use getset::Getters;
+use tokio::stream::StreamExt;
use uuid::Uuid;
use crate::filestore::MergedStores;
@@ -39,32 +40,39 @@ pub struct RunnableJob {
}
impl RunnableJob {
- pub fn build_from_job(job: Job, merged_stores: &MergedStores, source_cache: &SourceCache) -> Result<Self> {
+ pub async fn build_from_job(job: Job, merged_stores: &MergedStores, source_cache: &SourceCache) -> Result<Self> {
let script = ScriptBuilder::new(&job.script_shebang)
.build(&job.package, &job.script_phases)?;
trace!("Preparing build dependencies");
- let build_resources = job.package()
- .dependencies()
- .build()
- .into_iter()
- .map(|dep| Self::build_resource(dep, merged_stores));
-
- trace!("Preparing runtime dependencies");
- let runtime_resources = job.package()
- .dependencies()
- .runtime()
- .into_iter()
- .map(|dep| Self::build_resource(dep, merged_stores));
-
- let resources = build_resources.chain(runtime_resources)
- .collect::<Result<Vec<JobResource>>>()?;
+ let resources = {
+ let deps = job.package().dependencies();
+ let build = deps
+ .build()
+ .into_iter()
+ .map(|dep| Self::build_resource(dep, merged_stores))
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<JobResource>>>();
+
+ let rt = deps
+ .runtime()
+ .into_iter()
+ .map(|dep| Self::build_resource(dep, merged_stores))
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<JobResource>>>();
+
+ let (build, rt) = tokio::join!(build, rt);
+ let (mut build, mut rt) = (build?, rt?);
+
+ build.append(&mut rt);
+ build
+ };
Ok(RunnableJob {
uuid: job.uuid,
package: job.package,
image: job.image,
- resources: job.resources.into_iter().chain(resources.into_iter()).collect(),
+ resources,
source_cache: source_cache.clone(),
script,
@@ -76,10 +84,10 @@ impl RunnableJob {
self.source_cache.source_for(&self.package())
}
- fn build_resource(dep: &dyn ParseDependency, merged_stores: &MergedStores) -> Result<JobResource> {
+ async fn build_resource(dep: &dyn ParseDependency, merged_stores: &MergedStores) -> Result<JobResource> {
let (name, vers) = dep.parse_as_name_and_version()?;
trace!("Copying dep: {:?} {:?}", name, vers);
- let mut a = merged_stores.get_artifact_by_name_and_version(&name, &vers)?;
+ let mut a = merged_stores.get_artifact_by_name_and_version(&name, &vers).await?;
if a.is_empty() {
Err(anyhow!("Cannot find dependency: {:?} {:?}", name, vers))