diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-16 18:01:52 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-16 18:01:52 +0100 |
commit | eecd7b291c779cf34508687a586b58ca39239b67 (patch) | |
tree | f3b4ed31e6adea55af569c9d79ec15799d9c9a50 | |
parent | cf000f862b53847fa213375d7a14d20a3dd2630b (diff) | |
parent | f79e3b0dea31d48bce02deefa806b7661ba44c4b (diff) |
Merge branch 'container-env'
-rw-r--r-- | examples/packages/example_3/b/pkg.toml | 4 | ||||
-rw-r--r-- | src/cli.rs | 8 | ||||
-rw-r--r-- | src/commands/build.rs | 33 | ||||
-rw-r--r-- | src/commands/db.rs | 37 | ||||
-rw-r--r-- | src/db/models/endpoint.rs | 2 | ||||
-rw-r--r-- | src/db/models/envvar.rs | 3 | ||||
-rw-r--r-- | src/db/models/image.rs | 2 | ||||
-rw-r--r-- | src/db/models/job.rs | 16 | ||||
-rw-r--r-- | src/db/models/job_env.rs | 37 | ||||
-rw-r--r-- | src/db/models/mod.rs | 3 | ||||
-rw-r--r-- | src/db/models/package.rs | 2 | ||||
-rw-r--r-- | src/db/models/submit.rs | 9 | ||||
-rw-r--r-- | src/endpoint/configured.rs | 9 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 41 | ||||
-rw-r--r-- | src/job/runnable.rs | 34 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 3 |
16 files changed, 216 insertions, 27 deletions
diff --git a/examples/packages/example_3/b/pkg.toml b/examples/packages/example_3/b/pkg.toml index f197abf..2ca66e2 100644 --- a/examples/packages/example_3/b/pkg.toml +++ b/examples/packages/example_3/b/pkg.toml @@ -1,6 +1,10 @@ name = "b" version = "2" +[environment] +FOO = "bar" +BAZ = "bla" + [phases] package.script = ''' mkdir /outputs @@ -185,6 +185,14 @@ pub fn cli<'a>() -> App<'a> { .help("Show the script") ) + .arg(Arg::with_name("show_env") + .required(false) + .multiple(false) + .long("env") + .short('E') + .help("Show the environment of the job") + ) + .arg(Arg::with_name("script_disable_highlighting") .required(false) .multiple(false) diff --git a/src/commands/build.rs b/src/commands/build.rs index 6fc2312..122dd57 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -12,6 +12,7 @@ use clap_v3::ArgMatches; use diesel::PgConnection; use logcrate::debug; use tokio::sync::RwLock; +use tokio::stream::StreamExt; use crate::config::*; use crate::filestore::ReleaseStore; @@ -36,6 +37,7 @@ pub async fn build<'a>(matches: &ArgMatches, -> Result<()> { use crate::db::models::{ + EnvVar, Package, GitHash, Image, @@ -85,6 +87,17 @@ pub async fn build<'a>(matches: &ArgMatches, .map(PackageVersion::from); info!("We want {} ({:?})", pname, pvers); + let additional_env = matches.values_of("env") + .unwrap_or_default() + .map(|s| { + let v = s.split("=").collect::<Vec<_>>(); + Ok(( + String::from(*v.get(0).ok_or_else(|| anyhow!("Environment variable has no key: {}", s))?), + String::from(*v.get(1).ok_or_else(|| anyhow!("Environment variable has no key: {}", s))?) + )) + }) + .collect::<Result<Vec<(String, String)>>>()?; + let packages = if let Some(pvers) = pvers { repo.find(&pname, &pvers) } else { @@ -157,15 +170,28 @@ pub async fn build<'a>(matches: &ArgMatches, let db_package = async { Package::create_or_fetch(&database_connection, &package) }; let db_githash = async { GitHash::create_or_fetch(&database_connection, &hash_str) }; let db_image = async { Image::create_or_fetch(&database_connection, &image_name) }; + let db_envs = async { + additional_env.clone() + .into_iter() + .map(|(k, v)| async { + let k: String = k; // hack to work around move semantics + let v: String = v; // hack to work around move semantics + EnvVar::create_or_fetch(&database_connection, &k, &v) + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Result<Vec<EnvVar>>>() + .await + }; trace!("Running database jobs for Package, GitHash, Image"); - let (db_package, db_githash, db_image) = tokio::join!( + let (db_package, db_githash, db_image, db_envs) = tokio::join!( db_package, db_githash, - db_image + db_image, + db_envs ); - let (db_package, db_githash, db_image) = (db_package?, db_githash?, db_image?); + let (db_package, db_githash, db_image, db_envs) = (db_package?, db_githash?, db_image?, db_envs?); trace!("Database jobs for Package, GitHash, Image finished successfully"); trace!("Creating Submit in database"); @@ -192,6 +218,7 @@ pub async fn build<'a>(matches: &ArgMatches, .release_store(release_dir) .database(database_connection) .source_cache(source_cache) + .additional_env(additional_env) .submit(submit) .log_dir(if matches.is_present("write-log-file") { Some(config.log_dir().clone()) } else { None }) .jobsets(jobsets) diff --git a/src/commands/db.rs b/src/commands/db.rs index b788f71..ea9bb9a 100644 --- a/src/commands/db.rs +++ b/src/commands/db.rs @@ -8,6 +8,7 @@ use anyhow::Error; use anyhow::Result; use anyhow::anyhow; use clap::ArgMatches; +use diesel::BelongingToDsl; use diesel::ExpressionMethods; use diesel::JoinOnDsl; use diesel::QueryDsl; @@ -227,13 +228,16 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> { .transpose()? .map(|submit_uuid| { use crate::schema; + use diesel::BelongingToDsl; - schema::jobs::table.inner_join({ - schema::submits::table.on(schema::submits::uuid.eq(submit_uuid)) - }) - .inner_join(schema::endpoints::table) - .inner_join(schema::packages::table) - .load::<(models::Job, models::Submit, models::Endpoint, models::Package)>(&conn) + let submit = models::Submit::with_id(&conn, &submit_uuid)?; + + models::Job::belonging_to(&submit) + .inner_join(schema::submits::table) + .inner_join(schema::endpoints::table) + .inner_join(schema::packages::table) + .load::<(models::Job, models::Submit, models::Endpoint, models::Package)>(&conn) + .map_err(Error::from) }) .unwrap_or_else(|| { dsl::jobs @@ -241,6 +245,7 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> { .inner_join(crate::schema::endpoints::table) .inner_join(crate::schema::packages::table) .load::<(models::Job, models::Submit, models::Endpoint, models::Package)>(&conn) + .map_err(Error::from) })?; let data = jobs.into_iter() @@ -292,6 +297,21 @@ fn job<'a>(conn_cfg: DbConnectionConfig, config: &Configuration<'a>, matches: &A .inner_join(schema::images::table) .first::<(models::Job, models::Submit, models::Endpoint, models::Package, models::Image)>(&conn)?; + let env_vars = if matches.is_present("show_env") { + Some({ + models::JobEnv::belonging_to(&data.0) + .inner_join(schema::envvars::table) + .load::<(models::JobEnv, models::EnvVar)>(&conn)? + .into_iter() + .map(|tpl| tpl.1) + .enumerate() + .map(|(i, env)| format!("\t{:>3}. {}={}", i, env.name, env.value)) + .join("\n") + }) + } else { + None + }; + let parsed_log = crate::log::ParsedLog::build_from(&data.0.log_text)?; let success = parsed_log.is_successfull(); let s = indoc::formatdoc!(r#" @@ -308,6 +328,10 @@ fn job<'a>(conn_cfg: DbConnectionConfig, config: &Configuration<'a>, matches: &A --- + {envs} + + --- + {script_text} --- @@ -333,6 +357,7 @@ fn job<'a>(conn_cfg: DbConnectionConfig, config: &Configuration<'a>, matches: &A container_hash = data.0.container_hash.cyan(), script_len = format!("{:<4}", data.0.script_text.lines().count()).cyan(), log_len = format!("{:<4}", data.0.log_text.lines().count()).cyan(), + envs = env_vars.unwrap_or_else(|| String::from("<env vars hidden>")), script_text = if show_script { if let Some(configured_theme) = configured_theme { if highlighting_disabled { diff --git a/src/db/models/endpoint.rs b/src/db/models/endpoint.rs index ab30192..d81fbfd 100644 --- a/src/db/models/endpoint.rs +++ b/src/db/models/endpoint.rs @@ -6,7 +6,7 @@ use diesel::prelude::*; use crate::schema::endpoints::*; use crate::schema::endpoints; -#[derive(Queryable)] +#[derive(Identifiable, Queryable)] pub struct Endpoint { pub id: i32, pub name: String, diff --git a/src/db/models/envvar.rs b/src/db/models/envvar.rs index f883661..51ad32a 100644 --- a/src/db/models/envvar.rs +++ b/src/db/models/envvar.rs @@ -6,7 +6,8 @@ use diesel::prelude::*; use crate::schema::envvars::*; use crate::schema::envvars; -#[derive(Queryable)] +#[derive(Identifiable, Queryable)] +#[table_name="envvars"] pub struct EnvVar { pub id: i32, pub name: String, diff --git a/src/db/models/image.rs b/src/db/models/image.rs index 225bac7..5032039 100644 --- a/src/db/models/image.rs +++ b/src/db/models/image.rs @@ -7,7 +7,7 @@ use crate::schema::images::*; use crate::schema::images; use crate::util::docker::ImageName; -#[derive(Queryable)] +#[derive(Identifiable, Queryable)] pub struct Image { pub id: i32, pub name: String, diff --git a/src/db/models/job.rs b/src/db/models/job.rs index 12fba7b..90d9732 100644 --- a/src/db/models/job.rs +++ b/src/db/models/job.rs @@ -9,7 +9,12 @@ use crate::schema::jobs::*; use crate::schema::jobs; use crate::util::docker::ContainerHash; -#[derive(Queryable)] +#[derive(Identifiable, Queryable, Associations)] +#[belongs_to(Submit)] +#[belongs_to(Endpoint)] +#[belongs_to(Package)] +#[belongs_to(Image)] +#[table_name="jobs"] pub struct Job { pub id: i32, pub submit_id: i32, @@ -45,7 +50,7 @@ impl Job { container: &ContainerHash, script: &Script, log: &str, - ) -> Result<()> { + ) -> Result<Job> { let new_job = NewJob { uuid: job_uuid, submit_id: submit.id, @@ -61,9 +66,12 @@ impl Job { diesel::insert_into(jobs::table) .values(&new_job) .on_conflict_do_nothing() - .execute(database_connection) + .execute(database_connection)?; + + dsl::jobs + .filter(uuid.eq(job_uuid)) + .first::<Job>(database_connection) .map_err(Error::from) - .map(|_| ()) } } diff --git a/src/db/models/job_env.rs b/src/db/models/job_env.rs new file mode 100644 index 0000000..3ef1ae3 --- /dev/null +++ b/src/db/models/job_env.rs @@ -0,0 +1,37 @@ +use anyhow::Result; +use diesel::PgConnection; +use diesel::prelude::*; + +use crate::schema::job_envs::*; +use crate::schema::job_envs; +use crate::db::models::Job; +use crate::db::models::EnvVar; + +#[derive(Identifiable, Queryable, Associations)] +#[belongs_to(Job)] +#[belongs_to(EnvVar, foreign_key = "env_id")] +#[table_name="job_envs"] +pub struct JobEnv { + pub id: i32, + pub job_id: i32, + pub env_id: i32, +} + +#[derive(Insertable)] +#[table_name="job_envs"] +struct NewJobEnv { + pub job_id: i32, + pub env_id: i32, +} + +impl JobEnv { + pub fn create(database_connection: &PgConnection, job: &Job, env: &EnvVar) -> Result<()> { + let new_jobenv = NewJobEnv { job_id: job.id, env_id: env.id }; + + diesel::insert_into(job_envs::table) + .values(&new_jobenv) + .execute(database_connection)?; + Ok(()) + } +} + diff --git a/src/db/models/mod.rs b/src/db/models/mod.rs index 26875bc..8e1eb8e 100644 --- a/src/db/models/mod.rs +++ b/src/db/models/mod.rs @@ -13,6 +13,9 @@ pub use image::*; mod job; pub use job::*; +mod job_env; +pub use job_env::*; + mod githash; pub use githash::*; diff --git a/src/db/models/package.rs b/src/db/models/package.rs index 8f14607..b0b99ee 100644 --- a/src/db/models/package.rs +++ b/src/db/models/package.rs @@ -8,7 +8,7 @@ use diesel::prelude::*; use crate::schema::packages::*; use crate::schema::packages; -#[derive(Queryable)] +#[derive(Identifiable, Queryable)] pub struct Package { pub id: i32, pub name: String, diff --git a/src/db/models/submit.rs b/src/db/models/submit.rs index 102d7f2..731012a 100644 --- a/src/db/models/submit.rs +++ b/src/db/models/submit.rs @@ -12,7 +12,7 @@ use crate::db::models::Package; use crate::schema::submits::*; use crate::schema::submits; -#[derive(Clone, Debug, Queryable)] +#[derive(Clone, Debug, Identifiable, Queryable)] pub struct Submit { pub id: i32, pub uuid: ::uuid::Uuid, @@ -43,7 +43,7 @@ impl Submit { requested_package: &Package, repo_hash: &GitHash) -> Result<Submit> -{ + { let tree_json = serde_json::to_value(t) .context("Converting tree to JSON string") .with_context(|| anyhow!("Tree = {:#?}", t))?; @@ -63,11 +63,16 @@ impl Submit { .execute(database_connection) .context("Inserting new submit into submits table")?; + Self::with_id(database_connection, submit_id) + } + + pub fn with_id(database_connection: &PgConnection, submit_id: &::uuid::Uuid) -> Result<Submit> { dsl::submits .filter(submits::uuid.eq(submit_id)) .first::<Submit>(database_connection) .context("Loading submit") .map_err(Error::from) } + } diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 2dadb0d..9e9171b 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -176,15 +176,16 @@ impl Endpoint { .map(|_| ()) } - pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> { + pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>, additional_env: Vec<(String, String)>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> { use crate::log::buffer_stream_to_line_stream; use tokio::stream::StreamExt; use futures::FutureExt; let (container_id, _warnings) = { - let envs = job.resources() - .iter() - .filter_map(JobResource::env) + let envs = job.environment() + .into_iter() + .chain(job.package_environment().into_iter()) + .chain(additional_env.into_iter()) .map(|(k, v)| format!("{}={}", k, v)) .collect::<Vec<_>>(); trace!("Job resources: Environment variables = {:?}", envs); diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 3b4b7c6..cbc0bd9 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -33,11 +33,12 @@ pub struct EndpointScheduler { db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, + additional_env: Vec<(String, String)>, } impl EndpointScheduler { - pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>) -> Result<Self> { + pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>, additional_env: Vec<(String, String)>) -> Result<Self> { let endpoints = Self::setup_endpoints(endpoints).await?; Ok(EndpointScheduler { @@ -47,6 +48,7 @@ impl EndpointScheduler { db, progressbars, submit, + additional_env, }) } @@ -82,6 +84,7 @@ impl EndpointScheduler { staging_store: self.staging_store.clone(), db: self.db.clone(), submit: self.submit.clone(), + additional_env: self.additional_env.clone(), }) } @@ -118,6 +121,7 @@ pub struct JobHandle { db: Arc<PgConnection>, staging_store: Arc<RwLock<StagingStore>>, submit: crate::db::models::Submit, + additional_env: Vec<(String, String)>, } impl std::fmt::Debug for JobHandle { @@ -138,10 +142,37 @@ impl JobHandle { let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?; let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?; + let envs = { + trace!("Creating environment in database"); + trace!("Hardcoded = {:?}", self.job.package().environment()); + trace!("Dynamic = {:?}", self.additional_env); + let mut hardcoded_env = if let Some(hm) = self.job.package().environment().as_ref() { + hm.iter() + .map(|(k, v)| { + trace!("Creating environment variable in database: {} = {}", k, v); + dbmodels::EnvVar::create_or_fetch(&self.db, k, v) + }) + .collect::<Result<Vec<_>>>()? + } else { + Vec::new() + }; + + let mut additionals = self.additional_env + .iter() + .map(|(k, v)| { + trace!("Creating environment variable in database: {} = {}", k, v); + dbmodels::EnvVar::create_or_fetch(&self.db, k, v) + }) + .collect::<Result<Vec<_>>>()?; + + hardcoded_env.append(&mut additionals); + hardcoded_env + }; + let job_id = self.job.uuid().clone(); trace!("Running on Job {} on Endpoint {}", job_id, ep.name()); let res = ep - .run_job(self.job, log_sender, self.staging_store); + .run_job(self.job, log_sender, self.staging_store, self.additional_env); let logres = LogReceiver { log_dir: self.log_dir.as_ref(), @@ -157,7 +188,11 @@ impl JobHandle { let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?; let (paths, container_hash, script) = res.with_context(|| anyhow!("Running job on '{}'", ep.name()))?; - dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?; + let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?; + for env in envs { + let _ = dbmodels::JobEnv::create(&self.db, &job, &env)?; + } + Ok(paths) } diff --git a/src/job/runnable.rs b/src/job/runnable.rs index 6de5622..67219fb 100644 --- a/src/job/runnable.rs +++ b/src/job/runnable.rs @@ -84,6 +84,40 @@ impl RunnableJob { self.source_cache.source_for(&self.package()) } + pub fn environment(&self) -> Vec<(String, String)> { + let iter = self.resources + .iter() + .filter_map(JobResource::env) + .map(|(k, v)| (k.clone(), v.clone())); + + if let Some(hm) = self.package().environment() { + iter.chain({ + hm.iter().map(|(k, v)| (k.clone(), v.clone())) + }).collect() + } else { + iter.collect() + } + } + + pub fn package_environment(&self) -> Vec<(String, String)> { + vec![ + ( + String::from("BUTIDO_PACKAGE_NAME"), + self.package().name().clone().to_string() + ), + + ( + String::from("BUTIDO_PACKAGE_VERSION"), + self.package().version().clone().to_string() + ), + + ( + String::from("BUTIDO_PACKAGE_VERSION_IS_SEMVER"), + String::from(if *self.package().version_is_semver() { "true" } else { "false" }) + ), + ] + } + async fn build_resource(dep: &dyn ParseDependency, merged_stores: &MergedStores) -> Result<JobResource> { let (name, vers) = dep.parse_as_name_and_version()?; trace!("Copying dep: {:?} {:?}", name, vers); diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index e7b5ba6..c45b5b3 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -44,6 +44,7 @@ pub struct OrchestratorSetup { staging_store: Arc<RwLock<StagingStore>>, release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, + additional_env: Vec<(String, String)>, jobsets: Vec<JobSet>, database: PgConnection, submit: Submit, @@ -53,7 +54,7 @@ pub struct OrchestratorSetup { impl OrchestratorSetup { pub async fn setup(self) -> Result<Orchestrator> { let db = Arc::new(self.database); - let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone(), self.log_dir).await?; + let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone(), self.log_dir, self.additional_env).await?; Ok(Orchestrator { progress_generator: self.progress_generator, |