summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-07 12:35:33 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-08 09:37:04 +0100
commitd319579fc89b5ac09f10abf4bfc77c21b0fc65ce (patch)
treee4fb352f3c1cd98383955d08c20fc0d311db51b7
parent31fb8c89fefd3167b8b928cd75ddc008da60b434 (diff)
Remove `Artifact` type
This patch follows-up on the shrinking of the `Artifact` type and removes it entirely. The type is not needed. Only the `ArtifactPath` type is needed, which is a thin wrapper around `PathBuf`, ensuring that the path is relative to the store root. The `Artifact` type used `pom` to parse the name and version of the package from the `ArtifactPath` object it contained, which resulted in the restriction that the path must always be <name>-<version>... Which should not be a requirement and actually caused issues with a package named "foo-bar" (as an example). Signed-off-by: Matthias Beyer <matthias.beyer@atos.net> Tested-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r--src/commands/build.rs4
-rw-r--r--src/endpoint/configured.rs11
-rw-r--r--src/endpoint/scheduler.rs4
-rw-r--r--src/filestore/artifact.rs30
-rw-r--r--src/filestore/merged.rs7
-rw-r--r--src/filestore/mod.rs4
-rw-r--r--src/filestore/path.rs2
-rw-r--r--src/filestore/staging.rs19
-rw-r--r--src/filestore/util.rs37
-rw-r--r--src/job/resource.rs10
-rw-r--r--src/job/runnable.rs4
-rw-r--r--src/orchestrator/orchestrator.rs20
12 files changed, 53 insertions, 99 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs
index f5c9b5a..b653e29 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -334,8 +334,8 @@ pub async fn build(
if !artifacts.is_empty() {
writeln!(outlock, "Packages created:")?;
}
- artifacts.into_iter().try_for_each(|artifact| {
- writeln!(outlock, "-> {}", staging_dir.join(artifact.path()).display()).map_err(Error::from)
+ artifacts.into_iter().try_for_each(|artifact_path| {
+ writeln!(outlock, "-> {}", staging_dir.join(artifact_path).display()).map_err(Error::from)
})?;
let mut had_error = false;
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 83d51b4..6979e71 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -390,9 +390,8 @@ impl<'a> PreparedContainer<'a> {
.cloned()
.map(|art| async {
let artifact_file_name = art
- .path()
.file_name()
- .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.path().display()))
+ .ok_or_else(|| anyhow!("BUG: artifact {} is not a file", art.display()))
.with_context(|| {
anyhow!(
"Collecting artifacts for copying to container {}",
@@ -402,7 +401,7 @@ impl<'a> PreparedContainer<'a> {
let destination = PathBuf::from("/inputs/").join(artifact_file_name);
trace!(
"Copying {} to container: {}:{}",
- art.path().display(),
+ art.display(),
container.id(),
destination.display()
);
@@ -410,13 +409,13 @@ impl<'a> PreparedContainer<'a> {
.read()
.await
.root_path()
- .join(art.path())?
+ .join(&art)?
.read()
.await
.with_context(|| {
anyhow!(
"Reading artifact {}, so it can be copied to container",
- art.path().display()
+ art.display()
)
})?;
@@ -426,7 +425,7 @@ impl<'a> PreparedContainer<'a> {
.with_context(|| {
anyhow!(
"Copying artifact {} to container {} at {}",
- art.path().display(),
+ art.display(),
container.id(),
destination.display()
)
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index d3e4857..0b404de 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -29,7 +29,7 @@ use uuid::Uuid;
use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointConfiguration;
-use crate::filestore::Artifact;
+use crate::filestore::ArtifactPath;
use crate::filestore::StagingStore;
use crate::job::JobResource;
use crate::job::RunnableJob;
@@ -146,7 +146,7 @@ impl std::fmt::Debug for JobHandle {
}
impl JobHandle {
- pub async fn run(self) -> Result<Vec<Artifact>> {
+ pub async fn run(self) -> Result<Vec<ArtifactPath>> {
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
let ep = self.endpoint.read().await;
let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?;
diff --git a/src/filestore/artifact.rs b/src/filestore/artifact.rs
deleted file mode 100644
index 2f3477a..0000000
--- a/src/filestore/artifact.rs
+++ /dev/null
@@ -1,30 +0,0 @@
-//
-// Copyright (c) 2020-2021 science+computing ag and other contributors
-//
-// This program and the accompanying materials are made
-// available under the terms of the Eclipse Public License 2.0
-// which is available at https://www.eclipse.org/legal/epl-2.0/
-//
-// SPDX-License-Identifier: EPL-2.0
-//
-
-use anyhow::Result;
-use getset::Getters;
-
-use crate::filestore::path::ArtifactPath;
-use crate::filestore::path::StoreRoot;
-
-#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Getters)]
-pub struct Artifact {
- #[getset(get = "pub")]
- path: ArtifactPath,
-}
-
-impl Artifact {
- pub fn load(_root: &StoreRoot, path: ArtifactPath) -> Result<Self> {
- Ok(Artifact {
- path,
- })
- }
-}
-
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
index 0b19d85..4c7a38b 100644
--- a/src/filestore/merged.rs
+++ b/src/filestore/merged.rs
@@ -21,7 +21,6 @@ use getset::Getters;
use log::trace;
use tokio::sync::RwLock;
-use crate::filestore::Artifact;
use crate::filestore::path::ArtifactPath;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
@@ -45,7 +44,7 @@ impl MergedStores {
MergedStores { release, staging }
}
- pub async fn get_artifact_by_path(&self, p: &Path) -> Result<Option<Artifact>> {
+ pub async fn get_artifact_by_path(&self, p: &Path) -> Result<Option<ArtifactPath>> {
trace!("Fetching artifact from path: {:?}", p.display());
let artifact_path = ArtifactPath::new(p.to_path_buf())?;
@@ -58,7 +57,7 @@ impl MergedStores {
art
} else {
trace!("Loading path from staging store: {:?}", artifact_path.display());
- staging.load_from_path(&artifact_path)?
+ staging.load_from_path(&artifact_path)
};
return Ok(Some(art.clone()))
@@ -73,7 +72,7 @@ impl MergedStores {
art
} else {
trace!("Loading path from release store: {:?}", artifact_path.display());
- release.load_from_path(&artifact_path)?
+ release.load_from_path(&artifact_path)
};
return Ok(Some(art.clone()))
}
diff --git a/src/filestore/mod.rs b/src/filestore/mod.rs
index 0d19d02..29eb476 100644
--- a/src/filestore/mod.rs
+++ b/src/filestore/mod.rs
@@ -8,9 +8,6 @@
// SPDX-License-Identifier: EPL-2.0
//
-mod artifact;
-pub use artifact::*;
-
mod release;
pub use release::*;
@@ -21,5 +18,6 @@ mod merged;
pub use merged::*;
pub mod path;
+pub use path::ArtifactPath;
mod util;
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index c38c114..a03ab83 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -94,7 +94,7 @@ impl StoreRoot {
}
}
-#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
+#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ArtifactPath(PathBuf);
impl ArtifactPath {
diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs
index 788cd02..1c2a848 100644
--- a/src/filestore/staging.rs
+++ b/src/filestore/staging.rs
@@ -10,16 +10,15 @@
use std::fmt::Debug;
-use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
+use anyhow::anyhow;
use futures::stream::Stream;
use indicatif::ProgressBar;
use log::trace;
use result_inspect::ResultInspect;
-use crate::filestore::Artifact;
use crate::filestore::path::ArtifactPath;
use crate::filestore::path::StoreRoot;
use crate::filestore::util::FileStoreImpl;
@@ -84,14 +83,12 @@ impl StagingStore {
None
} else {
Some({
- ArtifactPath::new(path).and_then(|ap| {
- self.0
- .load_from_path(&ap)
- .inspect(|r| trace!("Loaded from path {} = {:?}", ap.display(), r))
- .with_context(|| anyhow!("Loading from path: {}", ap.display()))
- .map_err(Error::from)
- .map(|art| art.path().clone())
- })
+ // Clippy doesn't detect this properly
+ #[allow(clippy::redundant_clone)]
+ ArtifactPath::new(path.to_path_buf())
+ .inspect(|r| trace!("Loaded from path {} = {:?}", path.display(), r))
+ .with_context(|| anyhow!("Loading from path: {}", path.display()))
+ .map(|ap| self.0.load_from_path(&ap).clone())
})
}
})
@@ -102,7 +99,7 @@ impl StagingStore {
self.0.root_path()
}
- pub fn get(&self, p: &ArtifactPath) -> Option<&Artifact> {
+ pub fn get(&self, p: &ArtifactPath) -> Option<&ArtifactPath> {
self.0.get(p)
}
}
diff --git a/src/filestore/util.rs b/src/filestore/util.rs
index 2fc1483..0d98404 100644
--- a/src/filestore/util.rs
+++ b/src/filestore/util.rs
@@ -11,15 +11,13 @@
//! Module containing utilities for the filestore implementation
//!
-use std::collections::BTreeMap;
+use std::collections::HashSet;
-use anyhow::anyhow;
use anyhow::Result;
use indicatif::ProgressBar;
-use resiter::AndThen;
-use crate::filestore::path::*;
-use crate::filestore::Artifact;
+use crate::filestore::path::ArtifactPath;
+use crate::filestore::path::StoreRoot;
/// The actual filestore implementation
///
@@ -29,19 +27,16 @@ use crate::filestore::Artifact;
/// It can then be wrapped into the actual interface of this module with specialized functionality.
pub struct FileStoreImpl {
pub(in crate::filestore) root: StoreRoot,
- store: BTreeMap<ArtifactPath, Artifact>,
+ store: HashSet<ArtifactPath>,
}
impl FileStoreImpl {
- /// Loads the passed path recursively into a Path => Artifact mapping
+ /// Loads the passed path recursively
pub fn load(root: StoreRoot, progress: ProgressBar) -> Result<Self> {
let store = root
.find_artifacts_recursive()
- .and_then_ok(|artifact_path| {
- progress.tick();
- Artifact::load(&root, artifact_path.clone()).map(|a| (artifact_path, a))
- })
- .collect::<Result<BTreeMap<ArtifactPath, Artifact>>>()?;
+ .inspect(|_| progress.tick())
+ .collect::<Result<HashSet<ArtifactPath>>>()?;
Ok(FileStoreImpl { root, store })
}
@@ -50,21 +45,17 @@ impl FileStoreImpl {
&self.root
}
- pub fn get(&self, artifact_path: &ArtifactPath) -> Option<&Artifact> {
+ pub fn get(&self, artifact_path: &ArtifactPath) -> Option<&ArtifactPath> {
self.store.get(artifact_path)
}
- pub(in crate::filestore) fn load_from_path(
+ pub(in crate::filestore) fn load_from_path<'a>(
&mut self,
- artifact_path: &ArtifactPath,
- ) -> Result<&Artifact> {
- if self.store.get(&artifact_path).is_some() {
- Err(anyhow!("Entry exists: {}", artifact_path.display()))
- } else {
- Ok(self
- .store
- .entry(artifact_path.clone())
- .or_insert(Artifact::load(&self.root, artifact_path.clone())?))
+ artifact_path: &'a ArtifactPath,
+ ) -> &'a ArtifactPath {
+ if !self.store.contains(artifact_path) {
+ self.store.insert(artifact_path.clone());
}
+ artifact_path
}
}
diff --git a/src/job/resource.rs b/src/job/resource.rs
index d4fa654..d657599 100644
--- a/src/job/resource.rs
+++ b/src/job/resource.rs
@@ -8,13 +8,13 @@
// SPDX-License-Identifier: EPL-2.0
//
-use crate::filestore::Artifact;
+use crate::filestore::ArtifactPath;
use crate::util::EnvironmentVariableName;
#[derive(Clone, Debug)]
pub enum JobResource {
Environment(EnvironmentVariableName, String),
- Artifact(Artifact),
+ Artifact(ArtifactPath),
}
impl From<(EnvironmentVariableName, String)> for JobResource {
@@ -23,8 +23,8 @@ impl From<(EnvironmentVariableName, String)> for JobResource {
}
}
-impl From<Artifact> for JobResource {
- fn from(a: Artifact) -> Self {
+impl From<ArtifactPath> for JobResource {
+ fn from(a: ArtifactPath) -> Self {
JobResource::Artifact(a)
}
}
@@ -36,7 +36,7 @@ impl JobResource {
_ => None,
}
}
- pub fn artifact(&self) -> Option<&Artifact> {
+ pub fn artifact(&self) -> Option<&ArtifactPath> {
match self {
JobResource::Artifact(a) => Some(a),
_ => None,
diff --git a/src/job/runnable.rs b/src/job/runnable.rs
index 9806939..6361a49 100644
--- a/src/job/runnable.rs
+++ b/src/job/runnable.rs
@@ -19,7 +19,7 @@ use log::trace;
use uuid::Uuid;
use crate::config::Configuration;
-use crate::filestore::Artifact;
+use crate::filestore::ArtifactPath;
use crate::job::Job;
use crate::job::JobResource;
use crate::package::Package;
@@ -57,7 +57,7 @@ impl RunnableJob {
job: &Job,
source_cache: &SourceCache,
config: &Configuration,
- dependencies: Vec<Artifact>,
+ dependencies: Vec<ArtifactPath>,
) -> Result<Self> {
// Add the environment from the original Job object to the resources
let resources = dependencies
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index a1f8a99..79f769e 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -32,7 +32,7 @@ use crate::config::Configuration;
use crate::db::models as dbmodels;
use crate::endpoint::EndpointConfiguration;
use crate::endpoint::EndpointScheduler;
-use crate::filestore::Artifact;
+use crate::filestore::ArtifactPath;
use crate::filestore::MergedStores;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
@@ -203,16 +203,16 @@ impl<'a> OrchestratorSetup<'a> {
/// It is either a list of artifacts with the UUID of the job they were produced by,
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, HashMap<Uuid, Error>>;
+type JobResult = std::result::Result<HashMap<Uuid, Vec<ArtifactPath>>, HashMap<Uuid, Error>>;
impl<'a> Orchestrator<'a> {
- pub async fn run(self, output: &mut Vec<Artifact>) -> Result<HashMap<Uuid, Error>> {
+ pub async fn run(self, output: &mut Vec<ArtifactPath>) -> Result<HashMap<Uuid, Error>> {
let (results, errors) = self.run_tree().await?;
output.extend(results.into_iter());
Ok(errors)
}
- async fn run_tree(self) -> Result<(Vec<Artifact>, HashMap<Uuid, Error>)> {
+ async fn run_tree(self) -> Result<(Vec<ArtifactPath>, HashMap<Uuid, Error>)> {
let multibar = Arc::new(indicatif::MultiProgress::new());
// For each job in the jobdag, built a tuple with
@@ -403,7 +403,7 @@ impl<'a> JobTask<'a> {
// A list of job run results from dependencies that were received from the tasks for the
// dependencies
- let mut received_dependencies: HashMap<Uuid, Vec<Artifact>> = HashMap::new();
+ let mut received_dependencies: HashMap<Uuid, Vec<ArtifactPath>> = HashMap::new();
// A list of errors that were received from the tasks for the dependencies
let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len());
@@ -464,15 +464,15 @@ impl<'a> JobTask<'a> {
}
// Map the list of received dependencies from
- // Vec<(Uuid, Vec<Artifact>)>
+ // Vec<(Uuid, Vec<ArtifactPath>)>
// to
- // Vec<Artifact>
+ // Vec<ArtifactPath>
let dependency_artifacts = received_dependencies
.values()
.map(|v| v.iter())
.flatten()
.cloned()
- .collect();
+ .collect::<Vec<ArtifactPath>>();
trace!("[{}]: Dependency artifacts = {:?}", self.jobdef.job.uuid(), dependency_artifacts);
self.bar.set_message(&format!("[{} {} {}]: Preparing...",
self.jobdef.job.uuid(),
@@ -531,11 +531,11 @@ impl<'a> JobTask<'a> {
///
/// Return Ok(true) if we should continue operation
/// Return Ok(false) if the channel is empty and we're done receiving
- async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
+ async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ArtifactPath>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
match self.receiver.recv().await {
Some(Ok(mut v)) => {
// The task we depend on succeeded and returned an
- // (uuid of the job, [Artifact])
+ // (uuid of the job, [ArtifactPath])
trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), v);
received_dependencies.extend(v);
Ok(true)