diff options
-rw-r--r-- | src/filestore/path.rs | 7 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 38 |
2 files changed, 34 insertions, 11 deletions
diff --git a/src/filestore/path.rs b/src/filestore/path.rs index bbcc727..486532f 100644 --- a/src/filestore/path.rs +++ b/src/filestore/path.rs @@ -20,6 +20,8 @@ use resiter::AndThen; use resiter::Filter; use resiter::Map; +use crate::filestore::staging::StagingStore; + #[derive(Clone, Debug, PartialEq, Eq)] pub struct StoreRoot(PathBuf); @@ -139,6 +141,11 @@ impl AsRef<Path> for ArtifactPath { pub struct FullArtifactPath<'a>(&'a StoreRoot, &'a ArtifactPath); impl<'a> FullArtifactPath<'a> { + + pub fn is_in_staging_store(&self, store: &StagingStore) -> bool { + store.0.root == *self.0 + } + pub fn artifact_path(&self) -> &ArtifactPath { self.1 } diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index f9ee86f..dff5651 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -547,18 +547,34 @@ impl<'a> JobTask<'a> { let merged_stores = self.merged_stores; let mut artifacts = replacement_artifacts .into_iter() - .map(|tpl| tpl.0.artifact_path().clone()) // We dont care about the release date here. - .unique() - .map(|artifact_path| async move { - trace!("Searching for {:?} in stores", artifact_path); - merged_stores - .get(&artifact_path) - .await - .ok_or_else(|| anyhow!("BUG")) + + // First of all, we sort by whether the artifact path is in the staging store, + // because we prefer staging store artifacts at this point. + .sorted_by(|(p1, _), (p2, _)| { + let r1 = p1.is_in_staging_store(&staging_store); + let r2 = p2.is_in_staging_store(&staging_store); + r1.cmp(&r2) + }) + + // We don't need duplicates here, so remove them by making the iterator unique + // If we have two artifacts that are the same, the one in the staging store will be + // preffered in the next step + .unique_by(|tpl| tpl.0.artifact_path().clone()) + + // Fetch the artifact from the staging store, if there is one. + // If there is none, try the release store. + // If there is none, there won't be a replacement artifact + .filter_map(|(full_artifact_path, _)| { + trace!("Searching for {:?} in stores", full_artifact_path.display()); + if let Some(ap) = staging_store.get(full_artifact_path.artifact_path()) { + Some(ap.clone()) + } else if let Some(ap) = release_store.get(full_artifact_path.artifact_path()) { + Some(ap.clone()) + } else { + None + } }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<Vec<ArtifactPath>>>() - .await?; + .collect::<Vec<ArtifactPath>>(); if !artifacts.is_empty() { received_dependencies.insert(*self.jobdef.job.uuid(), artifacts); |