summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-16 07:36:01 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-02-16 08:00:30 +0100
commit09f5560442b13f8bf58152b36d8819ec309363fc (patch)
treefb8cea73aba9d17a0b7165e0a1e751c769cc325b
parent48317703c85ed55c4757aa6a966b7c5f5cdd1d14 (diff)
Rewrite artifact searching in replacement algorithm
This patch rewrites the replacement searching algorithm, to try the staging store first and then the release store. It does so by sorting the artifacts by whether they are in the staging store or not (hence the FullArtifactPath::is_in_staging_store() function). It filters out not-found artifacts and returns only ones that were found in either store. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net> Tested-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r--src/filestore/path.rs7
-rw-r--r--src/orchestrator/orchestrator.rs38
2 files changed, 34 insertions, 11 deletions
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index bbcc727..486532f 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -20,6 +20,8 @@ use resiter::AndThen;
use resiter::Filter;
use resiter::Map;
+use crate::filestore::staging::StagingStore;
+
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StoreRoot(PathBuf);
@@ -139,6 +141,11 @@ impl AsRef<Path> for ArtifactPath {
pub struct FullArtifactPath<'a>(&'a StoreRoot, &'a ArtifactPath);
impl<'a> FullArtifactPath<'a> {
+
+ pub fn is_in_staging_store(&self, store: &StagingStore) -> bool {
+ store.0.root == *self.0
+ }
+
pub fn artifact_path(&self) -> &ArtifactPath {
self.1
}
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index f9ee86f..dff5651 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -547,18 +547,34 @@ impl<'a> JobTask<'a> {
let merged_stores = self.merged_stores;
let mut artifacts = replacement_artifacts
.into_iter()
- .map(|tpl| tpl.0.artifact_path().clone()) // We dont care about the release date here.
- .unique()
- .map(|artifact_path| async move {
- trace!("Searching for {:?} in stores", artifact_path);
- merged_stores
- .get(&artifact_path)
- .await
- .ok_or_else(|| anyhow!("BUG"))
+
+ // First of all, we sort by whether the artifact path is in the staging store,
+ // because we prefer staging store artifacts at this point.
+ .sorted_by(|(p1, _), (p2, _)| {
+ let r1 = p1.is_in_staging_store(&staging_store);
+ let r2 = p2.is_in_staging_store(&staging_store);
+ r1.cmp(&r2)
+ })
+
+ // We don't need duplicates here, so remove them by making the iterator unique
+ // If we have two artifacts that are the same, the one in the staging store will be
+ // preffered in the next step
+ .unique_by(|tpl| tpl.0.artifact_path().clone())
+
+ // Fetch the artifact from the staging store, if there is one.
+ // If there is none, try the release store.
+ // If there is none, there won't be a replacement artifact
+ .filter_map(|(full_artifact_path, _)| {
+ trace!("Searching for {:?} in stores", full_artifact_path.display());
+ if let Some(ap) = staging_store.get(full_artifact_path.artifact_path()) {
+ Some(ap.clone())
+ } else if let Some(ap) = release_store.get(full_artifact_path.artifact_path()) {
+ Some(ap.clone())
+ } else {
+ None
+ }
})
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<ArtifactPath>>>()
- .await?;
+ .collect::<Vec<ArtifactPath>>();
if !artifacts.is_empty() {
received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);