summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-02-05 15:10:54 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-06 11:54:23 +0100
commitea416d8c120c2aa202e7e796135c816a8760a6db (patch)
tree13b246d06b2ef3482faec46f9ce8f635935a6982 /src/orchestrator/orchestrator.rs
parent9aa8181872c8fc03a3a74b7189cbf634c6df348c (diff)
Change implementation to use HashMap for errors
This patch changes the implementation to use an HashMap<Uuid, Error> for error propagation. The rationale behind this is the same as with the change to HashMap for the artifacts: Errors are not getting propagated twice if they arrive at a job from different child jobs. This is technically not possible yet, because we propagate errors only to one parent. Though, if the implementation changes one day (which it could), this is one thing less we have to think about. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs18
1 files changed, 10 insertions, 8 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 79c97f7..f5dd8cb 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -203,16 +203,16 @@ impl<'a> OrchestratorSetup<'a> {
/// It is either a list of artifacts with the UUID of the job they were produced by,
/// or a UUID and an Error object, where the UUID is the job UUID and the error is the
/// anyhow::Error that was issued.
-type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, Vec<(Uuid, Error)>>;
+type JobResult = std::result::Result<HashMap<Uuid, Vec<Artifact>>, HashMap<Uuid, Error>>;
impl<'a> Orchestrator<'a> {
- pub async fn run(self, output: &mut Vec<Artifact>) -> Result<Vec<(Uuid, Error)>> {
+ pub async fn run(self, output: &mut Vec<Artifact>) -> Result<HashMap<Uuid, Error>> {
let (results, errors) = self.run_tree().await?;
output.extend(results.into_iter());
Ok(errors)
}
- async fn run_tree(self) -> Result<(Vec<Artifact>, Vec<(Uuid, Error)>)> {
+ async fn run_tree(self) -> Result<(Vec<Artifact>, HashMap<Uuid, Error>)> {
let multibar = Arc::new(indicatif::MultiProgress::new());
// For each job in the jobdag, built a tuple with
@@ -340,7 +340,7 @@ impl<'a> Orchestrator<'a> {
None => Err(anyhow!("No result received...")),
Some(Ok(results)) => {
let results = results.into_iter().map(|tpl| tpl.1.into_iter()).flatten().collect();
- Ok((results, vec![]))
+ Ok((results, HashMap::with_capacity(0)))
},
Some(Err(errors)) => Ok((vec![], errors)),
}
@@ -403,7 +403,7 @@ impl<'a> JobTask<'a> {
let mut received_dependencies: HashMap<Uuid, Vec<Artifact>> = HashMap::new();
// A list of errors that were received from the tasks for the dependencies
- let mut received_errors: Vec<(Uuid, Error)> = vec![];
+ let mut received_errors: HashMap<Uuid, Error> = HashMap::with_capacity(self.jobdef.dependencies.len());
// Helper function to check whether all UUIDs are in a list of UUIDs
let all_dependencies_are_in = |dependency_uuids: &[Uuid], list: &HashMap<Uuid, Vec<_>>| {
@@ -501,7 +501,9 @@ impl<'a> JobTask<'a> {
//
// We only send to one parent, because it doesn't matter anymore
// We know that we have at least one sender available
- self.sender[0].send(Err(vec![(job_uuid, e)])).await?;
+ let mut errormap = HashMap::with_capacity(1);
+ errormap.insert(job_uuid, e);
+ self.sender[0].send(Err(errormap)).await?;
},
// if the scheduler run reports success,
@@ -526,7 +528,7 @@ impl<'a> JobTask<'a> {
///
/// Return Ok(true) if we should continue operation
/// Return Ok(false) if the channel is empty and we're done receiving
- async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> {
+ async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<Artifact>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
match self.receiver.recv().await {
Some(Ok(mut v)) => {
// The task we depend on succeeded and returned an
@@ -539,7 +541,7 @@ impl<'a> JobTask<'a> {
// The task we depend on failed
// we log that error for now
trace!("[{}]: Received: {:?}", self.jobdef.job.uuid(), e);
- received_errors.append(&mut e);
+ received_errors.extend(e);
Ok(true)
},
None => {