summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-22 10:59:07 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-02-22 10:59:07 +0100
commitc791d23e2fdc8335e6734975ac3bb265b862e3b2 (patch)
treeceb564fb181415a2ef41ffba489d45e30e0aafb5 /src/orchestrator/orchestrator.rs
parent3c7f31f0fa8350f65cf3e74955a22eef0674cd4d (diff)
Remove MergedStores
The concept of the MergedStores type was okay in the beginning, but it got more and more complex to use properly and most of the time, we used the release/staging stores directly anyways. So this removes the MergedStores type, which is a preparation for the change to have multiple release stores. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs28
1 files changed, 16 insertions, 12 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index e6e24c5..51c6d9a 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -34,7 +34,6 @@ use crate::db::models as dbmodels;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
use crate::filestore::ArtifactPath;
-use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobDefinition;
@@ -153,7 +152,8 @@ use crate::util::progress::ProgressBars;
pub struct Orchestrator<'a> {
scheduler: EndpointScheduler,
progress_generator: ProgressBars,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
jobdag: Dag,
config: &'a Configuration,
@@ -176,10 +176,10 @@ pub struct OrchestratorSetup<'a> {
impl<'a> OrchestratorSetup<'a> {
pub async fn setup(self) -> Result<Orchestrator<'a>> {
- let merged_stores = MergedStores::new(self.release_store, self.staging_store);
let scheduler = EndpointScheduler::setup(
self.endpoint_config,
- merged_stores.clone(),
+ self.staging_store.clone(),
+ self.release_store.clone(),
self.database.clone(),
self.submit.clone(),
self.log_dir,
@@ -188,7 +188,8 @@ impl<'a> OrchestratorSetup<'a> {
Ok(Orchestrator {
scheduler,
- merged_stores,
+ staging_store: self.staging_store.clone(),
+ release_store: self.release_store.clone(),
progress_generator: self.progress_generator,
source_cache: self.source_cache,
jobdag: self.jobdag,
@@ -245,7 +246,8 @@ impl<'a> Orchestrator<'a> {
config: self.config,
source_cache: &self.source_cache,
scheduler: &self.scheduler,
- merged_stores: &self.merged_stores,
+ staging_store: self.staging_store.clone(),
+ release_store: self.release_store.clone(),
database: self.database.clone(),
};
@@ -362,7 +364,8 @@ struct TaskPreparation<'a> {
config: &'a Configuration,
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
- merged_stores: &'a MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
database: Arc<PgConnection>,
}
@@ -377,7 +380,8 @@ struct JobTask<'a> {
config: &'a Configuration,
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
- merged_stores: &'a MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
database: Arc<PgConnection>,
/// Channel where the dependencies arrive
@@ -432,7 +436,8 @@ impl<'a> JobTask<'a> {
config: prep.config,
source_cache: prep.source_cache,
scheduler: prep.scheduler,
- merged_stores: prep.merged_stores,
+ staging_store: prep.staging_store,
+ release_store: prep.release_store,
database: prep.database.clone(),
receiver,
@@ -505,8 +510,8 @@ impl<'a> JobTask<'a> {
// check if a job that looks very similar to this job has already produced artifacts.
// If it has, simply return those (plus the received ones)
{
- let release_store = self.merged_stores.release().read().await;
- let staging_store = self.merged_stores.staging().read().await;
+ let release_store = self.release_store.read().await;
+ let staging_store = self.staging_store.read().await;
// Use the environment of the job definition, as it appears in the job DAG.
//
@@ -545,7 +550,6 @@ impl<'a> JobTask<'a> {
debug!("[{}]: Found {} replacement artifacts", self.jobdef.job.uuid(), replacement_artifacts.len());
trace!("[{}]: Found replacement artifacts: {:?}", self.jobdef.job.uuid(), replacement_artifacts);
- let merged_stores = self.merged_stores;
let mut artifacts = replacement_artifacts
.into_iter()