summaryrefslogtreecommitdiffstats
path: root/src/orchestrator/orchestrator.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/orchestrator/orchestrator.rs')
-rw-r--r--src/orchestrator/orchestrator.rs18
1 files changed, 14 insertions, 4 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 5df05c1..ff46a00 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -534,9 +534,6 @@ impl<'a> JobTask<'a> {
trace!("[{}]: receiving...", self.jobdef.job.uuid());
// receive from the receiver
let continue_receiving = self.perform_receive(&mut received_dependencies, &mut received_errors).await?;
- if !continue_receiving {
- break;
- }
trace!("[{}]: Received errors = {:?}", self.jobdef.job.uuid(), received_errors);
// if there are any errors from child tasks
@@ -554,6 +551,10 @@ impl<'a> JobTask<'a> {
self.jobdef.job.package().version()));
return Ok(())
}
+
+ if !continue_receiving {
+ break;
+ }
}
// check if a job that looks very similar to this job has already produced artifacts.
@@ -726,7 +727,8 @@ impl<'a> JobTask<'a> {
/// `received_errors`
///
/// Return Ok(true) if we should continue operation
- /// Return Ok(false) if the channel is empty and we're done receiving
+ /// Return Ok(false) if the channel is empty and we're done receiving or if the channel is
+ /// empty and there were errors collected
async fn perform_receive(&mut self, received_dependencies: &mut HashMap<Uuid, Vec<ArtifactPath>>, received_errors: &mut HashMap<Uuid, Error>) -> Result<bool> {
match self.receiver.recv().await {
Some(Ok(mut v)) => {
@@ -747,6 +749,14 @@ impl<'a> JobTask<'a> {
// The task we depend on finished... we must check what we have now...
trace!("[{}]: Received nothing, channel seems to be empty", self.jobdef.job.uuid());
+ // If the channel was closed and there are already errors in the `received_errors`
+ // buffer, we return Ok(false) to notify the caller that we should not continue
+ // receiving
+ if !received_errors.is_empty() {
+ trace!("[{}]: There are errors, stop receiving", self.jobdef.job.uuid());
+ return Ok(false)
+ }
+
// Find all dependencies that we need but which are not received
let received = received_dependencies.keys().collect::<Vec<_>>();
let missing_deps: Vec<_> = self.jobdef