summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-22 10:59:07 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-02-22 10:59:07 +0100
commitc791d23e2fdc8335e6734975ac3bb265b862e3b2 (patch)
treeceb564fb181415a2ef41ffba489d45e30e0aafb5
parent3c7f31f0fa8350f65cf3e74955a22eef0674cd4d (diff)
Remove MergedStores
The concept of the MergedStores type was okay in the beginning, but it got more and more complex to use properly and most of the time, we used the release/staging stores directly anyways. So this removes the MergedStores type, which is a preparation for the change to have multiple release stores. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r--src/endpoint/configured.rs29
-rw-r--r--src/endpoint/scheduler.rs26
-rw-r--r--src/filestore/merged.rs98
-rw-r--r--src/filestore/mod.rs3
-rw-r--r--src/filestore/path.rs4
-rw-r--r--src/orchestrator/orchestrator.rs28
6 files changed, 49 insertions, 139 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 94f134d..b90263e 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -11,6 +11,7 @@
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::str::FromStr;
+use std::sync::Arc;
use anyhow::anyhow;
use anyhow::Context;
@@ -23,16 +24,18 @@ use shiplift::Container;
use shiplift::Docker;
use shiplift::ExecContainerOptions;
use tokio::sync::mpsc::UnboundedSender;
+use tokio::sync::RwLock;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;
use crate::endpoint::EndpointConfiguration;
+use crate::filestore::ReleaseStore;
+use crate::filestore::StagingStore;
use crate::filestore::path::ArtifactPath;
-use crate::filestore::MergedStores;
use crate::job::JobResource;
use crate::job::RunnableJob;
-use crate::log::buffer_stream_to_line_stream;
use crate::log::LogItem;
+use crate::log::buffer_stream_to_line_stream;
use crate::package::Script;
use crate::util::docker::ContainerHash;
use crate::util::docker::ImageName;
@@ -219,9 +222,10 @@ impl Endpoint {
pub async fn prepare_container(
&self,
job: RunnableJob,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
) -> Result<PreparedContainer<'_>> {
- PreparedContainer::new(self, job, merged_stores).await
+ PreparedContainer::new(self, job, staging_store, release_store).await
}
pub async fn number_of_running_containers(&self) -> Result<usize> {
@@ -248,7 +252,8 @@ impl<'a> PreparedContainer<'a> {
async fn new(
endpoint: &'a Endpoint,
job: RunnableJob,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
) -> Result<PreparedContainer<'a>> {
let script = job.script().clone();
let create_info = Self::build_container(endpoint, &job).await?;
@@ -256,7 +261,7 @@ impl<'a> PreparedContainer<'a> {
let (cpysrc, cpyart, cpyscr) = tokio::join!(
Self::copy_source_to_container(&container, &job),
- Self::copy_artifacts_to_container(&container, &job, merged_stores),
+ Self::copy_artifacts_to_container(&container, &job, staging_store, release_store),
Self::copy_script_to_container(&container, &script)
);
@@ -380,7 +385,8 @@ impl<'a> PreparedContainer<'a> {
async fn copy_artifacts_to_container<'ca>(
container: &Container<'ca>,
job: &RunnableJob,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
) -> Result<()> {
job.resources()
.iter()
@@ -403,8 +409,8 @@ impl<'a> PreparedContainer<'a> {
container.id(),
destination.display()
);
- let staging_read = merged_stores.staging().read().await;
- let release_read = merged_stores.release().read().await;
+ let staging_read = staging_store.read().await;
+ let release_read = release_store.read().await;
let buf = match staging_read.root_path().join(&art)? {
Some(fp) => fp,
None => {
@@ -619,7 +625,7 @@ impl<'a> ExecutedContainer<'a> {
&self.script
}
- pub async fn finalize(self, merged_stores: MergedStores) -> Result<FinalizedContainer> {
+ pub async fn finalize(self, staging_store: Arc<RwLock<StagingStore>>) -> Result<FinalizedContainer> {
let (exit_info, artifacts) = match self.exit_info {
Some((false, msg)) => {
let err = anyhow!("Error during container run: '{msg}'", msg = msg.as_deref().unwrap_or(""));
@@ -644,8 +650,7 @@ impl<'a> ExecutedContainer<'a> {
.map_err(Error::from)
});
- let mut writelock = merged_stores.staging().write().await;
-
+ let mut writelock = staging_store.write().await;
let artifacts = writelock
.write_files_from_tar_stream(tar_stream)
.await
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 17f9f24..db0b826 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -31,7 +31,8 @@ use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::ArtifactPath;
-use crate::filestore::MergedStores;
+use crate::filestore::ReleaseStore;
+use crate::filestore::StagingStore;
use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::log::LogItem;
@@ -40,7 +41,8 @@ pub struct EndpointScheduler {
log_dir: Option<PathBuf>,
endpoints: Vec<Arc<RwLock<Endpoint>>>,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
db: Arc<PgConnection>,
submit: crate::db::models::Submit,
}
@@ -48,7 +50,8 @@ pub struct EndpointScheduler {
impl EndpointScheduler {
pub async fn setup(
endpoints: Vec<EndpointConfiguration>,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
db: Arc<PgConnection>,
submit: crate::db::models::Submit,
log_dir: Option<PathBuf>,
@@ -58,7 +61,8 @@ impl EndpointScheduler {
Ok(EndpointScheduler {
log_dir,
endpoints,
- merged_stores,
+ staging_store,
+ release_store,
db,
submit,
})
@@ -94,7 +98,8 @@ impl EndpointScheduler {
bar,
endpoint,
job,
- merged_stores: self.merged_stores.clone(),
+ staging_store: self.staging_store.clone(),
+ release_store: self.release_store.clone(),
db: self.db.clone(),
submit: self.submit.clone(),
})
@@ -136,7 +141,8 @@ pub struct JobHandle {
job: RunnableJob,
bar: ProgressBar,
db: Arc<PgConnection>,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
submit: crate::db::models::Submit,
}
@@ -157,7 +163,7 @@ impl JobHandle {
let job_id = *self.job.uuid();
trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
let prepared_container = ep
- .prepare_container(self.job, self.merged_stores.clone())
+ .prepare_container(self.job, self.staging_store.clone(), self.release_store.clone())
.await?;
let container_id = prepared_container.create_info().id.clone();
let running_container = prepared_container
@@ -219,7 +225,7 @@ impl JobHandle {
}
let res: crate::endpoint::FinalizedContainer = run_container
- .finalize(self.merged_stores.clone())
+ .finalize(self.staging_store.clone())
.await
.context("Finalizing container")
.with_context(|| {
@@ -256,13 +262,13 @@ impl JobHandle {
// Have to do it the ugly way here because of borrowing semantics
let mut r = vec![];
+ let staging_read = self.staging_store.read().await;
for p in paths.iter() {
trace!("DB: Creating artifact entry for path: {}", p.display());
let _ = dbmodels::Artifact::create(&self.db, p, &job)?;
r.push({
- self.merged_stores
+ staging_read
.get(p)
- .await
.ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))?
.clone()
});
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
deleted file mode 100644
index 1503870..0000000
--- a/src/filestore/merged.rs
+++ /dev/null
@@ -1,98 +0,0 @@
-//
-// Copyright (c) 2020-2021 science+computing ag and other contributors
-//
-// This program and the accompanying materials are made
-// available under the terms of the Eclipse Public License 2.0
-// which is available at https://www.eclipse.org/legal/epl-2.0/
-//
-// SPDX-License-Identifier: EPL-2.0
-//
-
-// TODO: The MergedStores is not used at all anymore, because we removed the feature while doing
-// the rewrite
-#![allow(unused)]
-
-use std::sync::Arc;
-use std::path::Path;
-
-use anyhow::anyhow;
-use anyhow::Result;
-use getset::Getters;
-use log::trace;
-use tokio::sync::RwLock;
-
-use crate::filestore::path::ArtifactPath;
-use crate::filestore::path::FullArtifactPath;
-use crate::filestore::ReleaseStore;
-use crate::filestore::StagingStore;
-
-
-/// A type that merges the release store and the staging store
-///
-/// The stores are not actually merged (on disk or in memory), but the querying mechanism works in
-/// a way where it _always_ preferes the staging store over the release store.
-///
-#[derive(Clone, Getters)]
-pub struct MergedStores {
- #[getset(get = "pub")]
- release: Arc<RwLock<ReleaseStore>>,
-
- #[getset(get = "pub")]
- staging: Arc<RwLock<StagingStore>>,
-}
-
-impl MergedStores {
- pub fn new(release: Arc<RwLock<ReleaseStore>>, staging: Arc<RwLock<StagingStore>>) -> Self {
- MergedStores { release, staging }
- }
-
- pub async fn get_artifact_by_path(&self, p: &Path) -> Result<Option<ArtifactPath>> {
- trace!("Fetching artifact from path: {:?}", p.display());
- let artifact_path = ArtifactPath::new(p.to_path_buf())?;
-
- let staging = &mut self.staging.write().await.0;
- let staging_path = staging
- .root_path()
- .join(&artifact_path)?
- .ok_or_else(|| anyhow!("Does not exist in staging store: {:?}", artifact_path))?;
- trace!("staging_path = {:?}", staging_path.display());
-
- if staging_path.exists() {
- let art = if let Some(art) = staging.get(&artifact_path) {
- art
- } else {
- trace!("Loading path from staging store: {:?}", artifact_path.display());
- staging.load_from_path(&artifact_path)
- };
-
- return Ok(Some(art.clone()))
- }
-
- let release = &mut self.release.write().await.0;
- let release_path = release.root_path()
- .join(&artifact_path)?
- .ok_or_else(|| anyhow!("Not found in release store: {:?}", artifact_path))?;
- trace!("release_path = {:?}", release_path);
-
- if release_path.exists() {
- let art = if let Some(art) = release.get(&artifact_path) {
- art
- } else {
- trace!("Loading path from release store: {:?}", artifact_path.display());
- release.load_from_path(&artifact_path)
- };
- return Ok(Some(art.clone()))
- }
-
- Ok(None)
- }
-
- pub async fn get(&self, p: &ArtifactPath) -> Option<ArtifactPath> {
- if let Some(a) = self.staging.read().await.get(p).cloned() {
- return Some(a)
- }
-
- self.release.read().await.get(p).cloned()
- }
-
-}
diff --git a/src/filestore/mod.rs b/src/filestore/mod.rs
index 29eb476..8d788e2 100644
--- a/src/filestore/mod.rs
+++ b/src/filestore/mod.rs
@@ -14,9 +14,6 @@ pub use release::*;
mod staging;
pub use staging::*;
-mod merged;
-pub use merged::*;
-
pub mod path;
pub use path::ArtifactPath;
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index 19016bd..4d75855 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -154,10 +154,6 @@ impl<'a> FullArtifactPath<'a> {
self.0 .0.join(&self.1 .0)
}
- pub fn exists(&self) -> bool {
- self.joined().exists()
- }
-
pub fn display(&self) -> FullArtifactPathDisplay<'a> {
FullArtifactPathDisplay(self.0, self.1)
}
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index e6e24c5..51c6d9a 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -34,7 +34,6 @@ use crate::db::models as dbmodels;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
use crate::filestore::ArtifactPath;
-use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobDefinition;
@@ -153,7 +152,8 @@ use crate::util::progress::ProgressBars;
pub struct Orchestrator<'a> {
scheduler: EndpointScheduler,
progress_generator: ProgressBars,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
jobdag: Dag,
config: &'a Configuration,
@@ -176,10 +176,10 @@ pub struct OrchestratorSetup<'a> {
impl<'a> OrchestratorSetup<'a> {
pub async fn setup(self) -> Result<Orchestrator<'a>> {
- let merged_stores = MergedStores::new(self.release_store, self.staging_store);
let scheduler = EndpointScheduler::setup(
self.endpoint_config,
- merged_stores.clone(),
+ self.staging_store.clone(),
+ self.release_store.clone(),
self.database.clone(),
self.submit.clone(),
self.log_dir,
@@ -188,7 +188,8 @@ impl<'a> OrchestratorSetup<'a> {
Ok(Orchestrator {
scheduler,
- merged_stores,
+ staging_store: self.staging_store.clone(),
+ release_store: self.release_store.clone(),
progress_generator: self.progress_generator,
source_cache: self.source_cache,
jobdag: self.jobdag,
@@ -245,7 +246,8 @@ impl<'a> Orchestrator<'a> {
config: self.config,
source_cache: &self.source_cache,
scheduler: &self.scheduler,
- merged_stores: &self.merged_stores,
+ staging_store: self.staging_store.clone(),
+ release_store: self.release_store.clone(),
database: self.database.clone(),
};
@@ -362,7 +364,8 @@ struct TaskPreparation<'a> {
config: &'a Configuration,
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
- merged_stores: &'a MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
database: Arc<PgConnection>,
}
@@ -377,7 +380,8 @@ struct JobTask<'a> {
config: &'a Configuration,
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
- merged_stores: &'a MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
database: Arc<PgConnection>,
/// Channel where the dependencies arrive
@@ -432,7 +436,8 @@ impl<'a> JobTask<'a> {
config: prep.config,
source_cache: prep.source_cache,
scheduler: prep.scheduler,
- merged_stores: prep.merged_stores,
+ staging_store: prep.staging_store,
+ release_store: prep.release_store,
database: prep.database.clone(),
receiver,
@@ -505,8 +510,8 @@ impl<'a> JobTask<'a> {
// check if a job that looks very similar to this job has already produced artifacts.
// If it has, simply return those (plus the received ones)
{
- let release_store = self.merged_stores.release().read().await;
- let staging_store = self.merged_stores.staging().read().await;
+ let release_store = self.release_store.read().await;
+ let staging_store = self.staging_store.read().await;
// Use the environment of the job definition, as it appears in the job DAG.
//
@@ -545,7 +550,6 @@ impl<'a> JobTask<'a> {
debug!("[{}]: Found {} replacement artifacts", self.jobdef.job.uuid(), replacement_artifacts.len());
trace!("[{}]: Found replacement artifacts: {:?}", self.jobdef.job.uuid(), replacement_artifacts);
- let merged_stores = self.merged_stores;
let mut artifacts = replacement_artifacts
.into_iter()