diff options
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index e6e24c5..51c6d9a 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -34,7 +34,6 @@ use crate::db::models as dbmodels; use crate::endpoint::EndpointConfiguration; use crate::endpoint::EndpointScheduler; use crate::filestore::ArtifactPath; -use crate::filestore::MergedStores; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; use crate::job::JobDefinition; @@ -153,7 +152,8 @@ use crate::util::progress::ProgressBars; pub struct Orchestrator<'a> { scheduler: EndpointScheduler, progress_generator: ProgressBars, - merged_stores: MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, jobdag: Dag, config: &'a Configuration, @@ -176,10 +176,10 @@ pub struct OrchestratorSetup<'a> { impl<'a> OrchestratorSetup<'a> { pub async fn setup(self) -> Result<Orchestrator<'a>> { - let merged_stores = MergedStores::new(self.release_store, self.staging_store); let scheduler = EndpointScheduler::setup( self.endpoint_config, - merged_stores.clone(), + self.staging_store.clone(), + self.release_store.clone(), self.database.clone(), self.submit.clone(), self.log_dir, @@ -188,7 +188,8 @@ impl<'a> OrchestratorSetup<'a> { Ok(Orchestrator { scheduler, - merged_stores, + staging_store: self.staging_store.clone(), + release_store: self.release_store.clone(), progress_generator: self.progress_generator, source_cache: self.source_cache, jobdag: self.jobdag, @@ -245,7 +246,8 @@ impl<'a> Orchestrator<'a> { config: self.config, source_cache: &self.source_cache, scheduler: &self.scheduler, - merged_stores: &self.merged_stores, + staging_store: self.staging_store.clone(), + release_store: self.release_store.clone(), database: self.database.clone(), }; @@ -362,7 +364,8 @@ struct TaskPreparation<'a> { config: &'a Configuration, source_cache: &'a SourceCache, scheduler: &'a EndpointScheduler, - merged_stores: &'a MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, database: Arc<PgConnection>, } @@ -377,7 +380,8 @@ struct JobTask<'a> { config: &'a Configuration, source_cache: &'a SourceCache, scheduler: &'a EndpointScheduler, - merged_stores: &'a MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, database: Arc<PgConnection>, /// Channel where the dependencies arrive @@ -432,7 +436,8 @@ impl<'a> JobTask<'a> { config: prep.config, source_cache: prep.source_cache, scheduler: prep.scheduler, - merged_stores: prep.merged_stores, + staging_store: prep.staging_store, + release_store: prep.release_store, database: prep.database.clone(), receiver, @@ -505,8 +510,8 @@ impl<'a> JobTask<'a> { // check if a job that looks very similar to this job has already produced artifacts. // If it has, simply return those (plus the received ones) { - let release_store = self.merged_stores.release().read().await; - let staging_store = self.merged_stores.staging().read().await; + let release_store = self.release_store.read().await; + let staging_store = self.staging_store.read().await; // Use the environment of the job definition, as it appears in the job DAG. // @@ -545,7 +550,6 @@ impl<'a> JobTask<'a> { debug!("[{}]: Found {} replacement artifacts", self.jobdef.job.uuid(), replacement_artifacts.len()); trace!("[{}]: Found replacement artifacts: {:?}", self.jobdef.job.uuid(), replacement_artifacts); - let merged_stores = self.merged_stores; let mut artifacts = replacement_artifacts .into_iter() |