summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-17 09:48:07 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-02-17 09:59:34 +0100
commit6b2e73a9cf24c80b3b86df68ff62392fdfc1e09c (patch)
tree4a448aadc8ba97f94fec1b877c050f987a89c32d
parent04e7602e34987b832e1d2e95405f61443b30a641 (diff)
Pass MergedStores to endpoint/jobs rather than only staging store
This patch changes the code so that the MergedStores object is known in the endpoints and job execution code. This is necessary, because the jobs must be able to load artifacts from the release store as well, for example if a library was released in another submit and we can reuse it because the script for that library didn't change. For that, the interface of StoreRoot::join() was changed - -> Result<FullArtifactPath<'a>> + -> Result<Option<FullArtifactPath<'a>>> whereas it returns an Ok(None) if the artifact requested does not exist. This was necessary to try-joining a path on the store root of the staging store, and if there is no such file continue with try-joining the path on the release store. The calling code got a bit more complex with that change, though. Next, the MergedStores got a `derive(Clone)` because clone()ing it is cheap (two `Arc`s) and necessary to pass it easily to the jobs. Each instance of the code where the staging store was consulted was changed to consult the the release store as well. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r--src/db/find_artifacts.rs4
-rw-r--r--src/endpoint/configured.rs50
-rw-r--r--src/endpoint/scheduler.rs22
-rw-r--r--src/filestore/merged.rs15
-rw-r--r--src/filestore/path.rs6
-rw-r--r--src/orchestrator/orchestrator.rs5
6 files changed, 56 insertions, 46 deletions
diff --git a/src/db/find_artifacts.rs b/src/db/find_artifacts.rs
index 613e63f..e3e418d 100644
--- a/src/db/find_artifacts.rs
+++ b/src/db/find_artifacts.rs
@@ -213,7 +213,7 @@ pub fn find_artifacts<'a>(
);
if let Some(art) = staging.get(&artpath) {
trace!("Found in staging: {:?}", art);
- return staging.root_path().join(art).map(|p| (p, ndt)).map(Some)
+ return staging.root_path().join(art).map(|p| p.map(|p| (p, ndt)))
}
}
@@ -222,7 +222,7 @@ pub fn find_artifacts<'a>(
// filesystem.
if let Some(art) = release_store.get(&artpath) {
trace!("Found in release: {:?}", art);
- return release_store.root_path().join(art).map(|p| (p, ndt)).map(Some)
+ return release_store.root_path().join(art).map(|p| p.map(|p| (p, ndt)))
}
Ok(None)
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index b908cd4..94f134d 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -11,7 +11,6 @@
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::str::FromStr;
-use std::sync::Arc;
use anyhow::anyhow;
use anyhow::Context;
@@ -23,14 +22,13 @@ use log::trace;
use shiplift::Container;
use shiplift::Docker;
use shiplift::ExecContainerOptions;
-use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::path::ArtifactPath;
-use crate::filestore::StagingStore;
+use crate::filestore::MergedStores;
use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::log::buffer_stream_to_line_stream;
@@ -221,9 +219,9 @@ impl Endpoint {
pub async fn prepare_container(
&self,
job: RunnableJob,
- staging: Arc<RwLock<StagingStore>>,
+ merged_stores: MergedStores,
) -> Result<PreparedContainer<'_>> {
- PreparedContainer::new(self, job, staging).await
+ PreparedContainer::new(self, job, merged_stores).await
}
pub async fn number_of_running_containers(&self) -> Result<usize> {
@@ -250,7 +248,7 @@ impl<'a> PreparedContainer<'a> {
async fn new(
endpoint: &'a Endpoint,
job: RunnableJob,
- staging: Arc<RwLock<StagingStore>>,
+ merged_stores: MergedStores,
) -> Result<PreparedContainer<'a>> {
let script = job.script().clone();
let create_info = Self::build_container(endpoint, &job).await?;
@@ -258,7 +256,7 @@ impl<'a> PreparedContainer<'a> {
let (cpysrc, cpyart, cpyscr) = tokio::join!(
Self::copy_source_to_container(&container, &job),
- Self::copy_artifacts_to_container(&container, &job, staging),
+ Self::copy_artifacts_to_container(&container, &job, merged_stores),
Self::copy_script_to_container(&container, &script)
);
@@ -382,7 +380,7 @@ impl<'a> PreparedContainer<'a> {
async fn copy_artifacts_to_container<'ca>(
container: &Container<'ca>,
job: &RunnableJob,
- staging: Arc<RwLock<StagingStore>>,
+ merged_stores: MergedStores,
) -> Result<()> {
job.resources()
.iter()
@@ -405,19 +403,25 @@ impl<'a> PreparedContainer<'a> {
container.id(),
destination.display()
);
- let buf = staging
- .read()
- .await
- .root_path()
- .join(&art)?
- .read()
- .await
- .with_context(|| {
- anyhow!(
- "Reading artifact {}, so it can be copied to container",
- art.display()
- )
- })?;
+ let staging_read = merged_stores.staging().read().await;
+ let release_read = merged_stores.release().read().await;
+ let buf = match staging_read.root_path().join(&art)? {
+ Some(fp) => fp,
+ None => {
+ release_read
+ .root_path()
+ .join(&art)?
+ .ok_or_else(|| anyhow!("Not found in staging or release store: {:?}", art))?
+ },
+ }
+ .read()
+ .await
+ .with_context(|| {
+ anyhow!(
+ "Reading artifact {}, so it can be copied to container",
+ art.display()
+ )
+ })?;
let r = container
.copy_file_into(&destination, &buf)
@@ -615,7 +619,7 @@ impl<'a> ExecutedContainer<'a> {
&self.script
}
- pub async fn finalize(self, staging: Arc<RwLock<StagingStore>>) -> Result<FinalizedContainer> {
+ pub async fn finalize(self, merged_stores: MergedStores) -> Result<FinalizedContainer> {
let (exit_info, artifacts) = match self.exit_info {
Some((false, msg)) => {
let err = anyhow!("Error during container run: '{msg}'", msg = msg.as_deref().unwrap_or(""));
@@ -640,7 +644,7 @@ impl<'a> ExecutedContainer<'a> {
.map_err(Error::from)
});
- let mut writelock = staging.write().await;
+ let mut writelock = merged_stores.staging().write().await;
let artifacts = writelock
.write_files_from_tar_stream(tar_stream)
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index cb9d04f..17f9f24 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -31,7 +31,7 @@ use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::ArtifactPath;
-use crate::filestore::StagingStore;
+use crate::filestore::MergedStores;
use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::log::LogItem;
@@ -40,7 +40,7 @@ pub struct EndpointScheduler {
log_dir: Option<PathBuf>,
endpoints: Vec<Arc<RwLock<Endpoint>>>,
- staging_store: Arc<RwLock<StagingStore>>,
+ merged_stores: MergedStores,
db: Arc<PgConnection>,
submit: crate::db::models::Submit,
}
@@ -48,7 +48,7 @@ pub struct EndpointScheduler {
impl EndpointScheduler {
pub async fn setup(
endpoints: Vec<EndpointConfiguration>,
- staging_store: Arc<RwLock<StagingStore>>,
+ merged_stores: MergedStores,
db: Arc<PgConnection>,
submit: crate::db::models::Submit,
log_dir: Option<PathBuf>,
@@ -58,7 +58,7 @@ impl EndpointScheduler {
Ok(EndpointScheduler {
log_dir,
endpoints,
- staging_store,
+ merged_stores,
db,
submit,
})
@@ -94,7 +94,7 @@ impl EndpointScheduler {
bar,
endpoint,
job,
- staging_store: self.staging_store.clone(),
+ merged_stores: self.merged_stores.clone(),
db: self.db.clone(),
submit: self.submit.clone(),
})
@@ -136,7 +136,7 @@ pub struct JobHandle {
job: RunnableJob,
bar: ProgressBar,
db: Arc<PgConnection>,
- staging_store: Arc<RwLock<StagingStore>>,
+ merged_stores: MergedStores,
submit: crate::db::models::Submit,
}
@@ -157,7 +157,7 @@ impl JobHandle {
let job_id = *self.job.uuid();
trace!("Running on Job {} on Endpoint {}", job_id, ep.name());
let prepared_container = ep
- .prepare_container(self.job, self.staging_store.clone())
+ .prepare_container(self.job, self.merged_stores.clone())
.await?;
let container_id = prepared_container.create_info().id.clone();
let running_container = prepared_container
@@ -219,7 +219,7 @@ impl JobHandle {
}
let res: crate::endpoint::FinalizedContainer = run_container
- .finalize(self.staging_store.clone())
+ .finalize(self.merged_stores.clone())
.await
.context("Finalizing container")
.with_context(|| {
@@ -256,15 +256,13 @@ impl JobHandle {
// Have to do it the ugly way here because of borrowing semantics
let mut r = vec![];
- let staging_store_lock = self.staging_store.read().await;
for p in paths.iter() {
- use std::ops::Deref;
trace!("DB: Creating artifact entry for path: {}", p.display());
let _ = dbmodels::Artifact::create(&self.db, p, &job)?;
r.push({
- staging_store_lock
- .deref()
+ self.merged_stores
.get(p)
+ .await
.ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))?
.clone()
});
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
index 7363eb2..1503870 100644
--- a/src/filestore/merged.rs
+++ b/src/filestore/merged.rs
@@ -12,16 +12,17 @@
// the rewrite
#![allow(unused)]
-
use std::sync::Arc;
use std::path::Path;
+use anyhow::anyhow;
use anyhow::Result;
use getset::Getters;
use log::trace;
use tokio::sync::RwLock;
use crate::filestore::path::ArtifactPath;
+use crate::filestore::path::FullArtifactPath;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
@@ -31,7 +32,7 @@ use crate::filestore::StagingStore;
/// The stores are not actually merged (on disk or in memory), but the querying mechanism works in
/// a way where it _always_ preferes the staging store over the release store.
///
-#[derive(Getters)]
+#[derive(Clone, Getters)]
pub struct MergedStores {
#[getset(get = "pub")]
release: Arc<RwLock<ReleaseStore>>,
@@ -50,7 +51,10 @@ impl MergedStores {
let artifact_path = ArtifactPath::new(p.to_path_buf())?;
let staging = &mut self.staging.write().await.0;
- let staging_path = staging.root_path().join(&artifact_path)?;
+ let staging_path = staging
+ .root_path()
+ .join(&artifact_path)?
+ .ok_or_else(|| anyhow!("Does not exist in staging store: {:?}", artifact_path))?;
trace!("staging_path = {:?}", staging_path.display());
if staging_path.exists() {
@@ -65,7 +69,9 @@ impl MergedStores {
}
let release = &mut self.release.write().await.0;
- let release_path = release.root_path().join(&artifact_path)?;
+ let release_path = release.root_path()
+ .join(&artifact_path)?
+ .ok_or_else(|| anyhow!("Not found in release store: {:?}", artifact_path))?;
trace!("release_path = {:?}", release_path);
if release_path.exists() {
@@ -88,4 +94,5 @@ impl MergedStores {
self.release.read().await.get(p).cloned()
}
+
}
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index 486532f..19016bd 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -44,15 +44,15 @@ impl StoreRoot {
}
}
- pub fn join<'a>(&'a self, ap: &'a ArtifactPath) -> Result<FullArtifactPath<'a>> {
+ pub fn join<'a>(&'a self, ap: &'a ArtifactPath) -> Result<Option<FullArtifactPath<'a>>> {
let join = self.0.join(&ap.0);
if join.is_file() {
- Ok(FullArtifactPath(&self, ap))
+ Ok(Some(FullArtifactPath(&self, ap)))
} else if join.is_dir() {
Err(anyhow!("Cannot load non-file path: {}", join.display()))
} else {
- Err(anyhow!("Path does not exist: {}", join.display()))
+ Ok(None)
}
}
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index dff5651..e6e24c5 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -176,9 +176,10 @@ pub struct OrchestratorSetup<'a> {
impl<'a> OrchestratorSetup<'a> {
pub async fn setup(self) -> Result<Orchestrator<'a>> {
+ let merged_stores = MergedStores::new(self.release_store, self.staging_store);
let scheduler = EndpointScheduler::setup(
self.endpoint_config,
- self.staging_store.clone(),
+ merged_stores.clone(),
self.database.clone(),
self.submit.clone(),
self.log_dir,
@@ -187,8 +188,8 @@ impl<'a> OrchestratorSetup<'a> {
Ok(Orchestrator {
scheduler,
+ merged_stores,
progress_generator: self.progress_generator,
- merged_stores: MergedStores::new(self.release_store, self.staging_store),
source_cache: self.source_cache,
jobdag: self.jobdag,
config: self.config,