summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@atos.net>2021-05-11 08:26:23 +0200
committerMatthias Beyer <matthias.beyer@atos.net>2021-05-11 09:54:28 +0200
commit7f3eeeb39c34903f2bd709a78287bc2b655b062c (patch)
tree0b49937f26f7b426366a5e6384a37566b6cfb95b
parent8cf287dd93f95775787b3a83500fee23cd8f07db (diff)
Fix: Do not return "dependencies missing" error on closed channel
This patch fixes the implementation of the perform_receive() helper function to return Ok(false) if the channel was closed (recv() returned None) and there are already errors in the error buffer. This case wasn't taken care of before, which resulted in the inconveniance that the channel was closed but if a task still performed a receive, it got a `None`, checked its dependencies and noted that there were dependencies missing. This resulted in a "Childs finished, but dependencies missing" error being printed to the user. This is not the case, though. This fix changes the implementation so that the `perform_receive()` function simply returns a `Ok(false)` if the channel was closed and there were errors. The call to the function was hence moved _after_ the check whether errors were received (in the caller), so that these errors are propagated appropriately. Signed-off-by: Matthias Beyer <matthias.beyer@atos.net> (cherry picked from commit 338ea0827c6d6ab0ac41b35a7bd066c528c5c811)
-rw-r--r--src/orchestrator/orchestrator.rs18
1 files changed, 14 insertions, 4 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 5df05c1..ff46a00 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -534,9 +534,6 @@ impl<'a> JobTask<'a> {
trace!("[{}]: receiving...", self.jobdef.job.uuid());
// receive from the receiver
let continue_receiving = self.perform_receive(&mut received_dependencies, &mut received_errors).await?;
- if !continue_receiving {
- break;
- }
trace!("[{}]: Received errors = {:?}", self.jobdef.job.uuid(), received_errors);
// if there are any errors from child tasks
@@ -554,6 +551,10 @@ impl<'a> JobTask<'a> {
self.jobdef.job.package().version()));
return Ok(())
}
+
+ if !continue_receiving {
+ break;
+ }
}
// check if a job that looks very similar to this job has already produced artifacts.
@@ -726,7 +727,8 @@ impl<'a> JobTask<'a> {
/// `received_errors`
///
/// Return Ok(true) if we should continue operation
- /// Return Ok(false) if the channel is empty and we're done receiving
+ /// Return Ok(false) if the channel is empty and we're done receiving or if the channel is
+ /// empty and there were errors collected
async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ArtifactPath>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
match self.receiver.recv().await {
Some(Ok(mut v)) => {
@@ -747,6 +749,14 @@ impl<'a> JobTask<'a> {
// The task we depend on finished... we must check what we have now...
trace!("[{}]: Received nothing, channel seems to be empty", self.jobdef.job.uuid());
+ // If the channel was closed and there are already errors in the `received_errors`
+ // buffer, we return Ok(false) to notify the caller that we should not continue
+ // receiving
+ if !received_errors.is_empty() {
+ trace!("[{}]: There are errors, stop receiving", self.jobdef.job.uuid());
+ return Ok(false)
+ }
+
// Find all dependencies that we need but which are not received
let received = received_dependencies.keys().collect::<Vec<_>>();
let missing_deps: Vec<_> = self.jobdef