summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/cli.rs86
-rw-r--r--src/commands/db.rs17
-rw-r--r--src/commands/find_artifact.rs138
-rw-r--r--src/commands/mod.rs3
-rw-r--r--src/db/find_artifacts.rs224
-rw-r--r--src/db/mod.rs3
-rw-r--r--src/db/models/artifact.rs13
-rw-r--r--src/db/models/envvar.rs2
-rw-r--r--src/db/models/job.rs11
-rw-r--r--src/filestore/merged.rs9
-rw-r--r--src/filestore/path.rs8
-rw-r--r--src/filestore/release.rs9
-rw-r--r--src/filestore/util.rs5
-rw-r--r--src/main.rs6
-rw-r--r--src/orchestrator/orchestrator.rs62
-rw-r--r--src/package/name.rs6
-rw-r--r--src/package/version.rs6
17 files changed, 603 insertions, 5 deletions
diff --git a/src/cli.rs b/src/cli.rs
index 5be3aa8..9877edd 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -106,6 +106,14 @@ pub fn cli<'a>() -> App<'a> {
.subcommand(App::new("artifacts")
.about("List artifacts from the DB")
+ .arg(Arg::new("limit")
+ .required(false)
+ .multiple(false)
+ .long("limit")
+ .takes_value(true)
+ .default_value("50")
+ .about("Limit number of output entries")
+ )
.arg(Arg::new("csv")
.required(false)
.multiple(false)
@@ -126,6 +134,14 @@ pub fn cli<'a>() -> App<'a> {
.subcommand(App::new("envvars")
.about("List envvars from the DB")
+ .arg(Arg::new("limit")
+ .required(false)
+ .multiple(false)
+ .long("limit")
+ .takes_value(true)
+ .default_value("50")
+ .about("Limit number of output entries")
+ )
.arg(Arg::new("csv")
.required(false)
.multiple(false)
@@ -137,6 +153,14 @@ pub fn cli<'a>() -> App<'a> {
.subcommand(App::new("images")
.about("List images from the DB")
+ .arg(Arg::new("limit")
+ .required(false)
+ .multiple(false)
+ .long("limit")
+ .takes_value(true)
+ .default_value("50")
+ .about("Limit number of output entries")
+ )
.arg(Arg::new("csv")
.required(false)
.multiple(false)
@@ -148,6 +172,14 @@ pub fn cli<'a>() -> App<'a> {
.subcommand(App::new("submits")
.about("List submits from the DB")
+ .arg(Arg::new("limit")
+ .required(false)
+ .multiple(false)
+ .long("limit")
+ .takes_value(true)
+ .default_value("50")
+ .about("Limit number of output entries")
+ )
.arg(Arg::new("csv")
.required(false)
.multiple(false)
@@ -177,6 +209,14 @@ pub fn cli<'a>() -> App<'a> {
.subcommand(App::new("jobs")
.about("List jobs from the DB")
+ .arg(Arg::new("limit")
+ .required(false)
+ .multiple(false)
+ .long("limit")
+ .takes_value(true)
+ .default_value("50")
+ .about("Limit number of output entries")
+ )
.arg(Arg::new("csv")
.required(false)
.multiple(false)
@@ -425,6 +465,52 @@ pub fn cli<'a>() -> App<'a> {
.about("A version constraint to search for (optional), E.G. '=1.0.0'")
)
)
+
+ .subcommand(App::new("find-artifact")
+ .about("Find artifacts for packages")
+ .arg(Arg::new("package_name_regex")
+ .required(true)
+ .multiple(false)
+ .index(1)
+ .value_name("REGEX")
+ .about("The regex to match the package name against")
+ )
+ .arg(Arg::new("package_version_constraint")
+ .required(false)
+ .multiple(false)
+ .index(2)
+ .value_name("VERSION_CONSTRAINT")
+ .about("A version constraint to search for (optional), E.G. '=1.0.0'")
+ )
+ .arg(Arg::new("no_script_filter")
+ .long("no-script-filter")
+ .short('S')
+ .required(false)
+ .multiple(false)
+ .takes_value(false)
+ .about("Don't check for script equality. Can cause unexact results.")
+ )
+ .arg(Arg::new("staging_dir")
+ .required(false)
+ .multiple(false)
+ .long("staging-dir")
+ .takes_value(true)
+ .value_name("PATH")
+ .validator(dir_exists_validator)
+ .about("Also consider this staging dir when searching for artifacts")
+ )
+ .arg(Arg::new("env_filter")
+ .required(false)
+ .multiple(true)
+ .long("env")
+ .short('E')
+ .takes_value(true)
+ .value_name("KV")
+ .validator(env_pass_validator)
+ .about("Filter for this \"key=value\" environment variable")
+ )
+ )
+
.subcommand(App::new("find-pkg")
.about("Find a package by regex")
.arg(Arg::new("package_name_regex")
diff --git a/src/commands/db.rs b/src/commands/db.rs
index eb37fa8..6c13c39 100644
--- a/src/commands/db.rs
+++ b/src/commands/db.rs
@@ -12,6 +12,7 @@ use std::fmt::Display;
use std::io::Write;
use std::path::PathBuf;
use std::process::Command;
+use std::str::FromStr;
use anyhow::anyhow;
use anyhow::Context;
@@ -138,6 +139,7 @@ fn cli(db_connection_config: DbConnectionConfig, matches: &ArgMatches) -> Result
fn artifacts(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
use crate::schema::artifacts::dsl;
+ let limit = matches.value_of("limit").map(i64::from_str).transpose()?.unwrap_or(50);
let csv = matches.is_present("csv");
let hdrs = mk_header(vec!["id", "path", "released", "job id"]);
let conn = crate::db::establish_connection(conn_cfg)?;
@@ -150,6 +152,7 @@ fn artifacts(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
.inner_join(schema::jobs::table)
.left_join(schema::releases::table)
.filter(schema::jobs::dsl::uuid.eq(job_uuid))
+ .limit(limit)
.load::<(models::Artifact, models::Job, Option<models::Release>)>(&conn)
.map_err(Error::from)
})
@@ -157,6 +160,8 @@ fn artifacts(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
dsl::artifacts
.inner_join(schema::jobs::table)
.left_join(schema::releases::table)
+ .limit(limit)
+ .order_by(schema::artifacts::id.asc())
.load::<(models::Artifact, models::Job, Option<models::Release>)>(&conn)
.map_err(Error::from)
})?
@@ -186,10 +191,12 @@ fn artifacts(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
fn envvars(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
use crate::schema::envvars::dsl;
+ let limit = matches.value_of("limit").map(i64::from_str).transpose()?.unwrap_or(50);
let csv = matches.is_present("csv");
let hdrs = mk_header(vec!["id", "name", "value"]);
let conn = crate::db::establish_connection(conn_cfg)?;
let data = dsl::envvars
+ .limit(limit)
.load::<models::EnvVar>(&conn)?
.into_iter()
.map(|evar| vec![format!("{}", evar.id), evar.name, evar.value])
@@ -207,10 +214,12 @@ fn envvars(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
fn images(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
use crate::schema::images::dsl;
+ let limit = matches.value_of("limit").map(i64::from_str).transpose()?.unwrap_or(50);
let csv = matches.is_present("csv");
let hdrs = mk_header(vec!["id", "name"]);
let conn = crate::db::establish_connection(conn_cfg)?;
let data = dsl::images
+ .limit(limit)
.load::<models::Image>(&conn)?
.into_iter()
.map(|image| vec![format!("{}", image.id), image.name])
@@ -226,6 +235,7 @@ fn images(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
}
fn submits(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
+ let limit = matches.value_of("limit").map(i64::from_str).transpose()?.unwrap_or(50);
let csv = matches.is_present("csv");
let hdrs = mk_header(vec!["id", "time", "uuid"]);
let conn = crate::db::establish_connection(conn_cfg)?;
@@ -272,6 +282,7 @@ fn submits(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
} else {
// default: Get all submits
schema::submits::table
+ .limit(limit)
.load::<models::Submit>(&conn)?
.into_iter()
.map(submit_to_vec)
@@ -290,6 +301,7 @@ fn submits(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
use crate::schema::jobs::dsl;
+ let limit = matches.value_of("limit").map(i64::from_str).transpose()?.unwrap_or(50);
let csv = matches.is_present("csv");
let hdrs = mk_header(vec![
"id",
@@ -327,6 +339,7 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
.eq(env_name.as_ref())
.and(schema::envvars::dsl::value.eq(env_value))
})
+ .limit(limit)
.load::<(
models::Job,
models::Submit,
@@ -336,7 +349,8 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
)>(&conn)
.map_err(Error::from)
} else {
- sel.load::<(
+ sel.limit(limit)
+ .load::<(
models::Job,
models::Submit,
models::Endpoint,
@@ -352,6 +366,7 @@ fn jobs(conn_cfg: DbConnectionConfig, matches: &ArgMatches) -> Result<()> {
.inner_join(crate::schema::endpoints::table)
.inner_join(crate::schema::packages::table)
.left_outer_join(schema::job_envs::table.inner_join(schema::envvars::table))
+ .limit(limit)
.load::<(
models::Job,
models::Submit,
diff --git a/src/commands/find_artifact.rs b/src/commands/find_artifact.rs
new file mode 100644
index 0000000..bf48130
--- /dev/null
+++ b/src/commands/find_artifact.rs
@@ -0,0 +1,138 @@
+//
+// Copyright (c) 2020-2021 science+computing ag and other contributors
+//
+// This program and the accompanying materials are made
+// available under the terms of the Eclipse Public License 2.0
+// which is available at https://www.eclipse.org/legal/epl-2.0/
+//
+// SPDX-License-Identifier: EPL-2.0
+//
+
+use std::path::PathBuf;
+use std::io::Write;
+use std::sync::Arc;
+
+use anyhow::Context;
+use anyhow::Error;
+use anyhow::Result;
+use clap::ArgMatches;
+use diesel::PgConnection;
+use itertools::Itertools;
+use log::debug;
+use log::trace;
+
+use crate::config::Configuration;
+use crate::filestore::ReleaseStore;
+use crate::filestore::StagingStore;
+use crate::filestore::path::StoreRoot;
+use crate::package::PackageVersionConstraint;
+use crate::repository::Repository;
+use crate::util::progress::ProgressBars;
+
+/// Implementation of the "find_artifact" subcommand
+pub async fn find_artifact(matches: &ArgMatches, config: &Configuration, progressbars: ProgressBars, repo: Repository, database_connection: PgConnection, max_packages: u64) -> Result<()> {
+ let package_name_regex = crate::commands::util::mk_package_name_regex({
+ matches.value_of("package_name_regex").unwrap() // safe by clap
+ })?;
+
+ let package_version_constraint = matches
+ .value_of("package_version_constraint")
+ .map(String::from)
+ .map(PackageVersionConstraint::new)
+ .transpose()
+ .context("Parsing package version constraint")
+ .context("A valid package version constraint looks like this: '=1.0.0'")?;
+
+ let env_filter = matches.values_of("env_filter")
+ .map(|vals| vals.map(crate::util::env::parse_to_env).collect::<Result<Vec<_>>>())
+ .transpose()?
+ .unwrap_or_default();
+
+ log::debug!("Finding artifacts for '{:?}' '{:?}'", package_name_regex, package_version_constraint);
+
+ let release_store = {
+ let bar_release_loading = progressbars.bar();
+ bar_release_loading.set_length(max_packages);
+
+ let p = config.releases_directory();
+ debug!("Loading release directory: {}", p.display());
+ let r = ReleaseStore::load(StoreRoot::new(p.clone())?, bar_release_loading.clone());
+ if r.is_ok() {
+ bar_release_loading.finish_with_message("Loaded releases successfully");
+ } else {
+ bar_release_loading.finish_with_message("Failed to load releases");
+ }
+ r?
+ };
+ let staging_store = if let Some(p) = matches.value_of("staging_dir").map(PathBuf::from) {
+ let bar_staging_loading = progressbars.bar();
+ bar_staging_loading.set_length(max_packages);
+
+ if !p.is_dir() {
+ let _ = tokio::fs::create_dir_all(&p).await?;
+ }
+
+ debug!("Loading staging directory: {}", p.display());
+ let r = StagingStore::load(StoreRoot::new(p.clone())?, bar_staging_loading.clone());
+ if r.is_ok() {
+ bar_staging_loading.finish_with_message("Loaded staging successfully");
+ } else {
+ bar_staging_loading.finish_with_message("Failed to load staging");
+ }
+ Some(r?)
+ } else {
+ None
+ };
+
+ let database = Arc::new(database_connection);
+ repo.packages()
+ .filter(|p| package_name_regex.captures(p.name()).is_some())
+ .filter(|p| {
+ package_version_constraint
+ .as_ref()
+ .map(|v| v.matches(p.version()))
+ .unwrap_or(true)
+ })
+ .inspect(|pkg| trace!("Found package: {:?}", pkg))
+ .map(|pkg| {
+ let script_filter = !matches.is_present("no_script_filter");
+ let pathes = crate::db::find_artifacts(database.clone(), config, &pkg, &release_store, staging_store.as_ref(), &env_filter, script_filter)?;
+
+ pathes.iter()
+ .map(|tpl| (tpl.0.joined(), tpl.1))
+ .sorted_by(|tpla, tplb| {
+ use std::cmp::Ordering;
+
+ // Sort the iterator elements, so that if there is a release date, we always
+ // prefer the entry with the release date AS LONG AS the path is equal.
+ match (tpla, tplb) {
+ ((a, Some(ta)), (b, Some(tb))) => match a.cmp(b) {
+ Ordering::Equal => ta.cmp(tb),
+ other => other,
+ },
+
+ ((a, Some(_)), (b, None)) => match a.cmp(b) {
+ Ordering::Equal => Ordering::Greater,
+ other => other,
+ },
+ ((a, None), (b, Some(_))) => match a.cmp(b) {
+ Ordering::Equal => Ordering::Less,
+ other => other,
+ },
+ ((a, None), (b, None)) => a.cmp(b),
+ }
+ })
+ .unique_by(|tpl| tpl.0.clone()) // TODO: Dont clone()
+ .try_for_each(|(path, releasetime)| {
+ if let Some(time) = releasetime {
+ writeln!(std::io::stdout(), "[{}] {}", time, path.display())
+ } else {
+ writeln!(std::io::stdout(), "[unknown] {}", path.display())
+ }.map_err(Error::from)
+ })
+ })
+ .inspect(|r| trace!("Query resulted in: {:?}", r))
+ .collect::<Vec<Result<()>>>()
+ .into_iter()
+ .collect()
+}
diff --git a/src/commands/mod.rs b/src/commands/mod.rs
index 9696573..473596a 100644
--- a/src/commands/mod.rs
+++ b/src/commands/mod.rs
@@ -17,6 +17,9 @@ pub use db::db;
mod env_of;
pub use env_of::env_of;
+mod find_artifact;
+pub use find_artifact::find_artifact;
+
mod find_pkg;
pub use find_pkg::find_pkg;
diff --git a/src/db/find_artifacts.rs b/src/db/find_artifacts.rs
new file mode 100644
index 0000000..47184b9
--- /dev/null
+++ b/src/db/find_artifacts.rs
@@ -0,0 +1,224 @@
+//
+// Copyright (c) 2020-2021 science+computing ag and other contributors
+//
+// This program and the accompanying materials are made
+// available under the terms of the Eclipse Public License 2.0
+// which is available at https://www.eclipse.org/legal/epl-2.0/
+//
+// SPDX-License-Identifier: EPL-2.0
+//
+
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use anyhow::anyhow;
+use anyhow::Error;
+use anyhow::Result;
+use chrono::NaiveDateTime;
+use diesel::BoolExpressionMethods;
+use diesel::ExpressionMethods;
+use diesel::JoinOnDsl;
+use diesel::PgConnection;
+use diesel::QueryDsl;
+use diesel::RunQueryDsl;
+use log::trace;
+use resiter::AndThen;
+use resiter::Map;
+
+use crate::config::Configuration;
+use crate::db::models as dbmodels;
+use crate::filestore::path::ArtifactPath;
+use crate::filestore::path::FullArtifactPath;
+use crate::filestore::ReleaseStore;
+use crate::filestore::StagingStore;
+use crate::package::Package;
+use crate::package::ParseDependency;
+use crate::package::ScriptBuilder;
+use crate::package::Shebang;
+use crate::schema;
+use crate::util::EnvironmentVariableName;
+
+/// Find an artifact by a job description
+///
+/// This function finds artifacts for a job description and environment that is equal to the passed
+/// one.
+/// The package is not the only parameter that influences a build, so this function gets all the
+/// things: The Package, the Release store, the Staging store (optionally), additional environment
+/// variables,...
+/// to find artifacts for a job that looks the very same.
+///
+/// If the artifact was released, the return value contains a Some(NaiveDateTime), marking the date
+/// of the release.
+/// Releases are returned prefferably, if multiple equal pathes for an artifact are found.
+pub fn find_artifacts<'a>(
+ database_connection: Arc<PgConnection>,
+ config: &Configuration,
+ pkg: &Package,
+ release_store: &'a ReleaseStore,
+ staging_store: Option<&'a StagingStore>,
+ additional_env: &[(EnvironmentVariableName, String)],
+ script_filter: bool,
+) -> Result<Vec<(FullArtifactPath<'a>, Option<NaiveDateTime>)>> {
+ let shebang = Shebang::from(config.shebang().clone());
+ let script = if script_filter {
+ let script = ScriptBuilder::new(&shebang).build(
+ pkg,
+ config.available_phases(),
+ *config.strict_script_interpolation(),
+ )?;
+ Some(script)
+ } else {
+ None
+ };
+
+ let package_environment = pkg.environment();
+ let build_dependencies_names = pkg
+ .dependencies()
+ .build()
+ .iter()
+ .map(|d| d.parse_as_name_and_version())
+ .map_ok(|tpl| tpl.0) // TODO: We only filter by dependency NAME right now, not by version constraint
+ .collect::<Result<Vec<_>>>()?;
+
+ let runtime_dependencies_names = pkg
+ .dependencies()
+ .runtime()
+ .iter()
+ .map(|d| d.parse_as_name_and_version())
+ .map_ok(|tpl| tpl.0) // TODO: We only filter by dependency NAME right now, not by version constraint
+ .collect::<Result<Vec<_>>>()?;
+
+ trace!("Build dependency names: {:?}", build_dependencies_names);
+ trace!("Runtime dependency names: {:?}", runtime_dependencies_names);
+ let mut query = schema::packages::table
+ .filter({
+ // The package with pkg.name() and pkg.version()
+ let package_name_filter = schema::packages::name.eq(pkg.name().as_ref() as &str);
+ let package_version_filter =
+ schema::packages::version.eq(pkg.version().as_ref() as &str);
+
+ package_name_filter.and(package_version_filter)
+ })
+
+ // TODO: Only select from submits where the submit contained jobs that are in the
+ // dependencies of `pkg`.
+ .inner_join(schema::jobs::table.inner_join(schema::submits::table))
+ .inner_join(schema::artifacts::table.on(schema::jobs::id.eq(schema::artifacts::job_id)))
+
+ // TODO: We do not yet have a method to "left join" properly, because diesel only has
+ // left_outer_join (left_join is an alias)
+ // So do not include release dates here, for now
+ //.left_outer_join(schema::releases::table.on(schema::releases::artifact_id.eq(schema::artifacts::id)))
+ .inner_join(schema::images::table.on(schema::submits::requested_image_id.eq(schema::images::id)))
+ .into_boxed();
+
+ if let Some(allowed_images) = pkg.allowed_images() {
+ trace!("Filtering with allowed_images = {:?}", allowed_images);
+ let imgs = allowed_images
+ .iter()
+ .map(AsRef::<str>::as_ref)
+ .collect::<Vec<_>>();
+ query = query.filter(schema::images::name.eq_any(imgs));
+ }
+
+ if let Some(denied_images) = pkg.denied_images() {
+ trace!("Filtering with denied_images = {:?}", denied_images);
+ let imgs = denied_images
+ .iter()
+ .map(AsRef::<str>::as_ref)
+ .collect::<Vec<_>>();
+ query = query.filter(schema::images::name.ne_all(imgs));
+ }
+
+ if let Some(script_text) = script.as_ref() {
+ query = query.filter(schema::jobs::script_text.eq(script_text.as_ref()));
+ }
+
+ trace!("Query = {}", diesel::debug_query(&query));
+
+ query
+ .select({
+ let arts = schema::artifacts::all_columns;
+ let jobs = schema::jobs::all_columns;
+ //let rels = schema::releases::release_date.nullable();
+
+ (arts, jobs)
+ })
+ .load::<(dbmodels::Artifact, dbmodels::Job)>(
+ &*database_connection,
+ )
+ .map_err(Error::from)
+ .and_then(|results: Vec<_>| {
+ results
+ .into_iter()
+ .inspect(|(art, job)| log::debug!("Filtering further: {:?}, job {:?}", art, job.id))
+ //
+ // Filter by environment variables
+ // All environment variables of the package must be present in the loaded
+ // package, so that we can be sure that the loaded package was built with
+ // the same ENV.
+ //
+ // TODO:
+ // Doing this in the database query would be way nicer, but I was not able
+ // to implement it.
+ //
+ .map(|tpl| -> Result<(_, _)> {
+ // This is a Iterator::filter() but because our condition here might fail, we
+ // map() and do the actual filtering later.
+
+ let job = tpl.1;
+ let job_env: Vec<(String, String)> = job
+ .env(&*database_connection)?
+ .into_iter()
+ .map(|var: dbmodels::EnvVar| (var.name, var.value))
+ .collect();
+
+ trace!("The job we found had env: {:?}", job_env);
+ if let Some(pkg_env) = package_environment.as_ref() {
+ let filter_result = job_env.iter()
+ .all(|(k, v)| {
+ pkg_env
+ .iter()
+ .chain(additional_env.iter().map(|tpl| (&tpl.0, &tpl.1)))
+ .any(|(key, value)| k == key.as_ref() && v == value)
+ });
+
+ Ok((tpl.0, filter_result))
+ } else {
+ Ok((tpl.0, true))
+ }
+ })
+ .filter(|r| match r { // the actual filtering from above
+ Err(_) => true,
+ Ok((_, bl)) => *bl,
+ })
+ .and_then_ok(|(art, _)| {
+ if let Some(release) = art.get_release(&*database_connection)? {
+ Ok((art, Some(release.release_date)))
+ } else {
+ Ok((art, None))
+ }
+ })
+ .and_then_ok(|(p, ndt)| ArtifactPath::new(PathBuf::from(p.path)).map(|a| (a, ndt)))
+ .and_then_ok(|(artpath, ndt)| {
+ if let Some(staging) = staging_store.as_ref() {
+ trace!(
+ "Searching in staging: {:?} for {:?}",
+ staging.root_path(),
+ artpath
+ );
+ if let Some(art) = staging.get(&artpath) {
+ trace!("Found in staging: {:?}", art);
+ return staging.root_path().join(art).map(|p| (p, ndt));
+ }
+ }
+
+ let art = release_store
+ .get(&artpath)
+ .ok_or_else(|| anyhow!("Failed to find artifact for: {:?}", artpath))?;
+ trace!("Found in release: {:?}", art);
+ release_store.root_path().join(art).map(|p| (p, ndt))
+ })
+ .collect::<Result<Vec<(FullArtifactPath<'a>, Option<NaiveDateTime>)>>>()
+ })
+}
diff --git a/src/db/mod.rs b/src/db/mod.rs
index 0917929..8f29c0e 100644
--- a/src/db/mod.rs
+++ b/src/db/mod.rs
@@ -11,4 +11,7 @@
mod connection;
pub use connection::*;
+mod find_artifacts;
+pub use find_artifacts::find_artifacts;
+
pub mod models;
diff --git a/src/db/models/artifact.rs b/src/db/models/artifact.rs
index 1050b30..11e2cec 100644
--- a/src/db/models/artifact.rs
+++ b/src/db/models/artifact.rs
@@ -20,6 +20,7 @@ use diesel::prelude::*;
use diesel::PgConnection;
use crate::db::models::Job;
+use crate::db::models::Release;
use crate::schema::artifacts;
use crate::schema::artifacts::*;
@@ -51,6 +52,18 @@ impl Artifact {
crate::db::models::Release::create(database_connection, &self, release_date)
}
+ pub fn get_release(&self, database_connection: &PgConnection) -> Result<Option<Release>> {
+ use crate::schema;
+
+ schema::artifacts::table
+ .inner_join(schema::releases::table)
+ .filter(schema::releases::artifact_id.eq(self.id))
+ .select(schema::releases::all_columns)
+ .first::<Release>(database_connection)
+ .optional()
+ .map_err(Error::from)
+ }
+
pub fn create(
database_connection: &PgConnection,
art_path: &ArtifactPath,
diff --git a/src/db/models/envvar.rs b/src/db/models/envvar.rs
index f334969..b5fcc4f 100644
--- a/src/db/models/envvar.rs
+++ b/src/db/models/envvar.rs
@@ -17,7 +17,7 @@ use crate::schema::envvars;
use crate::schema::envvars::*;
use crate::util::EnvironmentVariableName;
-#[derive(Identifiable, Queryable)]
+#[derive(Debug, Identifiable, Queryable)]
#[table_name = "envvars"]
pub struct EnvVar {
pub id: i32,
diff --git a/src/db/models/job.rs b/src/db/models/job.rs
index aa7b7f9..12af2a9 100644
--- a/src/db/models/job.rs
+++ b/src/db/models/job.rs
@@ -86,4 +86,15 @@ impl Job {
.first::<Job>(database_connection)
.map_err(Error::from)
}
+
+ pub fn env(&self, database_connection: &PgConnection) -> Result<Vec<crate::db::models::EnvVar>> {
+ use crate::schema;
+
+ schema::job_envs::table
+ .inner_join(schema::envvars::table)
+ .filter(schema::job_envs::job_id.eq(self.id))
+ .select(schema::envvars::all_columns)
+ .load::<crate::db::models::EnvVar>(database_connection)
+ .map_err(Error::from)
+ }
}
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
index 4c7a38b..7363eb2 100644
--- a/src/filestore/merged.rs
+++ b/src/filestore/merged.rs
@@ -33,6 +33,7 @@ use crate::filestore::StagingStore;
///
#[derive(Getters)]
pub struct MergedStores {
+ #[getset(get = "pub")]
release: Arc<RwLock<ReleaseStore>>,
#[getset(get = "pub")]
@@ -79,4 +80,12 @@ impl MergedStores {
Ok(None)
}
+
+ pub async fn get(&self, p: &ArtifactPath) -> Option<ArtifactPath> {
+ if let Some(a) = self.staging.read().await.get(p).cloned() {
+ return Some(a)
+ }
+
+ self.release.read().await.get(p).cloned()
+ }
}
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index a03ab83..eeb946d 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -98,7 +98,7 @@ impl StoreRoot {
pub struct ArtifactPath(PathBuf);
impl ArtifactPath {
- pub(in crate::filestore) fn new(p: PathBuf) -> Result<Self> {
+ pub fn new(p: PathBuf) -> Result<Self> {
if p.is_relative() {
Ok(ArtifactPath(p))
} else {
@@ -139,7 +139,11 @@ impl AsRef<Path> for ArtifactPath {
pub struct FullArtifactPath<'a>(&'a StoreRoot, &'a ArtifactPath);
impl<'a> FullArtifactPath<'a> {
- fn joined(&self) -> PathBuf {
+ pub fn artifact_path(&self) -> &ArtifactPath {
+ self.1
+ }
+
+ pub fn joined(&self) -> PathBuf {
self.0 .0.join(&self.1 .0)
}
diff --git a/src/filestore/release.rs b/src/filestore/release.rs
index d7a7232..f19f69a 100644
--- a/src/filestore/release.rs
+++ b/src/filestore/release.rs
@@ -13,6 +13,7 @@ use std::fmt::Debug;
use anyhow::Result;
use indicatif::ProgressBar;
+use crate::filestore::path::ArtifactPath;
use crate::filestore::path::StoreRoot;
use crate::filestore::util::FileStoreImpl;
@@ -29,4 +30,12 @@ impl ReleaseStore {
pub fn load(root: StoreRoot, progress: ProgressBar) -> Result<Self> {
FileStoreImpl::load(root, progress).map(ReleaseStore)
}
+
+ pub fn root_path(&self) -> &StoreRoot {
+ self.0.root_path()
+ }
+
+ pub fn get(&self, p: &ArtifactPath) -> Option<&ArtifactPath> {
+ self.0.get(p)
+ }
}
diff --git a/src/filestore/util.rs b/src/filestore/util.rs
index 0d98404..53f5919 100644
--- a/src/filestore/util.rs
+++ b/src/filestore/util.rs
@@ -35,7 +35,10 @@ impl FileStoreImpl {
pub fn load(root: StoreRoot, progress: ProgressBar) -> Result<Self> {
let store = root
.find_artifacts_recursive()
- .inspect(|_| progress.tick())
+ .inspect(|path| {
+ log::trace!("Found artifact path: {:?}", path);
+ progress.tick();
+ })
.collect::<Result<HashSet<ArtifactPath>>>()?;
Ok(FileStoreImpl { root, store })
diff --git a/src/main.rs b/src/main.rs
index a75a9d3..606a951 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -161,6 +161,12 @@ async fn main() -> Result<()> {
crate::commands::env_of(matches, repo).await?
}
+ Some(("find-artifact", matches)) => {
+ let repo = load_repo()?;
+ let conn = crate::db::establish_connection(db_connection_config)?;
+ crate::commands::find_artifact(matches, &config, progressbars, repo, conn, max_packages).await?
+ }
+
Some(("find-pkg", matches)) => {
let repo = load_repo()?;
crate::commands::find_pkg(matches, &config, repo).await?
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 70699ad..2860748 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -501,6 +501,68 @@ impl<'a> JobTask<'a> {
}
}
+ // check if a job that looks very similar to this job has already produced artifacts.
+ // If it has, simply return those (plus the received ones)
+ {
+ let release_store = self.merged_stores.release().read().await;
+ let staging_store = self.merged_stores.staging().read().await;
+ let additional_env = vec![];
+
+ let replacement_artifacts = crate::db::find_artifacts(
+ self.database.clone(),
+ self.config,
+ self.jobdef.job.package(),
+ &release_store,
+
+ // We can simply pass the staging store here, because it doesn't hurt. There are
+ // two scenarios: