summaryrefslogtreecommitdiffstats
path: root/src/endpoint/configured.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-17 09:48:07 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-02-17 09:59:34 +0100
commit6b2e73a9cf24c80b3b86df68ff62392fdfc1e09c (patch)
tree4a448aadc8ba97f94fec1b877c050f987a89c32d /src/endpoint/configured.rs
parent04e7602e34987b832e1d2e95405f61443b30a641 (diff)
Pass MergedStores to endpoint/jobs rather than only staging store
This patch changes the code so that the MergedStores object is known in the endpoints and job execution code. This is necessary, because the jobs must be able to load artifacts from the release store as well, for example if a library was released in another submit and we can reuse it because the script for that library didn't change. For that, the interface of StoreRoot::join() was changed - -> Result<FullArtifactPath<'a>> + -> Result<Option<FullArtifactPath<'a>>> whereas it returns an Ok(None) if the artifact requested does not exist. This was necessary to try-joining a path on the store root of the staging store, and if there is no such file continue with try-joining the path on the release store. The calling code got a bit more complex with that change, though. Next, the MergedStores got a `derive(Clone)` because clone()ing it is cheap (two `Arc`s) and necessary to pass it easily to the jobs. Each instance of the code where the staging store was consulted was changed to consult the the release store as well. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r--src/endpoint/configured.rs50
1 files changed, 27 insertions, 23 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index b908cd4..94f134d 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -11,7 +11,6 @@
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::str::FromStr;
-use std::sync::Arc;
use anyhow::anyhow;
use anyhow::Context;
@@ -23,14 +22,13 @@ use log::trace;
use shiplift::Container;
use shiplift::Docker;
use shiplift::ExecContainerOptions;
-use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::path::ArtifactPath;
-use crate::filestore::StagingStore;
+use crate::filestore::MergedStores;
use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::log::buffer_stream_to_line_stream;
@@ -221,9 +219,9 @@ impl Endpoint {
pub async fn prepare_container(
&self,
job: RunnableJob,
- staging: Arc<RwLock<StagingStore>>,
+ merged_stores: MergedStores,
) -> Result<PreparedContainer<'_>> {
- PreparedContainer::new(self, job, staging).await
+ PreparedContainer::new(self, job, merged_stores).await
}
pub async fn number_of_running_containers(&self) -> Result<usize> {
@@ -250,7 +248,7 @@ impl<'a> PreparedContainer<'a> {
async fn new(
endpoint: &'a Endpoint,
job: RunnableJob,
- staging: Arc<RwLock<StagingStore>>,
+ merged_stores: MergedStores,
) -> Result<PreparedContainer<'a>> {
let script = job.script().clone();
let create_info = Self::build_container(endpoint, &job).await?;
@@ -258,7 +256,7 @@ impl<'a> PreparedContainer<'a> {
let (cpysrc, cpyart, cpyscr) = tokio::join!(
Self::copy_source_to_container(&container, &job),
- Self::copy_artifacts_to_container(&container, &job, staging),
+ Self::copy_artifacts_to_container(&container, &job, merged_stores),
Self::copy_script_to_container(&container, &script)
);
@@ -382,7 +380,7 @@ impl<'a> PreparedContainer<'a> {
async fn copy_artifacts_to_container<'ca>(
container: &Container<'ca>,
job: &RunnableJob,
- staging: Arc<RwLock<StagingStore>>,
+ merged_stores: MergedStores,
) -> Result<()> {
job.resources()
.iter()
@@ -405,19 +403,25 @@ impl<'a> PreparedContainer<'a> {
container.id(),
destination.display()
);
- let buf = staging
- .read()
- .await
- .root_path()
- .join(&art)?
- .read()
- .await
- .with_context(|| {
- anyhow!(
- "Reading artifact {}, so it can be copied to container",
- art.display()
- )
- })?;
+ let staging_read = merged_stores.staging().read().await;
+ let release_read = merged_stores.release().read().await;
+ let buf = match staging_read.root_path().join(&art)? {
+ Some(fp) => fp,
+ None => {
+ release_read
+ .root_path()
+ .join(&art)?
+ .ok_or_else(|| anyhow!("Not found in staging or release store: {:?}", art))?
+ },
+ }
+ .read()
+ .await
+ .with_context(|| {
+ anyhow!(
+ "Reading artifact {}, so it can be copied to container",
+ art.display()
+ )
+ })?;
let r = container
.copy_file_into(&destination, &buf)
@@ -615,7 +619,7 @@ impl<'a> ExecutedContainer<'a> {
&self.script
}
- pub async fn finalize(self, staging: Arc<RwLock<StagingStore>>) -> Result<FinalizedContainer> {
+ pub async fn finalize(self, merged_stores: MergedStores) -> Result<FinalizedContainer> {
let (exit_info, artifacts) = match self.exit_info {
Some((false, msg)) => {
let err = anyhow!("Error during container run: '{msg}'", msg = msg.as_deref().unwrap_or(""));
@@ -640,7 +644,7 @@ impl<'a> ExecutedContainer<'a> {
.map_err(Error::from)
});
- let mut writelock = staging.write().await;
+ let mut writelock = merged_stores.staging().write().await;
let artifacts = writelock
.write_files_from_tar_stream(tar_stream)