summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs28
1 files changed, 16 insertions, 12 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index e6e24c5..51c6d9a 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -34,7 +34,6 @@ use crate::db::models as dbmodels;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
use crate::filestore::ArtifactPath;
-use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobDefinition;
@@ -153,7 +152,8 @@ use crate::util::progress::ProgressBars;
pub struct Orchestrator<'a> {
scheduler: EndpointScheduler,
progress_generator: ProgressBars,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
jobdag: Dag,
config: &'a Configuration,
@@ -176,10 +176,10 @@ pub struct OrchestratorSetup<'a> {
impl<'a> OrchestratorSetup<'a> {
pub async fn setup(self) -> Result<Orchestrator<'a>> {
- let merged_stores = MergedStores::new(self.release_store, self.staging_store);
let scheduler = EndpointScheduler::setup(
self.endpoint_config,
- merged_stores.clone(),
+ self.staging_store.clone(),
+ self.release_store.clone(),
self.database.clone(),
self.submit.clone(),
self.log_dir,
@@ -188,7 +188,8 @@ impl<'a> OrchestratorSetup<'a> {
Ok(Orchestrator {
scheduler,
- merged_stores,
+ staging_store: self.staging_store.clone(),
+ release_store: self.release_store.clone(),
progress_generator: self.progress_generator,
source_cache: self.source_cache,
jobdag: self.jobdag,
@@ -245,7 +246,8 @@ impl<'a> Orchestrator<'a> {
config: self.config,
source_cache: &self.source_cache,
scheduler: &self.scheduler,
- merged_stores: &self.merged_stores,
+ staging_store: self.staging_store.clone(),
+ release_store: self.release_store.clone(),
database: self.database.clone(),
};
@@ -362,7 +364,8 @@ struct TaskPreparation<'a> {
config: &'a Configuration,
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
- merged_stores: &'a MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
database: Arc<PgConnection>,
}
@@ -377,7 +380,8 @@ struct JobTask<'a> {
config: &'a Configuration,
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
- merged_stores: &'a MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
database: Arc<PgConnection>,
/// Channel where the dependencies arrive
@@ -432,7 +436,8 @@ impl<'a> JobTask<'a> {
config: prep.config,
source_cache: prep.source_cache,
scheduler: prep.scheduler,
- merged_stores: prep.merged_stores,
+ staging_store: prep.staging_store,
+ release_store: prep.release_store,
database: prep.database.clone(),
receiver,
@@ -505,8 +510,8 @@ impl<'a> JobTask<'a> {
// check if a job that looks very similar to this job has already produced artifacts.
// If it has, simply return those (plus the received ones)
{
- let release_store = self.merged_stores.release().read().await;
- let staging_store = self.merged_stores.staging().read().await;
+ let release_store = self.release_store.read().await;
+ let staging_store = self.staging_store.read().await;
// Use the environment of the job definition, as it appears in the job DAG.
//
@@ -545,7 +550,6 @@ impl<'a> JobTask<'a> {
debug!("[{}]: Found {} replacement artifacts", self.jobdef.job.uuid(), replacement_artifacts.len());
trace!("[{}]: Found replacement artifacts: {:?}", self.jobdef.job.uuid(), replacement_artifacts);
- let merged_stores = self.merged_stores;
let mut artifacts = replacement_artifacts
.into_iter()