diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-14 17:56:11 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-14 18:18:27 +0100 |
commit | fcecf5fce755b2870cf0baae4a29104d146df948 (patch) | |
tree | 978997e6d0a5f1443befd28fdda35271e05f9c8f /src/job/runnable.rs | |
parent | 056a2c66e3d3fc8c4b6803acb9d151673b78a23f (diff) |
Rewrite to use tokio::sync::RwLock
This patch rewrites the codebase to not use std::sync::RwLock, but
tokio::sync::RwLock.
tokios RwLock is an async RwLock, which is what we want in an
async-await context. The more I use tokio, the more I understand what
you should do and what you shouldn't do.
Some parts of this patch are a rewrite, for example,
JobSet::into_runables()
was completely rewritten.
That was necessary because the function used inside is
`Runnable::build_from_job()`, which uses an RwLock internally, thus,
gets `async` in this patch.
Because of this, `JobSet::into_runables()` needed a complete rewrite as
well.
Because it is way more difficult than transforming the function to
return an iterator of futures, this patch simply rewrites it to return a
`Result<Vec<RunnableJob>>` instead.
Internally, tokio jobs are submitted via the
`futures::stream::FuturesUnordered<_>` now.
This is not the most performant implementation for the problem at hand,
but it is a reasonable simple one. Optimization could happen here, of
course.
Also, the implementation of resource preparation inside
`RunnableJob::build_from_job()` got a rewrite using the same technique.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/job/runnable.rs')
-rw-r--r-- | src/job/runnable.rs | 46 |
1 files changed, 27 insertions, 19 deletions
diff --git a/src/job/runnable.rs b/src/job/runnable.rs index 066d434..6de5622 100644 --- a/src/job/runnable.rs +++ b/src/job/runnable.rs @@ -3,6 +3,7 @@ use anyhow::Error; use anyhow::Result; use anyhow::anyhow; use getset::Getters; +use tokio::stream::StreamExt; use uuid::Uuid; use crate::filestore::MergedStores; @@ -39,32 +40,39 @@ pub struct RunnableJob { } impl RunnableJob { - pub fn build_from_job(job: Job, merged_stores: &MergedStores, source_cache: &SourceCache) -> Result<Self> { + pub async fn build_from_job(job: Job, merged_stores: &MergedStores, source_cache: &SourceCache) -> Result<Self> { let script = ScriptBuilder::new(&job.script_shebang) .build(&job.package, &job.script_phases)?; trace!("Preparing build dependencies"); - let build_resources = job.package() - .dependencies() - .build() - .into_iter() - .map(|dep| Self::build_resource(dep, merged_stores)); - - trace!("Preparing runtime dependencies"); - let runtime_resources = job.package() - .dependencies() - .runtime() - .into_iter() - .map(|dep| Self::build_resource(dep, merged_stores)); - - let resources = build_resources.chain(runtime_resources) - .collect::<Result<Vec<JobResource>>>()?; + let resources = { + let deps = job.package().dependencies(); + let build = deps + .build() + .into_iter() + .map(|dep| Self::build_resource(dep, merged_stores)) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<Vec<JobResource>>>(); + + let rt = deps + .runtime() + .into_iter() + .map(|dep| Self::build_resource(dep, merged_stores)) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<Vec<JobResource>>>(); + + let (build, rt) = tokio::join!(build, rt); + let (mut build, mut rt) = (build?, rt?); + + build.append(&mut rt); + build + }; Ok(RunnableJob { uuid: job.uuid, package: job.package, image: job.image, - resources: job.resources.into_iter().chain(resources.into_iter()).collect(), + resources, source_cache: source_cache.clone(), script, @@ -76,10 +84,10 @@ impl RunnableJob { self.source_cache.source_for(&self.package()) } - fn build_resource(dep: &dyn ParseDependency, merged_stores: &MergedStores) -> Result<JobResource> { + async fn build_resource(dep: &dyn ParseDependency, merged_stores: &MergedStores) -> Result<JobResource> { let (name, vers) = dep.parse_as_name_and_version()?; trace!("Copying dep: {:?} {:?}", name, vers); - let mut a = merged_stores.get_artifact_by_name_and_version(&name, &vers)?; + let mut a = merged_stores.get_artifact_by_name_and_version(&name, &vers).await?; if a.is_empty() { Err(anyhow!("Cannot find dependency: {:?} {:?}", name, vers)) |