From 916b19f2c099beaf00d37e5d2cc8e2fe81440996 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Fri, 5 Feb 2021 09:57:37 +0100 Subject: Change implementation to allow multiple senders MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch reimplements the result-propagation to allow multiple senders. .-> E / .-> C -< / \ D >-> A \ / `-> B -ยด In this scenario, A needs to send its results to multiple other jobs. This patch changes the implementation so that each JobTask has a Vec<_> of senders, which allows sending results to multiple parents. In case of error, the error is only propagated to one parent, because it doesn't matter. If there is an error, we fail and abort the whole tree anyways, so the other parents don't need to be notified of the error. Signed-off-by: Matthias Beyer --- src/orchestrator/orchestrator.rs | 45 ++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 11 deletions(-) (limited to 'src/orchestrator') diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index bed2195..49d95eb 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -246,7 +246,7 @@ impl<'a> Orchestrator<'a> { database: self.database.clone(), }; - (receiver, tp, sender, std::cell::RefCell::new(None as Option>)) + (receiver, tp, sender, std::cell::RefCell::new(None as Option>>)) }) .collect(); @@ -261,9 +261,26 @@ impl<'a> Orchestrator<'a> { // find the job that depends on this job // use the sender of the found job and set it as sender for this job for job in jobs.iter() { - *job.3.borrow_mut() = jobs.iter() - .find(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) - .map(|j| j.2.clone()); + if let Some(mut v) = job.3.borrow_mut().as_mut() { + v.extend({ + jobs.iter() + .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) + .map(|j| j.2.clone()) + }); + } else { + *job.3.borrow_mut() = { + let depending_on_job = jobs.iter() + .filter(|j| j.1.jobdef.dependencies.contains(job.1.jobdef.job.uuid())) + .map(|j| j.2.clone()) + .collect::>>(); + + if depending_on_job.is_empty() { + None + } else { + Some(depending_on_job) + } + }; + } } // Find the id of the root task @@ -308,7 +325,7 @@ impl<'a> Orchestrator<'a> { receiver: prep.0, // the sender is set or we need to use the root sender - sender: prep.3.into_inner().unwrap_or(root_sender), + sender: prep.3.into_inner().unwrap_or_else(|| vec![root_sender]), } }) .map(|task| task.run()) @@ -365,7 +382,7 @@ struct JobTask<'a> { receiver: Receiver, /// Channel to send the own build outputs to - sender: Sender, + sender: Vec>, } impl<'a> JobTask<'a> { @@ -418,7 +435,10 @@ impl<'a> JobTask<'a> { // if there are any errors from child tasks if !received_errors.is_empty() { // send them to the parent,... - self.sender.send(Err(received_errors)).await; + // + // We only send to one parent, because it doesn't matter + // And we know that we have at least one sender + self.sender[0].send(Err(received_errors)).await; // ... and stop operation, because the whole tree will fail anyways. return Ok(()) @@ -477,7 +497,10 @@ impl<'a> JobTask<'a> { Err(e) => { trace!("[{}]: Scheduler returned error = {:?}", self.jobdef.job.uuid(), e); // ... and we send that to our parent - self.sender.send(Err(vec![(job_uuid, e)])).await?; + // + // We only send to one parent, because it doesn't matter anymore + // We know that we have at least one sender available + self.sender[0].send(Err(vec![(job_uuid, e)])).await?; }, // if the scheduler run reports success, @@ -485,9 +508,9 @@ impl<'a> JobTask<'a> { Ok(artifacts) => { trace!("[{}]: Scheduler returned artifacts = {:?}", self.jobdef.job.uuid(), artifacts); received_dependencies.push((*self.jobdef.job.uuid(), artifacts)); - self.sender - .send(Ok(received_dependencies)) - .await?; + for s in self.sender { + s.send(Ok(received_dependencies.clone())).await?; + } }, } -- cgit v1.2.3