diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-02-22 14:18:36 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-02-22 14:18:36 +0100 |
commit | 261b30c9ba1ed11ae28dad222aaa69319c8d7336 (patch) | |
tree | 97fe8bec3cc8f9571ba91e44686e1d5aaf1972c8 /src | |
parent | 010c61f6140513b0dc2582e6bb11cd5913bf4881 (diff) | |
parent | c791d23e2fdc8335e6734975ac3bb265b862e3b2 (diff) |
Merge branch 'remove-merged-store'
Diffstat (limited to 'src')
-rw-r--r-- | src/endpoint/configured.rs | 29 | ||||
-rw-r--r-- | src/endpoint/scheduler.rs | 26 | ||||
-rw-r--r-- | src/filestore/merged.rs | 98 | ||||
-rw-r--r-- | src/filestore/mod.rs | 3 | ||||
-rw-r--r-- | src/filestore/path.rs | 4 | ||||
-rw-r--r-- | src/orchestrator/orchestrator.rs | 28 |
6 files changed, 49 insertions, 139 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 94f134d..b90263e 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -11,6 +11,7 @@ use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::str::FromStr; +use std::sync::Arc; use anyhow::anyhow; use anyhow::Context; @@ -23,16 +24,18 @@ use shiplift::Container; use shiplift::Docker; use shiplift::ExecContainerOptions; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::RwLock; use tokio_stream::StreamExt; use typed_builder::TypedBuilder; use crate::endpoint::EndpointConfiguration; +use crate::filestore::ReleaseStore; +use crate::filestore::StagingStore; use crate::filestore::path::ArtifactPath; -use crate::filestore::MergedStores; use crate::job::JobResource; use crate::job::RunnableJob; -use crate::log::buffer_stream_to_line_stream; use crate::log::LogItem; +use crate::log::buffer_stream_to_line_stream; use crate::package::Script; use crate::util::docker::ContainerHash; use crate::util::docker::ImageName; @@ -219,9 +222,10 @@ impl Endpoint { pub async fn prepare_container( &self, job: RunnableJob, - merged_stores: MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, ) -> Result<PreparedContainer<'_>> { - PreparedContainer::new(self, job, merged_stores).await + PreparedContainer::new(self, job, staging_store, release_store).await } pub async fn number_of_running_containers(&self) -> Result<usize> { @@ -248,7 +252,8 @@ impl<'a> PreparedContainer<'a> { async fn new( endpoint: &'a Endpoint, job: RunnableJob, - merged_stores: MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, ) -> Result<PreparedContainer<'a>> { let script = job.script().clone(); let create_info = Self::build_container(endpoint, &job).await?; @@ -256,7 +261,7 @@ impl<'a> PreparedContainer<'a> { let (cpysrc, cpyart, cpyscr) = tokio::join!( Self::copy_source_to_container(&container, &job), - Self::copy_artifacts_to_container(&container, &job, merged_stores), + Self::copy_artifacts_to_container(&container, &job, staging_store, release_store), Self::copy_script_to_container(&container, &script) ); @@ -380,7 +385,8 @@ impl<'a> PreparedContainer<'a> { async fn copy_artifacts_to_container<'ca>( container: &Container<'ca>, job: &RunnableJob, - merged_stores: MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, ) -> Result<()> { job.resources() .iter() @@ -403,8 +409,8 @@ impl<'a> PreparedContainer<'a> { container.id(), destination.display() ); - let staging_read = merged_stores.staging().read().await; - let release_read = merged_stores.release().read().await; + let staging_read = staging_store.read().await; + let release_read = release_store.read().await; let buf = match staging_read.root_path().join(&art)? { Some(fp) => fp, None => { @@ -619,7 +625,7 @@ impl<'a> ExecutedContainer<'a> { &self.script } - pub async fn finalize(self, merged_stores: MergedStores) -> Result<FinalizedContainer> { + pub async fn finalize(self, staging_store: Arc<RwLock<StagingStore>>) -> Result<FinalizedContainer> { let (exit_info, artifacts) = match self.exit_info { Some((false, msg)) => { let err = anyhow!("Error during container run: '{msg}'", msg = msg.as_deref().unwrap_or("")); @@ -644,8 +650,7 @@ impl<'a> ExecutedContainer<'a> { .map_err(Error::from) }); - let mut writelock = merged_stores.staging().write().await; - + let mut writelock = staging_store.write().await; let artifacts = writelock .write_files_from_tar_stream(tar_stream) .await diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 17f9f24..db0b826 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -31,7 +31,8 @@ use crate::db::models as dbmodels; use crate::endpoint::Endpoint; use crate::endpoint::EndpointConfiguration; use crate::filestore::ArtifactPath; -use crate::filestore::MergedStores; +use crate::filestore::ReleaseStore; +use crate::filestore::StagingStore; use crate::job::JobResource; use crate::job::RunnableJob; use crate::log::LogItem; @@ -40,7 +41,8 @@ pub struct EndpointScheduler { log_dir: Option<PathBuf>, endpoints: Vec<Arc<RwLock<Endpoint>>>, - merged_stores: MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, db: Arc<PgConnection>, submit: crate::db::models::Submit, } @@ -48,7 +50,8 @@ pub struct EndpointScheduler { impl EndpointScheduler { pub async fn setup( endpoints: Vec<EndpointConfiguration>, - merged_stores: MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, db: Arc<PgConnection>, submit: crate::db::models::Submit, log_dir: Option<PathBuf>, @@ -58,7 +61,8 @@ impl EndpointScheduler { Ok(EndpointScheduler { log_dir, endpoints, - merged_stores, + staging_store, + release_store, db, submit, }) @@ -94,7 +98,8 @@ impl EndpointScheduler { bar, endpoint, job, - merged_stores: self.merged_stores.clone(), + staging_store: self.staging_store.clone(), + release_store: self.release_store.clone(), db: self.db.clone(), submit: self.submit.clone(), }) @@ -136,7 +141,8 @@ pub struct JobHandle { job: RunnableJob, bar: ProgressBar, db: Arc<PgConnection>, - merged_stores: MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, submit: crate::db::models::Submit, } @@ -157,7 +163,7 @@ impl JobHandle { let job_id = *self.job.uuid(); trace!("Running on Job {} on Endpoint {}", job_id, ep.name()); let prepared_container = ep - .prepare_container(self.job, self.merged_stores.clone()) + .prepare_container(self.job, self.staging_store.clone(), self.release_store.clone()) .await?; let container_id = prepared_container.create_info().id.clone(); let running_container = prepared_container @@ -219,7 +225,7 @@ impl JobHandle { } let res: crate::endpoint::FinalizedContainer = run_container - .finalize(self.merged_stores.clone()) + .finalize(self.staging_store.clone()) .await .context("Finalizing container") .with_context(|| { @@ -256,13 +262,13 @@ impl JobHandle { // Have to do it the ugly way here because of borrowing semantics let mut r = vec![]; + let staging_read = self.staging_store.read().await; for p in paths.iter() { trace!("DB: Creating artifact entry for path: {}", p.display()); let _ = dbmodels::Artifact::create(&self.db, p, &job)?; r.push({ - self.merged_stores + staging_read .get(p) - .await .ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))? .clone() }); diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs deleted file mode 100644 index 1503870..0000000 --- a/src/filestore/merged.rs +++ /dev/null @@ -1,98 +0,0 @@ -// -// Copyright (c) 2020-2021 science+computing ag and other contributors -// -// This program and the accompanying materials are made -// available under the terms of the Eclipse Public License 2.0 -// which is available at https://www.eclipse.org/legal/epl-2.0/ -// -// SPDX-License-Identifier: EPL-2.0 -// - -// TODO: The MergedStores is not used at all anymore, because we removed the feature while doing -// the rewrite -#![allow(unused)] - -use std::sync::Arc; -use std::path::Path; - -use anyhow::anyhow; -use anyhow::Result; -use getset::Getters; -use log::trace; -use tokio::sync::RwLock; - -use crate::filestore::path::ArtifactPath; -use crate::filestore::path::FullArtifactPath; -use crate::filestore::ReleaseStore; -use crate::filestore::StagingStore; - - -/// A type that merges the release store and the staging store -/// -/// The stores are not actually merged (on disk or in memory), but the querying mechanism works in -/// a way where it _always_ preferes the staging store over the release store. -/// -#[derive(Clone, Getters)] -pub struct MergedStores { - #[getset(get = "pub")] - release: Arc<RwLock<ReleaseStore>>, - - #[getset(get = "pub")] - staging: Arc<RwLock<StagingStore>>, -} - -impl MergedStores { - pub fn new(release: Arc<RwLock<ReleaseStore>>, staging: Arc<RwLock<StagingStore>>) -> Self { - MergedStores { release, staging } - } - - pub async fn get_artifact_by_path(&self, p: &Path) -> Result<Option<ArtifactPath>> { - trace!("Fetching artifact from path: {:?}", p.display()); - let artifact_path = ArtifactPath::new(p.to_path_buf())?; - - let staging = &mut self.staging.write().await.0; - let staging_path = staging - .root_path() - .join(&artifact_path)? - .ok_or_else(|| anyhow!("Does not exist in staging store: {:?}", artifact_path))?; - trace!("staging_path = {:?}", staging_path.display()); - - if staging_path.exists() { - let art = if let Some(art) = staging.get(&artifact_path) { - art - } else { - trace!("Loading path from staging store: {:?}", artifact_path.display()); - staging.load_from_path(&artifact_path) - }; - - return Ok(Some(art.clone())) - } - - let release = &mut self.release.write().await.0; - let release_path = release.root_path() - .join(&artifact_path)? - .ok_or_else(|| anyhow!("Not found in release store: {:?}", artifact_path))?; - trace!("release_path = {:?}", release_path); - - if release_path.exists() { - let art = if let Some(art) = release.get(&artifact_path) { - art - } else { - trace!("Loading path from release store: {:?}", artifact_path.display()); - release.load_from_path(&artifact_path) - }; - return Ok(Some(art.clone())) - } - - Ok(None) - } - - pub async fn get(&self, p: &ArtifactPath) -> Option<ArtifactPath> { - if let Some(a) = self.staging.read().await.get(p).cloned() { - return Some(a) - } - - self.release.read().await.get(p).cloned() - } - -} diff --git a/src/filestore/mod.rs b/src/filestore/mod.rs index 29eb476..8d788e2 100644 --- a/src/filestore/mod.rs +++ b/src/filestore/mod.rs @@ -14,9 +14,6 @@ pub use release::*; mod staging; pub use staging::*; -mod merged; -pub use merged::*; - pub mod path; pub use path::ArtifactPath; diff --git a/src/filestore/path.rs b/src/filestore/path.rs index 19016bd..4d75855 100644 --- a/src/filestore/path.rs +++ b/src/filestore/path.rs @@ -154,10 +154,6 @@ impl<'a> FullArtifactPath<'a> { self.0 .0.join(&self.1 .0) } - pub fn exists(&self) -> bool { - self.joined().exists() - } - pub fn display(&self) -> FullArtifactPathDisplay<'a> { FullArtifactPathDisplay(self.0, self.1) } diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index e6e24c5..51c6d9a 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -34,7 +34,6 @@ use crate::db::models as dbmodels; use crate::endpoint::EndpointConfiguration; use crate::endpoint::EndpointScheduler; use crate::filestore::ArtifactPath; -use crate::filestore::MergedStores; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; use crate::job::JobDefinition; @@ -153,7 +152,8 @@ use crate::util::progress::ProgressBars; pub struct Orchestrator<'a> { scheduler: EndpointScheduler, progress_generator: ProgressBars, - merged_stores: MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, source_cache: SourceCache, jobdag: Dag, config: &'a Configuration, @@ -176,10 +176,10 @@ pub struct OrchestratorSetup<'a> { impl<'a> OrchestratorSetup<'a> { pub async fn setup(self) -> Result<Orchestrator<'a>> { - let merged_stores = MergedStores::new(self.release_store, self.staging_store); let scheduler = EndpointScheduler::setup( self.endpoint_config, - merged_stores.clone(), + self.staging_store.clone(), + self.release_store.clone(), self.database.clone(), self.submit.clone(), self.log_dir, @@ -188,7 +188,8 @@ impl<'a> OrchestratorSetup<'a> { Ok(Orchestrator { scheduler, - merged_stores, + staging_store: self.staging_store.clone(), + release_store: self.release_store.clone(), progress_generator: self.progress_generator, source_cache: self.source_cache, jobdag: self.jobdag, @@ -245,7 +246,8 @@ impl<'a> Orchestrator<'a> { config: self.config, source_cache: &self.source_cache, scheduler: &self.scheduler, - merged_stores: &self.merged_stores, + staging_store: self.staging_store.clone(), + release_store: self.release_store.clone(), database: self.database.clone(), }; @@ -362,7 +364,8 @@ struct TaskPreparation<'a> { config: &'a Configuration, source_cache: &'a SourceCache, scheduler: &'a EndpointScheduler, - merged_stores: &'a MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, database: Arc<PgConnection>, } @@ -377,7 +380,8 @@ struct JobTask<'a> { config: &'a Configuration, source_cache: &'a SourceCache, scheduler: &'a EndpointScheduler, - merged_stores: &'a MergedStores, + staging_store: Arc<RwLock<StagingStore>>, + release_store: Arc<RwLock<ReleaseStore>>, database: Arc<PgConnection>, /// Channel where the dependencies arrive @@ -432,7 +436,8 @@ impl<'a> JobTask<'a> { config: prep.config, source_cache: prep.source_cache, scheduler: prep.scheduler, - merged_stores: prep.merged_stores, + staging_store: prep.staging_store, + release_store: prep.release_store, database: prep.database.clone(), receiver, @@ -505,8 +510,8 @@ impl<'a> JobTask<'a> { // check if a job that looks very similar to this job has already produced artifacts. // If it has, simply return those (plus the received ones) { - let release_store = self.merged_stores.release().read().await; - let staging_store = self.merged_stores.staging().read().await; + let release_store = self.release_store.read().await; + let staging_store = self.staging_store.read().await; // Use the environment of the job definition, as it appears in the job DAG. // @@ -545,7 +550,6 @@ impl<'a> JobTask<'a> { debug!("[{}]: Found {} replacement artifacts", self.jobdef.job.uuid(), replacement_artifacts.len()); trace!("[{}]: Found replacement artifacts: {:?}", self.jobdef.job.uuid(), replacement_artifacts); - let merged_stores = self.merged_stores; let mut artifacts = replacement_artifacts .into_iter() |