diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-02-25 10:40:44 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-02-25 10:40:44 +0100 |
commit | 6ebc1f4c8d36b6a19205a294553010ee7f3e7f74 (patch) | |
tree | 6e49fe0d00260f7f079ae1f2b143ae555e6cc070 /src/orchestrator | |
parent | f0aeeca3e5edac302aa67ca87297fe6f0ecb7473 (diff) | |
parent | fe5b97425fa08854b4f1ce37451166f8a81d54d2 (diff) |
Merge branch 'multiple-release-stores'
Diffstat (limited to 'src/orchestrator')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index de50b36..dc32925 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -10,6 +10,7 @@ #![allow(unused)] +use std::borrow::Borrow; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; @@ -28,6 +29,7 @@ use tokio::sync::mpsc::Sender; use tokio_stream::StreamExt; use typed_builder::TypedBuilder; use uuid::Uuid; +use resiter::FilterMap; use crate::config::Configuration; use crate::db::models as dbmodels; @@ -153,7 +155,7 @@ pub struct Orchestrator<'a> { scheduler: EndpointScheduler, progress_generator: ProgressBars, staging_store: Arc<RwLock<StagingStore>>, - release_store: Arc<RwLock<ReleaseStore>>, + release_stores: Vec<Arc<ReleaseStore>>, source_cache: SourceCache, jobdag: Dag, config: &'a Configuration, @@ -165,7 +167,7 @@ pub struct OrchestratorSetup<'a> { progress_generator: ProgressBars, endpoint_config: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, - release_store: Arc<RwLock<ReleaseStore>>, + release_stores: Vec<Arc<ReleaseStore>>, source_cache: SourceCache, jobdag: Dag, database: Arc<PgConnection>, @@ -179,7 +181,7 @@ impl<'a> OrchestratorSetup<'a> { let scheduler = EndpointScheduler::setup( self.endpoint_config, self.staging_store.clone(), - self.release_store.clone(), + self.release_stores.clone(), self.database.clone(), self.submit.clone(), self.log_dir, @@ -189,7 +191,7 @@ impl<'a> OrchestratorSetup<'a> { Ok(Orchestrator { scheduler, staging_store: self.staging_store.clone(), - release_store: self.release_store.clone(), + release_stores: self.release_stores.clone(), progress_generator: self.progress_generator, source_cache: self.source_cache, jobdag: self.jobdag, @@ -253,7 +255,7 @@ impl<'a> Orchestrator<'a> { source_cache: &self.source_cache, scheduler: &self.scheduler, staging_store: self.staging_store.clone(), - release_store: self.release_store.clone(), + release_stores: self.release_stores.clone(), database: self.database.clone(), }; @@ -371,7 +373,7 @@ struct TaskPreparation<'a> { source_cache: &'a SourceCache, scheduler: &'a EndpointScheduler, staging_store: Arc<RwLock<StagingStore>>, - release_store: Arc<RwLock<ReleaseStore>>, + release_stores: Vec<Arc<ReleaseStore>>, database: Arc<PgConnection>, } @@ -387,7 +389,7 @@ struct JobTask<'a> { source_cache: &'a SourceCache, scheduler: &'a EndpointScheduler, staging_store: Arc<RwLock<StagingStore>>, - release_store: Arc<RwLock<ReleaseStore>>, + release_stores: Vec<Arc<ReleaseStore>>, database: Arc<PgConnection>, /// Channel where the dependencies arrive @@ -443,7 +445,7 @@ impl<'a> JobTask<'a> { source_cache: prep.source_cache, scheduler: prep.scheduler, staging_store: prep.staging_store, - release_store: prep.release_store, + release_stores: prep.release_stores, database: prep.database.clone(), receiver, @@ -516,7 +518,6 @@ impl<'a> JobTask<'a> { // check if a job that looks very similar to this job has already produced artifacts. // If it has, simply return those (plus the received ones) { - let release_store = self.release_store.read().await; let staging_store = self.staging_store.read().await; // Use the environment of the job definition, as it appears in the job DAG. @@ -534,7 +535,7 @@ impl<'a> JobTask<'a> { self.database.clone(), self.config, self.jobdef.job.package(), - &release_store, + &self.release_stores, // We can simply pass the staging store here, because it doesn't hurt. There are // two scenarios: @@ -579,10 +580,11 @@ impl<'a> JobTask<'a> { trace!("Searching for {:?} in stores", full_artifact_path.display()); if let Some(ap) = staging_store.get(full_artifact_path.artifact_path()) { Some(ap.clone()) - } else if let Some(ap) = release_store.get(full_artifact_path.artifact_path()) { - Some(ap.clone()) } else { - None + self.release_stores + .iter() + .find_map(|rs| rs.get(full_artifact_path.artifact_path())) + .cloned() } }) .collect::<Vec<ArtifactPath>>(); |