diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-02-16 07:36:01 +0100 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-02-16 08:00:30 +0100 |
commit | 09f5560442b13f8bf58152b36d8819ec309363fc (patch) | |
tree | fb8cea73aba9d17a0b7165e0a1e751c769cc325b /src/orchestrator/orchestrator.rs | |
parent | 48317703c85ed55c4757aa6a966b7c5f5cdd1d14 (diff) |
Rewrite artifact searching in replacement algorithm
This patch rewrites the replacement searching algorithm, to try the staging
store first and then the release store.
It does so by sorting the artifacts by whether they are in the staging store or
not (hence the FullArtifactPath::is_in_staging_store() function).
It filters out not-found artifacts and returns only ones that were found in
either store.
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Tested-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 38 |
1 files changed, 27 insertions, 11 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index f9ee86f..dff5651 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -547,18 +547,34 @@ impl<'a> JobTask<'a> { let merged_stores = self.merged_stores; let mut artifacts = replacement_artifacts .into_iter() - .map(|tpl| tpl.0.artifact_path().clone()) // We dont care about the release date here. - .unique() - .map(|artifact_path| async move { - trace!("Searching for {:?} in stores", artifact_path); - merged_stores - .get(&artifact_path) - .await - .ok_or_else(|| anyhow!("BUG")) + + // First of all, we sort by whether the artifact path is in the staging store, + // because we prefer staging store artifacts at this point. + .sorted_by(|(p1, _), (p2, _)| { + let r1 = p1.is_in_staging_store(&staging_store); + let r2 = p2.is_in_staging_store(&staging_store); + r1.cmp(&r2) + }) + + // We don't need duplicates here, so remove them by making the iterator unique + // If we have two artifacts that are the same, the one in the staging store will be + // preffered in the next step + .unique_by(|tpl| tpl.0.artifact_path().clone()) + + // Fetch the artifact from the staging store, if there is one. + // If there is none, try the release store. + // If there is none, there won't be a replacement artifact + .filter_map(|(full_artifact_path, _)| { + trace!("Searching for {:?} in stores", full_artifact_path.display()); + if let Some(ap) = staging_store.get(full_artifact_path.artifact_path()) { + Some(ap.clone()) + } else if let Some(ap) = release_store.get(full_artifact_path.artifact_path()) { + Some(ap.clone()) + } else { + None + } }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<Vec<ArtifactPath>>>() - .await?; + .collect::<Vec<ArtifactPath>>(); if !artifacts.is_empty() { received_dependencies.insert(*self.jobdef.job.uuid(), artifacts); |