diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-02-05 09:57:37 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-02-06 11:54:23 +0100 |
commit | 916b19f2c099beaf00d37e5d2cc8e2fe81440996 (patch) | |
tree | 3144198ca3d15e7e8f87c41ea65626349b44d11c /src/orchestrator | |
parent | 284f46561253e977691916b1bad6568d7033fc48 (diff) |
Change implementation to allow multiple senders
This patch reimplements the result-propagation to allow multiple
senders.
.-> E
/
.-> C -<
/ \
D >-> A
\ /
`-> B -ยด
In this scenario, A needs to send its results to multiple other jobs.
This patch changes the implementation so that each JobTask has a Vec<_>
of senders, which allows sending results to multiple parents.
In case of error, the error is only propagated to one parent, because it
doesn't matter. If there is an error, we fail and abort the whole tree
anyways, so the other parents don't need to be notified of the error.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/orchestrator')
-rw-r--r-- | src/orchestrator/orchestrator.rs | 45 |
1 files changed, 34 insertions, 11 deletions
diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index bed2195..49d95eb 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -246,7 +246,7 @@ impl<'a> Orchestrator<'a> { database: self.database.clone(), }; - (receiver, tp, sender, std::cell::RefCell::new(None as Option<Sender<JobResult>>)) + (receiver, tp, sender, std::cell::RefCell::new(None as Option<Vec<Sender<JobResult>>>)) }) .collect(); @@ -261,9 +261,26 @@ impl<'a> Orchestrator<'a> { // find the job that depends on this job // use the sender of the found job and set it as sender for this job for job in jobs.iter() { - *job.3.borrow_mut() = jobs.iter() - .find(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) - .map(|j| j.2.clone()); + if let Some(mut v) = job.3.borrow_mut().as_mut() { + v.extend({ + jobs.iter() + .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) + .map(|j| j.2.clone()) + }); + } else { + *job.3.borrow_mut() = { + let depending_on_job = jobs.iter() + .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) + .map(|j| j.2.clone()) + .collect::<Vec<Sender<JobResult>>>(); + + if depending_on_job.is_empty() { + None + } else { + Some(depending_on_job) + } + }; + } } // Find the id of the root task @@ -308,7 +325,7 @@ impl<'a> Orchestrator<'a> { receiver: prep.0, // the sender is set or we need to use the root sender - sender: prep.3.into_inner().unwrap_or(root_sender), + sender: prep.3.into_inner().unwrap_or_else(|| vec![root_sender]), } }) .map(|task| task.run()) @@ -365,7 +382,7 @@ struct JobTask<'a> { receiver: Receiver<JobResult>, /// Channel to send the own build outputs to - sender: Sender<JobResult>, + sender: Vec<Sender<JobResult>>, } impl<'a> JobTask<'a> { @@ -418,7 +435,10 @@ impl<'a> JobTask<'a> { // if there are any errors from child tasks if !received_errors.is_empty() { // send them to the parent,... - self.sender.send(Err(received_errors)).await; + // + // We only send to one parent, because it doesn't matter + // And we know that we have at least one sender + self.sender[0].send(Err(received_errors)).await; // ... and stop operation, because the whole tree will fail anyways. return Ok(()) @@ -477,7 +497,10 @@ impl<'a> JobTask<'a> { Err(e) => { trace!("[{}]: Scheduler returned error = {:?}", self.jobdef.job.uuid(), e); // ... and we send that to our parent - self.sender.send(Err(vec![(job_uuid, e)])).await?; + // + // We only send to one parent, because it doesn't matter anymore + // We know that we have at least one sender available + self.sender[0].send(Err(vec![(job_uuid, e)])).await?; }, // if the scheduler run reports success, @@ -485,9 +508,9 @@ impl<'a> JobTask<'a> { Ok(artifacts) => { trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts); received_dependencies.push((*self.jobdef.job.uuid(), artifacts)); - self.sender - .send(Ok(received_dependencies)) - .await?; + for s in self.sender { + s.send(Ok(received_dependencies.clone())).await?; + } }, } |