summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/filestore/path.rs7
-rw-r--r--src/orchestrator/orchestrator.rs38
2 files changed, 34 insertions, 11 deletions
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index bbcc727..486532f 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -20,6 +20,8 @@ use resiter::AndThen;
use resiter::Filter;
use resiter::Map;
+use crate::filestore::staging::StagingStore;
+
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StoreRoot(PathBuf);
@@ -139,6 +141,11 @@ impl AsRef<Path> for ArtifactPath {
pub struct FullArtifactPath<'a>(&'a StoreRoot, &'a ArtifactPath);
impl<'a> FullArtifactPath<'a> {
+
+ pub fn is_in_staging_store(&self, store: &StagingStore) -> bool {
+ store.0.root == *self.0
+ }
+
pub fn artifact_path(&self) -> &ArtifactPath {
self.1
}
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index f9ee86f..dff5651 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -547,18 +547,34 @@ impl<'a> JobTask<'a> {
let merged_stores = self.merged_stores;
let mut artifacts = replacement_artifacts
.into_iter()
- .map(|tpl| tpl.0.artifact_path().clone()) // We dont care about the release date here.
- .unique()
- .map(|artifact_path| async move {
- trace!("Searching for {:?} in stores", artifact_path);
- merged_stores
- .get(&artifact_path)
- .await
- .ok_or_else(|| anyhow!("BUG"))
+
+ // First of all, we sort by whether the artifact path is in the staging store,
+ // because we prefer staging store artifacts at this point.
+ .sorted_by(|(p1, _), (p2, _)| {
+ let r1 = p1.is_in_staging_store(&staging_store);
+ let r2 = p2.is_in_staging_store(&staging_store);
+ r1.cmp(&r2)
+ })
+
+ // We don't need duplicates here, so remove them by making the iterator unique
+ // If we have two artifacts that are the same, the one in the staging store will be
+ // preffered in the next step
+ .unique_by(|tpl| tpl.0.artifact_path().clone())
+
+ // Fetch the artifact from the staging store, if there is one.
+ // If there is none, try the release store.
+ // If there is none, there won't be a replacement artifact
+ .filter_map(|(full_artifact_path, _)| {
+ trace!("Searching for {:?} in stores", full_artifact_path.display());
+ if let Some(ap) = staging_store.get(full_artifact_path.artifact_path()) {
+ Some(ap.clone())
+ } else if let Some(ap) = release_store.get(full_artifact_path.artifact_path()) {
+ Some(ap.clone())
+ } else {
+ None
+ }
})
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<Vec<ArtifactPath>>>()
- .await?;
+ .collect::<Vec<ArtifactPath>>();
if !artifacts.is_empty() {
received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);