summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/commands/build.rs5
-rw-r--r--src/filestore/merged.rs68
-rw-r--r--src/filestore/path.rs4
-rw-r--r--src/filestore/staging.rs5
-rw-r--r--src/filestore/util.rs9
-rw-r--r--src/job/mod.rs4
-rw-r--r--src/job/runnable.rs77
-rw-r--r--src/job/set.rs351
-rw-r--r--src/job/tree.rs80
-rw-r--r--src/orchestrator/orchestrator.rs262
10 files changed, 272 insertions, 593 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs
index 5cc0dc0..c9a564b 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -33,7 +33,6 @@ use crate::filestore::path::StoreRoot;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobResource;
-use crate::job::JobSet;
use crate::log::LogItem;
use crate::orchestrator::OrchestratorSetup;
use crate::package::PackageName;
@@ -306,7 +305,7 @@ pub async fn build(
trace!("Setting up job sets");
let resources: Vec<JobResource> = additional_env.into_iter().map(JobResource::from).collect();
- let jobsets = JobSet::sets_from_tree(tree, shebang, image_name, phases.clone(), resources)?;
+ let jobtree = crate::job::Tree::from_package_tree(tree, shebang, image_name, phases.clone(), resources);
trace!("Setting up job sets finished successfully");
trace!("Setting up Orchestrator");
@@ -324,7 +323,7 @@ pub async fn build(
} else {
None
})
- .jobsets(jobsets)
+ .jobtree(jobtree)
.config(config)
.build()
.setup()
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
index d1bcfe7..c8ed90d 100644
--- a/src/filestore/merged.rs
+++ b/src/filestore/merged.rs
@@ -9,18 +9,17 @@
//
use std::sync::Arc;
-
-use log::trace;
-use tokio::sync::RwLock;
+use std::path::Path;
use anyhow::Result;
use getset::Getters;
+use log::trace;
+use tokio::sync::RwLock;
use crate::filestore::Artifact;
+use crate::filestore::path::ArtifactPath;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
-use crate::package::PackageName;
-use crate::package::PackageVersionConstraint;
/// A type that merges the release store and the staging store
///
@@ -40,55 +39,22 @@ impl MergedStores {
MergedStores { release, staging }
}
- pub async fn get_artifact_by_name_and_version(
- &self,
- name: &PackageName,
- version: &PackageVersionConstraint,
- ) -> Result<Vec<Artifact>> {
- let v = self
- .staging
- .read()
- .await
- .0
- .values()
- .filter(|a| {
- trace!(
- "Checking {:?} == {:?} && {:?} == {:?}",
- a.name(),
- name,
- version,
- a.version()
- );
- a.name() == name && version.matches(a.version())
- })
- .cloned()
- .collect::<Vec<_>>();
-
- if v.is_empty() {
- Ok({
- self.release
- .read()
- .await
- .0
- .values()
- .filter(|a| a.name() == name && version.matches(a.version()))
- .cloned()
- .collect()
- })
- } else {
- Ok(v)
- }
- }
-
pub async fn get_artifact_by_path(&self, p: &Path) -> Result<Option<Artifact>> {
+ trace!("Fetching artifact from path: {:?}", p.display());
let artifact_path = ArtifactPath::new(p.to_path_buf())?;
let staging = &mut self.staging.write().await.0;
let staging_path = staging.root_path().join(&artifact_path)?;
+ trace!("staging_path = {:?}", staging_path.display());
if staging_path.exists() {
- let art_path = ArtifactPath::new(p.to_path_buf())?;
- let art = staging.load_from_path(&artifact_path)?;
+ let art = if let Some(art) = staging.get(&artifact_path) {
+ art
+ } else {
+ trace!("Loading path from staging store: {:?}", artifact_path.display());
+ staging.load_from_path(&artifact_path)?
+ };
+
return Ok(Some(art.clone()))
}
@@ -96,9 +62,15 @@ impl MergedStores {
let release = &mut self.release.write().await.0;
let release_path = release.root_path().join(&artifact_path)?;
+ trace!("release_path = {:?}", release_path);
if release_path.exists() {
- let art = release.load_from_path(&artifact_path)?;
+ let art = if let Some(art) = release.get(&artifact_path) {
+ art
+ } else {
+ trace!("Loading path from release store: {:?}", artifact_path.display());
+ release.load_from_path(&artifact_path)?
+ };
return Ok(Some(art.clone()))
}
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index 9262d1d..9ddc989 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -73,10 +73,6 @@ impl StoreRoot {
FullArtifactPath(self, ap)
}
- pub(in crate::filestore) fn is_file(&self, subpath: &Path) -> bool {
- self.0.join(subpath).is_file()
- }
-
pub(in crate::filestore) fn is_dir(&self, subpath: &Path) -> bool {
self.0.join(subpath).is_dir()
}
diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs
index 995768c..b944d84 100644
--- a/src/filestore/staging.rs
+++ b/src/filestore/staging.rs
@@ -9,7 +9,6 @@
//
use std::fmt::Debug;
-use std::path::Path;
use anyhow::anyhow;
use anyhow::Context;
@@ -101,8 +100,4 @@ impl StagingStore {
pub fn root_path(&self) -> &StoreRoot {
self.0.root_path()
}
-
- pub fn path_exists_in_store_root(&self, path: &Path) -> bool {
- self.0.path_exists_in_store_root(path)
- }
}
diff --git a/src/filestore/util.rs b/src/filestore/util.rs
index c931545..2fc1483 100644
--- a/src/filestore/util.rs
+++ b/src/filestore/util.rs
@@ -12,7 +12,6 @@
//!
use std::collections::BTreeMap;
-use std::path::Path;
use anyhow::anyhow;
use anyhow::Result;
@@ -51,12 +50,8 @@ impl FileStoreImpl {
&self.root
}
- pub fn path_exists_in_store_root(&self, p: &Path) -> bool {
- self.root.is_file(p)
- }
-
- pub(in crate::filestore) fn values(&self) -> impl Iterator<Item = &Artifact> {
- self.store.values()
+ pub fn get(&self, artifact_path: &ArtifactPath) -> Option<&Artifact> {
+ self.store.get(artifact_path)
}
pub(in crate::filestore) fn load_from_path(
diff --git a/src/job/mod.rs b/src/job/mod.rs
index 719d2da..e684935 100644
--- a/src/job/mod.rs
+++ b/src/job/mod.rs
@@ -12,8 +12,8 @@
mod job;
pub use job::*;
-mod set;
-pub use set::*;
+mod tree;
+pub use tree::*;
mod resource;
pub use resource::*;
diff --git a/src/job/runnable.rs b/src/job/runnable.rs
index 7a0fc19..620df73 100644
--- a/src/job/runnable.rs
+++ b/src/job/runnable.rs
@@ -12,25 +12,22 @@ use std::collections::HashMap;
use anyhow::anyhow;
use anyhow::Context;
-use anyhow::Error;
use anyhow::Result;
use getset::Getters;
-use log::{debug, trace, warn};
-use tokio::stream::StreamExt;
+use log::debug;
use uuid::Uuid;
use crate::config::Configuration;
-use crate::filestore::MergedStores;
+use crate::filestore::Artifact;
use crate::job::Job;
use crate::job::JobResource;
use crate::package::Package;
-use crate::package::ParseDependency;
use crate::package::Script;
use crate::package::ScriptBuilder;
use crate::source::SourceCache;
use crate::source::SourceEntry;
-use crate::util::docker::ImageName;
use crate::util::EnvironmentVariableName;
+use crate::util::docker::ImageName;
/// A job configuration that can be run. All inputs are clear here.
#[derive(Debug, Getters)]
@@ -56,39 +53,22 @@ pub struct RunnableJob {
impl RunnableJob {
pub async fn build_from_job(
- job: Job,
- merged_stores: &MergedStores,
+ job: &Job,
source_cache: &SourceCache,
config: &Configuration,
+ dependencies: Vec<Artifact>,
) -> Result<Self> {
- trace!("Preparing build dependencies");
- let resources = {
- let mut resources = job
- .package()
- .dependencies()
- .build()
- .iter()
- .map(|dep| Self::build_resource(dep, merged_stores))
- .chain({
- job.package()
- .dependencies()
- .runtime()
- .iter()
- .map(|dep| Self::build_resource(dep, merged_stores))
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<JobResource>>>()
- .await?;
-
- resources.extend({
+ // Add the environment from the original Job object to the resources
+ let resources = dependencies
+ .into_iter()
+ .map(JobResource::from)
+ .chain({
job.resources()
.iter()
.filter(|jr| jr.env().is_some())
.cloned()
- });
-
- resources
- };
+ })
+ .collect();
if config.containers().check_env_names() {
debug!("Checking environment if all variables are allowed!");
@@ -121,9 +101,9 @@ impl RunnableJob {
)?;
Ok(RunnableJob {
- uuid: job.uuid,
- package: job.package,
- image: job.image,
+ uuid: job.uuid.clone(),
+ package: job.package.clone(),
+ image: job.image.clone(),
resources,
source_cache: source_cache.clone(),
@@ -179,31 +159,4 @@ impl RunnableJob {
]
}
- async fn build_resource(
- dep: &dyn ParseDependency,
- merged_stores: &MergedStores,
- ) -> Result<JobResource> {
- let (name, vers) = dep.parse_as_name_and_version()?;
- trace!("Copying dep: {:?} {:?}", name, vers);
- let mut a = merged_stores
- .get_artifact_by_name_and_version(&name, &vers)
- .await?;
-
- if a.is_empty() {
- Err(anyhow!("Cannot find dependency: {:?} {:?}", name, vers))
- .context("Building a runnable job")
- .map_err(Error::from)
- } else {
- a.sort();
- let a_len = a.len();
- let found_dependency = a.into_iter().next().unwrap(); // save by above check
- if a_len > 1 {
- warn!("Found more than one dependency for {:?} {:?}", name, vers);
- warn!("Using: {:?}", found_dependency);
- warn!("Please investigate, this might be a BUG");
- }
-
- Ok(JobResource::Artifact(found_dependency))
- }
- }
}
diff --git a/src/job/set.rs b/src/job/set.rs
deleted file mode 100644
index b9e3881..0000000
--- a/src/job/set.rs
+++ /dev/null
@@ -1,351 +0,0 @@
-//
-// Copyright (c) 2020-2021 science+computing ag and other contributors
-//
-// This program and the accompanying materials are made
-// available under the terms of the Eclipse Public License 2.0
-// which is available at https://www.eclipse.org/legal/epl-2.0/
-//
-// SPDX-License-Identifier: EPL-2.0
-//
-
-use anyhow::Result;
-use log::{debug, trace};
-use tokio::stream::StreamExt;
-
-use crate::config::Configuration;
-use crate::filestore::MergedStores;
-use crate::job::Job;
-use crate::job::JobResource;
-use crate::job::RunnableJob;
-use crate::package::PhaseName;
-use crate::package::Shebang;
-use crate::package::Tree;
-use crate::source::SourceCache;
-use crate::util::docker::ImageName;
-
-/// A set of jobs that could theoretically be run in parallel
-#[derive(Debug)]
-pub struct JobSet {
- set: Vec<Job>,
-}
-
-impl JobSet {
- pub fn sets_from_tree(
- t: Tree,
- shebang: Shebang,
- image: ImageName,
- phases: Vec<PhaseName>,
- resources: Vec<JobResource>,
- ) -> Result<Vec<JobSet>> {
- tree_into_jobsets(t, shebang, image, phases, resources)
- }
-
- fn is_empty(&self) -> bool {
- self.set.is_empty()
- }
-
- pub async fn into_runables<'a>(
- self,
- merged_stores: &'a MergedStores,
- source_cache: &'a SourceCache,
- config: &Configuration,
- ) -> Result<Vec<RunnableJob>> {
- self.set
- .into_iter()
- .map(move |j| RunnableJob::build_from_job(j, merged_stores, source_cache, config))
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<RunnableJob>>>()
- .await
- }
-}
-
-/// Get the tree as sets of jobs, the deepest level of the tree first
-fn tree_into_jobsets(
- tree: Tree,
- shebang: Shebang,
- image: ImageName,
- phases: Vec<PhaseName>,
- resources: Vec<JobResource>,
-) -> Result<Vec<JobSet>> {
- fn inner(
- tree: Tree,
- shebang: &Shebang,
- image: &ImageName,
- phases: &[PhaseName],
- resources: &[JobResource],
- ) -> Result<Vec<JobSet>> {
- trace!("Creating jobsets for tree: {:?}", tree);
-
- let mut sets = vec![];
- let mut current_set = vec![];
-
- for (package, dep) in tree.into_iter() {
- trace!("Recursing for package: {:?}", package);
- let mut sub_sets = inner(dep, shebang, image, phases, resources)?; // recursion!
- sets.append(&mut sub_sets);
- current_set.push(package);
- }
-
- debug!("Jobset for set: {:?}", current_set);
- let jobset = JobSet {
- set: current_set
- .into_iter()
- .map(|package| {
- Job::new(
- package,
- shebang.clone(),
- image.clone(),
- phases.to_vec(),
- resources.to_vec(),
- )
- })
- .collect(),
- };
- debug!("Jobset = {:?}", jobset);
-
- // make sure the current recursion is added _before_ all other recursions
- // which yields the highest level in the tree as _first_ element of the resulting vector
- let mut result = Vec::new();
- if !jobset.is_empty() {
- debug!("Adding jobset: {:?}", jobset);
- result.push(jobset)
- }
- result.append(&mut sets);
- debug!("Result = {:?}", result);
- Ok(result)
- }
-
- inner(tree, &shebang, &image, &phases, &resources).map(|mut v| {
- // reverse, because the highest level in the tree is added as first element in the vector
- // and the deepest level is last.
- //
- // After reversing, we have a chain of things to build. Awesome, huh?
- v.reverse();
- v
- })
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- use std::collections::BTreeMap;
-
- use crate::package::tests::package;
- use crate::package::tests::pname;
- use crate::package::tests::pversion;
- use crate::package::Dependencies;
- use crate::package::Dependency;
- use crate::package::PhaseName;
- use crate::repository::Repository;
- use crate::util::docker::ImageName;
-
- use indicatif::ProgressBar;
-
- fn setup_logging() {
- let _ = ::env_logger::try_init();
- }
-
- #[test]
- fn test_one_element_tree_to_jobsets() {
- setup_logging();
- let mut btree = BTreeMap::new();
-
- let p1 = {
- let name = "a";
- let vers = "1";
- let pack = package(name, vers, "https://rust-lang.org", "123");
- btree.insert((pname(name), pversion(vers)), pack.clone());
- pack
- };
-
- let repo = Repository::from(btree);
- let progress = ProgressBar::hidden();
-
- let mut tree = Tree::default();
- let r = tree.add_package(p1, &repo, progress);
- assert!(r.is_ok());
-
- let image = ImageName::from(String::from("test"));
- let phases = vec![PhaseName::from(String::from("testphase"))];
- let shebang = Shebang::from(String::from("#!/bin/bash"));
-
- let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]);
- assert!(js.is_ok());
- let js = js.unwrap();
-
- assert_eq!(js.len(), 1, "There should be only one jobset if there is only one element in the dependency tree: {:?}", js);
-
- let js = js.get(0).unwrap();
- assert_eq!(
- js.set.len(),
- 1,
- "The jobset should contain exactly one job: {:?}",
- js
- );
-
- let job = js.set.get(0).unwrap();
- assert_eq!(
- *job.package.name(),
- pname("a"),
- "The job should be for the package 'a': {:?}",
- job
- );
- }
-
- #[test]
- fn test_two_element_tree_to_jobsets() {
- setup_logging();
- let mut btree = BTreeMap::new();
-
- let p1 = {
- let name = "a";
- let vers = "1";
- let pack = package(name, vers, "https://rust-lang.org", "123");
- btree.insert((pname(name), pversion(vers)), pack.clone());
- pack
- };
-
- let p2 = {
- let name = "b";
- let vers = "2";
- let pack = package(name, vers, "https://rust-lang.org", "124");
- btree.insert((pname(name), pversion(vers)), pack.clone());
- pack
- };
-
- let repo = Repository::from(btree);
- let progress = ProgressBar::hidden();
-
- let mut tree = Tree::default();
- let r = tree.add_package(p1, &repo, progress.clone());
- assert!(r.is_ok());
-
- let r = tree.add_package(p2, &repo, progress);
- assert!(r.is_ok());
-
- let image = ImageName::from(String::from("test"));
- let phases = vec![PhaseName::from(String::from("testphase"))];
- let shebang = Shebang::from(String::from("#!/bin/bash"));
-
- let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]);
- assert!(js.is_ok());
- let js = js.unwrap();
-
- assert_eq!(
- js.len(),
- 1,
- "There should be one set of jobs for two packages on the same level in the tree: {:?}",
- js
- );
-
- let js = js.get(0).unwrap();
- assert_eq!(
- js.set.len(),
- 2,
- "The jobset should contain exactly two jobs: {:?}",
- js
- );
-
- let job = js.set.get(0).unwrap();
- assert_eq!(
- *job.package.name(),
- pname("a"),
- "The job should be for the package 'a': {:?}",
- job
- );
-
- let job = js.set.get(1).unwrap();
- assert_eq!(
- *job.package.name(),
- pname("b"),
- "The job should be for the package 'a': {:?}",
- job
- );
- }
-
- #[test]
- fn test_two_dependent_elements_to_jobsets() {
- setup_logging();
- let mut btree = BTreeMap::new();
-
- let p1 = {
- let name = "a";
- let vers = "1";
- let mut pack = package(name, vers, "https://rust-lang.org", "123");
- {
- let d1 = Dependency::from(String::from("b =2"));
- let ds = Dependencies::with_runtime_dependencies(vec![d1]);
- pack.set_dependencies(ds);
- }
- btree.insert((pname(name), pversion(vers)), pack.clone());
- pack
- };
-
- let _ = {
- let name = "b";
- let vers = "2";
- let pack = package(name, vers, "https://rust-lang.org", "124");
- btree.insert((pname(name), pversion(vers)), pack.clone());
- pack
- };
-
- let repo = Repository::from(btree);
- let progress = ProgressBar::hidden();
-
- let mut tree = Tree::default();
- let r = tree.add_package(p1, &repo, progress);
- assert!(r.is_ok());
-
- let image = ImageName::from(String::from("test"));
- let phases = vec![PhaseName::from(String::from("testphase"))];
- let shebang = Shebang::from(String::from("#!/bin/bash"));
-
- let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]);
- assert!(js.is_ok());
- let js = js.unwrap();
-
- assert_eq!(
- js.len(),
- 2,
- "There should be two set of jobs for two packages where one depends on the other: {:?}",
- js
- );
-
- {
- let first_js = js.get(0).unwrap();
- assert_eq!(
- first_js.set.len(),
- 1,
- "The first jobset should contain exactly one job: {:?}",
- js
- );
-
- let job = first_js.set.get(0).unwrap();
- assert_eq!(
- *job.package.name(),
- pname("b"),
- "The job from the first set should be for the package 'b': {:?}",
- job
- );
- }
-
- {
- let second_js = js.get(1).unwrap();
- assert_eq!(
- second_js.set.len(),
- 1,
- "The second jobset should contain exactly one job: {:?}",
- js
- );
-
- let job = second_js.set.get(0).unwrap();
- assert_eq!(
- *job.package.name(),
- pname("a"),
- "The job should be for the package 'a': {:?}",
- job
- );
- }
- }
-}
diff --git a/src/job/tree.rs b/src/job/tree.rs
new file mode 100644
index 0000000..883d93e
--- /dev/null
+++ b/src/job/tree.rs
@@ -0,0 +1,80 @@
+//
+// Copyright (c) 2020-2021 science+computing ag and other contributors
+//
+// This program and the accompanying materials are made
+// available under the terms of the Eclipse Public License 2.0
+// which is available at https://www.eclipse.org/legal/epl-2.0/
+//
+// SPDX-License-Identifier: EPL-2.0
+//
+
+use std::collections::BTreeMap;
+
+use uuid::Uuid;
+use getset::Getters;
+
+use crate::job::Job;
+use crate::job::JobResource;
+use crate::package::PhaseName;
+use crate::package::Shebang;
+use crate::util::docker::ImageName;
+
+#[derive(Debug, Getters)]
+pub struct Tree {
+ #[getset(get = "pub")]
+ inner: BTreeMap<Uuid, JobDefinition>,
+}
+
+impl Tree {
+ pub fn from_package_tree(pt: crate::package::Tree,
+ script_shebang: Shebang,
+ image: ImageName,
+ phases: Vec<PhaseName>,
+ resources: Vec<JobResource>,
+ ) -> Self {
+ Tree { inner: Self::build_tree(pt, script_shebang, image, phases, resources) }
+ }
+
+ fn build_tree(pt: crate::package::Tree,
+ script_shebang: Shebang,
+ image: ImageName,
+ phases: Vec<PhaseName>,
+ resources: Vec<JobResource>,
+ ) -> BTreeMap<Uuid, JobDefinition> {
+ let mut tree = BTreeMap::new();
+
+ for (package, dependencies) in pt.into_iter() {
+ let mut deps = Self::build_tree(dependencies,
+ script_shebang.clone(),
+ image.clone(),
+ phases.clone(),
+ resources.clone());
+
+ let deps_uuids = deps.keys().cloned().collect();
+ tree.append(&mut deps);
+
+ let job = Job::new(package,
+ script_shebang.clone(),
+ image.clone(),
+ phases.clone(),
+ resources.clone());
+
+ let job_uuid = job.uuid().clone();
+ let jdef = JobDefinition { job, dependencies: deps_uuids };
+
+ tree.insert(job_uuid, jdef);
+ }
+
+ tree
+ }
+
+}
+
+/// A job definition is the job itself and all UUIDs from jobs this job depends on.
+#[derive(Debug)]
+pub struct JobDefinition {
+ pub job: Job,
+
+ /// Uuids of the jobs where this job depends on the outputs
+ pub dependencies: Vec<Uuid>,
+}
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 7fec555..03a4cd2 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -12,25 +12,27 @@ use std::path::PathBuf;
use std::sync::Arc;
use anyhow::anyhow;
-use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use diesel::PgConnection;
+use indicatif::ProgressBar;
use log::trace;
use tokio::sync::RwLock;
+use tokio::stream::StreamExt;
use typed_builder::TypedBuilder;
use uuid::Uuid;
use crate::config::Configuration;
-use crate::db::models::Artifact;
-use crate::db::models::Submit;
+use crate::db::models as dbmodels;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
+use crate::filestore::Artifact;
use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
-use crate::job::JobSet;
+use crate::job::JobDefinition;
use crate::job::RunnableJob;
+use crate::job::Tree as JobTree;
use crate::source::SourceCache;
use crate::util::progress::ProgressBars;
@@ -39,8 +41,9 @@ pub struct Orchestrator<'a> {
progress_generator: ProgressBars,
merged_stores: MergedStores,
source_cache: SourceCache,
- jobsets: Vec<JobSet>,
+ jobtree: JobTree,
config: &'a Configuration,
+ database: Arc<PgConnection>,
}
#[derive(TypedBuilder)]
@@ -50,9 +53,9 @@ pub struct OrchestratorSetup<'a> {
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
- jobsets: Vec<JobSet>,
+ jobtree: JobTree,
database: Arc<PgConnection>,
- submit: Submit,
+ submit: dbmodels::Submit,
log_dir: Option<PathBuf>,
config: &'a Configuration,
}
@@ -62,7 +65,7 @@ impl<'a> OrchestratorSetup<'a> {
let scheduler = EndpointScheduler::setup(
self.endpoint_config,
self.staging_store.clone(),
- self.database,
+ self.database.clone(),
self.submit.clone(),
self.log_dir,
)
@@ -73,125 +76,162 @@ impl<'a> OrchestratorSetup<'a> {
progress_generator: self.progress_generator,
merged_stores: MergedStores::new(self.release_store, self.staging_store),
source_cache: self.source_cache,
- jobsets: self.jobsets,
+ jobtree: self.jobtree,
config: self.config,
+ database: self.database,
})
}
}
+/// Helper type
+///
+/// Represents a result that came from the run of a job inside a container
+///
+/// It is either a list of artifacts (with their respective database artifact objects)
+/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
+/// anyhow::Error that was issued.
+type JobResult = std::result::Result<Vec<(Artifact, dbmodels::Artifact)>, (Uuid, Error)>;
+
impl<'a> Orchestrator<'a> {
- pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, anyhow::Error)>> {
- for jobset in self.jobsets.into_iter() {
- let errs = Self::run_jobset(
- &self.scheduler,
- &self.merged_stores,
- &self.source_cache,
- &self.config,
- &self.progress_generator,
- jobset,
- output,
- )
- .await?;
+ pub async fn run(self, output: &mut Vec<dbmodels::Artifact>) -> Result<Vec<(Uuid, Error)>> {
+ let (results, errors) = self.run_tree().await?;
+ output.extend(results.into_iter().map(|(_, dba)| dba));
+ Ok(errors)
+ }
+
+ async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> {
+ use futures::FutureExt;
+
+ let mut already_built = vec![];
+ let mut artifacts = vec![];
+ let mut errors = vec![];
+
+ loop {
+ // loop{}
+ // until for all elements of self.jobtree, the uuid exists in already_built
+ //
+ // for each element in jobtree
+ // where dependencies(element) all in already_built
+ // run_job_for(element)
+ //
+ // for results from run_job_for calls
+ // remember UUID in already_built
+ // put built artifacts in artifacts
+ // if error, abort everything
+ //
+ //
+ let multibar = Arc::new(indicatif::MultiProgress::new());
+ let build_results = self.jobtree
+ .inner()
+ .iter()
+ .filter(|(uuid, jobdef)| { // select all jobs where all dependencies are in `already_built`
+ trace!("Filtering job definition: {:?}", jobdef);
+ jobdef.dependencies.iter().all(|d| already_built.contains(d)) && !already_built.contains(uuid)
+ })
+ .map(|(uuid, jobdef)| {
+ trace!("Running job {}", uuid);
+ let bar = multibar.add(self.progress_generator.bar());
+ let uuid = uuid.clone();
+ self.run_job(jobdef, bar).map(move |r| (uuid, r))
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<(_, Result<JobResult>)>>();
+
+ let multibar_block = tokio::task::spawn_blocking(move || multibar.join());
+ let (_, build_results) = tokio::join!(multibar_block, build_results);
+
+ for (uuid, artifact_result) in build_results.into_iter() {
+ already_built.push(uuid);
+
+ match artifact_result {
+ Ok(Ok(mut arts)) => artifacts.append(&mut arts),
+ Ok(Err((uuid, e))) => { // error during job running
+ log::error!("Error for job {} = {}", uuid, e);
+ errors.push((uuid, e));
+ },
+
+ Err(e) => return Err(e), // error during container execution
+ }
+ }
+
+ if !errors.is_empty() {
+ break
+ }
- if !errs.is_empty() {
- return Ok(errs);
+ // already_built.sort(); // TODO: optimization for binary search in
+ // above and below contains() clause
+
+ if self.jobtree.inner().iter().all(|(uuid, _)| already_built.contains(uuid)) {
+ break
}
}
- Ok(vec![])
+ Ok((artifacts, errors))
}
- async fn run_jobset(
- scheduler: &EndpointScheduler,
- merged_store: &MergedStores,
- source_cache: &SourceCache,
- config: &Configuration,
- progress_generator: &ProgressBars,
- jobset: JobSet,
- output: &mut Vec<Artifact>,
- ) -> Result<Vec<(Uuid, anyhow::Error)>> {
- use tokio::stream::StreamExt;
-
- let multibar = Arc::new(indicatif::MultiProgress::new());
- let results = jobset // run the jobs in the set
- .into_runables(&merged_store, source_cache, config)
- .await?
- .into_iter()
- .map(|runnable| {
- let bar = multibar.add(progress_generator.bar());
-
- async {
- let uuid = *runnable.uuid();
- Self::run_runnable(runnable, scheduler, bar)
- .await
- .map_err(|e| (uuid, e))
- }
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Vec<std::result::Result<Vec<Artifact>, (Uuid, Error)>>>();
-
- let multibar_block = tokio::task::spawn_blocking(move || multibar.join());<