From 5392cd0a1d2e22d970b27d2f4345db062ca21687 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Thu, 28 Jan 2021 11:37:45 +0100 Subject: Implement JobTask artifact substitution Signed-off-by: Matthias Beyer --- src/orchestrator/orchestrator.rs | 62 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) (limited to 'src/orchestrator/orchestrator.rs') diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 70699ad..2860748 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -501,6 +501,68 @@ impl<'a> JobTask<'a> { } } + // check if a job that looks very similar to this job has already produced artifacts. + // If it has, simply return those (plus the received ones) + { + let release_store = self.merged_stores.release().read().await; + let staging_store = self.merged_stores.staging().read().await; + let additional_env = vec![]; + + let replacement_artifacts = crate::db::find_artifacts( + self.database.clone(), + self.config, + self.jobdef.job.package(), + &release_store, + + // We can simply pass the staging store here, because it doesn't hurt. There are + // two scenarios: + // + // 1. We are in a fresh build for a package. In this case, the artifacts for this + // very build are not in there yet, and there won't be any artifacts from the + // staging store (possibly from the release store, which would be fine). + // 2. We are in a re-build, where the user passed the staging store to the build + // subcommand. In this case, there might be an artifact for this job in the + // staging store. In this case, we want to use it as a replacement, of course. + // + // The fact that released artifacts are returned prefferably from this function + // call does not change anything, because if there is an artifact that's a released + // one that matches this job, we should use it anyways. + Some(&staging_store), + &additional_env, + true + )?; + + trace!("[{}]: Found replacement artifacts: {:?}", self.jobdef.job.uuid(), replacement_artifacts); + let merged_stores = self.merged_stores; + let mut artifacts = replacement_artifacts + .into_iter() + .map(|tpl| tpl.0.artifact_path().clone()) // We dont care about the release date here. + .unique() + .map(|artifact_path| async move { + trace!("Searching for {:?} in stores", artifact_path); + merged_stores + .get(&artifact_path) + .await + .ok_or_else(|| anyhow!("BUG")) + }) + .collect::>() + .collect::>>() + .await?; + + if !artifacts.is_empty() { + received_dependencies.insert(*self.jobdef.job.uuid(), artifacts); + trace!("[{}]: Sending to parent: {:?}", self.jobdef.job.uuid(), received_dependencies); + for s in self.sender.iter() { + s.send(Ok(received_dependencies.clone())).await?; + } + self.bar.finish_with_message(&format!("[{} {} {}] Reusing artifact", + self.jobdef.job.uuid(), + self.jobdef.job.package().name(), + self.jobdef.job.package().version())); + return Ok(()) + } + } + // Map the list of received dependencies from // Vec<(Uuid, Vec)> // to -- cgit v1.2.3