summaryrefslogtreecommitdiffstats
path: root/src/endpoint/configured.rs
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-22 10:59:07 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-02-22 10:59:07 +0100
commitc791d23e2fdc8335e6734975ac3bb265b862e3b2 (patch)
treeceb564fb181415a2ef41ffba489d45e30e0aafb5 /src/endpoint/configured.rs
parent3c7f31f0fa8350f65cf3e74955a22eef0674cd4d (diff)
Remove MergedStores
The concept of the MergedStores type was okay in the beginning, but it got more and more complex to use properly and most of the time, we used the release/staging stores directly anyways. So this removes the MergedStores type, which is a preparation for the change to have multiple release stores. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r--src/endpoint/configured.rs29
1 files changed, 17 insertions, 12 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 94f134d..b90263e 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -11,6 +11,7 @@
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::str::FromStr;
+use std::sync::Arc;
use anyhow::anyhow;
use anyhow::Context;
@@ -23,16 +24,18 @@ use shiplift::Container;
use shiplift::Docker;
use shiplift::ExecContainerOptions;
use tokio::sync::mpsc::UnboundedSender;
+use tokio::sync::RwLock;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;
use crate::endpoint::EndpointConfiguration;
+use crate::filestore::ReleaseStore;
+use crate::filestore::StagingStore;
use crate::filestore::path::ArtifactPath;
-use crate::filestore::MergedStores;
use crate::job::JobResource;
use crate::job::RunnableJob;
-use crate::log::buffer_stream_to_line_stream;
use crate::log::LogItem;
+use crate::log::buffer_stream_to_line_stream;
use crate::package::Script;
use crate::util::docker::ContainerHash;
use crate::util::docker::ImageName;
@@ -219,9 +222,10 @@ impl Endpoint {
pub async fn prepare_container(
&self,
job: RunnableJob,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
) -> Result<PreparedContainer<'_>> {
- PreparedContainer::new(self, job, merged_stores).await
+ PreparedContainer::new(self, job, staging_store, release_store).await
}
pub async fn number_of_running_containers(&self) -> Result<usize> {
@@ -248,7 +252,8 @@ impl<'a> PreparedContainer<'a> {
async fn new(
endpoint: &'a Endpoint,
job: RunnableJob,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
) -> Result<PreparedContainer<'a>> {
let script = job.script().clone();
let create_info = Self::build_container(endpoint, &job).await?;
@@ -256,7 +261,7 @@ impl<'a> PreparedContainer<'a> {
let (cpysrc, cpyart, cpyscr) = tokio::join!(
Self::copy_source_to_container(&container, &job),
- Self::copy_artifacts_to_container(&container, &job, merged_stores),
+ Self::copy_artifacts_to_container(&container, &job, staging_store, release_store),
Self::copy_script_to_container(&container, &script)
);
@@ -380,7 +385,8 @@ impl<'a> PreparedContainer<'a> {
async fn copy_artifacts_to_container<'ca>(
container: &Container<'ca>,
job: &RunnableJob,
- merged_stores: MergedStores,
+ staging_store: Arc<RwLock<StagingStore>>,
+ release_store: Arc<RwLock<ReleaseStore>>,
) -> Result<()> {
job.resources()
.iter()
@@ -403,8 +409,8 @@ impl<'a> PreparedContainer<'a> {
container.id(),
destination.display()
);
- let staging_read = merged_stores.staging().read().await;
- let release_read = merged_stores.release().read().await;
+ let staging_read = staging_store.read().await;
+ let release_read = release_store.read().await;
let buf = match staging_read.root_path().join(&art)? {
Some(fp) => fp,
None => {
@@ -619,7 +625,7 @@ impl<'a> ExecutedContainer<'a> {
&self.script
}
- pub async fn finalize(self, merged_stores: MergedStores) -> Result<FinalizedContainer> {
+ pub async fn finalize(self, staging_store: Arc<RwLock<StagingStore>>) -> Result<FinalizedContainer> {
let (exit_info, artifacts) = match self.exit_info {
Some((false, msg)) => {
let err = anyhow!("Error during container run: '{msg}'", msg = msg.as_deref().unwrap_or(""));
@@ -644,8 +650,7 @@ impl<'a> ExecutedContainer<'a> {
.map_err(Error::from)
});
- let mut writelock = merged_stores.staging().write().await;
-
+ let mut writelock = staging_store.write().await;
let artifacts = writelock
.write_files_from_tar_stream(tar_stream)
.await