summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-01-20 11:08:29 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-01-21 14:56:03 +0100
commit889649ac16367fe671ce61363bb6ce82531e5a6b (patch)
tree4b24f1e6036a7b14343101778c26b55a6386fecb /src
parent4cb9ace3e4506acb2d60dbc8688b29c9b3f85a69 (diff)
Reimplement: Orchestrator::run()
This patch reimplements the running of the computed jobs. The old implementation was structured as follows: 1. Compute a Tree of dependencies for the requested package 2. Make sets of this tree (see below) 3. For each set 3.1. Run set in parallel by submitting each job in the set to the scheduler 3.2. collect outputs and errors 3.3. Record outputs and return errors (if any) The complexity here was the computing of the JobSets but also the running of each job in a set in parallel. The code was non-trivial to understand. But that's not even the biggest concern with this approch. Consider the following tree of jobs: A / \ B E / \ \ C D F / \ G H \ I Each node here represents a package, the edges represent dependencies on the lower-hanging package. This tree would result in 5 sets of jobs: [ [ I ] [ G, H ] [ C, D, F ] [ B, E ] [ A ] ] because each "layer" in the tree would be run in parallel. It can be easily seen, that in the tree from above, the jobs for [ I, G, D, C ] can be run in parallel easily, because they do not have dependencies. The reimplementation also has another (crucial) benefit: The implementation does not depend on a structure of artifact path names anymore. Before, the artifacts needed to have a name as follows: <name of the package>-<version of the package>.<something> which was extremely restrictive. With the changes from this patch, the implementation does not depend on such a format anymore. Instead: Dependencies are associated with a job, by the output of jobs run for dependent packages. That means that, considering the above tree of packages: deps_of(B) = outputs_of(job_for(C)) + outputs_of(job_for(D)) in text: The dependencies of package B are the outputs of the job run for package C plus the outputs of the job run for package D. With that change in place, the outputs of a job run for a package can yield arbitrary file names and as long as the build script for the package can process them, everything is fine. The new algorithm, that solves that issue, is rather simple: 1. Hold a list of errors 2. Hold a list of artifacts that were built 3. Hold a list of jobs that were run 4. Iterate over all jobs, filtered by - If the job appears in the "already run jobs" list, ignore it - If a job has dependencies (on outputs of other jobs) that do not appear in the "already run jobs", ignore it (for now) 5. Run these jobs, and for each job: 5.1. Take the job UUID and put it in the "already run jobs" list. 5.2. Take the result of the job, 5.2.1. if it is an error, put it in the "list of errors" 5.2.2. if it is ok, put the artifact in the "list of artifacts" 6. if the list of errors is not empty, goto 9 7. if all jobs are in the "already run jobs" list, goto 9 8. goto 4 9. return all artifacts and all errors Because this approach is fundamentally different than the previous approach, a lot of things had to be rewritten: - The `JobSet` type was complete removed - There is a new type `crate::job:Tree` that gets built from the `crate::package::Tree` It is a mapping of a UUID (the job UUID) to a `JobDefinition`. The `JobDefinition` type is - A Job - A list of UUIDs of other jobs, where this job depends on the outputs It is therefore a mapping of `Job -> outputs(jobs_of(dependencies)` The `crate::job::Tree` type is now responsible for building a `Job` object for each `crate::package::Package` object from the `crate::package::Tree` object. Because the `crate::package::Tree` object contains all required packages for the complete built, the implementation of `crate::job::Tree::build_tree()` does not check sanity. It is assumed that the input tree to the function contains all mappings. Despite the name `crate::job::Tree` ("Tree"), the actual structure stored in the type is not a real tree. - The `MergedStores::get_artifact_by_path()` function was adapted because in the previous implementation, it used `StagingStore::load_from_path()`, which tried to load the file from the filesystem and put it into the internal map, which failed if it was already there. The adaption checks if the artifact already exists in the internal map and returns that object instead. (For the release store accordingly) - The interface of the `RunnableJob::build_from_job()` function was adapted, as this function does not need to access the `MergedStores` object anymore to load dependency-Artifacts from the filesystem. Instead, these Artifacts are passed to the function now. - The Orchestrator code - Got a type alias `JobResult` which represents the result of a job run wich is either - A number of artifacts (for optimization reasons with their associated database artifact entry) - or an error with the job uuid that failed (again, for optimization reasons) - Got an implementation of the algorithm described above - Got a new implementation of run_job(), which - Fetches the pathes of dependency-artifacts from the database by using the job uuids from the JobDefinition object - Creates the RunnableJob object for that - Schedules the RunnableJob object in the scheduler - For each output artifact (database object representing it) - get the filesystem Artifact object for it Signed-off-by: Matthias Beyer <matthias.beyer@atos.net> Tested-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src')
-rw-r--r--src/commands/build.rs5
-rw-r--r--src/filestore/merged.rs68
-rw-r--r--src/filestore/path.rs4
-rw-r--r--src/filestore/staging.rs5
-rw-r--r--src/filestore/util.rs9
-rw-r--r--src/job/mod.rs4
-rw-r--r--src/job/runnable.rs77
-rw-r--r--src/job/set.rs351
-rw-r--r--src/job/tree.rs80
-rw-r--r--src/orchestrator/orchestrator.rs262
10 files changed, 272 insertions, 593 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs
index 5cc0dc0..c9a564b 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -33,7 +33,6 @@ use crate::filestore::path::StoreRoot;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobResource;
-use crate::job::JobSet;
use crate::log::LogItem;
use crate::orchestrator::OrchestratorSetup;
use crate::package::PackageName;
@@ -306,7 +305,7 @@ pub async fn build(
trace!("Setting up job sets");
let resources: Vec<JobResource> = additional_env.into_iter().map(JobResource::from).collect();
- let jobsets = JobSet::sets_from_tree(tree, shebang, image_name, phases.clone(), resources)?;
+ let jobtree = crate::job::Tree::from_package_tree(tree, shebang, image_name, phases.clone(), resources);
trace!("Setting up job sets finished successfully");
trace!("Setting up Orchestrator");
@@ -324,7 +323,7 @@ pub async fn build(
} else {
None
})
- .jobsets(jobsets)
+ .jobtree(jobtree)
.config(config)
.build()
.setup()
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
index d1bcfe7..c8ed90d 100644
--- a/src/filestore/merged.rs
+++ b/src/filestore/merged.rs
@@ -9,18 +9,17 @@
//
use std::sync::Arc;
-
-use log::trace;
-use tokio::sync::RwLock;
+use std::path::Path;
use anyhow::Result;
use getset::Getters;
+use log::trace;
+use tokio::sync::RwLock;
use crate::filestore::Artifact;
+use crate::filestore::path::ArtifactPath;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
-use crate::package::PackageName;
-use crate::package::PackageVersionConstraint;
/// A type that merges the release store and the staging store
///
@@ -40,55 +39,22 @@ impl MergedStores {
MergedStores { release, staging }
}
- pub async fn get_artifact_by_name_and_version(
- &self,
- name: &PackageName,
- version: &PackageVersionConstraint,
- ) -> Result<Vec<Artifact>> {
- let v = self
- .staging
- .read()
- .await
- .0
- .values()
- .filter(|a| {
- trace!(
- "Checking {:?} == {:?} && {:?} == {:?}",
- a.name(),
- name,
- version,
- a.version()
- );
- a.name() == name && version.matches(a.version())
- })
- .cloned()
- .collect::<Vec<_>>();
-
- if v.is_empty() {
- Ok({
- self.release
- .read()
- .await
- .0
- .values()
- .filter(|a| a.name() == name && version.matches(a.version()))
- .cloned()
- .collect()
- })
- } else {
- Ok(v)
- }
- }
-
pub async fn get_artifact_by_path(&self, p: &Path) -> Result<Option<Artifact>> {
+ trace!("Fetching artifact from path: {:?}", p.display());
let artifact_path = ArtifactPath::new(p.to_path_buf())?;
let staging = &mut self.staging.write().await.0;
let staging_path = staging.root_path().join(&artifact_path)?;
+ trace!("staging_path = {:?}", staging_path.display());
if staging_path.exists() {
- let art_path = ArtifactPath::new(p.to_path_buf())?;
- let art = staging.load_from_path(&artifact_path)?;
+ let art = if let Some(art) = staging.get(&artifact_path) {
+ art
+ } else {
+ trace!("Loading path from staging store: {:?}", artifact_path.display());
+ staging.load_from_path(&artifact_path)?
+ };
+
return Ok(Some(art.clone()))
}
@@ -96,9 +62,15 @@ impl MergedStores {
let release = &mut self.release.write().await.0;
let release_path = release.root_path().join(&artifact_path)?;
+ trace!("release_path = {:?}", release_path);
if release_path.exists() {
- let art = release.load_from_path(&artifact_path)?;
+ let art = if let Some(art) = release.get(&artifact_path) {
+ art
+ } else {
+ trace!("Loading path from release store: {:?}", artifact_path.display());
+ release.load_from_path(&artifact_path)?
+ };
return Ok(Some(art.clone()))
}
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index 9262d1d..9ddc989 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -73,10 +73,6 @@ impl StoreRoot {
FullArtifactPath(self, ap)
}
- pub(in crate::filestore) fn is_file(&self, subpath: &Path) -> bool {
- self.0.join(subpath).is_file()
- }
-
pub(in crate::filestore) fn is_dir(&self, subpath: &Path) -> bool {
self.0.join(subpath).is_dir()
}
diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs
index 995768c..b944d84 100644
--- a/src/filestore/staging.rs
+++ b/src/filestore/staging.rs
@@ -9,7 +9,6 @@
//
use std::fmt::Debug;
-use std::path::Path;
use anyhow::anyhow;
use anyhow::Context;
@@ -101,8 +100,4 @@ impl StagingStore {
pub fn root_path(&self) -> &StoreRoot {
self.0.root_path()
}
-
- pub fn path_exists_in_store_root(&self, path: &Path) -> bool {
- self.0.path_exists_in_store_root(path)
- }
}
diff --git a/src/filestore/util.rs b/src/filestore/util.rs
index c931545..2fc1483 100644
--- a/src/filestore/util.rs
+++ b/src/filestore/util.rs
@@ -12,7 +12,6 @@
//!
use std::collections::BTreeMap;
-use std::path::Path;
use anyhow::anyhow;
use anyhow::Result;
@@ -51,12 +50,8 @@ impl FileStoreImpl {
&self.root
}
- pub fn path_exists_in_store_root(&self, p: &Path) -> bool {
- self.root.is_file(p)
- }
-
- pub(in crate::filestore) fn values(&self) -> impl Iterator<Item = &Artifact> {
- self.store.values()
+ pub fn get(&self, artifact_path: &ArtifactPath) -> Option<&Artifact> {
+ self.store.get(artifact_path)
}
pub(in crate::filestore) fn load_from_path(
diff --git a/src/job/mod.rs b/src/job/mod.rs
index 719d2da..e684935 100644
--- a/src/job/mod.rs
+++ b/src/job/mod.rs
@@ -12,8 +12,8 @@
mod job;
pub use job::*;
-mod set;
-pub use set::*;
+mod tree;
+pub use tree::*;
mod resource;
pub use resource::*;
diff --git a/src/job/runnable.rs b/src/job/runnable.rs
index 7a0fc19..620df73 100644
--- a/src/job/runnable.rs
+++ b/src/job/runnable.rs
@@ -12,25 +12,22 @@ use std::collections::HashMap;
use anyhow::anyhow;
use anyhow::Context;
-use anyhow::Error;
use anyhow::Result;
use getset::Getters;
-use log::{debug, trace, warn};
-use tokio::stream::StreamExt;
+use log::debug;
use uuid::Uuid;
use crate::config::Configuration;
-use crate::filestore::MergedStores;
+use crate::filestore::Artifact;
use crate::job::Job;
use crate::job::JobResource;
use crate::package::Package;
-use crate::package::ParseDependency;
use crate::package::Script;
use crate::package::ScriptBuilder;
use crate::source::SourceCache;
use crate::source::SourceEntry;
-use crate::util::docker::ImageName;
use crate::util::EnvironmentVariableName;
+use crate::util::docker::ImageName;
/// A job configuration that can be run. All inputs are clear here.
#[derive(Debug, Getters)]
@@ -56,39 +53,22 @@ pub struct RunnableJob {
impl RunnableJob {
pub async fn build_from_job(
- job: Job,
- merged_stores: &MergedStores,
+ job: &Job,
source_cache: &SourceCache,
config: &Configuration,
+ dependencies: Vec<Artifact>,
) -> Result<Self> {
- trace!("Preparing build dependencies");
- let resources = {
- let mut resources = job
- .package()
- .dependencies()
- .build()
- .iter()
- .map(|dep| Self::build_resource(dep, merged_stores))
- .chain({
- job.package()
- .dependencies()
- .runtime()
- .iter()
- .map(|dep| Self::build_resource(dep, merged_stores))
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<JobResource>>>()
- .await?;
-
- resources.extend({
+ // Add the environment from the original Job object to the resources
+ let resources = dependencies
+ .into_iter()
+ .map(JobResource::from)
+ .chain({
job.resources()
.iter()
.filter(|jr| jr.env().is_some())
.cloned()
- });
-
- resources
- };
+ })
+ .collect();
if config.containers().check_env_names() {
debug!("Checking environment if all variables are allowed!");
@@ -121,9 +101,9 @@ impl RunnableJob {
)?;
Ok(RunnableJob {
- uuid: job.uuid,
- package: job.package,
- image: job.image,
+ uuid: job.uuid.clone(),
+ package: job.package.clone(),
+ image: job.image.clone(),
resources,
source_cache: source_cache.clone(),
@@ -179,31 +159,4 @@ impl RunnableJob {
]
}
- async fn build_resource(
- dep: &dyn ParseDependency,
- merged_stores: &MergedStores,
- ) -> Result<JobResource> {
- let (name, vers) = dep.parse_as_name_and_version()?;
- trace!("Copying dep: {:?} {:?}", name, vers);
- let mut a = merged_stores
- .get_artifact_by_name_and_version(&name, &vers)
- .await?;
-
- if a.is_empty() {
- Err(anyhow!("Cannot find dependency: {:?} {:?}", name, vers))
- .context("Building a runnable job")
- .map_err(Error::from)
- } else {
- a.sort();
- let a_len = a.len();
- let found_dependency = a.into_iter().next().unwrap(); // save by above check
- if a_len > 1 {
- warn!("Found more than one dependency for {:?} {:?}", name, vers);
- warn!("Using: {:?}", found_dependency);
- warn!("Please investigate, this might be a BUG");
- }
-
- Ok(JobResource::Artifact(found_dependency))
- }
- }
}
diff --git a/src/job/set.rs b/src/job/set.rs
deleted file mode 100644
index b9e3881..0000000
--- a/src/job/set.rs
+++ /dev/null
@@ -1,351 +0,0 @@
-//
-// Copyright (c) 2020-2021 science+computing ag and other contributors
-//
-// This program and the accompanying materials are made
-// available under the terms of the Eclipse Public License 2.0
-// which is available at https://www.eclipse.org/legal/epl-2.0/
-//
-// SPDX-License-Identifier: EPL-2.0
-//
-
-use anyhow::Result;
-use log::{debug, trace};
-use tokio::stream::StreamExt;
-
-use crate::config::Configuration;
-use crate::filestore::MergedStores;
-use crate::job::Job;
-use crate::job::JobResource;
-use crate::job::RunnableJob;
-use crate::package::PhaseName;
-use crate::package::Shebang;
-use crate::package::Tree;
-use crate::source::SourceCache;
-use crate::util::docker::ImageName;
-
-/// A set of jobs that could theoretically be run in parallel
-#[derive(Debug)]
-pub struct JobSet {
- set: Vec<Job>,
-}
-
-impl JobSet {
- pub fn sets_from_tree(
- t: Tree,
- shebang: Shebang,
- image: ImageName,
- phases: Vec<PhaseName>,
- resources: Vec<JobResource>,
- ) -> Result<Vec<JobSet>> {
- tree_into_jobsets(t, shebang, image, phases, resources)
- }
-
- fn is_empty(&self) -> bool {
- self.set.is_empty()
- }
-
- pub async fn into_runables<'a>(
- self,
- merged_stores: &'a MergedStores,
- source_cache: &'a SourceCache,
- config: &Configuration,
- ) -> Result<Vec<RunnableJob>> {
- self.set
- .into_iter()
- .map(move |j| RunnableJob::build_from_job(j, merged_stores, source_cache, config))
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<RunnableJob>>>()
- .await
- }
-}
-
-/// Get the tree as sets of jobs, the deepest level of the tree first
-fn tree_into_jobsets(
- tree: Tree,
- shebang: Shebang,
- image: ImageName,
- phases: Vec<PhaseName>,
- resources: Vec<JobResource>,
-) -> Result<Vec<JobSet>> {
- fn inner(
- tree: Tree,
- shebang: &Shebang,
- image: &ImageName,
- phases: &[PhaseName],
- resources: &[JobResource],
- ) -> Result<Vec<JobSet>> {
- trace!("Creating jobsets for tree: {:?}", tree);
-
- let mut sets = vec![];
- let mut current_set = vec![];
-
- for (package, dep) in tree.into_iter() {
- trace!("Recursing for package: {:?}", package);
- let mut sub_sets = inner(dep, shebang, image, phases, resources)?; // recursion!
- sets.append(&mut sub_sets);
- current_set.push(package);
- }
-
- debug!("Jobset for set: {:?}", current_set);
- let jobset = JobSet {
- set: current_set
- .into_iter()
- .map(|package| {
- Job::new(
- package,
- shebang.clone(),
- image.clone(),
- phases.to_vec(),
- resources.to_vec(),
- )
- })
- .collect(),
- };
- debug!("Jobset = {:?}", jobset);
-
- // make sure the current recursion is added _before_ all other recursions
- // which yields the highest level in the tree as _first_ element of the resulting vector
- let mut result = Vec::new();
- if !jobset.is_empty() {
- debug!("Adding jobset: {:?}", jobset);
- result.push(jobset)
- }
- result.append(&mut sets);
- debug!("Result = {:?}", result);
- Ok(result)
- }
-
- inner(tree, &shebang, &image, &phases, &resources).map(|mut v| {
- // reverse, because the highest level in the tree is added as first element in the vector
- // and the deepest level is last.
- //
- // After reversing, we have a chain of things to build. Awesome, huh?
- v.reverse();
- v
- })
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- use std::collections::BTreeMap;
-
- use crate::package::tests::package;
- use crate::package::tests::pname;
- use crate::package::tests::pversion;
- use crate::package::Dependencies;
- use crate::package::Dependency;
- use crate::package::PhaseName;
- use crate::repository::Repository;
- use crate::util::docker::ImageName;
-
- use indicatif::ProgressBar;
-
- fn setup_logging() {
- let _ = ::env_logger::try_init();
- }
-
- #[test]
- fn test_one_element_tree_to_jobsets() {
- setup_logging();
- let mut btree = BTreeMap::new();
-
- let p1 = {
- let name = "a";
- let vers = "1";
- let pack = package(name, vers, "https://rust-lang.org", "123");
- btree.insert((pname(name), pversion(vers)), pack.clone());
- pack
- };
-
- let repo = Repository::from(btree);
- let progress = ProgressBar::hidden();
-
- let mut tree = Tree::default();
- let r = tree.add_package(p1, &repo, progress);
- assert!(r.is_ok());
-
- let image = ImageName::from(String::from("test"));
- let phases = vec![PhaseName::from(String::from("testphase"))];
- let shebang = Shebang::from(String::from("#!/bin/bash"));
-
- let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]);
- assert!(js.is_ok());
- let js = js.unwrap();
-
- assert_eq!(js.len(), 1, "There should be only one jobset if there is only one element in the dependency tree: {:?}", js);
-
- let js = js.get(0).unwrap();
- assert_eq!(
- js.set.len(),
- 1,
- "The jobset should contain exactly one job: {:?}",
- js
- );
-
- let job = js.set.get(0).unwrap();
- assert_eq!(
- *job.package.name(),
- pname("a"),
- "The job should be for the package 'a': {:?}",
- job
- );
- }
-
- #[test]
- fn test_two_element_tree_to_jobsets() {
- setup_logging();
- let mut btree = BTreeMap::new();
-
- let p1 = {
- let name = "a";
- let vers = "1";
- let pack = package(name, vers, "https://rust-lang.org", "123");
- btree.insert((pname(name), pversion(vers)), pack.clone());
- pack
- };
-
- let p2 = {
- let name = "b";
- let vers = "2";
- let pack = package(name, vers, "https://rust-lang.org", "124");
- btree.insert((pname(name), pversion(vers)), pack.clone());
- pack
- };
-
- let repo = Repository::from(btree);
- let progress = ProgressBar::hidden();
-
- let mut tree = Tree::default();
- let r = tree.add_package(p1, &repo, progress.clone());
- assert!(r.is_ok());
-
- let r = tree.add_package(p2, &repo, progress);
- assert!(r.is_ok());
-
- let image = ImageName::from(String::from("test"));
- let phases = vec![PhaseName::from(String::from("testphase"))];
- let shebang = Shebang::from(String::from("#!/bin/bash"));
-
- let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]);
- assert!(js.is_ok());
- let js = js.unwrap();
-
- assert_eq!(
- js.len(),
- 1,
- "There should be one set of jobs for two packages on the same level in the tree: {:?}",
- js
- );
-
- let js = js.get(0).unwrap();
- assert_eq!(
- js.set.len(),
- 2,
- "The jobset should contain exactly two jobs: {:?}",
- js
- );
-
- let job = js.set.get(0).unwrap();
- assert_eq!(
- *job.package.name(),
- pname("a"),
- "The job should be for the package 'a': {:?}",
- job
- );
-
- let job = js.set.get(1).unwrap();
- assert_eq!(
- *job.package.name(),
- pname("b"),
- "The job should be for the package 'a': {:?}",
- job
- );
- }
-
- #[test]
- fn test_two_dependent_elements_to_jobsets() {
- setup_logging();
- let mut btree = BTreeMap::new();
-
- let p1 = {
- let name = "a";
- let vers = "1";
- let mut pack = package(name, vers, "https://rust-lang.org", "123");
- {
- let d1 = Dependency::from(String::from("b =2"));
- let ds = Dependencies::with_runtime_dependencies(vec![d1]);
- pack.set_dependencies(ds);
- }
- btree.insert((pname(name), pversion(vers)), pack.clone());
- pack
- };
-
- let _ = {
- let name = "b";
- let vers = "2";
- let pack = package(name, vers, "https://rust-lang.org", "124");
- btree.insert((pname(name), pversion(vers)), pack.clone());
- pack
- };
-
- let repo = Repository::from(btree);
- let progress = ProgressBar::hidden();
-
- let mut tree = Tree::default();
- let r = tree.add_package(p1, &repo, progress);
- assert!(r.is_ok());
-
- let image = ImageName::from(String::from("test"));
- let phases = vec![PhaseName::from(String::from("testphase"))];
- let shebang = Shebang::from(String::from("#!/bin/bash"));
-
- let js = JobSet::sets_from_tree(tree, shebang, image, phases, vec![]);
- assert!(js.is_ok());
- let js = js.unwrap();
-
- assert_eq!(
- js.len(),
- 2,
- "There should be two set of jobs for two packages where one depends on the other: {:?}",
- js
- );
-
- {
- let first_js = js.get(0).unwrap();
- assert_eq!(
- first_js.set.len(),
- 1,
- "The first jobset should contain exactly one job: {:?}",
- js
- );
-
- let job = first_js.set.get(0).unwrap();
- assert_eq!(
- *job.package.name(),
- pname("b"),
- "The job from the first set should be for the package 'b': {:?}",
- job
- );
- }
-
- {
- let second_js = js.get(1).unwrap();
- assert_eq!(
- second_js.set.len(),
- 1,
- "The second jobset should contain exactly one job: {:?}",
- js
- );
-
- let job = second_js.set.get(0).unwrap();
- assert_eq!(
- *job.package.name(),
- pname("a"),
- "The job should be for the package 'a': {:?}",
- job
- );
- }
- }
-}
diff --git a/src/job/tree.rs b/src/job/tree.rs
new file mode 100644
index 0000000..883d93e
--- /dev/null
+++ b/src/job/tree.rs
@@ -0,0 +1,80 @@
+//
+// Copyright (c) 2020-2021 science+computing ag and other contributors
+//
+// This program and the accompanying materials are made
+// available under the terms of the Eclipse Public License 2.0
+// which is available at https://www.eclipse.org/legal/epl-2.0/
+//
+// SPDX-License-Identifier: EPL-2.0
+//
+
+use std::collections::BTreeMap;
+
+use uuid::Uuid;
+use getset::Getters;
+
+use crate::job::Job;
+use crate::job::JobResource;
+use crate::package::PhaseName;
+use crate::package::Shebang;
+use crate::util::docker::ImageName;
+
+#[derive(Debug, Getters)]
+pub struct Tree {
+ #[getset(get = "pub")]
+ inner: BTreeMap<Uuid, JobDefinition>,
+}
+
+impl Tree {
+ pub fn from_package_tree(pt: crate::package::Tree,
+ script_shebang: Shebang,
+ image: ImageName,
+ phases: Vec<PhaseName>,
+ resources: Vec<JobResource>,
+ ) -> Self {
+ Tree { inner: Self::build_tree(pt, script_shebang, image, phases, resources) }
+ }
+
+ fn build_tree(pt: crate::package::Tree,
+ script_shebang: Shebang,
+ image: ImageName,
+ phases: Vec<PhaseName>,
+ resources: Vec<JobResource>,
+ ) -> BTreeMap<Uuid, JobDefinition> {
+ let mut tree = BTreeMap::new();
+
+ for (package, dependencies) in pt.into_iter() {
+ let mut deps = Self::build_tree(dependencies,
+ script_shebang.clone(),
+ image.clone(),
+ phases.clone(),
+ resources.clone());
+
+ let deps_uuids = deps.keys().cloned().collect();
+ tree.append(&mut deps);
+
+ let job = Job::new(package,
+ script_shebang.clone(),
+ image.clone(),
+ phases.clone(),
+ resources.clone());
+
+ let job_uuid = job.uuid().clone();
+ let jdef = JobDefinition { job, dependencies: deps_uuids };
+
+ tree.insert(job_uuid, jdef);
+ }
+
+ tree
+ }
+
+}
+
+/// A job definition is the job itself and all UUIDs from jobs this job depends on.
+#[derive(Debug)]
+pub struct JobDefinition {
+ pub job: Job,
+
+ /// Uuids of the jobs where this job depends on the outputs
+ pub dependencies: Vec<Uuid>,
+}
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 7fec555..03a4cd2 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -12,25 +12,27 @@ use std::path::PathBuf;
use std::sync::Arc;
use anyhow::anyhow;
-use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use diesel::PgConnection;
+use indicatif::ProgressBar;
use log::trace;
use tokio::sync::RwLock;
+use tokio::stream::StreamExt;
use typed_builder::TypedBuilder;
use uuid::Uuid;
use crate::config::Configuration;
-use crate::db::models::Artifact;
-use crate::db::models::Submit;
+use crate::db::models as dbmodels;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
+use crate::filestore::Artifact;
use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
-use crate::job::JobSet;
+use crate::job::JobDefinition;
use crate::job::RunnableJob;
+use crate::job::Tree as JobTree;
use crate::source::SourceCache;
use crate::util::progress::ProgressBars;
@@ -39,8 +41,9 @@ pub struct Orchestrator<'a> {
progress_generator: ProgressBars,
merged_stores: MergedStores,
source_cache: SourceCache,
- jobsets: Vec<JobSet>,
+ jobtree: JobTree,
config: &'a Configuration,
+ database: Arc<PgConnection>,
}
#[derive(TypedBuilder)]
@@ -50,9 +53,9 @@ pub struct OrchestratorSetup<'a> {
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
- jobsets: Vec<JobSet>,
+ jobtree: JobTree,
database: Arc<PgConnection>,
- submit: Submit,
+ submit: dbmodels::Submit,
log_dir: Option<PathBuf>,
config: &'a Configuration,
}
@@ -62,7 +65,7 @@ impl<'a> OrchestratorSetup<'a> {
let scheduler = EndpointScheduler::setup(
self.endpoint_config,
self.staging_store.clone(),
- self.database,
+ self.database.clone(),
self.submit.clone(),
self.log_dir,
)
@@ -73,125 +76,162 @@ impl<'a> OrchestratorSetup<'a> {
progress_generator: self.progress_generator,
merged_stores: MergedStores::new(self.release_store, self.staging_store),
source_cache: self.source_cache,
- jobsets: self.jobsets,
+ jobtree: self.jobtree,
config: self.config,
+ database: self.database,
})
}
}
+/// Helper type
+///
+/// Represents a result that came from the run of a job inside a container
+///
+/// It is either a list of artifacts (with their respective database artifact objects)
+/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
+/// anyhow::Error that was issued.
+type JobResult = std::result::Result<Vec<(Artifact, dbmodels::Artifact)>, (Uuid, Error)>;
+
impl<'a> Orchestrator<'a> {
- pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, anyhow::Error)>> {
- for jobset in self.jobsets.into_iter() {
- let errs = Self::run_jobset(
- &self.scheduler,
- &self.merged_stores,
- &self.source_cache,
- &self.config,
- &self.progress_generator,
- jobset,
- output,
- )
- .await?;
+ pub async fn run(self, output: &mut Vec<dbmodels::Artifact>) -> Result<Vec<(Uuid, Error)>> {
+ let (results, errors) = self.run_tree().await?;
+ output.extend(results.into_iter().map(|(_, dba)| dba));
+ Ok(errors)
+ }
+
+ async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> {
+ use futures::FutureExt;
+
+ let mut already_built = vec![];
+ let mut artifacts = vec![];
+ let mut errors = vec![];
+
+ loop {
+ // loop{}
+ // until for all elements of self.jobtree, the uuid exists in already_built
+ //