summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/endpoint/configured.rs29
-rw-r--r--src/endpoint/scheduler.rs26
-rw-r--r--src/filestore/merged.rs98
-rw-r--r--src/filestore/mod.rs3
-rw-r--r--src/filestore/path.rs4
-rw-r--r--src/orchestrator/orchestrator.rs28
6 files changed, 49 insertions, 139 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 94f134d..b90263e 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -11,6 +11,7 @@
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::str::FromStr;
+use std::sync::Arc;
use anyhow::anyhow;
use anyhow::Context;
@@ -23,16 +24,18 @@ use shiplift::Container;
use shiplift::Docker;
use shiplift::ExecContainerOptions;
use tokio::sync::mpsc::UnboundedSender;
+use tokio::sync::RwLock;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;
use crate::endpoint::EndpointConfiguration;
+use crate::filestore::ReleaseStore;
+use crate::filestore::StagingStore;
use crate::filestore::path::ArtifactPath;
-use crate::filestore::MergedStores;
use crate::job::JobResource;
use crate::job::RunnableJob;
-use crate::log::buffer_stream_to_line_stream;
use crate::log::LogItem;
+use crate::log::buffer_stream_to_line_stream;
use crate::package::Script;
use crate::util::docker::ContainerHash;
use crate::util::docker::ImageName;
@@ -219,9 +222,10 @@ impl Endpoint {
pub async fn prepare_container(
&self,
job: RunnableJob,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
) -> Result<PreparedContainer<'_>> {
- PreparedContainer::new(self, job, merged_stores).await
+ PreparedContainer::new(self, job, staging_store, release_store).await
}
pub async fn number_of_running_containers(&self) -> Result<usize> {
@@ -248,7 +252,8 @@ impl<'a> PreparedContainer<'a> {
async fn new(
endpoint: &'a Endpoint,
job: RunnableJob,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
) -> Result<PreparedContainer<'a>> {
let script = job.script().clone();
let create_info = Self::build_container(endpoint, &job).await?;
@@ -256,7 +261,7 @@ impl<'a> PreparedContainer<'a> {
let (cpysrc, cpyart, cpyscr) = tokio::join!(
Self::copy_source_to_container(&container, &job),
- Self::copy_artifacts_to_container(&container, &job, merged_stores),
+ Self::copy_artifacts_to_container(&container, &job, staging_store, release_store),
Self::copy_script_to_container(&container, &script)
);
@@ -380,7 +385,8 @@ impl<'a> PreparedContainer<'a> {
async fn copy_artifacts_to_container<'ca>(
container: &Container<'ca>,
job: &RunnableJob,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
) -> Result<()> {
job.resources()
.iter()
@@ -403,8 +409,8 @@ impl<'a> PreparedContainer<'a> {
container.id(),
destination.display()
);
- let staging_read = merged_stores.staging().read().await;
- let release_read = merged_stores.release().read().await;
+ let staging_read = staging_store.read().await;
+ let release_read = release_store.read().await;
let buf = match staging_read.root_path().join(&art)? {
Some(fp) => fp,
None => {
@@ -619,7 +625,7 @@ impl<'a> ExecutedContainer<'a> {
&self.script
}
- pub async fn finalize(self, merged_stores: MergedStores) -> Result<FinalizedContainer> {
+ pub async fn finalize(self, staging_store: Arc<RwLock<StagingStore>>) -> Result<FinalizedContainer> {
let (exit_info, artifacts) = match self.exit_info {
Some((false, msg)) => {
let err = anyhow!("Error during container run: '{msg}'", msg = msg.as_deref().unwrap_or(""));
@@ -644,8 +650,7 @@ impl<'a> ExecutedContainer<'a> {
.map_err(Error::from)
});
- let mut writelock = merged_stores.staging().write().await;
-
+ let mut writelock = staging_store.write().await;
let artifacts = writelock
.write_files_from_tar_stream(tar_stream)
.await
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 17f9f24..db0b826 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -31,7 +31,8 @@ use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::ArtifactPath;
-use crate::filestore::MergedStores;
+use crate::filestore::ReleaseStore;
+use crate::filestore::StagingStore;
use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::log::LogItem;
@@ -40,7 +41,8 @@ pub struct EndpointScheduler {
log_dir: Option<PathBuf>,
endpoints: Vec<Arc<RwLock<Endpoint>>>,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
db: Arc<PgConnection>,
submit: crate::db::models::Submit,
}
@@ -48,7 +50,8 @@ pub struct EndpointScheduler {
impl EndpointScheduler {
pub async fn setup(
endpoints: Vec<EndpointConfiguration>,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
db: Arc<PgConnection>,
submit: crate::db::models::Submit,
log_dir: Option<PathBuf>,
@@ -58,7 +61,8 @@ impl EndpointScheduler {
Ok(EndpointScheduler {
log_dir,
endpoints,
- merged_stores,
+ staging_store,
+ release_store,
db,
submit,
})
@@ -94,7 +98,8 @@ impl EndpointScheduler {
bar,
endpoint,
job,
- merged_stores: self.merged_stores.clone(),
+ staging_store: self.staging_store.clone(),
+ release_store: self.release_store.clone(),
db: self.db.clone(),
submit: self.submit.clone(),
})
@@ -136,7 +141,8 @@ pub struct JobHandle {
job: RunnableJob,
bar: ProgressBar,
db: Arc<PgConnection>,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
submit: crate::db::models::Submit,
}
@@ -157,7 +163,7 @@ impl JobHandle {
let job_id = *self.job.uuid();
trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
let prepared_container = ep
- .prepare_container(self.job, self.merged_stores.clone())
+ .prepare_container(self.job, self.staging_store.clone(), self.release_store.clone())
.await?;
let container_id = prepared_container.create_info().id.clone();
let running_container = prepared_container
@@ -219,7 +225,7 @@ impl JobHandle {
}
let res: crate::endpoint::FinalizedContainer = run_container
- .finalize(self.merged_stores.clone())
+ .finalize(self.staging_store.clone())
.await
.context("Finalizing container")
.with_context(|| {
@@ -256,13 +262,13 @@ impl JobHandle {
// Have to do it the ugly way here because of borrowing semantics
let mut r = vec![];
+ let staging_read = self.staging_store.read().await;
for p in paths.iter() {
trace!("DB: Creating artifact entry for path: {}", p.display());
let _ = dbmodels::Artifact::create(&self.db, p, &job)?;
r.push({
- self.merged_stores
+ staging_read
.get(p)
- .await
.ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))?
.clone()
});
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
deleted file mode 100644
index 1503870..0000000
--- a/src/filestore/merged.rs
+++ /dev/null
@@ -1,98 +0,0 @@
-//
-// Copyright (c) 2020-2021 science+computing ag and other contributors
-//
-// This program and the accompanying materials are made
-// available under the terms of the Eclipse Public License 2.0
-// which is available at https://www.eclipse.org/legal/epl-2.0/
-//
-// SPDX-License-Identifier: EPL-2.0
-//
-
-// TODO: The MergedStores is not used at all anymore, because we removed the feature while doing
-// the rewrite
-#![allow(unused)]
-
-use std::sync::Arc;
-use std::path::Path;
-
-use anyhow::anyhow;
-use anyhow::Result;
-use getset::Getters;
-use log::trace;
-use tokio::sync::RwLock;
-
-use crate::filestore::path::ArtifactPath;
-use crate::filestore::path::FullArtifactPath;
-use crate::filestore::ReleaseStore;
-use crate::filestore::StagingStore;
-
-
-/// A type that merges the release store and the staging store
-///
-/// The stores are not actually merged (on disk or in memory), but the querying mechanism works in
-/// a way where it _always_ preferes the staging store over the release store.
-///
-#[derive(Clone, Getters)]
-pub struct MergedStores {
- #[getset(get = "pub")]
- release: Arc<RwLock<ReleaseStore>>,
-
- #[getset(get = "pub")]
- staging: Arc<RwLock<StagingStore>>,
-}
-
-impl MergedStores {
- pub fn new(release: Arc<RwLock<ReleaseStore>>, staging: Arc<RwLock<StagingStore>>) -> Self {
- MergedStores { release, staging }
- }
-
- pub async fn get_artifact_by_path(&self, p: &Path) -> Result<Option<ArtifactPath>> {
- trace!("Fetching artifact from path: {:?}", p.display());
- let artifact_path = ArtifactPath::new(p.to_path_buf())?;
-
- let staging = &mut self.staging.write().await.0;
- let staging_path = staging
- .root_path()
- .join(&artifact_path)?
- .ok_or_else(|| anyhow!("Does not exist in staging store: {:?}", artifact_path))?;
- trace!("staging_path = {:?}", staging_path.display());
-
- if staging_path.exists() {
- let art = if let Some(art) = staging.get(&artifact_path) {
- art
- } else {
- trace!("Loading path from staging store: {:?}", artifact_path.display());
- staging.load_from_path(&artifact_path)
- };
-
- return Ok(Some(art.clone()))
- }
-
- let release = &mut self.release.write().await.0;
- let release_path = release.root_path()
- .join(&artifact_path)?
- .ok_or_else(|| anyhow!("Not found in release store: {:?}", artifact_path))?;
- trace!("release_path = {:?}", release_path);
-
- if release_path.exists() {
- let art = if let Some(art) = release.get(&artifact_path) {
- art
- } else {
- trace!("Loading path from release store: {:?}", artifact_path.display());
- release.load_from_path(&artifact_path)
- };
- return Ok(Some(art.clone()))
- }
-
- Ok(None)
- }
-
- pub async fn get(&self, p: &ArtifactPath) -> Option<ArtifactPath> {
- if let Some(a) = self.staging.read().await.get(p).cloned() {
- return Some(a)
- }
-
- self.release.read().await.get(p).cloned()
- }
-
-}
diff --git a/src/filestore/mod.rs b/src/filestore/mod.rs
index 29eb476..8d788e2 100644
--- a/src/filestore/mod.rs
+++ b/src/filestore/mod.rs
@@ -14,9 +14,6 @@ pub use release::*;
mod staging;
pub use staging::*;
-mod merged;
-pub use merged::*;
-
pub mod path;
pub use path::ArtifactPath;
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index 19016bd..4d75855 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -154,10 +154,6 @@ impl<'a> FullArtifactPath<'a> {
self.0 .0.join(&self.1 .0)
}
- pub fn exists(&self) -> bool {
- self.joined().exists()
- }
-
pub fn display(&self) -> FullArtifactPathDisplay<'a> {
FullArtifactPathDisplay(self.0, self.1)
}
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index e6e24c5..51c6d9a 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -34,7 +34,6 @@ use crate::db::models as dbmodels;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
use crate::filestore::ArtifactPath;
-use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::job::JobDefinition;
@@ -153,7 +152,8 @@ use crate::util::progress::ProgressBars;
pub struct Orchestrator<'a> {
scheduler: EndpointScheduler,
progress_generator: ProgressBars,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
source_cache: SourceCache,
jobdag: Dag,
config: &'a Configuration,
@@ -176,10 +176,10 @@ pub struct OrchestratorSetup<'a> {
impl<'a> OrchestratorSetup<'a> {
pub async fn setup(self) -> Result<Orchestrator<'a>> {
- let merged_stores = MergedStores::new(self.release_store, self.staging_store);
let scheduler = EndpointScheduler::setup(
self.endpoint_config,
- merged_stores.clone(),
+ self.staging_store.clone(),
+ self.release_store.clone(),
self.database.clone(),
self.submit.clone(),
self.log_dir,
@@ -188,7 +188,8 @@ impl<'a> OrchestratorSetup<'a> {
Ok(Orchestrator {
scheduler,
- merged_stores,
+ staging_store: self.staging_store.clone(),
+ release_store: self.release_store.clone(),
progress_generator: self.progress_generator,
source_cache: self.source_cache,
jobdag: self.jobdag,
@@ -245,7 +246,8 @@ impl<'a> Orchestrator<'a> {
config: self.config,
source_cache: &self.source_cache,
scheduler: &self.scheduler,
- merged_stores: &self.merged_stores,
+ staging_store: self.staging_store.clone(),
+ release_store: self.release_store.clone(),
database: self.database.clone(),
};
@@ -362,7 +364,8 @@ struct TaskPreparation<'a> {
config: &'a Configuration,
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
- merged_stores: &'a MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
database: Arc<PgConnection>,
}
@@ -377,7 +380,8 @@ struct JobTask<'a> {
config: &'a Configuration,
source_cache: &'a SourceCache,
scheduler: &'a EndpointScheduler,
- merged_stores: &'a MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
database: Arc<PgConnection>,
/// Channel where the dependencies arrive
@@ -432,7 +436,8 @@ impl<'a> JobTask<'a> {
config: prep.config,
source_cache: prep.source_cache,
scheduler: prep.scheduler,
- merged_stores: prep.merged_stores,
+ staging_store: prep.staging_store,
+ release_store: prep.release_store,
database: prep.database.clone(),
receiver,
@@ -505,8 +510,8 @@ impl<'a> JobTask<'a> {
// check if a job that looks very similar to this job has already produced artifacts.
// If it has, simply return those (plus the received ones)
{
- let release_store = self.merged_stores.release().read().await;
- let staging_store = self.merged_stores.staging().read().await;
+ let release_store = self.release_store.read().await;
+ let staging_store = self.staging_store.read().await;
// Use the environment of the job definition, as it appears in the job DAG.
//
@@ -545,7 +550,6 @@ impl<'a> JobTask<'a> {
debug!("[{}]: Found {} replacement artifacts", self.jobdef.job.uuid(), replacement_artifacts.len());
trace!("[{}]: Found replacement artifacts: {:?}", self.jobdef.job.uuid(), replacement_artifacts);
- let merged_stores = self.merged_stores;
let mut artifacts = replacement_artifacts
.into_iter()