summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/commands/build.rs5
-rw-r--r--src/endpoint/configured.rs3
-rw-r--r--src/endpoint/scheduler.rs15
-rw-r--r--src/job/job.rs4
-rw-r--r--src/job/resource.rs8
-rw-r--r--src/job/set.rs21
-rw-r--r--src/orchestrator/orchestrator.rs3
7 files changed, 32 insertions, 27 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs
index 1522570..5014a6a 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -17,6 +17,7 @@ use tokio::stream::StreamExt;
use crate::config::*;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
+use crate::job::JobResource;
use crate::job::JobSet;
use crate::orchestrator::OrchestratorSetup;
use crate::package::PackageName;
@@ -214,7 +215,8 @@ pub async fn build(matches: &ArgMatches,
trace!("Creating Submit in database finished successfully: {:?}", submit);
trace!("Setting up job sets");
- let jobsets = JobSet::sets_from_tree(tree, image_name, phases.clone())?;
+ let resources: Vec<JobResource> = additional_env.into_iter().map(JobResource::from).collect();
+ let jobsets = JobSet::sets_from_tree(tree, image_name, phases.clone(), resources)?;
trace!("Setting up job sets finished successfully");
trace!("Setting up Orchestrator");
@@ -225,7 +227,6 @@ pub async fn build(matches: &ArgMatches,
.release_store(release_dir)
.database(database_connection)
.source_cache(source_cache)
- .additional_env(additional_env)
.submit(submit)
.log_dir(if matches.is_present("write-log-file") { Some(config.log_dir().clone()) } else { None })
.jobsets(jobsets)
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index fc43aa2..533b499 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -175,14 +175,13 @@ impl Endpoint {
.map(|_| ())
}
- pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>, additional_env: Vec<(String, String)>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> {
+ pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> {
let staging_store_path = staging.read().await.root_path().to_path_buf();
let (container_id, _warnings) = {
let envs = job.environment()
.into_iter()
.chain(job.package_environment().into_iter())
- .chain(additional_env.into_iter())
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>();
trace!("Job resources: Environment variables = {:?}", envs);
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 57b8043..8742663 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -20,6 +20,7 @@ use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::StagingStore;
+use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::log::LogItem;
use crate::util::progress::ProgressBars;
@@ -33,12 +34,11 @@ pub struct EndpointScheduler {
db: Arc<PgConnection>,
progressbars: ProgressBars,
submit: crate::db::models::Submit,
- additional_env: Vec<(String, String)>,
}
impl EndpointScheduler {
- pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>, additional_env: Vec<(String, String)>) -> Result<Self> {
+ pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>) -> Result<Self> {
let endpoints = Self::setup_endpoints(endpoints).await?;
Ok(EndpointScheduler {
@@ -48,7 +48,6 @@ impl EndpointScheduler {
db,
progressbars,
submit,
- additional_env,
})
}
@@ -84,7 +83,6 @@ impl EndpointScheduler {
staging_store: self.staging_store.clone(),
db: self.db.clone(),
submit: self.submit.clone(),
- additional_env: self.additional_env.clone(),
})
}
@@ -126,7 +124,6 @@ pub struct JobHandle {
db: Arc<PgConnection>,
staging_store: Arc<RwLock<StagingStore>>,
submit: crate::db::models::Submit,
- additional_env: Vec<(String, String)>,
}
impl std::fmt::Debug for JobHandle {
@@ -147,7 +144,7 @@ impl JobHandle {
let job_id = self.job.uuid().clone();
trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
let res = ep
- .run_job(self.job, log_sender, self.staging_store, self.additional_env);
+ .run_job(self.job, log_sender, self.staging_store);
let logres = LogReceiver {
log_dir: self.log_dir.as_ref(),
@@ -179,7 +176,7 @@ impl JobHandle {
fn create_env_in_db(&self) -> Result<Vec<dbmodels::EnvVar>> {
trace!("Creating environment in database");
trace!("Hardcoded = {:?}", self.job.package().environment());
- trace!("Dynamic = {:?}", self.additional_env);
+ trace!("Dynamic = {:?}", self.job.resources());
self.job
.package()
.environment()
@@ -195,8 +192,10 @@ impl JobHandle {
.into_iter()
.map(Ok)
.chain({
- self.additional_env
+ self.job
+ .resources()
.iter()
+ .filter_map(JobResource::env)
.inspect(|(k, v)| trace!("Creating environment variable in database: {} = {}", k, v))
.map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&self.db, k, v))
})
diff --git a/src/job/job.rs b/src/job/job.rs
index 1046b5d..865fa69 100644
--- a/src/job/job.rs
+++ b/src/job/job.rs
@@ -32,7 +32,7 @@ pub struct Job {
impl Job {
- pub fn new(pkg: Package, image: ImageName, phases: Vec<PhaseName>) -> Self {
+ pub fn new(pkg: Package, image: ImageName, phases: Vec<PhaseName>, resources: Vec<JobResource>) -> Self {
let uuid = Uuid::new_v4();
Job {
@@ -41,7 +41,7 @@ impl Job {
image,
script_shebang: String::from("#!/bin/bash"), // TODO Dont hardcode
script_phases: phases,
- resources: Vec::new(),
+ resources,
}
}
diff --git a/src/job/resource.rs b/src/job/resource.rs
index 6dba1ba..7e255f6 100644
--- a/src/job/resource.rs
+++ b/src/job/resource.rs
@@ -1,12 +1,18 @@
use crate::filestore::Artifact;
/// TODO implement
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub enum JobResource {
Environment(String, String),
Artifact(Artifact)
}
+impl From<(String, String)> for JobResource {
+ fn from(tpl: (String, String)) -> Self {
+ JobResource::Environment(tpl.0, tpl.1)
+ }
+}
+
impl JobResource {
pub fn env(&self) -> Option<(&String, &String)> {
match self {
diff --git a/src/job/set.rs b/src/job/set.rs
index a428cce..233110a 100644
--- a/src/job/set.rs
+++ b/src/job/set.rs
@@ -4,6 +4,7 @@ use tokio::stream::StreamExt;
use crate::config::Configuration;
use crate::filestore::MergedStores;
use crate::job::Job;
+use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::package::Tree;
use crate::phase::PhaseName;
@@ -17,8 +18,8 @@ pub struct JobSet {
}
impl JobSet {
- pub fn sets_from_tree(t: Tree, image: ImageName, phases: Vec<PhaseName>) -> Result<Vec<JobSet>> {
- tree_into_jobsets(t, image, phases)
+ pub fn sets_from_tree(t: Tree, image: ImageName, phases: Vec<PhaseName>, resources: Vec<JobResource>) -> Result<Vec<JobSet>> {
+ tree_into_jobsets(t, image, phases, resources)
}
fn is_empty(&self) -> bool {
@@ -37,8 +38,8 @@ impl JobSet {
}
/// Get the tree as sets of jobs, the deepest level of the tree first
-fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Result<Vec<JobSet>> {
- fn inner(tree: Tree, image: &ImageName, phases: &Vec<PhaseName>) -> Result<Vec<JobSet>> {
+fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>, resources: Vec<JobResource>) -> Result<Vec<JobSet>> {
+ fn inner(tree: Tree, image: &ImageName, phases: &Vec<PhaseName>, resources: &Vec<JobResource>) -> Result<Vec<JobSet>> {
trace!("Creating jobsets for tree: {:?}", tree);
let mut sets = vec![];
@@ -46,7 +47,7 @@ fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Re
for (package, dep) in tree.into_iter() {
trace!("Recursing for package: {:?}", package);
- let mut sub_sets = inner(dep, image, phases)?; // recursion!
+ let mut sub_sets = inner(dep, image, phases, resources)?; // recursion!
sets.append(&mut sub_sets);
current_set.push(package);
}
@@ -56,7 +57,7 @@ fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Re
set: current_set
.into_iter()
.map(|package| {
- Job::new(package, image.clone(), phases.clone())
+ Job::new(package, image.clone(), phases.clone(), resources.clone())
})
.collect(),
};
@@ -74,7 +75,7 @@ fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Re
Ok(result)
}
- inner(tree, &image, &phases).map(|mut v| {
+ inner(tree, &image, &phases, &resources).map(|mut v| {
// reverse, because the highest level in the tree is added as first element in the vector
// and the deepest level is last.
//
@@ -128,7 +129,7 @@ mod tests {
let image = ImageName::from(String::from("test"));
let phases = vec![PhaseName::from(String::from("testphase"))];
- let js = JobSet::sets_from_tree(tree, image, phases);
+ let js = JobSet::sets_from_tree(tree, image, phases, vec![]);
assert!(js.is_ok());
let js = js.unwrap();
@@ -175,7 +176,7 @@ mod tests {
let image = ImageName::from(String::from("test"));
let phases = vec![PhaseName::from(String::from("testphase"))];
- let js = JobSet::sets_from_tree(tree, image, phases);
+ let js = JobSet::sets_from_tree(tree, image, phases, vec![]);
assert!(js.is_ok());
let js = js.unwrap();
@@ -227,7 +228,7 @@ mod tests {
let image = ImageName::from(String::from("test"));
let phases = vec![PhaseName::from(String::from("testphase"))];
- let js = JobSet::sets_from_tree(tree, image, phases);
+ let js = JobSet::sets_from_tree(tree, image, phases, vec![]);
assert!(js.is_ok());
let js = js.unwrap();
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 3d98ee5..c6bcc77 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -40,7 +40,6 @@ pub struct OrchestratorSetup<'a> {
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
- additional_env: Vec<(String, String)>,
jobsets: Vec<JobSet>,
database: PgConnection,
submit: Submit,
@@ -51,7 +50,7 @@ pub struct OrchestratorSetup<'a> {
impl<'a> OrchestratorSetup<'a> {
pub async fn setup(self) -> Result<Orchestrator<'a>> {
let db = Arc::new(self.database);
- let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.progress_generator, self.submit.clone(), self.log_dir, self.additional_env).await?;
+ let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.progress_generator, self.submit.clone(), self.log_dir).await?;
Ok(Orchestrator {
scheduler: scheduler,