diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-02-22 10:44:19 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-02-25 10:40:16 +0100 |
commit | fe5b97425fa08854b4f1ce37451166f8a81d54d2 (patch) | |
tree | 01f3c7f85b4599e16fff44425c93789b825aeccc /src/orchestrator/orchestrator.rs | |
parent | df1ab6c67de7591f849b14b8bdd94aadfc8fe961 (diff) |
Multiple release stores
This patch adds the ability to have more than one release store.
With this patch, a user can (has to) configure release store names in the
configuration file, and can then specify one of the configured names to release
the artifacts to.
This way, different release "channels" can be served, for example a stable
channel and a rolling release channel (although "channel" is not in our wording).
The code was adapted to be able to fetch releases from multiple release
directories, in the crate::db::find_artifact implementation, so that re-using
artifacts works across all release directories.
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 51c6d9a..52a0829 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -10,6 +10,7 @@ #![allow(unused)] +use std::borrow::Borrow; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; @@ -28,6 +29,7 @@ use tokio::sync::mpsc::Sender; use tokio_stream::StreamExt; use typed_builder::TypedBuilder; use uuid::Uuid; +use resiter::FilterMap; use crate::config::Configuration; use crate::db::models as dbmodels; @@ -153,7 +155,7 @@ pub struct Orchestrator<'a> { scheduler: EndpointScheduler, progress_generator: ProgressBars, staging_store: Arc<RwLock<StagingStore>>, - release_store: Arc<RwLock<ReleaseStore>>, + release_stores: Vec<Arc<ReleaseStore>>, source_cache: SourceCache, jobdag: Dag, config: &'a Configuration, @@ -165,7 +167,7 @@ pub struct OrchestratorSetup<'a> { progress_generator: ProgressBars, endpoint_config: Vec<EndpointConfiguration>, staging_store: Arc<RwLock<StagingStore>>, - release_store: Arc<RwLock<ReleaseStore>>, + release_stores: Vec<Arc<ReleaseStore>>, source_cache: SourceCache, jobdag: Dag, database: Arc<PgConnection>, @@ -179,7 +181,7 @@ impl<'a> OrchestratorSetup<'a> { let scheduler = EndpointScheduler::setup( self.endpoint_config, self.staging_store.clone(), - self.release_store.clone(), + self.release_stores.clone(), self.database.clone(), self.submit.clone(), self.log_dir, @@ -189,7 +191,7 @@ impl<'a> OrchestratorSetup<'a> { Ok(Orchestrator { scheduler, staging_store: self.staging_store.clone(), - release_store: self.release_store.clone(), + release_stores: self.release_stores.clone(), progress_generator: self.progress_generator, source_cache: self.source_cache, jobdag: self.jobdag, @@ -247,7 +249,7 @@ impl<'a> Orchestrator<'a> { source_cache: &self.source_cache, scheduler: &self.scheduler, staging_store: self.staging_store.clone(), - release_store: self.release_store.clone(), + release_stores: self.release_stores.clone(), database: self.database.clone(), }; @@ -365,7 +367,7 @@ struct TaskPreparation<'a> { source_cache: &'a SourceCache, scheduler: &'a EndpointScheduler, staging_store: Arc<RwLock<StagingStore>>, - release_store: Arc<RwLock<ReleaseStore>>, + release_stores: Vec<Arc<ReleaseStore>>, database: Arc<PgConnection>, } @@ -381,7 +383,7 @@ struct JobTask<'a> { source_cache: &'a SourceCache, scheduler: &'a EndpointScheduler, staging_store: Arc<RwLock<StagingStore>>, - release_store: Arc<RwLock<ReleaseStore>>, + release_stores: Vec<Arc<ReleaseStore>>, database: Arc<PgConnection>, /// Channel where the dependencies arrive @@ -437,7 +439,7 @@ impl<'a> JobTask<'a> { source_cache: prep.source_cache, scheduler: prep.scheduler, staging_store: prep.staging_store, - release_store: prep.release_store, + release_stores: prep.release_stores, database: prep.database.clone(), receiver, @@ -510,7 +512,6 @@ impl<'a> JobTask<'a> { // check if a job that looks very similar to this job has already produced artifacts. // If it has, simply return those (plus the received ones) { - let release_store = self.release_store.read().await; let staging_store = self.staging_store.read().await; // Use the environment of the job definition, as it appears in the job DAG. @@ -528,7 +529,7 @@ impl<'a> JobTask<'a> { self.database.clone(), self.config, self.jobdef.job.package(), - &release_store, + &self.release_stores, // We can simply pass the staging store here, because it doesn't hurt. There are // two scenarios: @@ -573,10 +574,11 @@ impl<'a> JobTask<'a> { trace!("Searching for {:?} in stores", full_artifact_path.display()); if let Some(ap) = staging_store.get(full_artifact_path.artifact_path()) { Some(ap.clone()) - } else if let Some(ap) = release_store.get(full_artifact_path.artifact_path()) { - Some(ap.clone()) } else { - None + self.release_stores + .iter() + .find_map(|rs| rs.get(full_artifact_path.artifact_path())) + .cloned() } }) .collect::<Vec<ArtifactPath>>(); |