diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-02-05 15:05:17 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-02-06 11:54:23 +0100 |
commit | 9aa8181872c8fc03a3a74b7189cbf634c6df348c (patch) | |
tree | 29f934a8b07045f7e2fec9ce4f72ba8f40e2504d /src/orchestrator/orchestrator.rs | |
parent | 916b19f2c099beaf00d37e5d2cc8e2fe81440996 (diff) |
Change implementation to use HashMap for storing results
This changes the implementation to use a hashmap for storing the
results.
This way, we are not storing the same result twice.
.-> C -,
/ \
D >-> A
\ /
`-> B -ยด
In this scenario, D gets the result from A propagated via B and via C.
Because of this, it would propagate the results from A twice to its
caller (the orchestrator itself).
By using a hashmap, we prevent this from happening on the JobTask level,
thus, artifacts are not getting reported to the user twice.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Tested-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 21 |
1 files changed, 11 insertions, 10 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 49d95eb..79c97f7 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -10,6 +10,7 @@ #![allow(unused)] +use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; @@ -202,7 +203,7 @@ impl<'a> OrchestratorSetup<'a> { /// It is either a list of artifacts with the UUID of the job they were produced by, /// or a UUID and an Error object, where the UUID is the job UUID and the error is the /// anyhow::Error that was issued. -type JobResult = std::result::Result<Vec<(Uuid, Vec<Artifact>)>, Vec<(Uuid, Error)>>; +type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, Vec<(Uuid, Error)>>; impl<'a> Orchestrator<'a> { pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> { @@ -399,15 +400,15 @@ impl<'a> JobTask<'a> { // A list of job run results from dependencies that were received from the tasks for the // dependencies - let mut received_dependencies: Vec<(Uuid, Vec<Artifact>)> = vec![]; + let mut received_dependencies: HashMap<Uuid, Vec<Artifact>> = HashMap::new(); // A list of errors that were received from the tasks for the dependencies let mut received_errors: Vec<(Uuid, Error)> = vec![]; // Helper function to check whether all UUIDs are in a list of UUIDs - let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &[(Uuid, Vec<_>)]| { + let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &HashMap<Uuid, Vec<_>>| { dependency_uuids.iter().all(|dependency_uuid| { - list.iter().map(|tpl| tpl.0).any(|id| id == *dependency_uuid) + list.keys().any(|id| id == dependency_uuid) }) }; @@ -464,8 +465,8 @@ impl<'a> JobTask<'a> { // to // Vec<Artifact> let dependency_artifacts = received_dependencies - .iter() - .map(|tpl| tpl.1.iter()) + .values() + .map(|v| v.iter()) .flatten() .cloned() .collect(); @@ -507,7 +508,7 @@ impl<'a> JobTask<'a> { // it returns the database artifact objects it created! Ok(artifacts) => { trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts); - received_dependencies.push((*self.jobdef.job.uuid(), artifacts)); + received_dependencies.insert(*self.jobdef.job.uuid(), artifacts); for s in self.sender { s.send(Ok(received_dependencies.clone())).await?; } @@ -525,13 +526,13 @@ impl<'a> JobTask<'a> { /// /// Return Ok(true) if we should continue operation /// Return Ok(false) if the channel is empty and we're done receiving - async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> { + async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> { match self.receiver.recv().await { Some(Ok(mut v)) => { // The task we depend on succeeded and returned an // (uuid of the job, [Artifact]) trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), v); - received_dependencies.append(&mut v); + received_dependencies.extend(v); Ok(true) }, Some(Err(mut e)) => { @@ -546,7 +547,7 @@ impl<'a> JobTask<'a> { trace!("[{}]: Received nothing, channel seems to be empty", self.jobdef.job.uuid()); // Find all dependencies that we need but which are not received - let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>(); + let received = received_dependencies.keys().collect::<Vec<_>>(); let missing_deps: Vec<_> = self.jobdef .dependencies .iter() |