From b4b617a42b2c160b577272c1c16b330a6d45c921 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Mon, 16 Nov 2020 14:45:34 +0100 Subject: Add passing of additional env This patch adds the code to pass the additional environment to the container job. Signed-off-by: Matthias Beyer --- src/commands/build.rs | 1 + src/endpoint/configured.rs | 3 ++- src/endpoint/scheduler.rs | 8 ++++++-- src/orchestrator/orchestrator.rs | 3 ++- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/commands/build.rs b/src/commands/build.rs index dac7c0f..122dd57 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -218,6 +218,7 @@ pub async fn build<'a>(matches: &ArgMatches, .release_store(release_dir) .database(database_connection) .source_cache(source_cache) + .additional_env(additional_env) .submit(submit) .log_dir(if matches.is_present("write-log-file") { Some(config.log_dir().clone()) } else { None }) .jobsets(jobsets) diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index f9a04d5..9e9171b 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -176,7 +176,7 @@ impl Endpoint { .map(|_| ()) } - pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender, staging: Arc>) -> RResult<(Vec, ContainerHash, Script), ContainerError> { + pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender, staging: Arc>, additional_env: Vec<(String, String)>) -> RResult<(Vec, ContainerHash, Script), ContainerError> { use crate::log::buffer_stream_to_line_stream; use tokio::stream::StreamExt; use futures::FutureExt; @@ -185,6 +185,7 @@ impl Endpoint { let envs = job.environment() .into_iter() .chain(job.package_environment().into_iter()) + .chain(additional_env.into_iter()) .map(|(k, v)| format!("{}={}", k, v)) .collect::>(); trace!("Job resources: Environment variables = {:?}", envs); diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 3b4b7c6..c5044c8 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -33,11 +33,12 @@ pub struct EndpointScheduler { db: Arc, progressbars: ProgressBars, submit: crate::db::models::Submit, + additional_env: Vec<(String, String)>, } impl EndpointScheduler { - pub async fn setup(endpoints: Vec, staging_store: Arc>, db: Arc, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option) -> Result { + pub async fn setup(endpoints: Vec, staging_store: Arc>, db: Arc, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option, additional_env: Vec<(String, String)>) -> Result { let endpoints = Self::setup_endpoints(endpoints).await?; Ok(EndpointScheduler { @@ -47,6 +48,7 @@ impl EndpointScheduler { db, progressbars, submit, + additional_env, }) } @@ -82,6 +84,7 @@ impl EndpointScheduler { staging_store: self.staging_store.clone(), db: self.db.clone(), submit: self.submit.clone(), + additional_env: self.additional_env.clone(), }) } @@ -118,6 +121,7 @@ pub struct JobHandle { db: Arc, staging_store: Arc>, submit: crate::db::models::Submit, + additional_env: Vec<(String, String)>, } impl std::fmt::Debug for JobHandle { @@ -141,7 +145,7 @@ impl JobHandle { let job_id = self.job.uuid().clone(); trace!("Running on Job {} on Endpoint {}", job_id, ep.name()); let res = ep - .run_job(self.job, log_sender, self.staging_store); + .run_job(self.job, log_sender, self.staging_store, self.additional_env); let logres = LogReceiver { log_dir: self.log_dir.as_ref(), diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index e7b5ba6..c45b5b3 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -44,6 +44,7 @@ pub struct OrchestratorSetup { staging_store: Arc>, release_store: Arc>, source_cache: SourceCache, + additional_env: Vec<(String, String)>, jobsets: Vec, database: PgConnection, submit: Submit, @@ -53,7 +54,7 @@ pub struct OrchestratorSetup { impl OrchestratorSetup { pub async fn setup(self) -> Result { let db = Arc::new(self.database); - let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone(), self.log_dir).await?; + let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone(), self.log_dir, self.additional_env).await?; Ok(Orchestrator { progress_generator: self.progress_generator, -- cgit v1.2.3