summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-12-04 10:23:27 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-12-07 13:21:54 +0100
commit4f0eb2b11ac44ce4f3b2273dcd863c5e173689c1 (patch)
treeb4ef4f7b0a66e66f9c52ef89c6e9d9cfe50d7bd4 /src
parent89e48c21245e1b09678363fcf157094c1234b277 (diff)
Remove passing of additional env variables
This patch removes the passing around of additional environment variables that were specified on the commandline and adds them directly to the Job object instance upon creation. This does not result in a netto-loss of code, but in a netto-loss of complexity. For this to be possible, we had to derive Clone for `JobResource`, which we have to clone when creating the `Job` objects during the creation of the jobsets from the `Tree` object. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src')
-rw-r--r--src/commands/build.rs5
-rw-r--r--src/endpoint/configured.rs3
-rw-r--r--src/endpoint/scheduler.rs15
-rw-r--r--src/job/job.rs4
-rw-r--r--src/job/resource.rs8
-rw-r--r--src/job/set.rs21
-rw-r--r--src/orchestrator/orchestrator.rs3
7 files changed, 32 insertions, 27 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs
index 1522570..5014a6a 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -17,6 +17,7 @@ use tokio::stream::StreamExt;
use crate::config::*;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
+use crate::job::JobResource;
use crate::job::JobSet;
use crate::orchestrator::OrchestratorSetup;
use crate::package::PackageName;
@@ -214,7 +215,8 @@ pub async fn build(matches: &ArgMatches,
trace!("Creating Submit in database finished successfully: {:?}", submit);
trace!("Setting up job sets");
- let jobsets = JobSet::sets_from_tree(tree, image_name, phases.clone())?;
+ let resources: Vec<JobResource> = additional_env.into_iter().map(JobResource::from).collect();
+ let jobsets = JobSet::sets_from_tree(tree, image_name, phases.clone(), resources)?;
trace!("Setting up job sets finished successfully");
trace!("Setting up Orchestrator");
@@ -225,7 +227,6 @@ pub async fn build(matches: &ArgMatches,
.release_store(release_dir)
.database(database_connection)
.source_cache(source_cache)
- .additional_env(additional_env)
.submit(submit)
.log_dir(if matches.is_present("write-log-file") { Some(config.log_dir().clone()) } else { None })
.jobsets(jobsets)
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index fc43aa2..533b499 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -175,14 +175,13 @@ impl Endpoint {
.map(|_| ())
}
- pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>, additional_env: Vec<(String, String)>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> {
+ pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> {
let staging_store_path = staging.read().await.root_path().to_path_buf();
let (container_id, _warnings) = {
let envs = job.environment()
.into_iter()
.chain(job.package_environment().into_iter())
- .chain(additional_env.into_iter())
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>();
trace!("Job resources: Environment variables = {:?}", envs);
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 57b8043..8742663 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -20,6 +20,7 @@ use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::StagingStore;
+use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::log::LogItem;
use crate::util::progress::ProgressBars;
@@ -33,12 +34,11 @@ pub struct EndpointScheduler {
db: Arc<PgConnection>,
progressbars: ProgressBars,
submit: crate::db::models::Submit,
- additional_env: Vec<(String, String)>,
}
impl EndpointScheduler {
- pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>, additional_env: Vec<(String, String)>) -> Result<Self> {
+ pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>) -> Result<Self> {
let endpoints = Self::setup_endpoints(endpoints).await?;
Ok(EndpointScheduler {
@@ -48,7 +48,6 @@ impl EndpointScheduler {
db,
progressbars,
submit,
- additional_env,
})
}
@@ -84,7 +83,6 @@ impl EndpointScheduler {
staging_store: self.staging_store.clone(),
db: self.db.clone(),
submit: self.submit.clone(),
- additional_env: self.additional_env.clone(),
})
}
@@ -126,7 +124,6 @@ pub struct JobHandle {
db: Arc<PgConnection>,
staging_store: Arc<RwLock<StagingStore>>,
submit: crate::db::models::Submit,
- additional_env: Vec<(String, String)>,
}
impl std::fmt::Debug for JobHandle {
@@ -147,7 +144,7 @@ impl JobHandle {
let job_id = self.job.uuid().clone();
trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
let res = ep
- .run_job(self.job, log_sender, self.staging_store, self.additional_env);
+ .run_job(self.job, log_sender, self.staging_store);
let logres = LogReceiver {
log_dir: self.log_dir.as_ref(),
@@ -179,7 +176,7 @@ impl JobHandle {
fn create_env_in_db(&self) -> Result<Vec<dbmodels::EnvVar>> {
trace!("Creating environment in database");
trace!("Hardcoded = {:?}", self.job.package().environment());
- trace!("Dynamic = {:?}", self.additional_env);
+ trace!("Dynamic = {:?}", self.job.resources());
self.job
.package()
.environment()
@@ -195,8 +192,10 @@ impl JobHandle {
.into_iter()
.map(Ok)
.chain({
- self.additional_env
+ self.job
+ .resources()
.iter()
+ .filter_map(JobResource::env)
.inspect(|(k, v)| trace!("Creating environment variable in database: {} = {}", k, v))
.map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&self.db, k, v))
})
diff --git a/src/job/job.rs b/src/job/job.rs
index 1046b5d..865fa69 100644
--- a/src/job/job.rs
+++ b/src/job/job.rs
@@ -32,7 +32,7 @@ pub struct Job {
impl Job {
- pub fn new(pkg: Package, image: ImageName, phases: Vec<PhaseName>) -> Self {
+ pub fn new(pkg: Package, image: ImageName, phases: Vec<PhaseName>, resources: Vec<JobResource>) -> Self {
let uuid = Uuid::new_v4();
Job {
@@ -41,7 +41,7 @@ impl Job {
image,
script_shebang: String::from("#!/bin/bash"), // TODO Dont hardcode
script_phases: phases,
- resources: Vec::new(),
+ resources,
}
}
diff --git a/src/job/resource.rs b/src/job/resource.rs
index 6dba1ba..7e255f6 100644
--- a/src/job/resource.rs
+++ b/src/job/resource.rs
@@ -1,12 +1,18 @@
use crate::filestore::Artifact;
/// TODO implement
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub enum JobResource {
Environment(String, String),
Artifact(Artifact)
}
+impl From<(String, String)> for JobResource {
+ fn from(tpl: (String, String)) -> Self {
+ JobResource::Environment(tpl.0, tpl.1)
+ }
+}
+
impl JobResource {
pub fn env(&self) -> Option<(&String, &String)> {
match self {
diff --git a/src/job/set.rs b/src/job/set.rs
index a428cce..233110a 100644
--- a/src/job/set.rs
+++ b/src/job/set.rs
@@ -4,6 +4,7 @@ use tokio::stream::StreamExt;
use crate::config::Configuration;
use crate::filestore::MergedStores;
use crate::job::Job;
+use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::package::Tree;
use crate::phase::PhaseName;
@@ -17,8 +18,8 @@ pub struct JobSet {
}
impl JobSet {
- pub fn sets_from_tree(t: Tree, image: ImageName, phases: Vec<PhaseName>) -> Result<Vec<JobSet>> {
- tree_into_jobsets(t, image, phases)
+ pub fn sets_from_tree(t: Tree, image: ImageName, phases: Vec<PhaseName>, resources: Vec<JobResource>) -> Result<Vec<JobSet>> {
+ tree_into_jobsets(t, image, phases, resources)
}
fn is_empty(&self) -> bool {
@@ -37,8 +38,8 @@ impl JobSet {
}
/// Get the tree as sets of jobs, the deepest level of the tree first
-fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Result<Vec<JobSet>> {
- fn inner(tree: Tree, image: &ImageName, phases: &Vec<PhaseName>) -> Result<Vec<JobSet>> {
+fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>, resources: Vec<JobResource>) -> Result<Vec<JobSet>> {
+ fn inner(tree: Tree, image: &ImageName, phases: &Vec<PhaseName>, resources: &Vec<JobResource>) -> Result<Vec<JobSet>> {
trace!("Creating jobsets for tree: {:?}", tree);
let mut sets = vec![];
@@ -46,7 +47,7 @@ fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Re
for (package, dep) in tree.into_iter() {
trace!("Recursing for package: {:?}", package);
- let mut sub_sets = inner(dep, image, phases)?; // recursion!
+ let mut sub_sets = inner(dep, image, phases, resources)?; // recursion!
sets.append(&mut sub_sets);
current_set.push(package);
}
@@ -56,7 +57,7 @@ fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Re
set: current_set
.into_iter()
.map(|package| {
- Job::new(package, image.clone(), phases.clone())
+ Job::new(package, image.clone(), phases.clone(), resources.clone())
})
.collect(),
};
@@ -74,7 +75,7 @@ fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Re
Ok(result)
}
- inner(tree, &image, &phases).map(|mut v| {
+ inner(tree, &image, &phases, &resources).map(|mut v| {
// reverse, because the highest level in the tree is added as first element in the vector
// and the deepest level is last.
//
@@ -128,7 +129,7 @@ mod tests {
let image = ImageName::from(String::from("test"));
let phases = vec![PhaseName::from(String::from("testphase"))];
- let js = JobSet::sets_from_tree(tree, image, phases);
+ let js = JobSet::sets_from_tree(tree, image, phases, vec![]);
assert!(js.is_ok());
let js = js.unwrap();
@@ -175,7 +176,7 @@ mod tests {
let image = ImageName::from(String::from("test"));
let phases = vec![PhaseName::from(String::from("testphase"))];
- let js = JobSet::sets_from_tree(tree, image, phases);
+ let js = JobSet::sets_from_tree(tree, image, phases, vec![]);
assert!(js.is_ok());
let js = js.unwrap();
@@ -227,7 +228,7 @@ mod tests {
let image = ImageName::from(String::from("test"));
let phases = vec![PhaseName::from(String::from("testphase"))];
- let js = JobSet::sets_from_tree(tree, image, phases);
+ let js = JobSet::sets_from_tree(tree, image, phases, vec![]);
assert!(js.is_ok());
let js = js.unwrap();
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 3d98ee5..c6bcc77 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -40,7 +40,6 @@ pub struct OrchestratorSetup<'a> {
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
- additional_env: Vec<(String, String)>,
jobsets: Vec<JobSet>,
database: PgConnection,
submit: Submit,
@@ -51,7 +50,7 @@ pub struct OrchestratorSetup<'a> {
impl<'a> OrchestratorSetup<'a> {
pub async fn setup(self) -> Result<Orchestrator<'a>> {
let db = Arc::new(self.database);
- let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.progress_generator, self.submit.clone(), self.log_dir, self.additional_env).await?;
+ let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.progress_generator, self.submit.clone(), self.log_dir).await?;
Ok(Orchestrator {
scheduler: scheduler,