diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-02-17 09:48:07 +0100 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@atos.net> | 2021-02-17 09:59:34 +0100 |
commit | 6b2e73a9cf24c80b3b86df68ff62392fdfc1e09c (patch) | |
tree | 4a448aadc8ba97f94fec1b877c050f987a89c32d /src/endpoint/scheduler.rs | |
parent | 04e7602e34987b832e1d2e95405f61443b30a641 (diff) |
Pass MergedStores to endpoint/jobs rather than only staging store
This patch changes the code so that the MergedStores object is known in the
endpoints and job execution code.
This is necessary, because the jobs must be able to load artifacts from the
release store as well, for example if a library was released in another submit
and we can reuse it because the script for that library didn't change.
For that, the interface of StoreRoot::join() was changed
- -> Result<FullArtifactPath<'a>>
+ -> Result<Option<FullArtifactPath<'a>>>
whereas it returns an Ok(None) if the artifact requested does not exist.
This was necessary to try-joining a path on the store root of the staging store,
and if there is no such file continue with try-joining the path on the release
store.
The calling code got a bit more complex with that change, though.
Next, the MergedStores got a `derive(Clone)` because clone()ing it is cheap
(two `Arc`s) and necessary to pass it easily to the jobs.
Each instance of the code where the staging store was consulted was changed to
consult the the release store as well.
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/endpoint/scheduler.rs')
-rw-r--r-- | src/endpoint/scheduler.rs | 22 |
1 files changed, 10 insertions, 12 deletions
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index cb9d04f..17f9f24 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -31,7 +31,7 @@ use crate::db::models as dbmodels; use crate::endpoint::Endpoint; use crate::endpoint::EndpointConfiguration; use crate::filestore::ArtifactPath; -use crate::filestore::StagingStore; +use crate::filestore::MergedStores; use crate::job::JobResource; use crate::job::RunnableJob; use crate::log::LogItem; @@ -40,7 +40,7 @@ pub struct EndpointScheduler { log_dir: Option<PathBuf>, endpoints: Vec<Arc<RwLock<Endpoint>>>, - staging_store: Arc<RwLock<StagingStore>>, + merged_stores: MergedStores, db: Arc<PgConnection>, submit: crate::db::models::Submit, } @@ -48,7 +48,7 @@ pub struct EndpointScheduler { impl EndpointScheduler { pub async fn setup( endpoints: Vec<EndpointConfiguration>, - staging_store: Arc<RwLock<StagingStore>>, + merged_stores: MergedStores, db: Arc<PgConnection>, submit: crate::db::models::Submit, log_dir: Option<PathBuf>, @@ -58,7 +58,7 @@ impl EndpointScheduler { Ok(EndpointScheduler { log_dir, endpoints, - staging_store, + merged_stores, db, submit, }) @@ -94,7 +94,7 @@ impl EndpointScheduler { bar, endpoint, job, - staging_store: self.staging_store.clone(), + merged_stores: self.merged_stores.clone(), db: self.db.clone(), submit: self.submit.clone(), }) @@ -136,7 +136,7 @@ pub struct JobHandle { job: RunnableJob, bar: ProgressBar, db: Arc<PgConnection>, - staging_store: Arc<RwLock<StagingStore>>, + merged_stores: MergedStores, submit: crate::db::models::Submit, } @@ -157,7 +157,7 @@ impl JobHandle { let job_id = *self.job.uuid(); trace!("Running on Job {} on Endpoint {}", job_id, ep.name()); let prepared_container = ep - .prepare_container(self.job, self.staging_store.clone()) + .prepare_container(self.job, self.merged_stores.clone()) .await?; let container_id = prepared_container.create_info().id.clone(); let running_container = prepared_container @@ -219,7 +219,7 @@ impl JobHandle { } let res: crate::endpoint::FinalizedContainer = run_container - .finalize(self.staging_store.clone()) + .finalize(self.merged_stores.clone()) .await .context("Finalizing container") .with_context(|| { @@ -256,15 +256,13 @@ impl JobHandle { // Have to do it the ugly way here because of borrowing semantics let mut r = vec![]; - let staging_store_lock = self.staging_store.read().await; for p in paths.iter() { - use std::ops::Deref; trace!("DB: Creating artifact entry for path: {}", p.display()); let _ = dbmodels::Artifact::create(&self.db, p, &job)?; r.push({ - staging_store_lock - .deref() + self.merged_stores .get(p) + .await .ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))? .clone() }); |