diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 18 |
1 files changed, 14 insertions, 4 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 5df05c1..ff46a00 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -534,9 +534,6 @@ impl<'a> JobTask<'a> { trace!("[{}]: receiving...", self.jobdef.job.uuid()); // receive from the receiver let continue_receiving = self.perform_receive(&mut received_dependencies, &mut received_errors).await?; - if !continue_receiving { - break; - } trace!("[{}]: Received errors = {:?}", self.jobdef.job.uuid(), received_errors); // if there are any errors from child tasks @@ -554,6 +551,10 @@ impl<'a> JobTask<'a> { self.jobdef.job.package().version())); return Ok(()) } + + if !continue_receiving { + break; + } } // check if a job that looks very similar to this job has already produced artifacts. @@ -726,7 +727,8 @@ impl<'a> JobTask<'a> { /// `received_errors` /// /// Return Ok(true) if we should continue operation - /// Return Ok(false) if the channel is empty and we're done receiving + /// Return Ok(false) if the channel is empty and we're done receiving or if the channel is + /// empty and there were errors collected async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ArtifactPath>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> { match self.receiver.recv().await { Some(Ok(mut v)) => { @@ -747,6 +749,14 @@ impl<'a> JobTask<'a> { // The task we depend on finished... we must check what we have now... trace!("[{}]: Received nothing, channel seems to be empty", self.jobdef.job.uuid()); + // If the channel was closed and there are already errors in the `received_errors` + // buffer, we return Ok(false) to notify the caller that we should not continue + // receiving + if !received_errors.is_empty() { + trace!("[{}]: There are errors, stop receiving", self.jobdef.job.uuid()); + return Ok(false) + } + // Find all dependencies that we need but which are not received let received = received_dependencies.keys().collect::<Vec<_>>(); let missing_deps: Vec<_> = self.jobdef |