summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-07-27 16:36:35 +0200
committerMatthias Beyer <matthias.beyer@atos.net>2021-07-27 16:36:35 +0200
commit61a6427fe0fdaf894f713cac9cf23418e9708846 (patch)
tree34baead6e36d8262ec59ae946dd5e7850eda87c3
parent9e12e114ef8bc1120f87431b85331df50cd31181 (diff)
parent507aeeb8899a714fe794814615b61eeef91c7ed2 (diff)
Merge branch 'fix-dont-reuse-with-new-dependency-bug'
-rw-r--r--src/db/find_artifacts.rs20
-rw-r--r--src/orchestrator/orchestrator.rs70
2 files changed, 64 insertions, 26 deletions
diff --git a/src/db/find_artifacts.rs b/src/db/find_artifacts.rs
index 4f5401f..8434530 100644
--- a/src/db/find_artifacts.rs
+++ b/src/db/find_artifacts.rs
@@ -24,7 +24,6 @@ use diesel::RunQueryDsl;
use log::trace;
use resiter::AndThen;
use resiter::FilterMap;
-use resiter::Map;
use crate::config::Configuration;
use crate::db::models as dbmodels;
@@ -33,7 +32,6 @@ use crate::filestore::path::FullArtifactPath;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::package::Package;
-use crate::package::ParseDependency;
use crate::package::ScriptBuilder;
use crate::package::Shebang;
use crate::schema;
@@ -73,24 +71,6 @@ pub fn find_artifacts<'a>(
};
let package_environment = pkg.environment();
- let build_dependencies_names = pkg
- .dependencies()
- .build()
- .iter()
- .map(|d| d.parse_as_name_and_version())
- .map_ok(|tpl| tpl.0) // TODO: We only filter by dependency NAME right now, not by version constraint
- .collect::<Result<Vec<_>>>()?;
-
- let runtime_dependencies_names = pkg
- .dependencies()
- .runtime()
- .iter()
- .map(|d| d.parse_as_name_and_version())
- .map_ok(|tpl| tpl.0) // TODO: We only filter by dependency NAME right now, not by version constraint
- .collect::<Result<Vec<_>>>()?;
-
- trace!("Build dependency names: {:?}", build_dependencies_names);
- trace!("Runtime dependency names: {:?}", runtime_dependencies_names);
let mut query = schema::packages::table
.filter({
// The package with pkg.name() and pkg.version()
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 836b0f0..7e2799d 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -214,7 +214,47 @@ impl<'a> OrchestratorSetup<'a> {
/// It is either a list of artifacts with the UUID of the job they were produced by,
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<HashMap<Uuid, Vec<ArtifactPath>>, HashMap<Uuid, Error>>;
+///
+/// The artifacts are encapsulated into a `ProducedArtifact`, see the documentation of the type for
+/// why.
+type JobResult = std::result::Result<HashMap<Uuid, Vec<ProducedArtifact>>, HashMap<Uuid, Error>>;
+
+/// A type that represents whether an artifact was built or reused from an old job
+///
+/// This is necessary to decide in dependent jobs whether a package needs to be rebuild even though
+/// the script and environment did not change.
+///
+/// E.G.: If a libA depends on libB, if libB changed and needs to be rebuilt, we need to rebuilt
+/// all packages that depend (directly or indirectly) on that library.
+#[derive(Clone, Debug)]
+enum ProducedArtifact {
+ Built(ArtifactPath),
+ Reused(ArtifactPath),
+}
+
+impl ProducedArtifact {
+ /// Get whether the ProducedArtifact was built or reused from another job
+ fn was_build(&self) -> bool {
+ std::matches!(self, ProducedArtifact::Built(_))
+ }
+
+ /// Unpack the ProducedArtifact object into the ArtifactPath object it contains
+ fn unpack(self) -> ArtifactPath {
+ match self {
+ ProducedArtifact::Built(a) => a,
+ ProducedArtifact::Reused(a) => a,
+ }
+ }
+}
+
+impl Borrow<ArtifactPath> for ProducedArtifact {
+ fn borrow(&self) -> &ArtifactPath {
+ match self {
+ ProducedArtifact::Built(a) => a,
+ ProducedArtifact::Reused(a) => a,
+ }
+ }
+}
impl<'a> Orchestrator<'a> {
pub async fn run(self, output: &mut Vec<ArtifactPath>) -> Result<HashMap<Uuid, Error>> {
@@ -387,7 +427,11 @@ impl<'a> Orchestrator<'a> {
match root_receiver.recv().await {
None => Err(anyhow!("No result received...")),
Some(Ok(results)) => {
- let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect();
+ let results = results.into_iter()
+ .map(|tpl| tpl.1.into_iter())
+ .flatten()
+ .map(ProducedArtifact::unpack)
+ .collect();
Ok((results, HashMap::with_capacity(0)))
},
Some(Err(errors)) => Ok((vec![], errors)),
@@ -517,7 +561,7 @@ impl<'a> JobTask<'a> {
// A list of job run results from dependencies that were received from the tasks for the
// dependencies
- let mut received_dependencies: HashMap<Uuid, Vec<ArtifactPath>> = HashMap::new();
+ let mut received_dependencies: HashMap<Uuid, Vec<ProducedArtifact>> = HashMap::new();
// A list of errors that were received from the tasks for the dependencies
let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len());
@@ -569,9 +613,17 @@ impl<'a> JobTask<'a> {
}
}
+ // Check if any of the received dependencies was built (and not reused).
+ // If any dependency was built, we need to build as well.
+ let any_dependency_was_built = received_dependencies.values()
+ .map(|v| v.iter())
+ .flatten()
+ .any(ProducedArtifact::was_build);
+
+ // If no dependency was built, we can check for replacements for this job as well, so
// check if a job that looks very similar to this job has already produced artifacts.
// If it has, simply return those (plus the received ones)
- {
+ if !any_dependency_was_built {
let staging_store = self.staging_store.read().await;
// Use the environment of the job definition, as it appears in the job DAG.
@@ -643,7 +695,8 @@ impl<'a> JobTask<'a> {
.cloned()
}
})
- .collect::<Vec<ArtifactPath>>();
+ .map(ProducedArtifact::Reused)
+ .collect::<Vec<ProducedArtifact>>();
if !artifacts.is_empty() {
received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);
@@ -675,6 +728,7 @@ impl<'a> JobTask<'a> {
.values()
.map(|v| v.iter())
.flatten()
+ .map(ProducedArtifact::borrow)
.cloned()
.collect::<Vec<ArtifactPath>>();
trace!("[{}]: Dependency artifacts = {:?}", self.jobdef.job.uuid(), dependency_artifacts);
@@ -724,6 +778,10 @@ impl<'a> JobTask<'a> {
// it returns the database artifact objects it created!
Ok(artifacts) => {
trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts);
+
+ // mark the produced artifacts as "built" (rather than reused)
+ let artifacts = artifacts.into_iter().map(ProducedArtifact::Built).collect();
+
received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);
for s in self.sender.iter() {
s.send(Ok(received_dependencies.clone())).await?;
@@ -743,7 +801,7 @@ impl<'a> JobTask<'a> {
/// Return Ok(true) if we should continue operation
/// Return Ok(false) if the channel is empty and we're done receiving or if the channel is
/// empty and there were errors collected
- async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ArtifactPath>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
+ async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ProducedArtifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
match self.receiver.recv().await {
Some(Ok(mut v)) => {
// The task we depend on succeeded and returned an