diff options
-rw-r--r-- | src/orchestrator/orchestrator.rs | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 79c97f7..f5dd8cb 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -203,16 +203,16 @@ impl<'a> OrchestratorSetup<'a> { /// It is either a list of artifacts with the UUID of the job they were produced by, /// or a UUID and an Error object, where the UUID is the job UUID and the error is the /// anyhow::Error that was issued. -type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, Vec<(Uuid, Error)>>; +type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, HashMap<Uuid, Error>>; impl<'a> Orchestrator<'a> { - pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> { + pub async fn run(self, output: &mut Vec<Artifact>) -> Result<HashMap<Uuid, Error>> { let (results, errors) = self.run_tree().await?; output.extend(results.into_iter()); Ok(errors) } - async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> { + async fn run_tree(self) -> Result<(Vec<Artifact>, HashMap<Uuid, Error>)> { let multibar = Arc::new(indicatif::MultiProgress::new()); // For each job in the jobdag, built a tuple with @@ -340,7 +340,7 @@ impl<'a> Orchestrator<'a> { None => Err(anyhow!("No result received...")), Some(Ok(results)) => { let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect(); - Ok((results, vec![])) + Ok((results, HashMap::with_capacity(0))) }, Some(Err(errors)) => Ok((vec![], errors)), } @@ -403,7 +403,7 @@ impl<'a> JobTask<'a> { let mut received_dependencies: HashMap<Uuid, Vec<Artifact>> = HashMap::new(); // A list of errors that were received from the tasks for the dependencies - let mut received_errors: Vec<(Uuid, Error)> = vec![]; + let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len()); // Helper function to check whether all UUIDs are in a list of UUIDs let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &HashMap<Uuid, Vec<_>>| { @@ -501,7 +501,9 @@ impl<'a> JobTask<'a> { // // We only send to one parent, because it doesn't matter anymore // We know that we have at least one sender available - self.sender[0].send(Err(vec![(job_uuid, e)])).await?; + let mut errormap = HashMap::with_capacity(1); + errormap.insert(job_uuid, e); + self.sender[0].send(Err(errormap)).await?; }, // if the scheduler run reports success, @@ -526,7 +528,7 @@ impl<'a> JobTask<'a> { /// /// Return Ok(true) if we should continue operation /// Return Ok(false) if the channel is empty and we're done receiving - async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> { + async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> { match self.receiver.recv().await { Some(Ok(mut v)) => { // The task we depend on succeeded and returned an @@ -539,7 +541,7 @@ impl<'a> JobTask<'a> { // The task we depend on failed // we log that error for now trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), e); - received_errors.append(&mut e); + received_errors.extend(e); Ok(true) }, None => { |