diff options
author | Matthias Beyer <matthias.beyer@atos.net> | 2021-01-22 16:28:15 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-01-25 15:35:21 +0100 |
commit | 34291a0442ac6fb66967543f1bbf465c0b3dd731 (patch) | |
tree | e4d2feb6fef85527710b6d3d4431903f041eab72 /src/orchestrator/orchestrator.rs | |
parent | 5a51e23ba57491d100f4ffeac5c8657aaa1b011b (diff) |
Let the JobHandle::run() return a Vec<Artifact>
Before that change, it returned the dbmodels::Artifact objects, for which we
needed to fetch the filestore::Artifact again.
This change removes that restriction (improving runtime, of course).
Signed-off-by: Matthias Beyer <matthias.beyer@atos.net>
Tested-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 53 |
1 files changed, 11 insertions, 42 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index cbb783c..f5e8905 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -94,16 +94,16 @@ impl<'a> OrchestratorSetup<'a> { /// It is either a list of artifacts (with their respective database artifact objects) /// or a UUID and an Error object, where the UUID is the job UUID and the error is the /// anyhow::Error that was issued. -type JobResult = std::result::Result<(Uuid, Vec<(Artifact, dbmodels::Artifact)>), Vec<(Uuid, Error)>>; +type JobResult = std::result::Result<(Uuid, Vec<Artifact>), Vec<(Uuid, Error)>>; impl<'a> Orchestrator<'a> { - pub async fn run(self, output: &mut Vec<dbmodels::Artifact>) -> Result<Vec<(Uuid, Error)>> { + pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> { let (results, errors) = self.run_tree().await?; - output.extend(results.into_iter().map(|(_, dba)| dba)); + output.extend(results.into_iter()); Ok(errors) } - async fn run_tree(self) -> Result<(Vec<(Artifact, dbmodels::Artifact)>, Vec<(Uuid, Error)>)> { + async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> { let multibar = Arc::new(indicatif::MultiProgress::new()); // For each job in the jobtree, built a tuple with @@ -274,7 +274,7 @@ impl<'a> JobTask<'a> { // A list of job run results from dependencies that were received from the tasks for the // dependencies - let mut received_dependencies: Vec<(Uuid, Vec<(Artifact, dbmodels::Artifact)>)> = vec![]; + let mut received_dependencies: Vec<(Uuid, Vec<Artifact>)> = vec![]; // A list of errors that were received from the tasks for the dependencies let mut received_errors: Vec<(Uuid, Error)> = vec![]; @@ -342,14 +342,14 @@ impl<'a> JobTask<'a> { } // Map the list of received dependencies from - // Vec<(Uuid, Vec<(Artifact)>)> + // Vec<(Uuid, Vec<Artifact>)> // to // Vec<Artifact> let dependency_artifacts = received_dependencies .iter() .map(|tpl| tpl.1.iter()) .flatten() - .map(|tpl| tpl.0.clone()) + .cloned() .collect(); trace!("[{}]: Dependency artifacts = {:?}", self.uuid, dependency_artifacts); self.bar.set_message("Preparing..."); @@ -377,32 +377,11 @@ impl<'a> JobTask<'a> { // if the scheduler run reports success, // it returns the database artifact objects it created! - Ok(db_artifacts) => { - trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, db_artifacts); - // we take these artifacts and - let results: JobResult = db_artifacts.into_iter() - .map(|db_artifact| async { - trace!("Getting store Artifact for db Artifact: {:?}", db_artifact); - - // get the appropriate filesystem artifact for it - let art = self.get_store_artifact_for(&db_artifact).await?; - trace!("Store Artifact: {:?}", art); - Ok(Ok((art, db_artifact))) - }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Result<std::result::Result<Vec<(Artifact, dbmodels::Artifact)>, _>>>() - .await? - .map(|mut v| { - // Also send out the artifact of our dependencies, because we need to - // propagate them upwards through the tree - v.extend(received_dependencies.into_iter().map(|tpl| tpl.1.into_iter()).flatten()); - (self.uuid, v) - }); // and we add the UUID of the job of this task to it - - trace!("[{}]: sending artifacts to parent", self.uuid); - + Ok(mut artifacts) => { + trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, artifacts); + artifacts.extend(received_dependencies.into_iter().map(|(_, v)| v.into_iter()).flatten()); self.sender - .send(results) + .send(Ok((self.uuid, artifacts))) .await?; }, } @@ -410,15 +389,5 @@ impl<'a> JobTask<'a> { trace!("[{}]: Finished successfully", self.uuid); Ok(()) } - - async fn get_store_artifact_for(&self, db_artifact: &dbmodels::Artifact) -> Result<Artifact> { - let p = PathBuf::from(&db_artifact.path); - self.merged_stores - .get_artifact_by_path(&p) - .await? - .ok_or_else(|| { - anyhow!("Artifact not found in {}", p.display()) - }) - } } |