From 34291a0442ac6fb66967543f1bbf465c0b3dd731 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 22 Jan 2021 16:28:15 +0100 Subject: Let the JobHandle::run() return a Vec Before that change, it returned the dbmodels::Artifact objects, for which we needed to fetch the filestore::Artifact again. This change removes that restriction (improving runtime, of course). Signed-off-by: Matthias Beyer Tested-by: Matthias Beyer --- src/commands/build.rs | 2 +- src/endpoint/scheduler.rs | 14 +++++++++-- src/filestore/merged.rs | 6 +++++ src/filestore/path.rs | 6 +++++ src/filestore/staging.rs | 5 ++++ src/orchestrator/orchestrator.rs | 53 +++++++++------------------------------- 6 files changed, 41 insertions(+), 45 deletions(-) (limited to 'src') diff --git a/src/commands/build.rs b/src/commands/build.rs index c9a564b..86f205b 100644 --- a/src/commands/build.rs +++ b/src/commands/build.rs @@ -339,7 +339,7 @@ pub async fn build( writeln!(outlock, "Packages created:")?; } artifacts.into_iter().try_for_each(|artifact| { - writeln!(outlock, "-> {}", staging_dir.join(artifact.path).display()).map_err(Error::from) + writeln!(outlock, "-> {}", staging_dir.join(artifact.path()).display()).map_err(Error::from) })?; let mut had_error = false; diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 3326a0a..a907257 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -29,6 +29,7 @@ use uuid::Uuid; use crate::db::models as dbmodels; use crate::endpoint::Endpoint; use crate::endpoint::EndpointConfiguration; +use crate::filestore::Artifact; use crate::filestore::StagingStore; use crate::job::JobResource; use crate::job::RunnableJob; @@ -145,7 +146,7 @@ impl std::fmt::Debug for JobHandle { } impl JobHandle { - pub async fn run(self) -> Result> { + pub async fn run(self) -> Result> { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::(); let ep = self.endpoint.read().await; let endpoint = dbmodels::Endpoint::create_or_fetch(&self.db, ep.name())?; @@ -241,9 +242,18 @@ impl JobHandle { // Have to do it the ugly way here because of borrowing semantics let mut r = vec![]; + let staging_store_lock = self.staging_store.read().await; for p in paths.iter() { + use std::ops::Deref; trace!("DB: Creating artifact entry for path: {}", p.display()); - r.push(dbmodels::Artifact::create(&self.db, p, &job)?); + let _ = dbmodels::Artifact::create(&self.db, p, &job)?; + r.push({ + staging_store_lock + .deref() + .get(p) + .ok_or_else(|| anyhow!("Artifact not in store: {:?}", p))? + .clone() + }); } Ok(r) } diff --git a/src/filestore/merged.rs b/src/filestore/merged.rs index 3ed59dd..0b19d85 100644 --- a/src/filestore/merged.rs +++ b/src/filestore/merged.rs @@ -8,6 +8,11 @@ // SPDX-License-Identifier: EPL-2.0 // +// TODO: The MergedStores is not used at all anymore, because we removed the feature while doing +// the rewrite +#![allow(unused)] + + use std::sync::Arc; use std::path::Path; @@ -21,6 +26,7 @@ use crate::filestore::path::ArtifactPath; use crate::filestore::ReleaseStore; use crate::filestore::StagingStore; + /// A type that merges the release store and the staging store /// /// The stores are not actually merged (on disk or in memory), but the querying mechanism works in diff --git a/src/filestore/path.rs b/src/filestore/path.rs index ab0655b..cfa999f 100644 --- a/src/filestore/path.rs +++ b/src/filestore/path.rs @@ -149,6 +149,12 @@ impl ArtifactPath { } } +impl AsRef for ArtifactPath { + fn as_ref(&self) -> &Path { + &self.0 + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct FullArtifactPath<'a>(&'a StoreRoot, &'a ArtifactPath); diff --git a/src/filestore/staging.rs b/src/filestore/staging.rs index b944d84..788cd02 100644 --- a/src/filestore/staging.rs +++ b/src/filestore/staging.rs @@ -19,6 +19,7 @@ use indicatif::ProgressBar; use log::trace; use result_inspect::ResultInspect; +use crate::filestore::Artifact; use crate::filestore::path::ArtifactPath; use crate::filestore::path::StoreRoot; use crate::filestore::util::FileStoreImpl; @@ -100,4 +101,8 @@ impl StagingStore { pub fn root_path(&self) -> &StoreRoot { self.0.root_path() } + + pub fn get(&self, p: &ArtifactPath) -> Option<&Artifact> { + self.0.get(p) + } } diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index cbb783c..f5e8905 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -94,16 +94,16 @@ impl<'a> OrchestratorSetup<'a> { /// It is either a list of artifacts (with their respective database artifact objects) /// or a UUID and an Error object, where the UUID is the job UUID and the error is the /// anyhow::Error that was issued. -type JobResult = std::result::Result<(Uuid, Vec<(Artifact, dbmodels::Artifact)>), Vec<(Uuid, Error)>>; +type JobResult = std::result::Result<(Uuid, Vec), Vec<(Uuid, Error)>>; impl<'a> Orchestrator<'a> { - pub async fn run(self, output: &mut Vec) -> Result> { + pub async fn run(self, output: &mut Vec) -> Result> { let (results, errors) = self.run_tree().await?; - output.extend(results.into_iter().map(|(_, dba)| dba)); + output.extend(results.into_iter()); Ok(errors) } - async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> { + async fn run_tree(self) -> Result<(Vec, Vec<(Uuid, Error)>)> { let multibar = Arc::new(indicatif::MultiProgress::new()); // For each job in the jobtree, built a tuple with @@ -274,7 +274,7 @@ impl<'a> JobTask<'a> { // A list of job run results from dependencies that were received from the tasks for the // dependencies - let mut received_dependencies: Vec<(Uuid, Vec<(Artifact, dbmodels::Artifact)>)> = vec![]; + let mut received_dependencies: Vec<(Uuid, Vec)> = vec![]; // A list of errors that were received from the tasks for the dependencies let mut received_errors: Vec<(Uuid, Error)> = vec![]; @@ -342,14 +342,14 @@ impl<'a> JobTask<'a> { } // Map the list of received dependencies from - // Vec<(Uuid, Vec<(Artifact)>)> + // Vec<(Uuid, Vec)> // to // Vec let dependency_artifacts = received_dependencies .iter() .map(|tpl| tpl.1.iter()) .flatten() - .map(|tpl| tpl.0.clone()) + .cloned() .collect(); trace!("[{}]: Dependency artifacts = {:?}", self.uuid, dependency_artifacts); self.bar.set_message("Preparing..."); @@ -377,32 +377,11 @@ impl<'a> JobTask<'a> { // if the scheduler run reports success, // it returns the database artifact objects it created! - Ok(db_artifacts) => { - trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, db_artifacts); - // we take these artifacts and - let results: JobResult = db_artifacts.into_iter() - .map(|db_artifact| async { - trace!("Getting store Artifact for db Artifact: {:?}", db_artifact); - - // get the appropriate filesystem artifact for it - let art = self.get_store_artifact_for(&db_artifact).await?; - trace!("Store Artifact: {:?}", art); - Ok(Ok((art, db_artifact))) - }) - .collect::>() - .collect::, _>>>() - .await? - .map(|mut v| { - // Also send out the artifact of our dependencies, because we need to - // propagate them upwards through the tree - v.extend(received_dependencies.into_iter().map(|tpl| tpl.1.into_iter()).flatten()); - (self.uuid, v) - }); // and we add the UUID of the job of this task to it - - trace!("[{}]: sending artifacts to parent", self.uuid); - + Ok(mut artifacts) => { + trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, artifacts); + artifacts.extend(received_dependencies.into_iter().map(|(_, v)| v.into_iter()).flatten()); self.sender - .send(results) + .send(Ok((self.uuid, artifacts))) .await?; }, } @@ -410,15 +389,5 @@ impl<'a> JobTask<'a> { trace!("[{}]: Finished successfully", self.uuid); Ok(()) } - - async fn get_store_artifact_for(&self, db_artifact: &dbmodels::Artifact) -> Result { - let p = PathBuf::from(&db_artifact.path); - self.merged_stores - .get_artifact_by_path(&p) - .await? - .ok_or_else(|| { - anyhow!("Artifact not found in {}", p.display()) - }) - } } -- cgit v1.2.3