summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-06-30 19:32:18 +0200
committerMatthias Beyer <matthias.beyer@atos.net>2021-06-30 19:35:36 +0200
commit507aeeb8899a714fe794814615b61eeef91c7ed2 (patch)
tree66c310a5f44c098a2ff09541016238e238b1c333
parentdefb2626ef69d270cc38f044ba5fb1305e1931a8 (diff)
Fix: Propagate information whether an Artifact was built or reused
This patch fixes a bug where butido did not rebuild a package if one of its dependencies was rebuild. Of course, if in a dependency chain with libA -> libB, if libB gets rebuild, we need to rebuild libA as well. For this, a new (Wrapper-)type `ProducedArtifact` was added (private to the orchestrator module) that holds that information. This information is only necessary between the individual build workers. If we know from all dependencies that the artifacts were reused, we can check for a similar job in the database and reuse artifacts from that job as well. If one dependency was built, we need to rebuild the current package as well. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r--src/orchestrator/orchestrator.rs70
1 files changed, 64 insertions, 6 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index dd1322d..4a6cf7d 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -214,7 +214,47 @@ impl<'a> OrchestratorSetup<'a> {
/// It is either a list of artifacts with the UUID of the job they were produced by,
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<HashMap<Uuid, Vec<ArtifactPath>>, HashMap<Uuid, Error>>;
+///
+/// The artifacts are encapsulated into a `ProducedArtifact`, see the documentation of the type for
+/// why.
+type JobResult = std::result::Result<HashMap<Uuid, Vec<ProducedArtifact>>, HashMap<Uuid, Error>>;
+
+/// A type that represents whether an artifact was built or reused from an old job
+///
+/// This is necessary to decide in dependent jobs whether a package needs to be rebuild even though
+/// the script and environment did not change.
+///
+/// E.G.: If a libA depends on libB, if libB changed and needs to be rebuilt, we need to rebuilt
+/// all packages that depend (directly or indirectly) on that library.
+#[derive(Clone, Debug)]
+enum ProducedArtifact {
+ Built(ArtifactPath),
+ Reused(ArtifactPath),
+}
+
+impl ProducedArtifact {
+ /// Get whether the ProducedArtifact was built or reused from another job
+ fn was_build(&self) -> bool {
+ std::matches!(self, ProducedArtifact::Built(_))
+ }
+
+ /// Unpack the ProducedArtifact object into the ArtifactPath object it contains
+ fn unpack(self) -> ArtifactPath {
+ match self {
+ ProducedArtifact::Built(a) => a,
+ ProducedArtifact::Reused(a) => a,
+ }
+ }
+}
+
+impl Borrow<ArtifactPath> for ProducedArtifact {
+ fn borrow(&self) -> &ArtifactPath {
+ match self {
+ ProducedArtifact::Built(a) => a,
+ ProducedArtifact::Reused(a) => a,
+ }
+ }
+}
impl<'a> Orchestrator<'a> {
pub async fn run(self, output: &mut Vec<ArtifactPath>) -> Result<HashMap<Uuid, Error>> {
@@ -385,7 +425,11 @@ impl<'a> Orchestrator<'a> {
match root_receiver.recv().await {
None => Err(anyhow!("No result received...")),
Some(Ok(results)) => {
- let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect();
+ let results = results.into_iter()
+ .map(|tpl| tpl.1.into_iter())
+ .flatten()
+ .map(ProducedArtifact::unpack)
+ .collect();
Ok((results, HashMap::with_capacity(0)))
},
Some(Err(errors)) => Ok((vec![], errors)),
@@ -506,7 +550,7 @@ impl<'a> JobTask<'a> {
// A list of job run results from dependencies that were received from the tasks for the
// dependencies
- let mut received_dependencies: HashMap<Uuid, Vec<ArtifactPath>> = HashMap::new();
+ let mut received_dependencies: HashMap<Uuid, Vec<ProducedArtifact>> = HashMap::new();
// A list of errors that were received from the tasks for the dependencies
let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len());
@@ -557,9 +601,17 @@ impl<'a> JobTask<'a> {
}
}
+ // Check if any of the received dependencies was built (and not reused).
+ // If any dependency was built, we need to build as well.
+ let any_dependency_was_built = received_dependencies.values()
+ .map(|v| v.iter())
+ .flatten()
+ .any(ProducedArtifact::was_build);
+
+ // If no dependency was built, we can check for replacements for this job as well, so
// check if a job that looks very similar to this job has already produced artifacts.
// If it has, simply return those (plus the received ones)
- {
+ if !any_dependency_was_built {
let staging_store = self.staging_store.read().await;
// Use the environment of the job definition, as it appears in the job DAG.
@@ -631,7 +683,8 @@ impl<'a> JobTask<'a> {
.cloned()
}
})
- .collect::<Vec<ArtifactPath>>();
+ .map(ProducedArtifact::Reused)
+ .collect::<Vec<ProducedArtifact>>();
if !artifacts.is_empty() {
received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);
@@ -663,6 +716,7 @@ impl<'a> JobTask<'a> {
.values()
.map(|v| v.iter())
.flatten()
+ .map(ProducedArtifact::borrow)
.cloned()
.collect::<Vec<ArtifactPath>>();
trace!("[{}]: Dependency artifacts = {:?}", self.jobdef.job.uuid(), dependency_artifacts);
@@ -712,6 +766,10 @@ impl<'a> JobTask<'a> {
// it returns the database artifact objects it created!
Ok(artifacts) => {
trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts);
+
+ // mark the produced artifacts as "built" (rather than reused)
+ let artifacts = artifacts.into_iter().map(ProducedArtifact::Built).collect();
+
received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);
for s in self.sender.iter() {
s.send(Ok(received_dependencies.clone())).await?;
@@ -731,7 +789,7 @@ impl<'a> JobTask<'a> {
/// Return Ok(true) if we should continue operation
/// Return Ok(false) if the channel is empty and we're done receiving or if the channel is
/// empty and there were errors collected
- async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ArtifactPath>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
+ async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ProducedArtifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
match self.receiver.recv().await {
Some(Ok(mut v)) => {
// The task we depend on succeeded and returned an