diff options
-rw-r--r-- | src/commands/build.rs | 2 | ||||
-rw-r--r-- | src/endpoint/configured.rs | 16 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 6 | ||||
-rw-r--r-- | src/filestore/merged.rs | 14 | ||||
-rw-r--r-- | src/job/runnable.rs | 46 | ||||
-rw-r--r-- | src/job/set.rs | 11 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 17 |
7 files changed, 66 insertions, 46 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs index 09af33d..08d7804 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -3,7 +3,6 @@ use std::io::Write; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; -use std::sync::RwLock; use anyhow::Context; use anyhow::Error; @@ -12,6 +11,7 @@ use anyhow::anyhow; use clap_v3::ArgMatches; use diesel::PgConnection; use logcrate::debug; +use tokio::sync::RwLock; use crate::config::*; use crate::filestore::ReleaseStore; diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 23b3069..a9d4982 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -2,7 +2,6 @@ use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use std::sync::RwLock; use anyhow::Context; use anyhow::Error; @@ -11,6 +10,7 @@ use anyhow::anyhow; use getset::{Getters, CopyGetters}; use shiplift::Docker; use shiplift::ExecContainerOptions; +use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedSender; use typed_builder::TypedBuilder; @@ -300,12 +300,14 @@ impl Endpoint { .copy_from(&PathBuf::from("/outputs/")) .map(|item| item.map_err(Error::from)); - let r = staging - .write() - .map_err(|_| anyhow!("Lock poisoned"))? - .write_files_from_tar_stream(tar_stream) - .await - .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?; + let r = { + let mut writelock = staging.write().await; + + writelock + .write_files_from_tar_stream(tar_stream) + .await + .with_context(|| anyhow!("Copying the TAR stream to the staging store"))? + }; container.stop(Some(std::time::Duration::new(1, 0))).await?; diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 3c2ebee..8b99a1a 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -1,6 +1,5 @@ use std::path::PathBuf; use std::sync::Arc; -use std::sync::RwLock; use anyhow::Context; use anyhow::Error; @@ -11,6 +10,7 @@ use futures::FutureExt; use indicatif::ProgressBar; use itertools::Itertools; use tokio::stream::StreamExt; +use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedSender; use uuid::Uuid; @@ -84,7 +84,7 @@ impl EndpointScheduler { let unordered = futures::stream::FuturesUnordered::new(); for ep in self.endpoints.iter().cloned() { unordered.push(async move { - let wl = ep.write().map_err(|_| anyhow!("Lock poisoned"))?; + let wl = ep.write().await; wl.number_of_running_containers().await.map(|u| (u, ep.clone())) }); } @@ -125,7 +125,7 @@ impl JobHandle { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>(); let ep = self.endpoint .read() - .map_err(|_| anyhow!("Lock poisoned"))?; + .await; let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?; let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?; diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs index 7cdeb78..38bd302 100644 --- a/src/filestore/merged.rs +++ b/src/filestore/merged.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::sync::RwLock; +use tokio::sync::RwLock; use anyhow::Result; use anyhow::anyhow; @@ -25,10 +25,10 @@ impl MergedStores { MergedStores { release, staging } } - pub fn get_artifact_by_name(&self, name: &PackageName) -> Result<Vec<Artifact>> { + pub async fn get_artifact_by_name(&self, name: &PackageName) -> Result<Vec<Artifact>> { let v = self.staging .read() - .map_err(|_| anyhow!("Lock poisoned"))? + .await .0 .values() .filter(|a| a.name() == name) @@ -39,7 +39,7 @@ impl MergedStores { Ok({ self.release .read() - .map_err(|_| anyhow!("Lock poisoned"))? + .await .0 .values() .filter(|a| a.name() == name) @@ -51,10 +51,10 @@ impl MergedStores { } } - pub fn get_artifact_by_name_and_version(&self, name: &PackageName, version: &PackageVersionConstraint) -> Result<Vec<Artifact>> { + pub async fn get_artifact_by_name_and_version(&self, name: &PackageName, version: &PackageVersionConstraint) -> Result<Vec<Artifact>> { let v = self.staging .read() - .map_err(|_| anyhow!("Lock poisoned"))? + .await .0 .values() .filter(|a| { @@ -68,7 +68,7 @@ impl MergedStores { Ok({ self.release .read() - .map_err(|_| anyhow!("Lock poisoned"))? + .await .0 .values() .filter(|a| a.name() == name && version.matches(a.version())) diff --git a/src/job/runnable.rs b/src/job/runnable.rs index 066d434..6de5622 100644 --- a/src/job/runnable.rs +++ b/src/job/runnable.rs @@ -3,6 +3,7 @@ use anyhow::Error; use anyhow::Result; use anyhow::anyhow; use getset::Getters; +use tokio::stream::StreamExt; use uuid::Uuid; use crate::filestore::MergedStores; @@ -39,32 +40,39 @@ pub struct RunnableJob { } impl RunnableJob { - pub fn build_from_job(job: Job, merged_stores: &MergedStores, source_cache: &SourceCache) -> Result<Self> { + pub async fn build_from_job(job: Job, merged_stores: &MergedStores, source_cache: &SourceCache) -> Result<Self> { let script = ScriptBuilder::new(&job.script_shebang) .build(&job.package, &job.script_phases)?; trace!("Preparing build dependencies"); - let build_resources = job.package() - .dependencies() - .build() - .into_iter() - .map(|dep| Self::build_resource(dep, merged_stores)); - - trace!("Preparing runtime dependencies"); - let runtime_resources = job.package() - .dependencies() - .runtime() - .into_iter() - .map(|dep| Self::build_resource(dep, merged_stores)); - - let resources = build_resources.chain(runtime_resources) - .collect::<Result<Vec<JobResource>>>()?; + let resources = { + let deps = job.package().dependencies(); + let build = deps + .build() + .into_iter() + .map(|dep| Self::build_resource(dep, merged_stores)) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<Vec<JobResource>>>(); + + let rt = deps + .runtime() + .into_iter() + .map(|dep| Self::build_resource(dep, merged_stores)) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<Vec<JobResource>>>(); + + let (build, rt) = tokio::join!(build, rt); + let (mut build, mut rt) = (build?, rt?); + + build.append(&mut rt); + build + }; Ok(RunnableJob { uuid: job.uuid, package: job.package, image: job.image, - resources: job.resources.into_iter().chain(resources.into_iter()).collect(), + resources, source_cache: source_cache.clone(), script, @@ -76,10 +84,10 @@ impl RunnableJob { self.source_cache.source_for(&self.package()) } - fn build_resource(dep: &dyn ParseDependency, merged_stores: &MergedStores) -> Result<JobResource> { + async fn build_resource(dep: &dyn ParseDependency, merged_stores: &MergedStores) -> Result<JobResource> { let (name, vers) = dep.parse_as_name_and_version()?; trace!("Copying dep: {:?} {:?}", name, vers); - let mut a = merged_stores.get_artifact_by_name_and_version(&name, &vers)?; + let mut a = merged_stores.get_artifact_by_name_and_version(&name, &vers).await?; if a.is_empty() { Err(anyhow!("Cannot find dependency: {:?} {:?}", name, vers)) diff --git a/src/job/set.rs b/src/job/set.rs index 58bf7ce..4410e20 100644 --- a/src/job/set.rs +++ b/src/job/set.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use futures::future::Future; +use tokio::stream::StreamExt; use crate::filestore::MergedStores; use crate::job::Job; @@ -23,8 +25,13 @@ impl JobSet { self.set.is_empty() } - pub fn into_runables<'a>(self, merged_stores: &'a MergedStores, source_cache: &'a SourceCache) -> impl Iterator<Item = Result<RunnableJob>> + 'a { - self.set.into_iter().map(move |j| RunnableJob::build_from_job(j, merged_stores, source_cache)) + pub async fn into_runables<'a>(self, merged_stores: &'a MergedStores, source_cache: &'a SourceCache) -> Result<Vec<RunnableJob>> { + self.set + .into_iter() + .map(move |j| RunnableJob::build_from_job(j, merged_stores, source_cache)) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<Vec<RunnableJob>>>() + .await } pub fn len(&self) -> usize { diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 33dcd1f..c486296 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -1,6 +1,6 @@ use std::path::PathBuf; use std::sync::Arc; -use std::sync::RwLock; +use tokio::sync::RwLock; use anyhow::Context; use anyhow::Error; @@ -78,10 +78,11 @@ impl Orchestrator { for (i, jobset) in self.jobsets.into_iter().enumerate() { let merged_store = MergedStores::new(self.release_store.clone(), self.staging_store.clone()); + let multibar = indicatif::MultiProgress::new(); + let results = { // run the jobs in the set let unordered_results = futures::stream::FuturesUnordered::new(); - for runnable in jobset.into_runables(&merged_store, &self.source_cache) { - let runnable = runnable?; + for runnable in jobset.into_runables(&merged_store, &self.source_cache).await?.into_iter() { let job_id = runnable.uuid().clone(); trace!("Runnable {} for package {}", job_id, runnable.package().name()); @@ -100,15 +101,17 @@ impl Orchestrator { unordered_results.collect::<Result<Vec<_>>>() }; - let results = results.await? + let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); + + let (results, barres) = tokio::join!(results, multibar_block); + let _ = barres?; + let results = results? .into_iter() .flatten() .collect::<Vec<PathBuf>>(); { // check if all paths that were written are actually there in the staging store - let staging_store_lock = self.staging_store - .read() - .map_err(|_| anyhow!("Lock Poisoned"))?; + let staging_store_lock = self.staging_store.read().await; trace!("Checking {} results...", results.len()); for path in results.iter() { |