summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-01-22 16:28:15 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-01-25 15:35:21 +0100
commit34291a0442ac6fb66967543f1bbf465c0b3dd731 (patch)
treee4d2feb6fef85527710b6d3d4431903f041eab72
parent5a51e23ba57491d100f4ffeac5c8657aaa1b011b (diff)
Let the JobHandle::run() return a Vec<Artifact>
Before that change, it returned the dbmodels::Artifact objects, for which we needed to fetch the filestore::Artifact again. This change removes that restriction (improving runtime, of course). Signed-off-by: Matthias Beyer <matthias.beyer@atos.net> Tested-by: Matthias Beyer <matthias.beyer@atos.net>
-rw-r--r--src/commands/build.rs2
-rw-r--r--src/endpoint/scheduler.rs14
-rw-r--r--src/filestore/merged.rs6
-rw-r--r--src/filestore/path.rs6
-rw-r--r--src/filestore/staging.rs5
-rw-r--r--src/orchestrator/orchestrator.rs53
6 files changed, 41 insertions, 45 deletions
diff --git a/src/commands/build.rs b/src/commands/build.rs
index c9a564b..86f205b 100644
--- a/src/commands/build.rs
+++ b/src/commands/build.rs
@@ -339,7 +339,7 @@ pub async fn build(
writeln!(outlock, "Packages created:")?;
}
artifacts.into_iter().try_for_each(|artifact| {
- writeln!(outlock, "-> {}", staging_dir.join(artifact.path).display()).map_err(Error::from)
+ writeln!(outlock, "-> {}", staging_dir.join(artifact.path()).display()).map_err(Error::from)
})?;
let mut had_error = false;
diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs
index 3326a0a..a907257 100644
--- a/src/endpoint/scheduler.rs
+++ b/src/endpoint/scheduler.rs
@@ -29,6 +29,7 @@ use uuid::Uuid;
use crate::db::models as dbmodels;
use crate::endpoint::Endpoint;
use crate::endpoint::EndpointConfiguration;
+use crate::filestore::Artifact;
use crate::filestore::StagingStore;
use crate::job::JobResource;
use crate::job::RunnableJob;
@@ -145,7 +146,7 @@ impl std::fmt::Debug for JobHandle {
}
impl JobHandle {
- pub async fn run(self) -> Result<Vec<dbmodels::Artifact>> {
+ pub async fn run(self) -> Result<Vec<Artifact>> {
let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::<LogItem>();
let ep = self.endpoint.read().await;
let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?;
@@ -241,9 +242,18 @@ impl JobHandle {
// Have to do it the ugly way here because of borrowing semantics
let mut r = vec![];
+ let staging_store_lock = self.staging_store.read().await;
for p in paths.iter() {
+ use std::ops::Deref;
trace!("DB: Creating artifact entry for path: {}", p.display());
- r.push(dbmodels::Artifact::create(&self.db, p, &job)?);
+ let _ = dbmodels::Artifact::create(&self.db, p, &job)?;
+ r.push({
+ staging_store_lock
+ .deref()
+ .get(p)
+ .ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))?
+ .clone()
+ });
}
Ok(r)
}
diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs
index 3ed59dd..0b19d85 100644
--- a/src/filestore/merged.rs
+++ b/src/filestore/merged.rs
@@ -8,6 +8,11 @@
// SPDX-License-Identifier: EPL-2.0
//
+// TODO: The MergedStores is not used at all anymore, because we removed the feature while doing
+// the rewrite
+#![allow(unused)]
+
+
use std::sync::Arc;
use std::path::Path;
@@ -21,6 +26,7 @@ use crate::filestore::path::ArtifactPath;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
+
/// A type that merges the release store and the staging store
///
/// The stores are not actually merged (on disk or in memory), but the querying mechanism works in
diff --git a/src/filestore/path.rs b/src/filestore/path.rs
index ab0655b..cfa999f 100644
--- a/src/filestore/path.rs
+++ b/src/filestore/path.rs
@@ -149,6 +149,12 @@ impl ArtifactPath {
}
}
+impl AsRef<Path> for ArtifactPath {
+ fn as_ref(&self) -> &Path {
+ &self.0
+ }
+}
+
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FullArtifactPath<'a>(&'a StoreRoot, &'a ArtifactPath);
diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs
index b944d84..788cd02 100644
--- a/src/filestore/staging.rs
+++ b/src/filestore/staging.rs
@@ -19,6 +19,7 @@ use indicatif::ProgressBar;
use log::trace;
use result_inspect::ResultInspect;
+use crate::filestore::Artifact;
use crate::filestore::path::ArtifactPath;
use crate::filestore::path::StoreRoot;
use crate::filestore::util::FileStoreImpl;
@@ -100,4 +101,8 @@ impl StagingStore {
pub fn root_path(&self) -> &StoreRoot {
self.0.root_path()
}
+
+ pub fn get(&self, p: &ArtifactPath) -> Option<&Artifact> {
+ self.0.get(p)
+ }
}
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index cbb783c..f5e8905 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -94,16 +94,16 @@ impl<'a> OrchestratorSetup<'a> {
/// It is either a list of artifacts (with their respective database artifact objects)
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<(Uuid, Vec<(Artifact, dbmodels::Artifact)>), Vec<(Uuid, Error)>>;
+type JobResult = std::result::Result<(Uuid, Vec<Artifact>), Vec<(Uuid, Error)>>;
impl<'a> Orchestrator<'a> {
- pub async fn run(self, output: &mut Vec<dbmodels::Artifact>) -> Result<Vec<(Uuid, Error)>> {
+ pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> {
let (results, errors) = self.run_tree().await?;
- output.extend(results.into_iter().map(|(_, dba)| dba));
+ output.extend(results.into_iter());
Ok(errors)
}
- async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> {
+ async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> {
let multibar = Arc::new(indicatif::MultiProgress::new());
// For each job in the jobtree, built a tuple with
@@ -274,7 +274,7 @@ impl<'a> JobTask<'a> {
// A list of job run results from dependencies that were received from the tasks for the
// dependencies
- let mut received_dependencies: Vec<(Uuid, Vec<(Artifact, dbmodels::Artifact)>)> = vec![];
+ let mut received_dependencies: Vec<(Uuid, Vec<Artifact>)> = vec![];
// A list of errors that were received from the tasks for the dependencies
let mut received_errors: Vec<(Uuid, Error)> = vec![];
@@ -342,14 +342,14 @@ impl<'a> JobTask<'a> {
}
// Map the list of received dependencies from
- // Vec<(Uuid, Vec<(Artifact)>)>
+ // Vec<(Uuid, Vec<Artifact>)>
// to
// Vec<Artifact>
let dependency_artifacts = received_dependencies
.iter()
.map(|tpl| tpl.1.iter())
.flatten()
- .map(|tpl| tpl.0.clone())
+ .cloned()
.collect();
trace!("[{}]: Dependency artifacts = {:?}", self.uuid, dependency_artifacts);
self.bar.set_message("Preparing...");
@@ -377,32 +377,11 @@ impl<'a> JobTask<'a> {
// if the scheduler run reports success,
// it returns the database artifact objects it created!
- Ok(db_artifacts) => {
- trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, db_artifacts);
- // we take these artifacts and
- let results: JobResult = db_artifacts.into_iter()
- .map(|db_artifact| async {
- trace!("Getting store Artifact for db Artifact: {:?}", db_artifact);
-
- // get the appropriate filesystem artifact for it
- let art = self.get_store_artifact_for(&db_artifact).await?;
- trace!("Store Artifact: {:?}", art);
- Ok(Ok((art, db_artifact)))
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Result<std::result::Result<Vec<(Artifact, dbmodels::Artifact)>, _>>>()
- .await?
- .map(|mut v| {
- // Also send out the artifact of our dependencies, because we need to
- // propagate them upwards through the tree
- v.extend(received_dependencies.into_iter().map(|tpl| tpl.1.into_iter()).flatten());
- (self.uuid, v)
- }); // and we add the UUID of the job of this task to it
-
- trace!("[{}]: sending artifacts to parent", self.uuid);
-
+ Ok(mut artifacts) => {
+ trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, artifacts);
+ artifacts.extend(received_dependencies.into_iter().map(|(_, v)| v.into_iter()).flatten());
self.sender
- .send(results)
+ .send(Ok((self.uuid, artifacts)))
.await?;
},
}
@@ -410,15 +389,5 @@ impl<'a> JobTask<'a> {
trace!("[{}]: Finished successfully", self.uuid);
Ok(())
}
-
- async fn get_store_artifact_for(&self, db_artifact: &dbmodels::Artifact) -> Result<Artifact> {
- let p = PathBuf::from(&db_artifact.path);
- self.merged_stores
- .get_artifact_by_path(&p)
- .await?
- .ok_or_else(|| {
- anyhow!("Artifact not found in {}", p.display())
- })
- }
}