diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-07-27 16:36:35 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-07-27 16:36:35 +0200 |
commit | 61a6427fe0fdaf894f713cac9cf23418e9708846 (patch) | |
tree | 34baead6e36d8262ec59ae946dd5e7850eda87c3 /src/orchestrator/orchestrator.rs | |
parent | 9e12e114ef8bc1120f87431b85331df50cd31181 (diff) | |
parent | 507aeeb8899a714fe794814615b61eeef91c7ed2 (diff) |
Merge branch 'fix-dont-reuse-with-new-dependency-bug'
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 70 |
1 files changed, 64 insertions, 6 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 836b0f0..7e2799d 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -214,7 +214,47 @@ impl<'a> OrchestratorSetup<'a> { /// It is either a list of artifacts with the UUID of the job they were produced by, /// or a UUID and an Error object, where the UUID is the job UUID and the error is the /// anyhow::Error that was issued. -type JobResult = std::result::Result<HashMap<Uuid, Vec<ArtifactPath>>, HashMap<Uuid, Error>>; +/// +/// The artifacts are encapsulated into a `ProducedArtifact`, see the documentation of the type for +/// why. +type JobResult = std::result::Result<HashMap<Uuid, Vec<ProducedArtifact>>, HashMap<Uuid, Error>>; + +/// A type that represents whether an artifact was built or reused from an old job +/// +/// This is necessary to decide in dependent jobs whether a package needs to be rebuild even though +/// the script and environment did not change. +/// +/// E.G.: If a libA depends on libB, if libB changed and needs to be rebuilt, we need to rebuilt +/// all packages that depend (directly or indirectly) on that library. +#[derive(Clone, Debug)] +enum ProducedArtifact { + Built(ArtifactPath), + Reused(ArtifactPath), +} + +impl ProducedArtifact { + /// Get whether the ProducedArtifact was built or reused from another job + fn was_build(&self) -> bool { + std::matches!(self, ProducedArtifact::Built(_)) + } + + /// Unpack the ProducedArtifact object into the ArtifactPath object it contains + fn unpack(self) -> ArtifactPath { + match self { + ProducedArtifact::Built(a) => a, + ProducedArtifact::Reused(a) => a, + } + } +} + +impl Borrow<ArtifactPath> for ProducedArtifact { + fn borrow(&self) -> &ArtifactPath { + match self { + ProducedArtifact::Built(a) => a, + ProducedArtifact::Reused(a) => a, + } + } +} impl<'a> Orchestrator<'a> { pub async fn run(self, output: &mut Vec<ArtifactPath>) -> Result<HashMap<Uuid, Error>> { @@ -387,7 +427,11 @@ impl<'a> Orchestrator<'a> { match root_receiver.recv().await { None => Err(anyhow!("No result received...")), Some(Ok(results)) => { - let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect(); + let results = results.into_iter() + .map(|tpl| tpl.1.into_iter()) + .flatten() + .map(ProducedArtifact::unpack) + .collect(); Ok((results, HashMap::with_capacity(0))) }, Some(Err(errors)) => Ok((vec![], errors)), @@ -517,7 +561,7 @@ impl<'a> JobTask<'a> { // A list of job run results from dependencies that were received from the tasks for the // dependencies - let mut received_dependencies: HashMap<Uuid, Vec<ArtifactPath>> = HashMap::new(); + let mut received_dependencies: HashMap<Uuid, Vec<ProducedArtifact>> = HashMap::new(); // A list of errors that were received from the tasks for the dependencies let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len()); @@ -569,9 +613,17 @@ impl<'a> JobTask<'a> { } } + // Check if any of the received dependencies was built (and not reused). + // If any dependency was built, we need to build as well. + let any_dependency_was_built = received_dependencies.values() + .map(|v| v.iter()) + .flatten() + .any(ProducedArtifact::was_build); + + // If no dependency was built, we can check for replacements for this job as well, so // check if a job that looks very similar to this job has already produced artifacts. // If it has, simply return those (plus the received ones) - { + if !any_dependency_was_built { let staging_store = self.staging_store.read().await; // Use the environment of the job definition, as it appears in the job DAG. @@ -643,7 +695,8 @@ impl<'a> JobTask<'a> { .cloned() } }) - .collect::<Vec<ArtifactPath>>(); + .map(ProducedArtifact::Reused) + .collect::<Vec<ProducedArtifact>>(); if !artifacts.is_empty() { received_dependencies.insert(*self.jobdef.job.uuid(), artifacts); @@ -675,6 +728,7 @@ impl<'a> JobTask<'a> { .values() .map(|v| v.iter()) .flatten() + .map(ProducedArtifact::borrow) .cloned() .collect::<Vec<ArtifactPath>>(); trace!("[{}]: Dependency artifacts = {:?}", self.jobdef.job.uuid(), dependency_artifacts); @@ -724,6 +778,10 @@ impl<'a> JobTask<'a> { // it returns the database artifact objects it created! Ok(artifacts) => { trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts); + + // mark the produced artifacts as "built" (rather than reused) + let artifacts = artifacts.into_iter().map(ProducedArtifact::Built).collect(); + received_dependencies.insert(*self.jobdef.job.uuid(), artifacts); for s in self.sender.iter() { s.send(Ok(received_dependencies.clone())).await?; @@ -743,7 +801,7 @@ impl<'a> JobTask<'a> { /// Return Ok(true) if we should continue operation /// Return Ok(false) if the channel is empty and we're done receiving or if the channel is /// empty and there were errors collected - async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ArtifactPath>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> { + async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ProducedArtifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> { match self.receiver.recv().await { Some(Ok(mut v)) => { // The task we depend on succeeded and returned an |