summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-01-22 17:49:04 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-01-25 15:35:38 +0100
commit53360d499e2233b30b1aaf745800cd7199009153 (patch)
tree6c376366683ef7a2e2e7fe37d5f3bf480b84572d /src
parentdee5ef3a4bfbd6f8b6f5242130ca53628d9b33da (diff)
Outsource receiving, ensure we received it all
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src')
-rw-r--r--src/orchestrator/orchestrator.rs99
1 files changed, 65 insertions, 34 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index 2634b5c..5c5b8ec 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -301,40 +301,9 @@ impl<'a> JobTask<'a> {
trace!("[{}]: receiving...", self.uuid);
// receive from the receiver
- match self.receiver.recv().await {
- Some(Ok(v)) => {
- // The task we depend on succeeded and returned an
- // (uuid of the job, [Artifact])
- trace!("[{}]: Received: {:?}", self.uuid, v);
- received_dependencies.push(v)
- },
- Some(Err(mut e)) => {
- // The task we depend on failed
- // we log that error for now
- trace!("[{}]: Received: {:?}", self.uuid, e);
- received_errors.append(&mut e);
- },
- None => {
- // The task we depend on finished... we must check what we have now...
- trace!("[{}]: Received nothing, channel seems to be empty", self.uuid);
-
- // Find all dependencies that we need but which are not received
- let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>();
- let missing_deps: Vec<_> = self.jobdef
- .dependencies
- .iter()
- .filter(|d| !received.contains(d))
- .collect();
- trace!("[{}]: Missing dependencies = {:?}", self.uuid, missing_deps);
-
- // ... if there are any, error
- if !missing_deps.is_empty() {
- return Err(anyhow!("Childs finished, but dependencies still missing: {:?}", missing_deps))
- } else {
- // all dependencies are received
- break;
- }
- },
+ let continue_receiving = self.perform_receive(&mut received_dependencies, &mut received_errors).await?;
+ if !continue_receiving {
+ break;
}
trace!("[{}]: Received errors = {:?}", self.uuid, received_errors);
@@ -348,6 +317,20 @@ impl<'a> JobTask<'a> {
}
}
+ // receive items until the channel is empty.
+ //
+ // In the above loop, it could happen that we have all dependencies to run, but there is
+ // another job that reports artifacts.
+ // We need to collect them, too.
+ //
+ // This is techically not possible, because in a tree, we need all results from all childs.
+ // It just feels better having this in place as well.
+ //
+ // Sorry, not sorry.
+ while self.perform_receive(&mut received_dependencies, &mut received_errors).await? {
+ ;
+ }
+
// Map the list of received dependencies from
// Vec<(Uuid, Vec<Artifact>)>
// to
@@ -404,5 +387,53 @@ impl<'a> JobTask<'a> {
trace!("[{}]: Finished successfully", self.uuid);
Ok(())
}
+
+ /// Performe a recv() call on the receiving side of the channel
+ ///
+ /// Put the dependencies you received into the `received_dependencies`, the errors in the
+ /// `received_errors`
+ ///
+ /// Return Ok(true) if we should continue operation
+ /// Return Ok(false) if the channel is empty and we're done receiving
+ async fn perform_receive(&mut self, received_dependencies: &mut Vec<(Uuid, Vec<Artifact>)>, received_errors: &mut Vec<(Uuid, Error)>) -> Result<bool> {
+ match self.receiver.recv().await {
+ Some(Ok(v)) => {
+ // The task we depend on succeeded and returned an
+ // (uuid of the job, [Artifact])
+ trace!("[{}]: Received: {:?}", self.uuid, v);
+ received_dependencies.push(v);
+ Ok(true)
+ },
+ Some(Err(mut e)) => {
+ // The task we depend on failed
+ // we log that error for now
+ trace!("[{}]: Received: {:?}", self.uuid, e);
+ received_errors.append(&mut e);
+ Ok(true)
+ },
+ None => {
+ // The task we depend on finished... we must check what we have now...
+ trace!("[{}]: Received nothing, channel seems to be empty", self.uuid);
+
+ // Find all dependencies that we need but which are not received
+ let received = received_dependencies.iter().map(|tpl| tpl.0).collect::<Vec<_>>();
+ let missing_deps: Vec<_> = self.jobdef
+ .dependencies
+ .iter()
+ .filter(|d| !received.contains(d))
+ .collect();
+ trace!("[{}]: Missing dependencies = {:?}", self.uuid, missing_deps);
+
+ // ... if there are any, error
+ if !missing_deps.is_empty() {
+ return Err(anyhow!("Childs finished, but dependencies still missing: {:?}", missing_deps))
+ } else {
+ // all dependencies are received
+ Ok(false)
+ }
+ },
+ }
+ }
+
}