summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-02-05 15:05:17 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-06 11:54:23 +0100
commit9aa8181872c8fc03a3a74b7189cbf634c6df348c (patch)
tree29f934a8b07045f7e2fec9ce4f72ba8f40e2504d /src/orchestrator/orchestrator.rs
parent916b19f2c099beaf00d37e5d2cc8e2fe81440996 (diff)
Change implementation to use HashMap for storing results
This changes the implementation to use a hashmap for storing the results. This way, we are not storing the same result twice. .-> C -, / \ D >-> A \ / `-> B -ยด In this scenario, D gets the result from A propagated via B and via C. Because of this, it would propagate the results from A twice to its caller (the orchestrator itself). By using a hashmap, we prevent this from happening on the JobTask level, thus, artifacts are not getting reported to the user twice. Signed-off-by: Matthias Beyer <mail@beyermatthias.de> Tested-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs21
1 files changed, 11 insertions, 10 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 49d95eb..79c97f7 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -10,6 +10,7 @@
#![allow(unused)]
+use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
@@ -202,7 +203,7 @@ impl<'a> OrchestratorSetup<'a> {
/// It is either a list of artifacts with the UUID of the job they were produced by,
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<Vec<(Uuid, Vec<Artifact>)>, Vec<(Uuid, Error)>>;
+type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, Vec<(Uuid, Error)>>;
impl<'a> Orchestrator<'a> {
pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> {
@@ -399,15 +400,15 @@ impl<'a> JobTask<'a> {
// A list of job run results from dependencies that were received from the tasks for the
// dependencies
- let mut received_dependencies: Vec<(Uuid, Vec<Artifact>)> = vec![];
+ let mut received_dependencies: HashMap<Uuid, Vec<Artifact>> = HashMap::new();
// A list of errors that were received from the tasks for the dependencies
let mut received_errors: Vec<(Uuid, Error)> = vec![];
// Helper function to check whether all UUIDs are in a list of UUIDs
- let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &[(Uuid, Vec<_>)]| {
+ let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &HashMap<Uuid, Vec<_>>| {
dependency_uuids.iter().all(|dependency_uuid| {
- list.iter().map(|tpl| tpl.0).any(|id| id == *dependency_uuid)
+ list.keys().any(|id| id == dependency_uuid)
})
};
@@ -464,8 +465,8 @@ impl<'a> JobTask<'a> {
// to
// Vec<Artifact>
let dependency_artifacts = received_dependencies
- .iter()
- .map(|tpl| tpl.1.iter())
+ .values()
+ .map(|v| v.iter())
.flatten()
.cloned()
.collect();
@@ -507,7 +508,7 @@ impl<'a> JobTask<'a> {
// it returns the database artifact objects it created!
Ok(artifacts) => {
trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts);
- received_dependencies.push((*self.jobdef.job.uuid(), artifacts));
+ received_dependencies.insert(*self.jobdef.job.uuid(), artifacts);
for s in self.sender {
s.send(Ok(received_dependencies.clone())).await?;
}
@@ -525,13 +526,13 @@ impl<'a> JobTask<'a> {
///
/// Return Ok(true) if we should continue operation
/// Return Ok(false) if the channel is empty and we're done receiving
- async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> {
+ async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> {
match self.receiver.recv().await {
Some(Ok(mut v)) => {
// The task we depend on succeeded and returned an
// (uuid of the job, [Artifact])
trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), v);
- received_dependencies.append(&mut v);
+ received_dependencies.extend(v);
Ok(true)
},
Some(Err(mut e)) => {
@@ -546,7 +547,7 @@ impl<'a> JobTask<'a> {
trace!("[{}]: Received nothing, channel seems to be empty", self.jobdef.job.uuid());
// Find all dependencies that we need but which are not received
- let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>();
+ let received = received_dependencies.keys().collect::<Vec<_>>();
let missing_deps: Vec<_> = self.jobdef
.dependencies
.iter()