summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-02-04 08:32:18 +0100
committerMatthias Beyer <matthias.beyer@atos.net>2021-02-04 09:20:33 +0100
commit8e670094f88693830d2d1d871d8d31412ddfc539 (patch)
tree430d6710f5e78e7d20fc18eb4920ae43049e3391 /src
parent1ab306272ca181cc6955014519fcaca7a7333969 (diff)
Fix: Ensure job UUIDs are propagated through whole tree
This patch changes the propagation of results, so that the UUIDs of the jobs producing the artifacts are propagated through the whole tree. This issue at hand was that when having a dependency tree like this: C -> B -> A The results from A were propagated to B and the results from B where propagated to C. But, because the implementation did not do this, the results from A where included in the results from B and the UUID from A was dropped. This was an issue because the implementation waited for _all_ dependencies (direct and transitive) by their job UUID. This means that C waited on a UUID that described the Job for A, but never received it, which caused everything to fail. This patch changes the algorithm, to not only report the own UUID and all artifacts of a job, but all artifacts with their UUID attached, which solves the issue. The root of the tree (the `Orchestrator`) simply drops the UUIDs before returning the artifacts to its caller. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net> Tested-by: Matthias Beyer <matthias.beyer@atos.net>
Diffstat (limited to 'src')
-rw-r--r--src/orchestrator/orchestrator.rs19
1 files changed, 11 insertions, 8 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index d57324b..42e3b63 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -199,10 +199,10 @@ impl<'a> OrchestratorSetup<'a> {
///
/// Represents a result that came from the run of a job inside a container
///
-/// It is either a list of artifacts (with their respective database artifact objects)
+/// It is either a list of artifacts with the UUID of the job they were produced by,
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<(Uuid, Vec<Artifact>), Vec<(Uuid, Error)>>;
+type JobResult = std::result::Result<Vec<(Uuid, Vec<Artifact>)>, Vec<(Uuid, Error)>>;
impl<'a> Orchestrator<'a> {
pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> {
@@ -323,7 +323,10 @@ impl<'a> Orchestrator<'a> {
let _ = jobs_result?;
match root_receiver.recv().await {
None => Err(anyhow!("No result received...")),
- Some(Ok((_, artifacts))) => Ok((artifacts, vec![])),
+ Some(Ok(results)) => {
+ let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect();
+ Ok((results, vec![]))
+ },
Some(Err(errors)) => Ok((vec![], errors)),
}
}
@@ -486,11 +489,11 @@ impl<'a> JobTask<'a> {
// if the scheduler run reports success,
// it returns the database artifact objects it created!
- Ok(mut artifacts) => {
+ Ok(artifacts) => {
trace!("[{}]: Scheduler returned artifacts = {:?}", self.uuid, artifacts);
- artifacts.extend(received_dependencies.into_iter().map(|(_, v)| v.into_iter()).flatten());
+ received_dependencies.push((self.uuid, artifacts));
self.sender
- .send(Ok((self.uuid, artifacts)))
+ .send(Ok(received_dependencies))
.await?;
},
}
@@ -508,11 +511,11 @@ impl<'a> JobTask<'a> {
/// Return Ok(false) if the channel is empty and we're done receiving
async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> {
match self.receiver.recv().await {
- Some(Ok(v)) => {
+ Some(Ok(mut v)) => {
// The task we depend on succeeded and returned an
// (uuid of the job, [Artifact])
trace!("[{}]: Received: {:?}", self.uuid, v);
- received_dependencies.push(v);
+ received_dependencies.append(&mut v);
Ok(true)
},
Some(Err(mut e)) => {