summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-16 07:36:01 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-02-16 08:00:30 +0100
commit09f5560442b13f8bf58152b36d8819ec309363fc (patch)
treefb8cea73aba9d17a0b7165e0a1e751c769cc325b /src/orchestrator/orchestrator.rs
parent48317703c85ed55c4757aa6a966b7c5f5cdd1d14 (diff)
Rewrite artifact searching in replacement algorithm
This patch rewrites the replacement searching algorithm, to try the staging store first and then the release store. It does so by sorting the artifacts by whether they are in the staging store or not (hence the FullArtifactPath::is_in_staging_store() function). It filters out not-found artifacts and returns only ones that were found in either store. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net> Tested-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs38
1 files changed, 27 insertions, 11 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index f9ee86f..dff5651 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -547,18 +547,34 @@ impl<'a> JobTask<'a> {
let merged_stores = self.merged_stores;
let mut artifacts = replacement_artifacts
.into_iter()
- .map(|tpl| tpl.0.artifact_path().clone()) // We dont care about the release date here.
- .unique()
- .map(|artifact_path| async move {
- trace!("Searching for {:?} in stores", artifact_path);
- merged_stores
- .get(&artifact_path)
- .await
- .ok_or_else(|| anyhow!("BUG"))
+
+ // First of all, we sort by whether the artifact path is in the staging store,
+ // because we prefer staging store artifacts at this point.
+ .sorted_by(|(p1, _), (p2, _)| {
+ let r1 = p1.is_in_staging_store(&staging_store);
+ let r2 = p2.is_in_staging_store(&staging_store);
+ r1.cmp(&r2)
+ })
+
+ // We don't need duplicates here, so remove them by making the iterator unique
+ // If we have two artifacts that are the same, the one in the staging store will be
+ // preffered in the next step
+ .unique_by(|tpl| tpl.0.artifact_path().clone())
+
+ // Fetch the artifact from the staging store, if there is one.
+ // If there is none, try the release store.
+ // If there is none, there won't be a replacement artifact
+ .filter_map(|(full_artifact_path, _)| {
+ trace!("Searching for {:?} in stores", full_artifact_path.display());
+ if let Some(ap) = staging_store.get(full_artifact_path.artifact_path()) {
+ Some(ap.clone())
+ } else if let Some(ap) = release_store.get(full_artifact_path.artifact_path()) {
+ Some(ap.clone())
+ } else {
+ None
+ }
})
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<ArtifactPath>>>()
- .await?;
+ .collect::<Vec<ArtifactPath>>();
if !artifacts.is_empty() {
received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);