summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-22 10:44:19 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-25 10:40:16 +0100
commitfe5b97425fa08854b4f1ce37451166f8a81d54d2 (patch)
tree01f3c7f85b4599e16fff44425c93789b825aeccc /src/orchestrator/orchestrator.rs
parentdf1ab6c67de7591f849b14b8bdd94aadfc8fe961 (diff)
Multiple release stores
This patch adds the ability to have more than one release store. With this patch, a user can (has to) configure release store names in the configuration file, and can then specify one of the configured names to release the artifacts to. This way, different release "channels" can be served, for example a stable channel and a rolling release channel (although "channel" is not in our wording). The code was adapted to be able to fetch releases from multiple release directories, in the crate::db::find_artifact implementation, so that re-using artifacts works across all release directories. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs28
1 files changed, 15 insertions, 13 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 51c6d9a..52a0829 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -10,6 +10,7 @@
#![allow(unused)]
+use std::borrow::Borrow;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
@@ -28,6 +29,7 @@ use tokio::sync::mpsc::Sender;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;
use uuid::Uuid;
+use resiter::FilterMap;
use crate::config::Configuration;
use crate::db::models as dbmodels;
@@ -153,7 +155,7 @@ pub struct Orchestrator<'a> {
scheduler: EndpointScheduler,
progress_generator: ProgressBars,
staging_store: Arc<RwLock<StagingStore>>,
- release_store: Arc<RwLock<ReleaseStore>>,
+ release_stores: Vec<Arc<ReleaseStore>>,
source_cache: SourceCache,
jobdag: Dag,
config: &'a Configuration,
@@ -165,7 +167,7 @@ pub struct OrchestratorSetup<'a> {
progress_generator: ProgressBars,
endpoint_config: Vec<EndpointConfiguration>,
staging_store: Arc<RwLock<StagingStore>>,
- release_store: Arc<RwLock<ReleaseStore>>,
+ release_stores: Vec<Arc<ReleaseStore>>,
source_cache: SourceCache,
jobdag: Dag,
database: Arc<PgConnection>,
@@ -179,7 +181,7 @@ impl<'a> OrchestratorSetup<'a> {
let scheduler = EndpointScheduler::setup(
self.endpoint_config,
self.staging_store.clone(),
- self.release_store.clone(),
+ self.release_stores.clone(),
self.database.clone(),
self.submit.clone(),
self.log_dir,
@@ -189,7 +191,7 @@ impl<'a> OrchestratorSetup<'a> {
Ok(Orchestrator {
scheduler,
staging_store: self.staging_store.clone(),
- release_store: self.release_store.clone(),
+ release_stores: self.release_stores.clone(),
progress_generator: self.progress_generator,
source_cache: self.source_cache,
jobdag: self.jobdag,
@@ -247,7 +249,7 @@ impl<'a> Orchestrator<'a> {
source_cache: &self.source_cache,
scheduler: &self.scheduler,
staging_store: self.staging_store.clone(),
- release_store: self.release_store.clone(),
+ release_stores: self.release_stores.clone(),
database: self.database.clone(),
};
@@ -365,7 +367,7 @@ struct TaskPreparation<'a> {
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
staging_store: Arc<RwLock<StagingStore>>,
- release_store: Arc<RwLock<ReleaseStore>>,
+ release_stores: Vec<Arc<ReleaseStore>>,
database: Arc<PgConnection>,
}
@@ -381,7 +383,7 @@ struct JobTask<'a> {
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
staging_store: Arc<RwLock<StagingStore>>,
- release_store: Arc<RwLock<ReleaseStore>>,
+ release_stores: Vec<Arc<ReleaseStore>>,
database: Arc<PgConnection>,
/// Channel where the dependencies arrive
@@ -437,7 +439,7 @@ impl<'a> JobTask<'a> {
source_cache: prep.source_cache,
scheduler: prep.scheduler,
staging_store: prep.staging_store,
- release_store: prep.release_store,
+ release_stores: prep.release_stores,
database: prep.database.clone(),
receiver,
@@ -510,7 +512,6 @@ impl<'a> JobTask<'a> {
// check if a job that looks very similar to this job has already produced artifacts.
// If it has, simply return those (plus the received ones)
{
- let release_store = self.release_store.read().await;
let staging_store = self.staging_store.read().await;
// Use the environment of the job definition, as it appears in the job DAG.
@@ -528,7 +529,7 @@ impl<'a> JobTask<'a> {
self.database.clone(),
self.config,
self.jobdef.job.package(),
- &release_store,
+ &self.release_stores,
// We can simply pass the staging store here, because it doesn't hurt. There are
// two scenarios:
@@ -573,10 +574,11 @@ impl<'a> JobTask<'a> {
trace!("Searching for {:?} in stores", full_artifact_path.display());
if let Some(ap) = staging_store.get(full_artifact_path.artifact_path()) {
Some(ap.clone())
- } else if let Some(ap) = release_store.get(full_artifact_path.artifact_path()) {
- Some(ap.clone())
} else {
- None
+ self.release_stores
+ .iter()
+ .find_map(|rs| rs.get(full_artifact_path.artifact_path()))
+ .cloned()
}
})
.collect::<Vec<ArtifactPath>>();