diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-01-22 17:49:04 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-01-25 15:35:38 +0100 |
commit | 53360d499e2233b30b1aaf745800cd7199009153 (patch) | |
tree | 6c376366683ef7a2e2e7fe37d5f3bf480b84572d /src | |
parent | dee5ef3a4bfbd6f8b6f5242130ca53628d9b33da (diff) |
Outsource receiving, ensure we received it all
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 99 |
1 files changed, 65 insertions, 34 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 2634b5c..5c5b8ec 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -301,40 +301,9 @@ impl<'a> JobTask<'a> { trace!("[{}]: receiving...", self.uuid); // receive from the receiver - match self.receiver.recv().await { - Some(Ok(v)) => { - // The task we depend on succeeded and returned an - // (uuid of the job, [Artifact]) - trace!("[{}]: Received: {:?}", self.uuid, v); - received_dependencies.push(v) - }, - Some(Err(mut e)) => { - // The task we depend on failed - // we log that error for now - trace!("[{}]: Received: {:?}", self.uuid, e); - received_errors.append(&mut e); - }, - None => { - // The task we depend on finished... we must check what we have now... - trace!("[{}]: Received nothing, channel seems to be empty", self.uuid); - - // Find all dependencies that we need but which are not received - let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>(); - let missing_deps: Vec<_> = self.jobdef - .dependencies - .iter() - .filter(|d| !received.contains(d)) - .collect(); - trace!("[{}]: Missing dependencies = {:?}", self.uuid, missing_deps); - - // ... if there are any, error - if !missing_deps.is_empty() { - return Err(anyhow!("Childs finished, but dependencies still missing: {:?}", missing_deps)) - } else { - // all dependencies are received - break; - } - }, + let continue_receiving = self.perform_receive(&mut received_dependencies, &mut received_errors).await?; + if !continue_receiving { + break; } trace!("[{}]: Received errors = {:?}", self.uuid, received_errors); @@ -348,6 +317,20 @@ impl<'a> JobTask<'a> { } } + // receive items until the channel is empty. + // + // In the above loop, it could happen that we have all dependencies to run, but there is + // another job that reports artifacts. + // We need to collect them, too. + // + // This is techically not possible, because in a tree, we need all results from all childs. + // It just feels better having this in place as well. + // + // Sorry, not sorry. + while self.perform_receive(&mut received_dependencies, &mut received_errors).await? { + ; + } + // Map the list of received dependencies from // Vec<(Uuid, Vec<Artifact>)> // to @@ -404,5 +387,53 @@ impl<'a> JobTask<'a> { trace!("[{}]: Finished successfully", self.uuid); Ok(()) } + + /// Performe a recv() call on the receiving side of the channel + /// + /// Put the dependencies you received into the `received_dependencies`, the errors in the + /// `received_errors` + /// + /// Return Ok(true) if we should continue operation + /// Return Ok(false) if the channel is empty and we're done receiving + async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> { + match self.receiver.recv().await { + Some(Ok(v)) => { + // The task we depend on succeeded and returned an + // (uuid of the job, [Artifact]) + trace!("[{}]: Received: {:?}", self.uuid, v); + received_dependencies.push(v); + Ok(true) + }, + Some(Err(mut e)) => { + // The task we depend on failed + // we log that error for now + trace!("[{}]: Received: {:?}", self.uuid, e); + received_errors.append(&mut e); + Ok(true) + }, + None => { + // The task we depend on finished... we must check what we have now... + trace!("[{}]: Received nothing, channel seems to be empty", self.uuid); + + // Find all dependencies that we need but which are not received + let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>(); + let missing_deps: Vec<_> = self.jobdef + .dependencies + .iter() + .filter(|d| !received.contains(d)) + .collect(); + trace!("[{}]: Missing dependencies = {:?}", self.uuid, missing_deps); + + // ... if there are any, error + if !missing_deps.is_empty() { + return Err(anyhow!("Childs finished, but dependencies still missing: {:?}", missing_deps)) + } else { + // all dependencies are received + Ok(false) + } + }, + } + } + } |