diff options
-rw-r--r-- | src/commands/build.rs | 5 | ||||
-rw-r--r-- | src/endpoint/configured.rs | 3 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 15 | ||||
-rw-r--r-- | src/job/job.rs | 4 | ||||
-rw-r--r-- | src/job/resource.rs | 8 | ||||
-rw-r--r-- | src/job/set.rs | 21 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 3 |
7 files changed, 32 insertions, 27 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs index 1522570..5014a6a 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -17,6 +17,7 @@ use tokio::stream::StreamExt; use crate::config::*; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; +use crate::job::JobResource; use crate::job::JobSet; use crate::orchestrator::OrchestratorSetup; use crate::package::PackageName; @@ -214,7 +215,8 @@ pub async fn build(matches: &ArgMatches, trace!("Creating Submit in database finished successfully: {:?}", submit); trace!("Setting up job sets"); - let jobsets = JobSet::sets_from_tree(tree, image_name, phases.clone())?; + let resources: Vec<JobResource> = additional_env.into_iter().map(JobResource::from).collect(); + let jobsets = JobSet::sets_from_tree(tree, image_name, phases.clone(), resources)?; trace!("Setting up job sets finished successfully"); trace!("Setting up Orchestrator"); @@ -225,7 +227,6 @@ pub async fn build(matches: &ArgMatches, .release_store(release_dir) .database(database_connection) .source_cache(source_cache) - .additional_env(additional_env) .submit(submit) .log_dir(if matches.is_present("write-log-file") { Some(config.log_dir().clone()) } else { None }) .jobsets(jobsets) diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index fc43aa2..533b499 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -175,14 +175,13 @@ impl Endpoint { .map(|_| ()) } - pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>, additional_env: Vec<(String, String)>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> { + pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> { let staging_store_path = staging.read().await.root_path().to_path_buf(); let (container_id, _warnings) = { let envs = job.environment() .into_iter() .chain(job.package_environment().into_iter()) - .chain(additional_env.into_iter()) .map(|(k, v)| format!("{}={}", k, v)) .collect::<Vec<_>>(); trace!("Job resources: Environment variables = {:?}", envs); diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 57b8043..8742663 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -20,6 +20,7 @@ use crate::db::models as dbmodels; use crate::endpoint::Endpoint; use crate::endpoint::EndpointConfiguration; use crate::filestore::StagingStore; +use crate::job::JobResource; use crate::job::RunnableJob; use crate::log::LogItem; use crate::util::progress::ProgressBars; @@ -33,12 +34,11 @@ pub struct EndpointScheduler { db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, - additional_env: Vec<(String, String)>, } impl EndpointScheduler { - pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>, additional_env: Vec<(String, String)>) -> Result<Self> { + pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>) -> Result<Self> { let endpoints = Self::setup_endpoints(endpoints).await?; Ok(EndpointScheduler { @@ -48,7 +48,6 @@ impl EndpointScheduler { db, progressbars, submit, - additional_env, }) } @@ -84,7 +83,6 @@ impl EndpointScheduler { staging_store: self.staging_store.clone(), db: self.db.clone(), submit: self.submit.clone(), - additional_env: self.additional_env.clone(), }) } @@ -126,7 +124,6 @@ pub struct JobHandle { db: Arc<PgConnection>, staging_store: Arc<RwLock<StagingStore>>, submit: crate::db::models::Submit, - additional_env: Vec<(String, String)>, } impl std::fmt::Debug for JobHandle { @@ -147,7 +144,7 @@ impl JobHandle { let job_id = self.job.uuid().clone(); trace!("Running on Job {} on Endpoint {}", job_id, ep.name()); let res = ep - .run_job(self.job, log_sender, self.staging_store, self.additional_env); + .run_job(self.job, log_sender, self.staging_store); let logres = LogReceiver { log_dir: self.log_dir.as_ref(), @@ -179,7 +176,7 @@ impl JobHandle { fn create_env_in_db(&self) -> Result<Vec<dbmodels::EnvVar>> { trace!("Creating environment in database"); trace!("Hardcoded = {:?}", self.job.package().environment()); - trace!("Dynamic = {:?}", self.additional_env); + trace!("Dynamic = {:?}", self.job.resources()); self.job .package() .environment() @@ -195,8 +192,10 @@ impl JobHandle { .into_iter() .map(Ok) .chain({ - self.additional_env + self.job + .resources() .iter() + .filter_map(JobResource::env) .inspect(|(k, v)| trace!("Creating environment variable in database: {} = {}", k, v)) .map(|(k, v)| dbmodels::EnvVar::create_or_fetch(&self.db, k, v)) }) diff --git a/src/job/job.rs b/src/job/job.rs index 1046b5d..865fa69 100644 --- a/src/job/job.rs +++ b/src/job/job.rs @@ -32,7 +32,7 @@ pub struct Job { impl Job { - pub fn new(pkg: Package, image: ImageName, phases: Vec<PhaseName>) -> Self { + pub fn new(pkg: Package, image: ImageName, phases: Vec<PhaseName>, resources: Vec<JobResource>) -> Self { let uuid = Uuid::new_v4(); Job { @@ -41,7 +41,7 @@ impl Job { image, script_shebang: String::from("#!/bin/bash"), // TODO Dont hardcode script_phases: phases, - resources: Vec::new(), + resources, } } diff --git a/src/job/resource.rs b/src/job/resource.rs index 6dba1ba..7e255f6 100644 --- a/src/job/resource.rs +++ b/src/job/resource.rs @@ -1,12 +1,18 @@ use crate::filestore::Artifact; /// TODO implement -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum JobResource { Environment(String, String), Artifact(Artifact) } +impl From<(String, String)> for JobResource { + fn from(tpl: (String, String)) -> Self { + JobResource::Environment(tpl.0, tpl.1) + } +} + impl JobResource { pub fn env(&self) -> Option<(&String, &String)> { match self { diff --git a/src/job/set.rs b/src/job/set.rs index a428cce..233110a 100644 --- a/src/job/set.rs +++ b/src/job/set.rs @@ -4,6 +4,7 @@ use tokio::stream::StreamExt; use crate::config::Configuration; use crate::filestore::MergedStores; use crate::job::Job; +use crate::job::JobResource; use crate::job::RunnableJob; use crate::package::Tree; use crate::phase::PhaseName; @@ -17,8 +18,8 @@ pub struct JobSet { } impl JobSet { - pub fn sets_from_tree(t: Tree, image: ImageName, phases: Vec<PhaseName>) -> Result<Vec<JobSet>> { - tree_into_jobsets(t, image, phases) + pub fn sets_from_tree(t: Tree, image: ImageName, phases: Vec<PhaseName>, resources: Vec<JobResource>) -> Result<Vec<JobSet>> { + tree_into_jobsets(t, image, phases, resources) } fn is_empty(&self) -> bool { @@ -37,8 +38,8 @@ impl JobSet { } /// Get the tree as sets of jobs, the deepest level of the tree first -fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Result<Vec<JobSet>> { - fn inner(tree: Tree, image: &ImageName, phases: &Vec<PhaseName>) -> Result<Vec<JobSet>> { +fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>, resources: Vec<JobResource>) -> Result<Vec<JobSet>> { + fn inner(tree: Tree, image: &ImageName, phases: &Vec<PhaseName>, resources: &Vec<JobResource>) -> Result<Vec<JobSet>> { trace!("Creating jobsets for tree: {:?}", tree); let mut sets = vec![]; @@ -46,7 +47,7 @@ fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Re for (package, dep) in tree.into_iter() { trace!("Recursing for package: {:?}", package); - let mut sub_sets = inner(dep, image, phases)?; // recursion! + let mut sub_sets = inner(dep, image, phases, resources)?; // recursion! sets.append(&mut sub_sets); current_set.push(package); } @@ -56,7 +57,7 @@ fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Re set: current_set .into_iter() .map(|package| { - Job::new(package, image.clone(), phases.clone()) + Job::new(package, image.clone(), phases.clone(), resources.clone()) }) .collect(), }; @@ -74,7 +75,7 @@ fn tree_into_jobsets(tree: Tree, image: ImageName, phases: Vec<PhaseName>) -> Re Ok(result) } - inner(tree, &image, &phases).map(|mut v| { + inner(tree, &image, &phases, &resources).map(|mut v| { // reverse, because the highest level in the tree is added as first element in the vector // and the deepest level is last. // @@ -128,7 +129,7 @@ mod tests { let image = ImageName::from(String::from("test")); let phases = vec![PhaseName::from(String::from("testphase"))]; - let js = JobSet::sets_from_tree(tree, image, phases); + let js = JobSet::sets_from_tree(tree, image, phases, vec![]); assert!(js.is_ok()); let js = js.unwrap(); @@ -175,7 +176,7 @@ mod tests { let image = ImageName::from(String::from("test")); let phases = vec![PhaseName::from(String::from("testphase"))]; - let js = JobSet::sets_from_tree(tree, image, phases); + let js = JobSet::sets_from_tree(tree, image, phases, vec![]); assert!(js.is_ok()); let js = js.unwrap(); @@ -227,7 +228,7 @@ mod tests { let image = ImageName::from(String::from("test")); let phases = vec![PhaseName::from(String::from("testphase"))]; - let js = JobSet::sets_from_tree(tree, image, phases); + let js = JobSet::sets_from_tree(tree, image, phases, vec![]); assert!(js.is_ok()); let js = js.unwrap(); diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 3d98ee5..c6bcc77 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -40,7 +40,6 @@ pub struct OrchestratorSetup<'a> { staging_store: Arc<RwLock<StagingStore>>, release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, - additional_env: Vec<(String, String)>, jobsets: Vec<JobSet>, database: PgConnection, submit: Submit, @@ -51,7 +50,7 @@ pub struct OrchestratorSetup<'a> { impl<'a> OrchestratorSetup<'a> { pub async fn setup(self) -> Result<Orchestrator<'a>> { let db = Arc::new(self.database); - let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.progress_generator, self.submit.clone(), self.log_dir, self.additional_env).await?; + let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db, self.progress_generator, self.submit.clone(), self.log_dir).await?; Ok(Orchestrator { scheduler: scheduler, |