summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-02-25 10:40:44 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-25 10:40:44 +0100
commit6ebc1f4c8d36b6a19205a294553010ee7f3e7f74 (patch)
tree6e49fe0d00260f7f079ae1f2b143ae555e6cc070 /src/orchestrator/orchestrator.rs
parentf0aeeca3e5edac302aa67ca87297fe6f0ecb7473 (diff)
parentfe5b97425fa08854b4f1ce37451166f8a81d54d2 (diff)
Merge branch 'multiple-release-stores'
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs28
1 files changed, 15 insertions, 13 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index de50b36..dc32925 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -10,6 +10,7 @@
#![allow(unused)]
+use std::borrow::Borrow;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
@@ -28,6 +29,7 @@ use tokio::sync::mpsc::Sender;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;
use uuid::Uuid;
+use resiter::FilterMap;
use crate::config::Configuration;
use crate::db::models as dbmodels;
@@ -153,7 +155,7 @@ pub struct Orchestrator<'a> {
scheduler: EndpointScheduler,
progress_generator: ProgressBars,
staging_store: Arc<RwLock<StagingStore>>,
- release_store: Arc<RwLock<ReleaseStore>>,
+ release_stores: Vec<Arc<ReleaseStore>>,
source_cache: SourceCache,
jobdag: Dag,
config: &'a Configuration,
@@ -165,7 +167,7 @@ pub struct OrchestratorSetup<'a> {
progress_generator: ProgressBars,
endpoint_config: Vec<EndpointConfiguration>,
staging_store: Arc<RwLock<StagingStore>>,
- release_store: Arc<RwLock<ReleaseStore>>,
+ release_stores: Vec<Arc<ReleaseStore>>,
source_cache: SourceCache,
jobdag: Dag,
database: Arc<PgConnection>,
@@ -179,7 +181,7 @@ impl<'a> OrchestratorSetup<'a> {
let scheduler = EndpointScheduler::setup(
self.endpoint_config,
self.staging_store.clone(),
- self.release_store.clone(),
+ self.release_stores.clone(),
self.database.clone(),
self.submit.clone(),
self.log_dir,
@@ -189,7 +191,7 @@ impl<'a> OrchestratorSetup<'a> {
Ok(Orchestrator {
scheduler,
staging_store: self.staging_store.clone(),
- release_store: self.release_store.clone(),
+ release_stores: self.release_stores.clone(),
progress_generator: self.progress_generator,
source_cache: self.source_cache,
jobdag: self.jobdag,
@@ -253,7 +255,7 @@ impl<'a> Orchestrator<'a> {
source_cache: &self.source_cache,
scheduler: &self.scheduler,
staging_store: self.staging_store.clone(),
- release_store: self.release_store.clone(),
+ release_stores: self.release_stores.clone(),
database: self.database.clone(),
};
@@ -371,7 +373,7 @@ struct TaskPreparation<'a> {
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
staging_store: Arc<RwLock<StagingStore>>,
- release_store: Arc<RwLock<ReleaseStore>>,
+ release_stores: Vec<Arc<ReleaseStore>>,
database: Arc<PgConnection>,
}
@@ -387,7 +389,7 @@ struct JobTask<'a> {
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
staging_store: Arc<RwLock<StagingStore>>,
- release_store: Arc<RwLock<ReleaseStore>>,
+ release_stores: Vec<Arc<ReleaseStore>>,
database: Arc<PgConnection>,
/// Channel where the dependencies arrive
@@ -443,7 +445,7 @@ impl<'a> JobTask<'a> {
source_cache: prep.source_cache,
scheduler: prep.scheduler,
staging_store: prep.staging_store,
- release_store: prep.release_store,
+ release_stores: prep.release_stores,
database: prep.database.clone(),
receiver,
@@ -516,7 +518,6 @@ impl<'a> JobTask<'a> {
// check if a job that looks very similar to this job has already produced artifacts.
// If it has, simply return those (plus the received ones)
{
- let release_store = self.release_store.read().await;
let staging_store = self.staging_store.read().await;
// Use the environment of the job definition, as it appears in the job DAG.
@@ -534,7 +535,7 @@ impl<'a> JobTask<'a> {
self.database.clone(),
self.config,
self.jobdef.job.package(),
- &release_store,
+ &self.release_stores,
// We can simply pass the staging store here, because it doesn't hurt. There are
// two scenarios:
@@ -579,10 +580,11 @@ impl<'a> JobTask<'a> {
trace!("Searching for {:?} in stores", full_artifact_path.display());
if let Some(ap) = staging_store.get(full_artifact_path.artifact_path()) {
Some(ap.clone())
- } else if let Some(ap) = release_store.get(full_artifact_path.artifact_path()) {
- Some(ap.clone())
} else {
- None
+ self.release_stores
+ .iter()
+ .find_map(|rs| rs.get(full_artifact_path.artifact_path()))
+ .cloned()
}
})
.collect::<Vec<ArtifactPath>>();