diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/commands/build.rs | 5 | ||||
-rw-r--r-- | src/filestore/merged.rs | 68 | ||||
-rw-r--r-- | src/filestore/path.rs | 4 | ||||
-rw-r--r-- | src/filestore/staging.rs | 5 | ||||
-rw-r--r-- | src/filestore/util.rs | 9 | ||||
-rw-r--r-- | src/job/mod.rs | 4 | ||||
-rw-r--r-- | src/job/runnable.rs | 77 | ||||
-rw-r--r-- | src/job/set.rs | 351 | ||||
-rw-r--r-- | src/job/tree.rs | 80 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 262 |
10 files changed, 272 insertions, 593 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs index 5cc0dc0..c9a564b 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -33,7 +33,6 @@ use crate::filestore::path::StoreRoot; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; use crate::job::JobResource; -use crate::job::JobSet; use crate::log::LogItem; use crate::orchestrator::OrchestratorSetup; use crate::package::PackageName; @@ -306,7 +305,7 @@ pub async fn build( trace!("Setting up job sets"); let resources: Vec<JobResource> = additional_env.into_iter().map(JobResource::from).collect(); - let jobsets = JobSet::sets_from_tree(tree, shebang, image_name, phases.clone(), resources)?; + let jobtree = crate::job::Tree::from_package_tree(tree, shebang, image_name, phases.clone(), resources); trace!("Setting up job sets finished successfully"); trace!("Setting up Orchestrator"); @@ -324,7 +323,7 @@ pub async fn build( } else { None }) - .jobsets(jobsets) + .jobtree(jobtree) .config(config) .build() .setup() diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs index d1bcfe7..c8ed90d 100644 --- a/src/filestore/merged.rs +++ b/src/filestore/merged.rs @@ -9,18 +9,17 @@ // use std::sync::Arc; - -use log::trace; -use tokio::sync::RwLock; +use std::path::Path; use anyhow::Result; use getset::Getters; +use log::trace; +use tokio::sync::RwLock; use crate::filestore::Artifact; +use crate::filestore::path::ArtifactPath; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; -use crate::package::PackageName; -use crate::package::PackageVersionConstraint; /// A type that merges the release store and the staging store /// @@ -40,55 +39,22 @@ impl MergedStores { MergedStores { release, staging } } - pub async fn get_artifact_by_name_and_version( - &self, - name: &PackageName, - version: &PackageVersionConstraint, - ) -> Result<Vec<Artifact>> { - let v = self - .staging - .read() - .await - .0 - .values() - .filter(|a| { - trace!( - "Checking {:?} == {:?} && {:?} == {:?}", - a.name(), - name, - version, - a.version() - ); - a.name() == name && version.matches(a.version()) - }) - .cloned() - .collect::<Vec<_>>(); - - if v.is_empty() { - Ok({ - self.release - .read() - .await - .0 - .values() - .filter(|a| a.name() == name && version.matches(a.version())) - .cloned() - .collect() - }) - } else { - Ok(v) - } - } - pub async fn get_artifact_by_path(&self, p: &Path) -> Result<Option<Artifact>> { + trace!("Fetching artifact from path: {:?}", p.display()); let artifact_path = ArtifactPath::new(p.to_path_buf())?; let staging = &mut self.staging.write().await.0; let staging_path = staging.root_path().join(&artifact_path)?; + trace!("staging_path = {:?}", staging_path.display()); if staging_path.exists() { - let art_path = ArtifactPath::new(p.to_path_buf())?; - let art = staging.load_from_path(&artifact_path)?; + let art = if let Some(art) = staging.get(&artifact_path) { + art + } else { + trace!("Loading path from staging store: {:?}", artifact_path.display()); + staging.load_from_path(&artifact_path)? + }; + return Ok(Some(art.clone())) } @@ -96,9 +62,15 @@ impl MergedStores { let release = &mut self.release.write().await.0; let release_path = release.root_path().join(&artifact_path)?; + trace!("release_path = {:?}", release_path); if release_path.exists() { - let art = release.load_from_path(&artifact_path)?; + let art = if let Some(art) = release.get(&artifact_path) { + art + } else { + trace!("Loading path from release store: {:?}", artifact_path.display()); + release.load_from_path(&artifact_path)? + }; return Ok(Some(art.clone())) } diff --git a/src/filestore/path.rs b/src/filestore/path.rs index 9262d1d..9ddc989 100644 --- a/src/filestore/path.rs +++ b/src/filestore/path.rs @@ -73,10 +73,6 @@ impl StoreRoot { FullArtifactPath(self, ap) } - pub(in crate::filestore) fn is_file(&self, subpath: &Path) -> bool { - self.0.join(subpath).is_file() - } - pub(in crate::filestore) fn is_dir(&self, subpath: &Path) -> bool { self.0.join(subpath).is_dir() } diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs index 995768c..b944d84 100644 --- a/src/filestore/staging.rs +++ b/src/filestore/staging.rs @@ -9,7 +9,6 @@ // use std::fmt::Debug; -use std::path::Path; use anyhow::anyhow; use anyhow::Context; @@ -101,8 +100,4 @@ impl StagingStore { pub fn root_path(&self) -> &StoreRoot { self.0.root_path() } - - pub fn path_exists_in_store_root(&self, path: &Path) -> bool { - self.0.path_exists_in_store_root(path) - } } diff --git a/src/filestore/util.rs b/src/filestore/util.rs index c931545..2fc1483 100644 --- a/src/filestore/util.rs +++ b/src/filestore/util.rs @@ -12,7 +12,6 @@ //! use std::collections::BTreeMap; -use std::path::Path; use anyhow::anyhow; use anyhow::Result; @@ -51,12 +50,8 @@ impl FileStoreImpl { &self.root } - pub fn path_exists_in_store_root(&self, p: &Path) -> bool { - self.root.is_file(p) - } - - pub(in crate::filestore) fn values(&self) -> impl Iterator<Item = &Artifact> { - self.store.values() + pub fn get(&self, artifact_path: &ArtifactPath) -> Option<&Artifact> { + self.store.get(artifact_path) } pub(in crate::filestore) fn load_from_path( diff --git a/src/job/mod.rs b/src/job/mod.rs index 719d2da..e684935 100644 --- a/src/job/mod.rs +++ b/src/job/mod.rs @@ -12,8 +12,8 @@ mod job; pub use job::*; -mod set; -pub use set::*; +mod tree; +pub use tree::*; mod resource; pub use resource::*; diff --git a/src/job/runnable.rs b/src/job/runnable.rs index 7a0fc19..620df73 100644 --- a/src/job/runnable.rs +++ b/src/job/runnable.rs @@ -12,25 +12,22 @@ use std::collections::HashMap; use anyhow::anyhow; use anyhow::Context; -use anyhow::Error; use anyhow::Result; use getset::Getters; -use log::{debug, trace, warn}; -use tokio::stream::StreamExt; +use log::debug; use uuid::Uuid; use crate::config::Configuration; -use crate::filestore::MergedStores; +use crate::filestore::Artifact; use crate::job::Job; use crate::job::JobResource; use crate::package::Package; -use crate::package::ParseDependency; use crate::package::Script; use crate::package::ScriptBuilder; use crate::source::SourceCache; use crate::source::SourceEntry; -use crate::util::docker::ImageName; use crate::util::EnvironmentVariableName; +use crate::util::docker::ImageName; /// A job configuration that can be run. All inputs are clear here. #[derive(Debug, Getters)] @@ -56,39 +53,22 @@ pub struct RunnableJob { impl RunnableJob { pub async fn build_from_job( - job: Job, - merged_stores: &MergedStores, + job: &Job, source_cache: &SourceCache, config: &Configuration, + dependencies: Vec<Artifact>, ) -> Result<Self> { - trace!("Preparing build dependencies"); - let resources = { - let mut resources = job - .package() - .dependencies() - .build() - .iter() - .map(|dep| Self::build_resource(dep, merged_stores)) - .chain({ - job.package() - .dependencies() - .runtime() - .iter() - .map(|dep| Self::build_resource(dep, merged_stores)) - }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<Vec<JobResource>>>() - .await?; - - resources.extend({ + // Add the environment from the original Job object to the resources + let resources = dependencies + .into_iter() + .map(JobResource::from) + .chain({ job.resources() .iter() .filter(|jr| jr.env().is_some()) .cloned() - }); - - resources - }; + }) + .collect(); if config.containers().check_env_names() { debug!("Checking environment if all variables are allowed!"); @@ -121,9 +101,9 @@ impl RunnableJob { )?; Ok(RunnableJob { - uuid: job.uuid, - package: job.package, - image: job.image, + uuid: job.uuid.clone(), + package: job.package.clone(), + image: job.image.clone(), resources, source_cache: source_cache.clone(), @@ -179,31 +159,4 @@ impl RunnableJob { ] } - async fn build_resource( - dep: &dyn ParseDependency, - merged_stores: &MergedStores, - ) -> Result<JobResource> { - let (name, vers) = dep.parse_as_name_and_version()?; - trace!("Copying dep: {:?} {:?}", name, vers); - let mut a = merged_stores - .get_artifact_by_name_and_version(&name, &vers) - .await?; - - if a.is_empty() { - Err(anyhow!("Cannot find dependency: {:?} {:?}", name, vers)) - .context("Building a runnable job") - .map_err(Error::from) - } else { - a.sort(); - let a_len = a.len(); - let found_dependency = a.into_iter().next().unwrap(); // save by above check - if a_len > 1 { - warn!("Found more than one dependency for {:?} {:?}", name, vers); - warn!("Using: {:?}", found_dependency); - warn!("Please investigate, this might be a BUG"); - } - - Ok(JobResource::Artifact(found_dependency)) - } - } } diff --git a/src/job/set.rs b/src/job/set.rs deleted file mode 100644 index b9e3881..0000000 --- a/src/job/set.rs +++ /dev/null @@ -1,351 +0,0 @@ -// -// Copyright (c) 2020-2021 science+computing ag and other contributors -// -// This program and the accompanying materials are made -// available under the terms of the Eclipse Public License 2.0 -// which is available at https://www.eclipse.org/legal/epl-2.0/ -// -// SPDX-License-Identifier: EPL-2.0 -// - -use anyhow::Result; -use log::{debug, trace}; -use tokio::stream::StreamExt; - -use crate::config::Configuration; -use crate::filestore::MergedStores; -use crate::job::Job; -use crate::job::JobResource; -use crate::job::RunnableJob; -use crate::package::PhaseName; -use crate::package::Shebang; -use crate::package::Tree; -use crate::source::SourceCache; -use crate::util::docker::ImageName; - -/// A set of jobs that could theoretically be run in parallel -#[derive(Debug)] -pub struct JobSet { - set: Vec<Job>, -} - -impl JobSet { - pub fn sets_from_tree( - t: Tree, - shebang: Shebang, - image: ImageName, - phases: Vec<PhaseName>, - resources: Vec<JobResource>, - ) -> Result<Vec<JobSet>> { - tree_into_jobsets(t, shebang, image, phases, resources) - } - - fn is_empty(&self) -> bool { - self.set.is_empty() - } - - pub async fn into_runables<'a>( - self, - merged_stores: &'a MergedStores, - source_cache: &'a SourceCache, - config: &Configuration, - ) -> Result<Vec<RunnableJob>> { - self.set - .into_iter() - .map(move |j| RunnableJob::build_from_job(j, merged_stores, source_cache, config)) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<Vec<RunnableJob>>>() - .await - } -} - -/// Get the tree as sets of jobs, the deepest level of the tree first -fn tree_into_jobsets( - tree: Tree, - shebang: Shebang, - image: ImageName, - phases: Vec<PhaseName>, - resources: Vec<JobResource>, -) -> Result<Vec<JobSet>> { - fn inner( - tree: Tree, - shebang: &Shebang, - image: &ImageName, - phases: &[PhaseName], - resources: &[JobResource], - ) -> Result<Vec<JobSet>> { - trace!("Creating jobsets for tree: {:?}", tree); - - let mut sets = vec![]; - let mut current_set = vec![]; - - for (package, dep) in tree.into_iter() { - trace!("Recursing for package: {:?}", package); - let mut sub_sets = inner(dep, shebang, image, phases, resources)?; // recursion! - sets.append(&mut sub_sets); - current_set.push(package); - } - - debug!("Jobset for set: {:?}", current_set); - let jobset = JobSet { - set: current_set - .into_iter() - .map(|package| { - Job::new( - package, - shebang.clone(), - image.clone(), - phases.to_vec(), - resources.to_vec(), - ) - }) - .collect(), - }; - debug!("Jobset = {:?}", jobset); - - // make sure the current recursion is added _before_ all other recursions - // which yields the highest level in the tree as _first_ element of the resulting vector - let mut result = Vec::new(); - if !jobset.is_empty() { - debug!("Adding jobset: {:?}", jobset); - result.push(jobset) - } - result.append(&mut sets); - debug!("Result = {:?}", result); - Ok(result) - } - - inner(tree, &shebang, &image, &phases, &resources).map(|mut v| { - // reverse, because the highest level in the tree is added as first element in the vector - // and the deepest level is last. - // - // After reversing, we have a chain of things to build. Awesome, huh? - v.reverse(); - v - }) -} - -#[cfg(test)] -mod tests { - use super::*; - - use std::collections::BTreeMap; - - use crate::package::tests::package; - use crate::package::tests::pname; - use crate::package::tests::pversion; - use crate::package::Dependencies; - use crate::package::Dependency; - use crate::package::PhaseName; - use crate::repository::Repository; - use crate::util::docker::ImageName; - - use indicatif::ProgressBar; - - fn setup_logging() { - let _ = ::env_logger::try_init(); - } - - #[test] - fn test_one_element_tree_to_jobsets() { - setup_logging(); - let mut btree = BTreeMap::new(); - - let p1 = { - let name = "a"; - let vers = "1"; - let pack = package(name, vers, "https://rust-lang.org", "123"); - btree.insert((pname(name), pversion(vers)), pack.clone()); - pack - }; - - let repo = Repository::from(btree); - let progress = ProgressBar::hidden(); - - let mut tree = Tree::default(); - let r = tree.add_package(p1, &repo, progress); - assert!(r.is_ok()); - - let image = ImageName::from(String::from("test")); - let phases = vec![PhaseName::from(String::from("testphase"))]; - let shebang = Shebang::from(String::from("#!/bin/bash")); - - let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]); - assert!(js.is_ok()); - let js = js.unwrap(); - - assert_eq!(js.len(), 1, "There should be only one jobset if there is only one element in the dependency tree: {:?}", js); - - let js = js.get(0).unwrap(); - assert_eq!( - js.set.len(), - 1, - "The jobset should contain exactly one job: {:?}", - js - ); - - let job = js.set.get(0).unwrap(); - assert_eq!( - *job.package.name(), - pname("a"), - "The job should be for the package 'a': {:?}", - job - ); - } - - #[test] - fn test_two_element_tree_to_jobsets() { - setup_logging(); - let mut btree = BTreeMap::new(); - - let p1 = { - let name = "a"; - let vers = "1"; - let pack = package(name, vers, "https://rust-lang.org", "123"); - btree.insert((pname(name), pversion(vers)), pack.clone()); - pack - }; - - let p2 = { - let name = "b"; - let vers = "2"; - let pack = package(name, vers, "https://rust-lang.org", "124"); - btree.insert((pname(name), pversion(vers)), pack.clone()); - pack - }; - - let repo = Repository::from(btree); - let progress = ProgressBar::hidden(); - - let mut tree = Tree::default(); - let r = tree.add_package(p1, &repo, progress.clone()); - assert!(r.is_ok()); - - let r = tree.add_package(p2, &repo, progress); - assert!(r.is_ok()); - - let image = ImageName::from(String::from("test")); - let phases = vec![PhaseName::from(String::from("testphase"))]; - let shebang = Shebang::from(String::from("#!/bin/bash")); - - let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]); - assert!(js.is_ok()); - let js = js.unwrap(); - - assert_eq!( - js.len(), - 1, - "There should be one set of jobs for two packages on the same level in the tree: {:?}", - js - ); - - let js = js.get(0).unwrap(); - assert_eq!( - js.set.len(), - 2, - "The jobset should contain exactly two jobs: {:?}", - js - ); - - let job = js.set.get(0).unwrap(); - assert_eq!( - *job.package.name(), - pname("a"), - "The job should be for the package 'a': {:?}", - job - ); - - let job = js.set.get(1).unwrap(); - assert_eq!( - *job.package.name(), - pname("b"), - "The job should be for the package 'a': {:?}", - job - ); - } - - #[test] - fn test_two_dependent_elements_to_jobsets() { - setup_logging(); - let mut btree = BTreeMap::new(); - - let p1 = { - let name = "a"; - let vers = "1"; - let mut pack = package(name, vers, "https://rust-lang.org", "123"); - { - let d1 = Dependency::from(String::from("b =2")); - let ds = Dependencies::with_runtime_dependencies(vec![d1]); - pack.set_dependencies(ds); - } - btree.insert((pname(name), pversion(vers)), pack.clone()); - pack - }; - - let _ = { - let name = "b"; - let vers = "2"; - let pack = package(name, vers, "https://rust-lang.org", "124"); - btree.insert((pname(name), pversion(vers)), pack.clone()); - pack - }; - - let repo = Repository::from(btree); - let progress = ProgressBar::hidden(); - - let mut tree = Tree::default(); - let r = tree.add_package(p1, &repo, progress); - assert!(r.is_ok()); - - let image = ImageName::from(String::from("test")); - let phases = vec![PhaseName::from(String::from("testphase"))]; - let shebang = Shebang::from(String::from("#!/bin/bash")); - - let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]); - assert!(js.is_ok()); - let js = js.unwrap(); - - assert_eq!( - js.len(), - 2, - "There should be two set of jobs for two packages where one depends on the other: {:?}", - js - ); - - { - let first_js = js.get(0).unwrap(); - assert_eq!( - first_js.set.len(), - 1, - "The first jobset should contain exactly one job: {:?}", - js - ); - - let job = first_js.set.get(0).unwrap(); - assert_eq!( - *job.package.name(), - pname("b"), - "The job from the first set should be for the package 'b': {:?}", - job - ); - } - - { - let second_js = js.get(1).unwrap(); - assert_eq!( - second_js.set.len(), - 1, - "The second jobset should contain exactly one job: {:?}", - js - ); - - let job = second_js.set.get(0).unwrap(); - assert_eq!( - *job.package.name(), - pname("a"), - "The job should be for the package 'a': {:?}", - job - ); - } - } -} diff --git a/src/job/tree.rs b/src/job/tree.rs new file mode 100644 index 0000000..883d93e --- /dev/null +++ b/src/job/tree.rs @@ -0,0 +1,80 @@ +// +// Copyright (c) 2020-2021 science+computing ag and other contributors +// +// This program and the accompanying materials are made +// available under the terms of the Eclipse Public License 2.0 +// which is available at https://www.eclipse.org/legal/epl-2.0/ +// +// SPDX-License-Identifier: EPL-2.0 +// + +use std::collections::BTreeMap; + +use uuid::Uuid; +use getset::Getters; + +use crate::job::Job; +use crate::job::JobResource; +use crate::package::PhaseName; +use crate::package::Shebang; +use crate::util::docker::ImageName; + +#[derive(Debug, Getters)] +pub struct Tree { + #[getset(get = "pub")] + inner: BTreeMap<Uuid, JobDefinition>, +} + +impl Tree { + pub fn from_package_tree(pt: crate::package::Tree, + script_shebang: Shebang, + image: ImageName, + phases: Vec<PhaseName>, + resources: Vec<JobResource>, + ) -> Self { + Tree { inner: Self::build_tree(pt, script_shebang, image, phases, resources) } + } + + fn build_tree(pt: crate::package::Tree, + script_shebang: Shebang, + image: ImageName, + phases: Vec<PhaseName>, + resources: Vec<JobResource>, + ) -> BTreeMap<Uuid, JobDefinition> { + let mut tree = BTreeMap::new(); + + for (package, dependencies) in pt.into_iter() { + let mut deps = Self::build_tree(dependencies, + script_shebang.clone(), + image.clone(), + phases.clone(), + resources.clone()); + + let deps_uuids = deps.keys().cloned().collect(); + tree.append(&mut deps); + + let job = Job::new(package, + script_shebang.clone(), + image.clone(), + phases.clone(), + resources.clone()); + + let job_uuid = job.uuid().clone(); + let jdef = JobDefinition { job, dependencies: deps_uuids }; + + tree.insert(job_uuid, jdef); + } + + tree + } + +} + +/// A job definition is the job itself and all UUIDs from jobs this job depends on. +#[derive(Debug)] +pub struct JobDefinition { + pub job: Job, + + /// Uuids of the jobs where this job depends on the outputs + pub dependencies: Vec<Uuid>, +} diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 7fec555..03a4cd2 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -12,25 +12,27 @@ use std::path::PathBuf; use std::sync::Arc; use anyhow::anyhow; -use anyhow::Context; use anyhow::Error; use anyhow::Result; use diesel::PgConnection; +use indicatif::ProgressBar; use log::trace; use tokio::sync::RwLock; +use tokio::stream::StreamExt; use typed_builder::TypedBuilder; use uuid::Uuid; use crate::config::Configuration; -use crate::db::models::Artifact; -use crate::db::models::Submit; +use crate::db::models as dbmodels; use crate::endpoint::EndpointConfiguration; use crate::endpoint::EndpointScheduler; +use crate::filestore::Artifact; use crate::filestore::MergedStores; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; -use crate::job::JobSet; +use crate::job::JobDefinition; use crate::job::RunnableJob; +use crate::job::Tree as JobTree; use crate::source::SourceCache; use crate::util::progress::ProgressBars; @@ -39,8 +41,9 @@ pub struct Orchestrator<'a> { progress_generator: ProgressBars, merged_stores: MergedStores, source_cache: SourceCache, - jobsets: Vec<JobSet>, + jobtree: JobTree, config: &'a Configuration, + database: Arc<PgConnection>, } #[derive(TypedBuilder)] @@ -50,9 +53,9 @@ pub struct OrchestratorSetup<'a> { staging_store: Arc<RwLock<StagingStore>>, release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, - jobsets: Vec<JobSet>, + jobtree: JobTree, database: Arc<PgConnection>, - submit: Submit, + submit: dbmodels::Submit, log_dir: Option<PathBuf>, config: &'a Configuration, } @@ -62,7 +65,7 @@ impl<'a> OrchestratorSetup<'a> { let scheduler = EndpointScheduler::setup( self.endpoint_config, self.staging_store.clone(), - self.database, + self.database.clone(), self.submit.clone(), self.log_dir, ) @@ -73,125 +76,162 @@ impl<'a> OrchestratorSetup<'a> { progress_generator: self.progress_generator, merged_stores: MergedStores::new(self.release_store, self.staging_store), source_cache: self.source_cache, - jobsets: self.jobsets, + jobtree: self.jobtree, config: self.config, + database: self.database, }) } } +/// Helper type +/// +/// Represents a result that came from the run of a job inside a container +/// +/// It is either a list of artifacts (with their respective database artifact objects) +/// or a UUID and an Error object, where the UUID is the job UUID and the error is the +/// anyhow::Error that was issued. +type JobResult = std::result::Result<Vec<(Artifact, dbmodels::Artifact)>, (Uuid, Error)>; + impl<'a> Orchestrator<'a> { - pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, anyhow::Error)>> { - for jobset in self.jobsets.into_iter() { - let errs = Self::run_jobset( - &self.scheduler, - &self.merged_stores, - &self.source_cache, - &self.config, - &self.progress_generator, - jobset, - output, - ) - .await?; + pub async fn run(self, output: &mut Vec<dbmodels::Artifact>) -> Result<Vec<(Uuid, Error)>> { + let (results, errors) = self.run_tree().await?; + output.extend(results.into_iter().map(|(_, dba)| dba)); + Ok(errors) + } + + async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> { + use futures::FutureExt; + + let mut already_built = vec![]; + let mut artifacts = vec![]; + let mut errors = vec![]; + + loop { + // loop{} + // until for all elements of self.jobtree, the uuid exists in already_built + // + // for each element in jobtree + // where dependencies(element) all in already_built + // run_job_for(element) + // + // for results from run_job_for calls + // remember UUID in already_built + // put built artifacts in artifacts + // if error, abort everything + // + // + let multibar = Arc::new(indicatif::MultiProgress::new()); + let build_results = self.jobtree + .inner() + .iter() + .filter(|(uuid, jobdef)| { // select all jobs where all dependencies are in `already_built` + trace!("Filtering job definition: {:?}", jobdef); + jobdef.dependencies.iter().all(|d| already_built.contains(d)) && !already_built.contains(uuid) + }) + .map(|(uuid, jobdef)| { + trace!("Running job {}", uuid); + let bar = multibar.add(self.progress_generator.bar()); + let uuid = uuid.clone(); + self.run_job(jobdef, bar).map(move |r| (uuid, r)) + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Vec<(_, Result<JobResult>)>>(); + + let multibar_block = tokio::task::spawn_blocking(move || multibar.join()); + let (_, build_results) = tokio::join!(multibar_block, build_results); + + for (uuid, artifact_result) in build_results.into_iter() { + already_built.push(uuid); + + match artifact_result { + Ok(Ok(mut arts)) => artifacts.append(&mut arts), + Ok(Err((uuid, e))) => { // error during job running + log::error!("Error for job {} = {}", uuid, e); + errors.push((uuid, e)); + }, + + Err(e) => return Err(e), // error during container execution + } + } + + if !errors.is_empty() { + break + } - if !errs.is_empty() { - return Ok(errs); + // already_built.sort(); // TODO: optimization for binary search in + // above and below contains() clause + + if self.jobtree.inner().iter().all(|(uuid, _)| already_built.contains(uuid)) { + break } } - Ok(vec![]) + Ok((artifacts, errors)) } - async fn run_jobset( - scheduler: &EndpointScheduler, - merged_store: &MergedStores, - source_cache: &SourceCache, - config: &Configuration, - progress_generator: &ProgressBars, - jobset: JobSet, - output: &mut Vec<Artifact>, - ) -> Result<Vec<(Uuid, anyhow::Error)>> { - use tokio::stream::StreamExt; - - let multibar = Arc::new(indicatif::MultiProgress::new()); - let results = jobset // run the jobs in the set - .into_runables(&merged_store, source_cache, config) - .await? - .into_iter() - .map(|runnable| { - let bar = multibar.add(progress_generator.bar()); - - async { - let uuid = *runnable.uuid(); - Self::run_runnable(runnable, scheduler, bar) - .await - .map_err(|e| (uuid, e)) - } - }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Vec<std::result::Result<Vec<Artifact>, (Uuid, Error)>>>(); - - let multibar_block = tokio::task::spawn_blocking(move || multibar.join());< |