summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-16 18:01:52 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-16 18:01:52 +0100
commiteecd7b291c779cf34508687a586b58ca39239b67 (patch)
treef3b4ed31e6adea55af569c9d79ec15799d9c9a50
parentcf000f862b53847fa213375d7a14d20a3dd2630b (diff)
parentf79e3b0dea31d48bce02deefa806b7661ba44c4b (diff)
Merge branch 'container-env'
-rw-r--r--examples/packages/example_3/b/pkg.toml4
-rw-r--r--src/cli.rs8
-rw-r--r--src/commands/build.rs33
-rw-r--r--src/commands/db.rs37
-rw-r--r--src/db/models/endpoint.rs2
-rw-r--r--src/db/models/envvar.rs3
-rw-r--r--src/db/models/image.rs2
-rw-r--r--src/db/models/job.rs16
-rw-r--r--src/db/models/job_env.rs37
-rw-r--r--src/db/models/mod.rs3
-rw-r--r--src/db/models/package.rs2
-rw-r--r--src/db/models/submit.rs9
-rw-r--r--src/endpoint/configured.rs9
-rw-r--r--src/endpoint/scheduler.rs41
-rw-r--r--src/job/runnable.rs34
-rw-r--r--src/orchestrator/orchestrator.rs3
16 files changed, 216 insertions, 27 deletions
diff --git a/examples/packages/example_3/b/pkg.toml b/examples/packages/example_3/b/pkg.toml
index f197abf..2ca66e2 100644
--- a/examples/packages/example_3/b/pkg.toml
+++ b/examples/packages/example_3/b/pkg.toml
@@ -1,6 +1,10 @@
name = "b"
version = "2"
+[environment]
+FOO = "bar"
+BAZ = "bla"
+
[phases]
package.script = '''
mkdir /outputs
diff --git a/src/cli.rs b/src/cli.rs
index 00beb5b..6cffb76 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -185,6 +185,14 @@ pub fn cli<'a>() -> App<'a> {
.help("Show the script")
)
+ .arg(Arg::with_name("show_env")
+ .required(false)
+ .multiple(false)
+ .long("env")
+ .short('E')
+ .help("Show the environment of the job")
+ )
+
.arg(Arg::with_name("script_disable_highlighting")
.required(false)
.multiple(false)
diff --git a/src/commands/build.rs b/src/commands/build.rs
index 6fc2312..122dd57 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -12,6 +12,7 @@ use clap_v3::ArgMatches;
use diesel::PgConnection;
use logcrate::debug;
use tokio::sync::RwLock;
+use tokio::stream::StreamExt;
use crate::config::*;
use crate::filestore::ReleaseStore;
@@ -36,6 +37,7 @@ pub async fn build<'a>(matches: &ArgMatches,
-> Result<()>
{
use crate::db::models::{
+ EnvVar,
Package,
GitHash,
Image,
@@ -85,6 +87,17 @@ pub async fn build<'a>(matches: &ArgMatches,
.map(PackageVersion::from);
info!("We want {} ({:?})", pname, pvers);
+ let additional_env = matches.values_of("env")
+ .unwrap_or_default()
+ .map(|s| {
+ let v = s.split("=").collect::<Vec<_>>();
+ Ok((
+ String::from(*v.get(0).ok_or_else(|| anyhow!("Environment variable has no key: {}", s))?),
+ String::from(*v.get(1).ok_or_else(|| anyhow!("Environment variable has no key: {}", s))?)
+ ))
+ })
+ .collect::<Result<Vec<(String, String)>>>()?;
+
let packages = if let Some(pvers) = pvers {
repo.find(&pname, &pvers)
} else {
@@ -157,15 +170,28 @@ pub async fn build<'a>(matches: &ArgMatches,
let db_package = async { Package::create_or_fetch(&database_connection, &package) };
let db_githash = async { GitHash::create_or_fetch(&database_connection, &hash_str) };
let db_image = async { Image::create_or_fetch(&database_connection, &image_name) };
+ let db_envs = async {
+ additional_env.clone()
+ .into_iter()
+ .map(|(k, v)| async {
+ let k: String = k; // hack to work around move semantics
+ let v: String = v; // hack to work around move semantics
+ EnvVar::create_or_fetch(&database_connection, &k, &v)
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Result<Vec<EnvVar>>>()
+ .await
+ };
trace!("Running database jobs for Package, GitHash, Image");
- let (db_package, db_githash, db_image) = tokio::join!(
+ let (db_package, db_githash, db_image, db_envs) = tokio::join!(
db_package,
db_githash,
- db_image
+ db_image,
+ db_envs
);
- let (db_package, db_githash, db_image) = (db_package?, db_githash?, db_image?);
+ let (db_package, db_githash, db_image, db_envs) = (db_package?, db_githash?, db_image?, db_envs?);
trace!("Database jobs for Package, GitHash, Image finished successfully");
trace!("Creating Submit in database");
@@ -192,6 +218,7 @@ pub async fn build<'a>(matches: &ArgMatches,
.release_store(release_dir)
.database(database_connection)
.source_cache(source_cache)
+ .additional_env(additional_env)
.submit(submit)
.log_dir(if matches.is_present("write-log-file") { Some(config.log_dir().clone()) } else { None })
.jobsets(jobsets)
diff --git a/src/commands/db.rs b/src/commands/db.rs
index b788f71..ea9bb9a 100644
--- a/src/commands/db.rs
+++ b/src/commands/db.rs
@@ -8,6 +8,7 @@ use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use clap::ArgMatches;
+use diesel::BelongingToDsl;
use diesel::ExpressionMethods;
use diesel::JoinOnDsl;
use diesel::QueryDsl;
@@ -227,13 +228,16 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
.transpose()?
.map(|submit_uuid| {
use crate::schema;
+ use diesel::BelongingToDsl;
- schema::jobs::table.inner_join({
- schema::submits::table.on(schema::submits::uuid.eq(submit_uuid))
- })
- .inner_join(schema::endpoints::table)
- .inner_join(schema::packages::table)
- .load::<(models::Job, models::Submit, models::Endpoint, models::Package)>(&conn)
+ let submit = models::Submit::with_id(&conn, &submit_uuid)?;
+
+ models::Job::belonging_to(&submit)
+ .inner_join(schema::submits::table)
+ .inner_join(schema::endpoints::table)
+ .inner_join(schema::packages::table)
+ .load::<(models::Job, models::Submit, models::Endpoint, models::Package)>(&conn)
+ .map_err(Error::from)
})
.unwrap_or_else(|| {
dsl::jobs
@@ -241,6 +245,7 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
.inner_join(crate::schema::endpoints::table)
.inner_join(crate::schema::packages::table)
.load::<(models::Job, models::Submit, models::Endpoint, models::Package)>(&conn)
+ .map_err(Error::from)
})?;
let data = jobs.into_iter()
@@ -292,6 +297,21 @@ fn job<'a>(conn_cfg: DbConnectionConfig, config: &Configuration<'a>, matches: &A
.inner_join(schema::images::table)
.first::<(models::Job, models::Submit, models::Endpoint, models::Package, models::Image)>(&conn)?;
+ let env_vars = if matches.is_present("show_env") {
+ Some({
+ models::JobEnv::belonging_to(&data.0)
+ .inner_join(schema::envvars::table)
+ .load::<(models::JobEnv, models::EnvVar)>(&conn)?
+ .into_iter()
+ .map(|tpl| tpl.1)
+ .enumerate()
+ .map(|(i, env)| format!("\t{:>3}. {}={}", i, env.name, env.value))
+ .join("\n")
+ })
+ } else {
+ None
+ };
+
let parsed_log = crate::log::ParsedLog::build_from(&data.0.log_text)?;
let success = parsed_log.is_successfull();
let s = indoc::formatdoc!(r#"
@@ -308,6 +328,10 @@ fn job<'a>(conn_cfg: DbConnectionConfig, config: &Configuration<'a>, matches: &A
---
+ {envs}
+
+ ---
+
{script_text}
---
@@ -333,6 +357,7 @@ fn job<'a>(conn_cfg: DbConnectionConfig, config: &Configuration<'a>, matches: &A
container_hash = data.0.container_hash.cyan(),
script_len = format!("{:<4}", data.0.script_text.lines().count()).cyan(),
log_len = format!("{:<4}", data.0.log_text.lines().count()).cyan(),
+ envs = env_vars.unwrap_or_else(|| String::from("<env vars hidden>")),
script_text = if show_script {
if let Some(configured_theme) = configured_theme {
if highlighting_disabled {
diff --git a/src/db/models/endpoint.rs b/src/db/models/endpoint.rs
index ab30192..d81fbfd 100644
--- a/src/db/models/endpoint.rs
+++ b/src/db/models/endpoint.rs
@@ -6,7 +6,7 @@ use diesel::prelude::*;
use crate::schema::endpoints::*;
use crate::schema::endpoints;
-#[derive(Queryable)]
+#[derive(Identifiable, Queryable)]
pub struct Endpoint {
pub id: i32,
pub name: String,
diff --git a/src/db/models/envvar.rs b/src/db/models/envvar.rs
index f883661..51ad32a 100644
--- a/src/db/models/envvar.rs
+++ b/src/db/models/envvar.rs
@@ -6,7 +6,8 @@ use diesel::prelude::*;
use crate::schema::envvars::*;
use crate::schema::envvars;
-#[derive(Queryable)]
+#[derive(Identifiable, Queryable)]
+#[table_name="envvars"]
pub struct EnvVar {
pub id: i32,
pub name: String,
diff --git a/src/db/models/image.rs b/src/db/models/image.rs
index 225bac7..5032039 100644
--- a/src/db/models/image.rs
+++ b/src/db/models/image.rs
@@ -7,7 +7,7 @@ use crate::schema::images::*;
use crate::schema::images;
use crate::util::docker::ImageName;
-#[derive(Queryable)]
+#[derive(Identifiable, Queryable)]
pub struct Image {
pub id: i32,
pub name: String,
diff --git a/src/db/models/job.rs b/src/db/models/job.rs
index 12fba7b..90d9732 100644
--- a/src/db/models/job.rs
+++ b/src/db/models/job.rs
@@ -9,7 +9,12 @@ use crate::schema::jobs::*;
use crate::schema::jobs;
use crate::util::docker::ContainerHash;
-#[derive(Queryable)]
+#[derive(Identifiable, Queryable, Associations)]
+#[belongs_to(Submit)]
+#[belongs_to(Endpoint)]
+#[belongs_to(Package)]
+#[belongs_to(Image)]
+#[table_name="jobs"]
pub struct Job {
pub id: i32,
pub submit_id: i32,
@@ -45,7 +50,7 @@ impl Job {
container: &ContainerHash,
script: &Script,
log: &str,
- ) -> Result<()> {
+ ) -> Result<Job> {
let new_job = NewJob {
uuid: job_uuid,
submit_id: submit.id,
@@ -61,9 +66,12 @@ impl Job {
diesel::insert_into(jobs::table)
.values(&new_job)
.on_conflict_do_nothing()
- .execute(database_connection)
+ .execute(database_connection)?;
+
+ dsl::jobs
+ .filter(uuid.eq(job_uuid))
+ .first::<Job>(database_connection)
.map_err(Error::from)
- .map(|_| ())
}
}
diff --git a/src/db/models/job_env.rs b/src/db/models/job_env.rs
new file mode 100644
index 0000000..3ef1ae3
--- /dev/null
+++ b/src/db/models/job_env.rs
@@ -0,0 +1,37 @@
+use anyhow::Result;
+use diesel::PgConnection;
+use diesel::prelude::*;
+
+use crate::schema::job_envs::*;
+use crate::schema::job_envs;
+use crate::db::models::Job;
+use crate::db::models::EnvVar;
+
+#[derive(Identifiable, Queryable, Associations)]
+#[belongs_to(Job)]
+#[belongs_to(EnvVar, foreign_key = "env_id")]
+#[table_name="job_envs"]
+pub struct JobEnv {
+ pub id: i32,
+ pub job_id: i32,
+ pub env_id: i32,
+}
+
+#[derive(Insertable)]
+#[table_name="job_envs"]
+struct NewJobEnv {
+ pub job_id: i32,
+ pub env_id: i32,
+}
+
+impl JobEnv {
+ pub fn create(database_connection: &PgConnection, job: &Job, env: &EnvVar) -> Result<()> {
+ let new_jobenv = NewJobEnv { job_id: job.id, env_id: env.id };
+
+ diesel::insert_into(job_envs::table)
+ .values(&new_jobenv)
+ .execute(database_connection)?;
+ Ok(())
+ }
+}
+
diff --git a/src/db/models/mod.rs b/src/db/models/mod.rs
index 26875bc..8e1eb8e 100644
--- a/src/db/models/mod.rs
+++ b/src/db/models/mod.rs
@@ -13,6 +13,9 @@ pub use image::*;
mod job;
pub use job::*;
+mod job_env;
+pub use job_env::*;
+
mod githash;
pub use githash::*;
diff --git a/src/db/models/package.rs b/src/db/models/package.rs
index 8f14607..b0b99ee 100644
--- a/src/db/models/package.rs
+++ b/src/db/models/package.rs
@@ -8,7 +8,7 @@ use diesel::prelude::*;
use crate::schema::packages::*;
use crate::schema::packages;
-#[derive(Queryable)]
+#[derive(Identifiable, Queryable)]
pub struct Package {
pub id: i32,
pub name: String,
diff --git a/src/db/models/submit.rs b/src/db/models/submit.rs
index 102d7f2..731012a 100644
--- a/src/db/models/submit.rs
+++ b/src/db/models/submit.rs
@@ -12,7 +12,7 @@ use crate::db::models::Package;
use crate::schema::submits::*;
use crate::schema::submits;
-#[derive(Clone, Debug, Queryable)]
+#[derive(Clone, Debug, Identifiable, Queryable)]
pub struct Submit {
pub id: i32,
pub uuid: ::uuid::Uuid,
@@ -43,7 +43,7 @@ impl Submit {
requested_package: &Package,
repo_hash: &GitHash)
-> Result<Submit>
-{
+ {
let tree_json = serde_json::to_value(t)
.context("Converting tree to JSON string")
.with_context(|| anyhow!("Tree = {:#?}", t))?;
@@ -63,11 +63,16 @@ impl Submit {
.execute(database_connection)
.context("Inserting new submit into submits table")?;
+ Self::with_id(database_connection, submit_id)
+ }
+
+ pub fn with_id(database_connection: &PgConnection, submit_id: &::uuid::Uuid) -> Result<Submit> {
dsl::submits
.filter(submits::uuid.eq(submit_id))
.first::<Submit>(database_connection)
.context("Loading submit")
.map_err(Error::from)
}
+
}
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 2dadb0d..9e9171b 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -176,15 +176,16 @@ impl Endpoint {
.map(|_| ())
}
- pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> {
+ pub async fn run_job(&self, job: RunnableJob, logsink: UnboundedSender<LogItem>, staging: Arc<RwLock<StagingStore>>, additional_env: Vec<(String, String)>) -> RResult<(Vec<PathBuf>, ContainerHash, Script), ContainerError> {
use crate::log::buffer_stream_to_line_stream;
use tokio::stream::StreamExt;
use futures::FutureExt;
let (container_id, _warnings) = {
- let envs = job.resources()
- .iter()
- .filter_map(JobResource::env)
+ let envs = job.environment()
+ .into_iter()
+ .chain(job.package_environment().into_iter())
+ .chain(additional_env.into_iter())
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>();
trace!("Job resources: Environment variables = {:?}", envs);
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 3b4b7c6..cbc0bd9 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -33,11 +33,12 @@ pub struct EndpointScheduler {
db: Arc<PgConnection>,
progressbars: ProgressBars,
submit: crate::db::models::Submit,
+ additional_env: Vec<(String, String)>,
}
impl EndpointScheduler {
- pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>) -> Result<Self> {
+ pub async fn setup(endpoints: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, db: Arc<PgConnection>, progressbars: ProgressBars, submit: crate::db::models::Submit, log_dir: Option<PathBuf>, additional_env: Vec<(String, String)>) -> Result<Self> {
let endpoints = Self::setup_endpoints(endpoints).await?;
Ok(EndpointScheduler {
@@ -47,6 +48,7 @@ impl EndpointScheduler {
db,
progressbars,
submit,
+ additional_env,
})
}
@@ -82,6 +84,7 @@ impl EndpointScheduler {
staging_store: self.staging_store.clone(),
db: self.db.clone(),
submit: self.submit.clone(),
+ additional_env: self.additional_env.clone(),
})
}
@@ -118,6 +121,7 @@ pub struct JobHandle {
db: Arc<PgConnection>,
staging_store: Arc<RwLock<StagingStore>>,
submit: crate::db::models::Submit,
+ additional_env: Vec<(String, String)>,
}
impl std::fmt::Debug for JobHandle {
@@ -138,10 +142,37 @@ impl JobHandle {
let package = dbmodels::Package::create_or_fetch(&self.db, self.job.package())?;
let image = dbmodels::Image::create_or_fetch(&self.db, self.job.image())?;
+ let envs = {
+ trace!("Creating environment in database");
+ trace!("Hardcoded = {:?}", self.job.package().environment());
+ trace!("Dynamic = {:?}", self.additional_env);
+ let mut hardcoded_env = if let Some(hm) = self.job.package().environment().as_ref() {
+ hm.iter()
+ .map(|(k, v)| {
+ trace!("Creating environment variable in database: {} = {}", k, v);
+ dbmodels::EnvVar::create_or_fetch(&self.db, k, v)
+ })
+ .collect::<Result<Vec<_>>>()?
+ } else {
+ Vec::new()
+ };
+
+ let mut additionals = self.additional_env
+ .iter()
+ .map(|(k, v)| {
+ trace!("Creating environment variable in database: {} = {}", k, v);
+ dbmodels::EnvVar::create_or_fetch(&self.db, k, v)
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ hardcoded_env.append(&mut additionals);
+ hardcoded_env
+ };
+
let job_id = self.job.uuid().clone();
trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
let res = ep
- .run_job(self.job, log_sender, self.staging_store);
+ .run_job(self.job, log_sender, self.staging_store, self.additional_env);
let logres = LogReceiver {
log_dir: self.log_dir.as_ref(),
@@ -157,7 +188,11 @@ impl JobHandle {
let log = logres.with_context(|| anyhow!("Collecting logs for job on '{}'", ep.name()))?;
let (paths, container_hash, script) = res.with_context(|| anyhow!("Running job on '{}'", ep.name()))?;
- dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?;
+ let job = dbmodels::Job::create(&self.db, &job_id, &self.submit, &endpoint, &package, &image, &container_hash, &script, &log)?;
+ for env in envs {
+ let _ = dbmodels::JobEnv::create(&self.db, &job, &env)?;
+ }
+
Ok(paths)
}
diff --git a/src/job/runnable.rs b/src/job/runnable.rs
index 6de5622..67219fb 100644
--- a/src/job/runnable.rs
+++ b/src/job/runnable.rs
@@ -84,6 +84,40 @@ impl RunnableJob {
self.source_cache.source_for(&self.package())
}
+ pub fn environment(&self) -> Vec<(String, String)> {
+ let iter = self.resources
+ .iter()
+ .filter_map(JobResource::env)
+ .map(|(k, v)| (k.clone(), v.clone()));
+
+ if let Some(hm) = self.package().environment() {
+ iter.chain({
+ hm.iter().map(|(k, v)| (k.clone(), v.clone()))
+ }).collect()
+ } else {
+ iter.collect()
+ }
+ }
+
+ pub fn package_environment(&self) -> Vec<(String, String)> {
+ vec![
+ (
+ String::from("BUTIDO_PACKAGE_NAME"),
+ self.package().name().clone().to_string()
+ ),
+
+ (
+ String::from("BUTIDO_PACKAGE_VERSION"),
+ self.package().version().clone().to_string()
+ ),
+
+ (
+ String::from("BUTIDO_PACKAGE_VERSION_IS_SEMVER"),
+ String::from(if *self.package().version_is_semver() { "true" } else { "false" })
+ ),
+ ]
+ }
+
async fn build_resource(dep: &dyn ParseDependency, merged_stores: &MergedStores) -> Result<JobResource> {
let (name, vers) = dep.parse_as_name_and_version()?;
trace!("Copying dep: {:?} {:?}", name, vers);
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index e7b5ba6..c45b5b3 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -44,6 +44,7 @@ pub struct OrchestratorSetup {
staging_store: Arc<RwLock<StagingStore>>,
release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
+ additional_env: Vec<(String, String)>,
jobsets: Vec<JobSet>,
database: PgConnection,
submit: Submit,
@@ -53,7 +54,7 @@ pub struct OrchestratorSetup {
impl OrchestratorSetup {
pub async fn setup(self) -> Result<Orchestrator> {
let db = Arc::new(self.database);
- let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone(), self.log_dir).await?;
+ let scheduler = EndpointScheduler::setup(self.endpoint_config, self.staging_store.clone(), db.clone(), self.progress_generator.clone(), self.submit.clone(), self.log_dir, self.additional_env).await?;
Ok(Orchestrator {
progress_generator: self.progress_generator,