summaryrefslogtreecommitdiffstats
path: root/src/orchestrator
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-02-05 09:57:37 +0100
committerMatthias Beyer <mail@beyermatthias.de>2021-02-06 11:54:23 +0100
commit916b19f2c099beaf00d37e5d2cc8e2fe81440996 (patch)
tree3144198ca3d15e7e8f87c41ea65626349b44d11c /src/orchestrator
parent284f46561253e977691916b1bad6568d7033fc48 (diff)
Change implementation to allow multiple senders
This patch reimplements the result-propagation to allow multiple senders. .-> E / .-> C -< / \ D >-> A \ / `-> B -ยด In this scenario, A needs to send its results to multiple other jobs. This patch changes the implementation so that each JobTask has a Vec<_> of senders, which allows sending results to multiple parents. In case of error, the error is only propagated to one parent, because it doesn't matter. If there is an error, we fail and abort the whole tree anyways, so the other parents don't need to be notified of the error. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r--src/orchestrator/orchestrator.rs45
1 files changed, 34 insertions, 11 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs
index bed2195..49d95eb 100644
--- a/src/orchestrator/orchestrator.rs
+++ b/src/orchestrator/orchestrator.rs
@@ -246,7 +246,7 @@ impl<'a> Orchestrator<'a> {
database: self.database.clone(),
};
- (receiver, tp, sender, std::cell::RefCell::new(None as Option<Sender<JobResult>>))
+ (receiver, tp, sender, std::cell::RefCell::new(None as Option<Vec<Sender<JobResult>>>))
})
.collect();
@@ -261,9 +261,26 @@ impl<'a> Orchestrator<'a> {
// find the job that depends on this job
// use the sender of the found job and set it as sender for this job
for job in jobs.iter() {
- *job.3.borrow_mut() = jobs.iter()
- .find(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
- .map(|j| j.2.clone());
+ if let Some(mut v) = job.3.borrow_mut().as_mut() {
+ v.extend({
+ jobs.iter()
+ .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
+ .map(|j| j.2.clone())
+ });
+ } else {
+ *job.3.borrow_mut() = {
+ let depending_on_job = jobs.iter()
+ .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid()))
+ .map(|j| j.2.clone())
+ .collect::<Vec<Sender<JobResult>>>();
+
+ if depending_on_job.is_empty() {
+ None
+ } else {
+ Some(depending_on_job)
+ }
+ };
+ }
}
// Find the id of the root task
@@ -308,7 +325,7 @@ impl<'a> Orchestrator<'a> {
receiver: prep.0,
// the sender is set or we need to use the root sender
- sender: prep.3.into_inner().unwrap_or(root_sender),
+ sender: prep.3.into_inner().unwrap_or_else(|| vec![root_sender]),
}
})
.map(|task| task.run())
@@ -365,7 +382,7 @@ struct JobTask<'a> {
receiver: Receiver<JobResult>,
/// Channel to send the own build outputs to
- sender: Sender<JobResult>,
+ sender: Vec<Sender<JobResult>>,
}
impl<'a> JobTask<'a> {
@@ -418,7 +435,10 @@ impl<'a> JobTask<'a> {
// if there are any errors from child tasks
if !received_errors.is_empty() {
// send them to the parent,...
- self.sender.send(Err(received_errors)).await;
+ //
+ // We only send to one parent, because it doesn't matter
+ // And we know that we have at least one sender
+ self.sender[0].send(Err(received_errors)).await;
// ... and stop operation, because the whole tree will fail anyways.
return Ok(())
@@ -477,7 +497,10 @@ impl<'a> JobTask<'a> {
Err(e) => {
trace!("[{}]: Scheduler returned error = {:?}", self.jobdef.job.uuid(), e);
// ... and we send that to our parent
- self.sender.send(Err(vec![(job_uuid, e)])).await?;
+ //
+ // We only send to one parent, because it doesn't matter anymore
+ // We know that we have at least one sender available
+ self.sender[0].send(Err(vec![(job_uuid, e)])).await?;
},
// if the scheduler run reports success,
@@ -485,9 +508,9 @@ impl<'a> JobTask<'a> {
Ok(artifacts) => {
trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts);
received_dependencies.push((*self.jobdef.job.uuid(), artifacts));
- self.sender
- .send(Ok(received_dependencies))
- .await?;
+ for s in self.sender {
+ s.send(Ok(received_dependencies.clone())).await?;
+ }
},
}